日常数据需求讲解2-json内容改造
背景
目前有这样一张表ods_cs_json表(算法模型返回客服舆情数据)有2个字段user_id、message,后续会将数据传输到后端kafka,message消息如下:
[ { "criteriaOfJudgment": "该用户在客诉工单中有1条语兴好物圈需净化内容", "matchingDegree": "100%", "profilingTag": "语兴好物客诉舆情用户", "tagType": "modelTag", "datasource":"社区" }, { "criteriaOfJudgment": "该用户在客诉工单中有1条语兴好物圈需净化内容", "matchingDegree": "100%", "profilingTag": "语兴好物客诉舆情用户", "tagType": "modelTag", "datasource":"电商" }, { "criteriaOfJudgment": "该用户在客诉工单中有1条语兴好物圈需净化内容", "matchingDegree": "100%", "profilingTag": "语兴好物客诉舆情用户", "tagType": "modelTag", "datasource":"金融" } ]
内容属于list套json,可能多个json在list中,目前需求是接入一个新的数据源,动态舆情,看下有客诉的用户他动态舆情多少(通过user_id关联),并把原来客诉工单号也给带上,动态舆情对应数据表样式:
CREATE TABLE IF NOT EXISTS ods_trend_intelligence_di( user_id STRING COMMENT 'user_id', trend_id STRING COMMENT '动态id', url STRING COMMENT '备注', )
最终实现样式为,就是把动态舆情数据与算法推送topic数据合并一起
[ { "criteriaOfJudgment": "该用户在客诉工单中有1条疑似被骗内容", "matchingDegree": "100%", "profilingTag": "疑似被骗用户", "tagType": "modelTag", "ticketList":"xxx", "datasource":"社区" }, { "criteriaOfJudgment": "该用户共发布动态数1条", "matchingDegree": "0%", "profilingTag": "动态舆情用户", "tagType": "bizTag", "ticketList":"xxx,xxx,xx", "datasource":"社区" } ]
思路
思路1:首先拿到这个需求先要明确2个json的内容映射,需要找产品和后端对齐
json中字段 |
映射字段 |
备注 |
criteriaOfJudgment |
计算好的动态数 |
|
matchingDegree |
0% |
由于是业务打标所以这里算法识别度为0% |
profilingTag |
动态舆情用户(文本) |
|
tagType |
bizTag(文本) |
|
datasource |
社区 |
思路2:由于原来list数据中是多json的,因此需要对list去炸裂,并解析出每个元素
思路3:对动态舆情数据进行指标计算,并且封装json
思路4:将动态舆情数据和算法推送客诉数据合并到一个list中
操作
insert overwrite table xxx.dwd_xxx_di partition(pt='${bizdate}') select t0.user_id ,t1.message from ods_cs_json t0 left join ( select concat('[',CONCAT_WS(',',message),coalesce(if(t1_1.content_json is not null ,concat(',',t1_1.content_json),null),''),']') as message ,user_id from ( SELECT collect_list(to_json(map( 'criteriaOfJudgment',criteriaOfJudgment, 'matchingDegree',matchingDegree, 'profilingTag',profilingTag, 'tagType',tagType, 'datasource',datasource, 'ticketList',concat_ws(',',t1_0_1.ticket_list) ) ) ) as message ,user_id from ( SELECT GET_JSON_OBJECT(message3,'$.criteriaOfJudgment') as criteriaOfJudgment ,GET_JSON_OBJECT(message3,'$.matchingDegree') as matchingDegree ,GET_JSON_OBJECT(message3,'$.profilingTag') as profilingTag --取出元素 ,GET_JSON_OBJECT(message3,'$.datasource') as datasource --取出元素 ,'modelTag' as tagType ,user_id from ( select concat(replace(replace(message2,'[{','{'),',{','{'),'}') as message3 ,user_id from ( select user_id ,message ,datasource from ods_cs_json where pt='${bizdate_0}' --由于是实时返回数据,需要取t-0分区 ) lateral view explode(split(message,'}')) t as message2--炸裂 ) where message3<>']}' ) t1_0_0 left join ( select collect_list(ticket_id) as ticket_list ,user_id ,datasource from dwd_cs_json_di where pt>='${bizdate_30}' group by user_id ,datasource ) t1_0_1 on t1_0_0.user_id=t1_0_1.user_id AND t1_0_0.datasource=t1_0_1.datasource group by user_id ) t1_0 left join ( select user_id ,concat('{"criteriaOfJudgment":"',criteriaOfJudgment,'",', '"matchingDegree":"','0%','",', '"profilingTag":"','动态舆情用户','",', '"datasource":"','社区','",', '"ticketList":"',ticketList,'",', '"tagType":"','bizTag','"}', ) as content_json from ( select concat('该用户共发布动态数',trend_cnt,'条') as criteriaOfJudgment concat_ws(',',trend_list)) as ticketList ,user_id from ( SELECT collect_list(concat(trend_id,'-',url)) as trend_list ,count(1) as trend_cnt ,user_id FROM ods_trend_intelligence_di WHERE pt >= '${bizdate_30}'--t-1 group by user_id ) ) ) t1_1 ON cast(t1_0.user_id as string)=cast(t1_1.user_id as string) ) t1 on t0.user_id = t1.user_id#数据人offer决赛圈怎么选##数据人的面试交流地##牛客创作赏金赛##聊聊我眼中的AI##Java#