Commit b16c59eb by chenyuanjie

fix

parent 0f05969c
......@@ -452,21 +452,19 @@ class EsAsinProfitRate(object):
# 8. 30day 索引额外更新 cate_flag 相关字段(partial update,不影响其他字段)
# inner join df_es 确保只更新索引中已存在的 asin,避免 doc missing 报错
# asin_source_flag:Hive 存为字符串,ES mapping 为 integer[],需转为 int 数组
if base_date is None:
print(f"[30day] 开始更新 cate_flag 字段:asin_source_flag / bsr / nsr")
df_cate_update = self.df_cate_flag.join(
df_es.select('asin'), on='asin', how='inner'
).select(
'asin', 'asin_source_flag',
'bsr_last_seen_at', 'bsr_seen_count_30d',
'nsr_last_seen_at', 'nsr_seen_count_30d'
).na.fill({
'asin_source_flag': '0',
'bsr_last_seen_at': '1970-01-01',
'bsr_seen_count_30d': 0,
'nsr_last_seen_at': '1970-01-01',
'nsr_seen_count_30d': 0
})
'asin',
F.transform(F.split(F.col('asin_source_flag'), ','), lambda x: x.cast('int')).alias('asin_source_flag'),
F.coalesce(F.col('bsr_last_seen_at'), F.lit('1970-01-01')).alias('bsr_last_seen_at'),
F.coalesce(F.col('bsr_seen_count_30d').cast('int'), F.lit(0)).alias('bsr_seen_count_30d'),
F.coalesce(F.col('nsr_last_seen_at'), F.lit('1970-01-01')).alias('nsr_last_seen_at'),
F.coalesce(F.col('nsr_seen_count_30d').cast('int'), F.lit(0)).alias('nsr_seen_count_30d')
)
self.write_combined_update(df_cate_update, index_name)
df_es.unpersist()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment