Commit b75a2b29 by chenyuanjie

每日更新keepa追踪时间

parent 4e5a044a
...@@ -38,6 +38,7 @@ from pyspark.sql import functions as F ...@@ -38,6 +38,7 @@ from pyspark.sql import functions as F
from utils.common_util import CommonUtil from utils.common_util import CommonUtil
from datetime import datetime, timedelta from datetime import datetime, timedelta
class EsAsinProfitRate(object): class EsAsinProfitRate(object):
def __init__(self, site_name, date_info): def __init__(self, site_name, date_info):
...@@ -54,6 +55,7 @@ class EsAsinProfitRate(object): ...@@ -54,6 +55,7 @@ class EsAsinProfitRate(object):
self.es_profit_rate_options = self.get_es_profit_rate_options(self.es_profit_rate_index) self.es_profit_rate_options = self.get_es_profit_rate_options(self.es_profit_rate_index)
self.df_asin_profit_rate = self.spark.sql(f"select 1+1;") self.df_asin_profit_rate = self.spark.sql(f"select 1+1;")
self.df_keepa_add = self.spark.sql(f"select 1+1;")
@staticmethod @staticmethod
def get_es_profit_rate_body(): def get_es_profit_rate_body():
...@@ -110,29 +112,37 @@ class EsAsinProfitRate(object): ...@@ -110,29 +112,37 @@ class EsAsinProfitRate(object):
def run(self): def run(self):
self.read_profit_rate_add() self.read_profit_rate_add()
self.read_keepa_add()
# 利润率主索引 # 利润率主索引
self.save_profit_rate_to_es() self.save_profit_rate_to_es()
# st_detail_month 近3个月 # st_detail_month 近3个月
for index_name in self.get_recent_indexes("st_detail_month"): for index_name in self.get_recent_indexes("st_detail_month"):
date_info = self.get_date_info_from_index(index_name) date_info = self.get_date_info_from_index(index_name)
df_asin = self.spark.sql(f""" self.update_index_combined(index_name, self.get_month_last_day(date_info))
select asin from dwt_flow_asin
where site_name='{self.site_name}' and date_type='month' and date_info='{date_info}'
""").repartition(40, 'asin')
self.update_by_hive_asin(index_name, df_asin)
# 年度信息库 # 年度信息库
df_year_asin = self.spark.sql(f""" year_max_date_info = self.spark.sql(f"""
select asin from dwt_ai_asin_year select max(date_info) from dwt_ai_asin_year where site_name='{self.site_name}'
where site_name='{self.site_name}' """).collect()[0][0]
and date_info = (select max(date_info) from dwt_ai_asin_year where site_name='{self.site_name}') if year_max_date_info is None:
""").repartition(40, 'asin') print(f"dwt_ai_asin_year 无数据,跳过年度索引更新")
self.update_by_hive_asin(f"{self.site_name}_ai_asin_analyze_detail_last365_day", df_year_asin) else:
# ai_asin_analyze_detail 近3个月(从 ES 读取 asin+price,按 asin+price 关联) self.update_index_combined(
f"{self.site_name}_ai_asin_analyze_detail_last365_day",
self.get_month_last_day(str(year_max_date_info))
)
# ai_asin_analyze_detail 近3个月
for index_name in self.get_recent_indexes("ai_asin_analyze_detail"): for index_name in self.get_recent_indexes("ai_asin_analyze_detail"):
self.update_by_es_asin_price(index_name) date_info = self.get_date_info_from_index(index_name)
# 30天流量选品(从 ES 读取 asin+price,按 asin+price 关联) self.update_index_combined(index_name, self.get_month_last_day(date_info))
self.update_by_es_asin_price(f"{self.site_name}_flow_asin_30day") # 30天流量选品(base_date=None → 以每个 asin 的 asin_crawl_date 为基准)
self.update_index_combined(f"{self.site_name}_flow_asin_30day")
# 释放缓存
self.df_asin_profit_rate.unpersist()
self.df_keepa_add.unpersist()
# ------------------------------------------------------------------ #
# 数据读取
# ------------------------------------------------------------------ #
def read_profit_rate_add(self): def read_profit_rate_add(self):
# 读取利润率整合数据(增量数据) # 读取利润率整合数据(增量数据)
sql = f""" sql = f"""
...@@ -150,7 +160,7 @@ class EsAsinProfitRate(object): ...@@ -150,7 +160,7 @@ class EsAsinProfitRate(object):
'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time' 'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time'
) )
# 从Doris获取asin_crawl_date # 从Doris获取asin_crawl_date(用于利润率主索引写入)
df_crawl_date = DorisHelper.spark_import_with_connector( df_crawl_date = DorisHelper.spark_import_with_connector(
session=self.spark, session=self.spark,
table_identifier=f"selection.{self.site_name}_asin_latest_detail", table_identifier=f"selection.{self.site_name}_asin_latest_detail",
...@@ -167,6 +177,19 @@ class EsAsinProfitRate(object): ...@@ -167,6 +177,19 @@ class EsAsinProfitRate(object):
print(f"增量利润率数据如下:") print(f"增量利润率数据如下:")
self.df_asin_profit_rate.show(10, False) self.df_asin_profit_rate.show(10, False)
def read_keepa_add(self):
# 读取keepa增量数据
sql = f"""
select asin, tracking_since from dim_keepa_asin_info
where site_name = '{self.site_name}' and updated_time >= '{self.last_date_info}'
"""
self.df_keepa_add = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
print(f"增量keepa数据如下:")
self.df_keepa_add.show(10, False)
# ------------------------------------------------------------------ #
# 利润率主索引
# ------------------------------------------------------------------ #
def save_profit_rate_to_es(self): def save_profit_rate_to_es(self):
print(f"\n{'='*60}") print(f"\n{'='*60}")
print(f"开始更新利润率索引:{self.es_profit_rate_index}") print(f"开始更新利润率索引:{self.es_profit_rate_index}")
...@@ -182,6 +205,9 @@ class EsAsinProfitRate(object): ...@@ -182,6 +205,9 @@ class EsAsinProfitRate(object):
print("An error occurred while writing to Elasticsearch:", str(e)) print("An error occurred while writing to Elasticsearch:", str(e))
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES数据更新失败', f'失败索引:{self.es_profit_rate_index}') CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES数据更新失败', f'失败索引:{self.es_profit_rate_index}')
# ------------------------------------------------------------------ #
# 工具方法
# ------------------------------------------------------------------ #
def get_recent_indexes(self, prefix, max_count=3): def get_recent_indexes(self, prefix, max_count=3):
"""从date_info对应月份开始往前找,返回最近max_count个存在的索引名""" """从date_info对应月份开始往前找,返回最近max_count个存在的索引名"""
indexes = [] indexes = []
...@@ -204,22 +230,57 @@ class EsAsinProfitRate(object): ...@@ -204,22 +230,57 @@ class EsAsinProfitRate(object):
parts = index_name.rsplit('_', 2) parts = index_name.rsplit('_', 2)
return f"{parts[-2]}-{parts[-1]}" return f"{parts[-2]}-{parts[-1]}"
def build_profit_update_df(self, df): def get_month_last_day(self, year_month_str):
# 组装 profit_rate_extra 结构体,选出 update 所需列 # year_month_str: "yyyy-MM",返回该月最后一天 "yyyy-MM-dd"
return df.withColumn( year, month = map(int, year_month_str.split('-'))
"profit_rate_extra", if month == 12:
F.when( last_day = datetime(year + 1, 1, 1) - timedelta(days=1)
F.col("ocean_profit").isNull() & F.col("air_profit").isNull(), else:
F.lit(None) last_day = datetime(year, month + 1, 1) - timedelta(days=1)
).otherwise( return last_day.strftime('%Y-%m-%d')
F.struct(
F.col("ocean_profit").alias("ocean_profit"), def get_tracking_interval_dict(self, base_date):
F.col("air_profit").alias("air_profit") base = datetime.strptime(base_date, '%Y-%m-%d')
) return {
) "one_month": (base - timedelta(days=30)).strftime('%Y-%m-%d'),
).select("asin", "profit_key", "profit_rate_extra") "three_month": (base - timedelta(days=90)).strftime('%Y-%m-%d'),
"six_month": (base - timedelta(days=180)).strftime('%Y-%m-%d'),
"twelve_month": (base - timedelta(days=360)).strftime('%Y-%m-%d'),
"twenty_four_month": (base - timedelta(days=720)).strftime('%Y-%m-%d'),
"thirty_six_month": (base - timedelta(days=1080)).strftime('%Y-%m-%d'),
}
def get_tracking_since_type_expr(self, iv):
return f"""
CASE WHEN tracking_since >= '{iv['one_month']}' THEN 1
WHEN tracking_since >= '{iv['three_month']}' AND tracking_since < '{iv['one_month']}' THEN 2
WHEN tracking_since >= '{iv['six_month']}' AND tracking_since < '{iv['three_month']}' THEN 3
WHEN tracking_since >= '{iv['twelve_month']}' AND tracking_since < '{iv['six_month']}' THEN 4
WHEN tracking_since >= '{iv['twenty_four_month']}' AND tracking_since < '{iv['twelve_month']}' THEN 5
WHEN tracking_since >= '{iv['thirty_six_month']}' AND tracking_since < '{iv['twenty_four_month']}' THEN 6
WHEN tracking_since < '{iv['thirty_six_month']}' THEN 7
ELSE 0 END
"""
def write_profit_update(self, df, index_name): # ------------------------------------------------------------------ #
# ES 读写配置
# ------------------------------------------------------------------ #
def get_es_read_options(self, include_crawl_date=False):
# es.mapping.date.rich=false:date字段以原始字符串返回,避免转为timestamp带来的精度问题
fields = "asin,price,profit_key,profit_rate_extra,tracking_since,tracking_since_type"
if include_crawl_date:
fields += ",asin_crawl_date"
return {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.nodes.wan.only": "true",
"es.mapping.date.rich": "false",
"es.read.field.include": fields
}
def write_combined_update(self, df, index_name):
write_options = { write_options = {
"es.nodes": EsUtils.__es_ip__, "es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__, "es.port": EsUtils.__es_port__,
...@@ -238,48 +299,125 @@ class EsAsinProfitRate(object): ...@@ -238,48 +299,125 @@ class EsAsinProfitRate(object):
"es.nodes.wan.only": "false" "es.nodes.wan.only": "false"
} }
print(f"\n{'='*60}") print(f"\n{'='*60}")
print(f"开始更新索引利润率字段:{index_name}") print(f"开始写回索引:{index_name}")
print(f"{'='*60}") print(f"{'='*60}")
try: try:
df.repartition(10).write.format("org.elasticsearch.spark.sql") \ df.repartition(10).write.format("org.elasticsearch.spark.sql") \
.options(**write_options) \ .options(**write_options) \
.mode("append") \ .mode("append") \
.save() .save()
print(f"索引 {index_name} 利润率字段更新完毕!") print(f"索引 {index_name} 利润率+追踪时间字段更新完毕!")
except Exception as e: except Exception as e:
print(f"更新索引 {index_name} 失败: {str(e)}") print(f"更新索引 {index_name} 失败: {str(e)}")
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES利润率更新失败', f'失败索引:{index_name}') CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES索引更新失败', f'失败索引:{index_name}')
def update_by_hive_asin(self, index_name, df_hive_asin): # ------------------------------------------------------------------ #
# Hive ASIN inner join 利润率增量数据,只更新有变化的 asin,减少 ES 写入量 # 核心更新逻辑:单次读 + 单次写
# ------------------------------------------------------------------ #
def update_index_combined(self, index_name, base_date=None):
"""
base_date: 固定基准日期(yyyy-MM-dd),用于月度/年度索引 tracking_since_type 计算
None 表示 30day 索引,以每个 asin 的 asin_crawl_date 为基准动态计算
"""
if not EsUtils.exist_index(index_name, self.es_client): if not EsUtils.exist_index(index_name, self.es_client):
print(f"索引 {index_name} 不存在,跳过更新") print(f"索引 {index_name} 不存在,跳过更新")
return return
df_update = df_hive_asin.join(self.df_asin_profit_rate, on='asin', how='inner') print(f"\n{'='*60}")
df_update = self.build_profit_update_df(df_update) print(f"开始处理索引:{index_name},base_date={base_date or 'asin_crawl_date(动态)'}")
self.write_profit_update(df_update, index_name) print(f"{'='*60}")
def update_by_es_asin_price(self, index_name): # 1. 读取ES现有数据,date字段以原始字符串返回,profit_rate_extra以StructType读回
# 从 ES 读取 asin+price,按 asin+price 关联利润率后写回,确保 profit_key 匹配且不存在 document_missing # asin_crawl_date 仅 30day 动态模式需要,按需读取
if not EsUtils.exist_index(index_name, self.es_client):
print(f"索引 {index_name} 不存在,跳过更新")
return
es_read_options = {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.nodes.wan.only": "true",
"es.read.field.include": "asin,price"
}
df_es = self.spark.read.format("org.elasticsearch.spark.sql") \ df_es = self.spark.read.format("org.elasticsearch.spark.sql") \
.options(**es_read_options) \ .options(**self.get_es_read_options(include_crawl_date=(base_date is None))) \
.load(index_name) \ .load(index_name) \
.withColumn('price', F.round(F.col('price'), 2)) \ .withColumn('price', F.round(F.col('price'), 2)) \
.withColumn('tracking_since_type', F.col('tracking_since_type').cast('int')) \
.repartition(40, 'asin') .repartition(40, 'asin')
df_update = df_es.join(self.df_asin_profit_rate, on=['asin', 'price'], how='inner')
df_update = self.build_profit_update_df(df_update) # 2. 准备利润率增量(重命名避免列冲突)
self.write_profit_update(df_update, index_name) df_profit = self.df_asin_profit_rate.select(
'asin', 'price',
F.col('profit_key').alias('new_profit_key'),
F.col('ocean_profit').cast('float').alias('new_ocean_profit'),
F.col('air_profit').cast('float').alias('new_air_profit')
)
# 3. 准备keepa增量(重命名避免列冲突)
df_keepa = self.df_keepa_add.select(
'asin',
F.col('tracking_since').alias('kp_tracking_since')
)
# 4. left join 两个增量数据集
df = df_es \
.join(df_profit, on=['asin', 'price'], how='left') \
.join(df_keepa, on='asin', how='left')
# 5. 过滤:至少一方有增量数据,减少无效写入
df = df.filter(F.col('new_profit_key').isNotNull() | F.col('kp_tracking_since').isNotNull())
# 6. 利润率字段合并:有新数据用新的,否则保留ES旧值
# 直接访问子字段构建固定 schema 的 struct,避免不同索引 profit_rate_extra 子字段数不同导致 schema 冲突
df = df.withColumn('profit_key',
F.when(F.col('new_profit_key').isNotNull(), F.col('new_profit_key'))
.otherwise(F.col('profit_key'))
).withColumn('profit_rate_extra',
F.struct(
F.when(F.col('new_ocean_profit').isNotNull(), F.col('new_ocean_profit'))
.otherwise(F.col('profit_rate_extra.ocean_profit').cast('float'))
.alias('ocean_profit'),
F.when(F.col('new_air_profit').isNotNull(), F.col('new_air_profit'))
.otherwise(F.col('profit_rate_extra.air_profit').cast('float'))
.alias('air_profit')
)
)
# 7. keepa字段合并:有新数据用新的,否则保留ES旧值
# 将 kp_tracking_since(keepa原始分钟值)转为日期字符串
df = df.withColumn('tracking_since',
F.when(
F.col('kp_tracking_since').isNotNull(),
F.date_format(
F.from_unixtime((F.col('kp_tracking_since') + F.lit(21564000)) * 60),
'yyyy-MM-dd'
)
).otherwise(F.col('tracking_since'))
)
# 计算 tracking_since_type
if base_date is not None:
# 固定基准:月度/年度索引
iv = self.get_tracking_interval_dict(base_date)
df = df.withColumn('tracking_since_type',
F.when(F.col('kp_tracking_since').isNotNull(), F.expr(self.get_tracking_since_type_expr(iv)))
.otherwise(F.col('tracking_since_type'))
)
else:
# 动态基准:30day索引,以每个 asin 的 asin_crawl_date 为基准
# asin_crawl_date 格式为 "yyyy-MM-dd HH:mm:ss",截取前10位转为日期字符串
# 将 tracking_since 字符串显式转为 DateType,避免与 date_sub 返回值类型不一致
df = df.withColumn('asin_crawl_date', F.substring(F.col('asin_crawl_date'), 1, 10))
ts_date = F.to_date(F.col('tracking_since'), 'yyyy-MM-dd')
df = df.withColumn('tracking_since_type',
F.when(F.col('kp_tracking_since').isNotNull(),
F.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 30), F.lit(1))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 90), F.lit(2))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 180), F.lit(3))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 360), F.lit(4))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 720), F.lit(5))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 1080), F.lit(6))
.when(ts_date < F.date_sub(F.col('asin_crawl_date'), 1080), F.lit(7))
.otherwise(F.lit(0))
).otherwise(F.col('tracking_since_type'))
)
# 8. 写回索引(tracking_since_type 写回 ES short 类型,需显式 cast)
df_update = df.select(
'asin', 'profit_key', 'profit_rate_extra', 'tracking_since',
F.col('tracking_since_type').cast('short')
)
self.write_combined_update(df_update, index_name)
if __name__ == "__main__": if __name__ == "__main__":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment