Commit d6bfe7bf by chenyuanjie

每日更新asin上榜数据

parent b0dd0bb5
......@@ -56,6 +56,7 @@ class EsAsinProfitRate(object):
self.df_asin_profit_rate = self.spark.sql(f"select 1+1;")
self.df_keepa_add = self.spark.sql(f"select 1+1;")
self.df_cate_flag = self.spark.sql(f"select 1+1;")
@staticmethod
def get_es_profit_rate_body():
......@@ -135,10 +136,12 @@ class EsAsinProfitRate(object):
date_info = self.get_date_info_from_index(index_name)
self.update_index_combined(index_name, self.get_month_last_day(date_info))
# 30天流量选品(base_date=None → 以每个 asin 的 asin_crawl_date 为基准)
self.read_cate_flag()
self.update_index_combined(f"{self.site_name}_flow_asin_30day")
# 释放缓存
self.df_asin_profit_rate.unpersist()
self.df_keepa_add.unpersist()
self.df_cate_flag.unpersist()
# ------------------------------------------------------------------ #
# 数据读取
......@@ -187,6 +190,22 @@ class EsAsinProfitRate(object):
print(f"增量keepa数据如下:")
self.df_keepa_add.show(10, False)
def read_cate_flag(self):
# 读取30day流量选品分类标记数据(用于更新30day索引额外字段)
sql = f"""
select asin,
asin_cate_flag as asin_source_flag,
bsr_latest_date as bsr_last_seen_at,
bsr_30day_count as bsr_seen_count_30d,
nsr_latest_date as nsr_last_seen_at,
nsr_30day_count as nsr_seen_count_30d
from dwd_asin_cate_flag
where site_name = '{self.site_name}' and date_type = '30day'
"""
self.df_cate_flag = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
print(f"cate_flag数据量:{self.df_cate_flag.count()}")
self.df_cate_flag.show(10, False)
# ------------------------------------------------------------------ #
# 利润率主索引
# ------------------------------------------------------------------ #
......@@ -275,9 +294,10 @@ class EsAsinProfitRate(object):
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.nodes.wan.only": "true",
"es.nodes.wan.only": "false",
"es.mapping.date.rich": "false",
"es.read.field.include": fields
"es.read.field.include": fields,
"es.scroll.size": "5000"
}
def write_combined_update(self, df, index_name):
......@@ -292,8 +312,8 @@ class EsAsinProfitRate(object):
"es.batch.write.abort.on.failure": "false",
"es.update.retry.on.conflict": "3",
"es.batch.write.refresh": "false",
"es.batch.size.entries": "2000",
"es.batch.write.concurrency": "5",
"es.batch.size.entries": "5000",
"es.batch.write.concurrency": "20",
"es.batch.write.retry.count": "3",
"es.batch.write.retry.wait": "60s",
"es.nodes.wan.only": "false"
......@@ -328,12 +348,14 @@ class EsAsinProfitRate(object):
# 1. 读取ES现有数据,date字段以原始字符串返回,profit_rate_extra以StructType读回
# asin_crawl_date 仅 30day 动态模式需要,按需读取
# cache:30day索引步骤9需复用 df_es.select('asin'),避免二次全量读取
df_es = self.spark.read.format("org.elasticsearch.spark.sql") \
.options(**self.get_es_read_options(include_crawl_date=(base_date is None))) \
.load(index_name) \
.withColumn('price', F.round(F.col('price'), 2)) \
.withColumn('tracking_since_type', F.col('tracking_since_type').cast('int')) \
.repartition(40, 'asin')
.repartition(40, 'asin') \
.cache()
# 2. 准备利润率增量(重命名避免列冲突)
df_profit = self.df_asin_profit_rate.select(
......@@ -419,6 +441,27 @@ class EsAsinProfitRate(object):
)
self.write_combined_update(df_update, index_name)
# 9. 30day 索引额外更新 cate_flag 相关字段(partial update,不影响其他字段)
# inner join df_es 确保只更新索引中已存在的 asin,避免 doc missing 报错
if base_date is None:
print(f"[30day] 开始更新 cate_flag 字段:asin_source_flag / bsr / nsr")
df_cate_update = self.df_cate_flag.join(
df_es.select('asin'), on='asin', how='inner'
).select(
'asin', 'asin_source_flag',
'bsr_last_seen_at', 'bsr_seen_count_30d',
'nsr_last_seen_at', 'nsr_seen_count_30d'
).na.fill({
'asin_source_flag': '0',
'bsr_last_seen_at': '1970-01-01',
'bsr_seen_count_30d': 0,
'nsr_last_seen_at': '1970-01-01',
'nsr_seen_count_30d': 0
})
self.write_combined_update(df_cate_update, index_name)
df_es.unpersist()
if __name__ == "__main__":
site_name = sys.argv[1]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment