Commit 0f05969c by chenyuanjie

fix

parent d6bfe7bf
......@@ -286,7 +286,10 @@ class EsAsinProfitRate(object):
# ------------------------------------------------------------------ #
def get_es_read_options(self, include_crawl_date=False):
# es.mapping.date.rich=false:date字段以原始字符串返回,避免转为timestamp带来的精度问题
fields = "asin,price,profit_key,profit_rate_extra.ocean_profit,profit_rate_extra.air_profit,tracking_since,tracking_since_type"
# profit_rate_extra 不从ES读取:月度索引所有文档初始值为{},ES-Hadoop推断为空struct,
# 写回时F.when/otherwise两分支类型不一致导致struct字段被丢弃,序列化为{}
# 改为直接从Hive新值构建struct写入,彻底绕开空struct的schema推断问题
fields = "asin,profit_key,tracking_since,tracking_since_type"
if include_crawl_date:
fields += ",asin_crawl_date"
return {
......@@ -332,7 +335,7 @@ class EsAsinProfitRate(object):
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES索引更新失败', f'失败索引:{index_name}')
# ------------------------------------------------------------------ #
# 核心更新逻辑:单次读 + 单次写
# 核心更新逻辑:单次读 + 分类写入
# ------------------------------------------------------------------ #
def update_index_combined(self, index_name, base_date=None):
"""
......@@ -346,21 +349,20 @@ class EsAsinProfitRate(object):
print(f"开始处理索引:{index_name},base_date={base_date or 'asin_crawl_date(动态)'}")
print(f"{'='*60}")
# 1. 读取ES现有数据,date字段以原始字符串返回,profit_rate_extra以StructType读回
# 1. 读取ES现有数据,date字段以原始字符串返回
# profit_rate_extra 不读取(见 get_es_read_options 注释)
# asin_crawl_date 仅 30day 动态模式需要,按需读取
# cache:30day索引步骤9需复用 df_es.select('asin'),避免二次全量读取
# cache:30day索引步骤8需复用 df_es.select('asin'),避免二次全量读取
df_es = self.spark.read.format("org.elasticsearch.spark.sql") \
.options(**self.get_es_read_options(include_crawl_date=(base_date is None))) \
.load(index_name) \
.withColumn('price', F.round(F.col('price'), 2)) \
.withColumn('tracking_since_type', F.col('tracking_since_type').cast('int')) \
.repartition(40, 'asin') \
.cache()
# 2. 准备利润率增量(重命名避免列冲突
# 2. 准备利润率增量(直接用 profit_key 关联,无需读取 ES price 字段
df_profit = self.df_asin_profit_rate.select(
'asin', 'price',
F.col('profit_key').alias('new_profit_key'),
'profit_key',
F.col('ocean_profit').cast('float').alias('new_ocean_profit'),
F.col('air_profit').cast('float').alias('new_air_profit')
)
......@@ -372,30 +374,15 @@ class EsAsinProfitRate(object):
)
# 4. left join 两个增量数据集
# profit_key 格式统一(asin_price,已 round),ES 与 Hive 两侧直接对齐,无需借助 price 列
df = df_es \
.join(df_profit, on=['asin', 'price'], how='left') \
.join(df_profit, on='profit_key', how='left') \
.join(df_keepa, on='asin', how='left')
# 5. 过滤:至少一方有增量数据,减少无效写入
df = df.filter(F.col('new_profit_key').isNotNull() | F.col('kp_tracking_since').isNotNull())
# 6. 利润率字段合并:有新数据用新的,否则保留ES旧值
# 直接访问子字段构建固定 schema 的 struct,避免不同索引 profit_rate_extra 子字段数不同导致 schema 冲突
df = df.withColumn('profit_key',
F.when(F.col('new_profit_key').isNotNull(), F.col('new_profit_key'))
.otherwise(F.col('profit_key'))
).withColumn('profit_rate_extra',
F.struct(
F.when(F.col('new_ocean_profit').isNotNull(), F.col('new_ocean_profit'))
.otherwise(F.col('profit_rate_extra.ocean_profit').cast('float'))
.alias('ocean_profit'),
F.when(F.col('new_air_profit').isNotNull(), F.col('new_air_profit'))
.otherwise(F.col('profit_rate_extra.air_profit').cast('float'))
.alias('air_profit')
)
)
df = df.filter(F.col('new_ocean_profit').isNotNull() | F.col('new_air_profit').isNotNull() | F.col('kp_tracking_since').isNotNull())
# 7. keepa字段合并:有新数据用新的,否则保留ES旧值
# 6. keepa字段合并:有新数据用新的,否则保留ES旧值
# 将 kp_tracking_since(keepa原始分钟值)转为日期字符串
df = df.withColumn('tracking_since',
F.when(
......@@ -434,14 +421,36 @@ class EsAsinProfitRate(object):
).otherwise(F.col('tracking_since_type'))
)
# 8. 写回索引(tracking_since_type 写回 ES short 类型,需显式 cast)
df_update = df.select(
'asin', 'profit_key', 'profit_rate_extra', 'tracking_since',
# 7. 写回索引(tracking_since_type 写回 ES short 类型,需显式 cast)
# 拆为两次写入:
# 7a. 有利润率数据的行 → profit_rate_extra struct 从新值直接构建,彻底绕开空struct schema推断问题
# 7b. 仅有 keepa 数据的行 → 不写 profit_rate_extra,避免 null 覆盖已有利润率值
df = df.cache()
df_profit_update = df.filter(
F.col('new_ocean_profit').isNotNull() | F.col('new_air_profit').isNotNull()
).select(
'asin', 'profit_key',
F.struct(
F.col('new_ocean_profit').cast('float').alias('ocean_profit'),
F.col('new_air_profit').cast('float').alias('air_profit')
).alias('profit_rate_extra'),
'tracking_since',
F.col('tracking_since_type').cast('short')
)
self.write_combined_update(df_update, index_name)
self.write_combined_update(df_profit_update, index_name)
df_keepa_update = df.filter(
F.col('new_ocean_profit').isNull() & F.col('new_air_profit').isNull()
).select(
'asin', 'profit_key', 'tracking_since',
F.col('tracking_since_type').cast('short')
)
self.write_combined_update(df_keepa_update, index_name)
df.unpersist()
# 9. 30day 索引额外更新 cate_flag 相关字段(partial update,不影响其他字段)
# 8. 30day 索引额外更新 cate_flag 相关字段(partial update,不影响其他字段)
# inner join df_es 确保只更新索引中已存在的 asin,避免 doc missing 报错
if base_date is None:
print(f"[30day] 开始更新 cate_flag 字段:asin_source_flag / bsr / nsr")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment