故事背景
数据处理逻辑:
- 将一个json的数组从map结构里面扣出来
- 然后将json数组里面的每一个元素和map结构里面的其他元素重新组成一个新的map,存入一个新表
实现方式:采用SparkSQL实现(Spark 3.1.2)
问题:数据少了很多
原始代码逻辑
insert overwrite table iceberg_table
select
id,
map_concat(map_filter(config, (k, v) -> k != 'items'), elem)
from (
select
id,
config,
from_json(config['items'], 'array<map<string,string>>') list
from table
) as a
LATERAL view explode(list) as elem
怎么发现问题的
- 总体条数的减少
- 条数不少,但是
config
这个map
中kv
个数的减少,采用sum(size(config))
问题
explode的问题
explode
的参数如果是null
或者数组为空的时候,整行记录都会被清除掉,而不是用一个null
来补充数据,而且如果你缺省设置为 array()
也是不行的,因为这个数组为空,这行数据还是会被清除掉的
map_concat的问题
map_concat
如果传入的后续参数里面有null
的话,整个函数的返回值也是为null
了
总结
基于以上2点的问题,导致了数据的大量减少
调整之后的逻辑
初版
insert overwrite table iceberg_table
select
id,
map_concat(map_filter(config, (k, v) -> k != 'items'), COALESCE(elem, map()))
from (
select
id,
config,
from_json(config['items'], 'array<map<string,string>>') list
from table
) as a
LATERAL view
explode(COALESCE(if(size(list)=0, null, list), array(null))) as elem
这版可以看到,我们需要在2个地方进行修改
终版
切入点:将2处修改合并到一处
insert overwrite table iceberg_table
select
id,
map_concat(map_filter(config, (k, v) -> k != 'items'), elem)
from (
select
id,
config,
from_json(config['items'], 'array<map<string,string>>') list
from table
) as a
LATERAL view
explode(COALESCE(if(size(list)=0, null, list), array(map())) as elem