Commit 4e5a044a by chenyuanjie

利润率更新方式fix

parent 96c4c74c
......@@ -110,15 +110,28 @@ class EsAsinProfitRate(object):
def run(self):
self.read_profit_rate_add()
# 利润率主索引
self.save_profit_rate_to_es()
# 更新最新3个月度详情索引和信息库月索引的利润率字段
for index in self.get_recent_indexes("st_detail_month"):
self.update_index_profit_rate(index)
for index in self.get_recent_indexes("ai_asin_analyze_detail"):
self.update_index_profit_rate(index)
# 更新信息库年表索引和流量选品30天索引的利润率字段
self.update_index_profit_rate(f"{self.site_name}_ai_asin_analyze_detail_last365_day")
self.update_index_profit_rate(f"{self.site_name}_flow_asin_30day")
# st_detail_month 近3个月
for index_name in self.get_recent_indexes("st_detail_month"):
date_info = self.get_date_info_from_index(index_name)
df_asin = self.spark.sql(f"""
select asin from dwt_flow_asin
where site_name='{self.site_name}' and date_type='month' and date_info='{date_info}'
""").repartition(40, 'asin')
self.update_by_hive_asin(index_name, df_asin)
# 年度信息库
df_year_asin = self.spark.sql(f"""
select asin from dwt_ai_asin_year
where site_name='{self.site_name}'
and date_info = (select max(date_info) from dwt_ai_asin_year where site_name='{self.site_name}')
""").repartition(40, 'asin')
self.update_by_hive_asin(f"{self.site_name}_ai_asin_analyze_detail_last365_day", df_year_asin)
# ai_asin_analyze_detail 近3个月(从 ES 读取 asin+price,按 asin+price 关联)
for index_name in self.get_recent_indexes("ai_asin_analyze_detail"):
self.update_by_es_asin_price(index_name)
# 30天流量选品(从 ES 读取 asin+price,按 asin+price 关联)
self.update_by_es_asin_price(f"{self.site_name}_flow_asin_30day")
def read_profit_rate_add(self):
# 读取利润率整合数据(增量数据)
......@@ -128,6 +141,8 @@ class EsAsinProfitRate(object):
"""
self.df_asin_profit_rate = self.spark.sql(sqlQuery=sql).repartition(40, 'asin')
self.df_asin_profit_rate = self.df_asin_profit_rate.withColumn(
'price', F.round(F.col('price'), 2)
).withColumn(
'profit_key', F.concat_ws("_", F.col("asin"), F.col("price"))
).withColumn(
"update_time", F.substring(F.col("updated_time"), 1, 10)
......@@ -184,19 +199,14 @@ class EsAsinProfitRate(object):
print(f"[{prefix}] 找到 {len(indexes)} 个索引:{indexes}")
return indexes
def update_index_profit_rate(self, index_name):
"""
直接将利润率数据写入目标索引,asin存在则更新利润率字段,不存在则跳过
"""
if not EsUtils.exist_index(index_name, self.es_client):
print(f"索引 {index_name} 不存在,跳过更新")
return
def get_date_info_from_index(self, index_name):
# 从索引名末尾提取 date_info,如 us_st_detail_month_2026_02 → 2026-02
parts = index_name.rsplit('_', 2)
return f"{parts[-2]}-{parts[-1]}"
print(f"\n{'='*60}")
print(f"开始更新索引利润率字段:{index_name}")
print(f"{'='*60}")
df_update = self.df_asin_profit_rate.withColumn(
def build_profit_update_df(self, df):
# 组装 profit_rate_extra 结构体,选出 update 所需列
return df.withColumn(
"profit_rate_extra",
F.when(
F.col("ocean_profit").isNull() & F.col("air_profit").isNull(),
......@@ -209,6 +219,7 @@ class EsAsinProfitRate(object):
)
).select("asin", "profit_key", "profit_rate_extra")
def write_profit_update(self, df, index_name):
write_options = {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
......@@ -222,12 +233,15 @@ class EsAsinProfitRate(object):
"es.batch.write.refresh": "false",
"es.batch.size.entries": "2000",
"es.batch.write.concurrency": "5",
"es.batch.write.retry.count": "5",
"es.batch.write.retry.count": "3",
"es.batch.write.retry.wait": "60s",
"es.nodes.wan.only": "true"
"es.nodes.wan.only": "false"
}
print(f"\n{'='*60}")
print(f"开始更新索引利润率字段:{index_name}")
print(f"{'='*60}")
try:
df_update.repartition(10).write.format("org.elasticsearch.spark.sql") \
df.repartition(10).write.format("org.elasticsearch.spark.sql") \
.options(**write_options) \
.mode("append") \
.save()
......@@ -236,6 +250,37 @@ class EsAsinProfitRate(object):
print(f"更新索引 {index_name} 失败: {str(e)}")
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES利润率更新失败', f'失败索引:{index_name}')
def update_by_hive_asin(self, index_name, df_hive_asin):
# Hive ASIN inner join 利润率增量数据,只更新有变化的 asin,减少 ES 写入量
if not EsUtils.exist_index(index_name, self.es_client):
print(f"索引 {index_name} 不存在,跳过更新")
return
df_update = df_hive_asin.join(self.df_asin_profit_rate, on='asin', how='inner')
df_update = self.build_profit_update_df(df_update)
self.write_profit_update(df_update, index_name)
def update_by_es_asin_price(self, index_name):
# 从 ES 读取 asin+price,按 asin+price 关联利润率后写回,确保 profit_key 匹配且不存在 document_missing
if not EsUtils.exist_index(index_name, self.es_client):
print(f"索引 {index_name} 不存在,跳过更新")
return
es_read_options = {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.nodes.wan.only": "true",
"es.read.field.include": "asin,price"
}
df_es = self.spark.read.format("org.elasticsearch.spark.sql") \
.options(**es_read_options) \
.load(index_name) \
.withColumn('price', F.round(F.col('price'), 2)) \
.repartition(40, 'asin')
df_update = df_es.join(self.df_asin_profit_rate, on=['asin', 'price'], how='inner')
df_update = self.build_profit_update_df(df_update)
self.write_profit_update(df_update, index_name)
if __name__ == "__main__":
site_name = sys.argv[1]
......
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
print(f"执行参数为{sys.argv}")
# 获取数据库引擎
db_type = "postgresql_15"
engine = get_remote_engine(
site_name='us',
db_type=db_type
)
if site_name == 'us':
export_tb = f"ai_asin_detail_month_{date_info.replace('-', '_')}"
else:
export_tb = f"{site_name}_ai_asin_detail_month_{date_info.replace('-', '_')}"
# 导出数据
engine.sqoop_raw_export(
hive_table="dwt_ai_asin_add",
import_table=export_tb,
partitions={
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
},
m=30,
cols="site_name,asin,weight,bought_month,category,img,title,brand,account_name,account_addr,buy_box_seller_type,"
"launch_time,img_num,variation_flag,variation_num,ao_val,category_id,category_current_id,parent_asin,bsr_rank,"
"price,rating,total_comments,seller_id,fb_country_name,review_json_list,launch_time_type,describe,product_json,"
"product_detail_json,bought_month_mom,bought_month_yoy,is_new_flag,is_ascending_flag"
)
print("success")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment