Commit 96c4c74c by chenyuanjie

更新历史数据利润率-增加信息库索引更新+代码重构

parent f7a08785
......@@ -33,6 +33,7 @@ sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.es_util import EsUtils
from utils.DorisHelper import DorisHelper
from pyspark.sql import functions as F
from utils.common_util import CommonUtil
from datetime import datetime, timedelta
......@@ -109,8 +110,12 @@ class EsAsinProfitRate(object):
def run(self):
self.read_profit_rate_add()
self.update_history_index()
self.save_profit_rate_to_es() # 遍历完成后统一更新利润率索引
self.save_profit_rate_to_es()
# 更新最新3个月度详情索引和信息库月索引的利润率字段
for index in self.get_recent_indexes("st_detail_month"):
self.update_index_profit_rate(index)
for index in self.get_recent_indexes("ai_asin_analyze_detail"):
self.update_index_profit_rate(index)
# 更新信息库年表索引和流量选品30天索引的利润率字段
self.update_index_profit_rate(f"{self.site_name}_ai_asin_analyze_detail_last365_day")
self.update_index_profit_rate(f"{self.site_name}_flow_asin_30day")
......@@ -125,13 +130,22 @@ class EsAsinProfitRate(object):
self.df_asin_profit_rate = self.df_asin_profit_rate.withColumn(
'profit_key', F.concat_ws("_", F.col("asin"), F.col("price"))
).withColumn(
"update_time",
F.when(
F.col("updated_time").isNotNull(),
F.substring(F.col("updated_time"), 1, 10)
).otherwise(F.lit("1970-01-01"))
"update_time", F.substring(F.col("updated_time"), 1, 10)
).select(
'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time'
)
# 从Doris获取asin_crawl_date
df_crawl_date = DorisHelper.spark_import_with_connector(
session=self.spark,
table_identifier=f"selection.{self.site_name}_asin_latest_detail",
read_fields="asin,asin_crawl_date"
).withColumn(
"asin_crawl_date", F.lit(None).cast("string") # 初始化为 null
"asin_crawl_date", F.substring(F.col("asin_crawl_date"), 1, 10)
).repartition(40, 'asin')
self.df_asin_profit_rate = self.df_asin_profit_rate.join(
df_crawl_date, on='asin', how='left'
).select(
'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time', 'asin_crawl_date'
).cache()
......@@ -139,13 +153,9 @@ class EsAsinProfitRate(object):
self.df_asin_profit_rate.show(10, False)
def save_profit_rate_to_es(self):
"""遍历完成后,统一更新利润率索引"""
print(f"\n{'='*60}")
print(f"开始更新利润率索引:{self.es_profit_rate_index}")
print(f"{'='*60}")
print(f"最终利润率数据如下:")
self.df_asin_profit_rate.show(10, False)
EsUtils.create_index(self.es_profit_rate_index, self.es_client, self.es_profit_rate_body)
try:
self.df_asin_profit_rate.write.format("org.elasticsearch.spark.sql") \
......@@ -157,97 +167,26 @@ class EsAsinProfitRate(object):
print("An error occurred while writing to Elasticsearch:", str(e))
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES数据更新失败', f'失败索引:{self.es_profit_rate_index}')
def update_history_index(self):
"""更新历史月度索引的利润率数据"""
# 从 2025-05 开始,遍历到最新索引
start_date = datetime(2025, 5, 1)
current_date = start_date
while True:
year = current_date.year
month = current_date.month
month_str = f"{year}-{month:02d}"
index_name = f"{self.site_name}_st_detail_month_{year}_{month:02d}"
# 检查索引是否存在
if not self.es_client.indices.exists(index=index_name):
print(f"索引 {index_name} 不存在,停止遍历")
break
print(f"\n{'='*60}")
print(f"开始处理索引: {index_name}")
print(f"{'='*60}")
try:
self.update_single_history_index(index_name, month_str)
except Exception as e:
print(f"更新索引 {index_name} 失败: {str(e)}")
# 移动到下个月
if month == 12:
current_date = datetime(year + 1, 1, 1)
def get_recent_indexes(self, prefix, max_count=3):
"""从date_info对应月份开始往前找,返回最近max_count个存在的索引名"""
indexes = []
current = datetime.strptime(self.date_info, "%Y-%m-%d").replace(day=1)
checked = 0
while len(indexes) < max_count and checked < 24:
index_name = f"{self.site_name}_{prefix}_{current.year}_{current.month:02d}"
if EsUtils.exist_index(index_name, self.es_client):
indexes.append(index_name)
if current.month == 1:
current = current.replace(year=current.year - 1, month=12)
else:
current_date = datetime(year, month + 1, 1)
def update_single_history_index(self, index_name, month_str):
"""更新单个历史索引,同时更新 asin_crawl_date"""
hive_sql = f"""
SELECT asin, asin_price as price, asin_crawl_date FROM dwt_flow_asin
WHERE site_name = '{self.site_name}' AND date_type = 'month' AND date_info = '{month_str}' AND asin_price IS NOT NULL
"""
df_hive = self.spark.sql(hive_sql)
df_update = self.df_asin_profit_rate.join(
df_hive, on=['asin', 'price'], how='inner'
).withColumn(
"profit_rate_extra",
F.struct(
F.col("ocean_profit").alias("ocean_profit"),
F.col("air_profit").alias("air_profit")
)
).select("asin", "profit_rate_extra")
# 更新 df_asin_profit_rate 的 asin_crawl_date 字段(用当前分区的值覆盖)
df_crawl_date = df_hive.select(
'asin', 'price', F.substring(F.col('asin_crawl_date'), 1, 10).alias('new_crawl_date')
)
self.df_asin_profit_rate = self.df_asin_profit_rate.join(
df_crawl_date, on=['asin', 'price'], how='left'
).withColumn(
"asin_crawl_date", F.coalesce(F.col("new_crawl_date"), F.col("asin_crawl_date"))
).drop("new_crawl_date").cache()
es_options = {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.mapping.id": "asin",
"es.resource": f"{index_name}/_doc",
"es.batch.write.refresh": "false",
"es.batch.size.entries": "5000", # 批次数据量
"es.write.operation": "update",
"es.batch.write.concurrency": "5", # 降低并发数,默认是自动(较高)
"es.batch.write.retry.count": "3", # 重试次数
"es.batch.write.retry.wait": "30s", # 重试等待
"es.http.timeout": "5m", # 增加超时时间
"es.internal.es.version.ignore": "true" # 忽略版本检查
}
print(f"索引 {index_name} 待更新数据量: {df_update.count()}")
df_update.show(5, False)
df_update.repartition(10).write.format("org.elasticsearch.spark.sql") \
.options(**es_options) \
.mode("append") \
.save()
print(f"索引 {index_name} 更新完毕!")
current = current.replace(month=current.month - 1)
checked += 1
print(f"[{prefix}] 找到 {len(indexes)} 个索引:{indexes}")
return indexes
def update_index_profit_rate(self, index_name):
"""
从指定索引读取 asin + profit_key,
与新增利润率数据 inner join 后,只更新利润率相关字段回索引
直接将利润率数据写入目标索引,asin存在则更新利润率字段,不存在则跳过
"""
if not EsUtils.exist_index(index_name, self.es_client):
print(f"索引 {index_name} 不存在,跳过更新")
......@@ -257,28 +196,7 @@ class EsAsinProfitRate(object):
print(f"开始更新索引利润率字段:{index_name}")
print(f"{'='*60}")
# 从索引中读取 asin + profit_key
read_options = {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.nodes.wan.only": "true",
"es.read.field.include": "asin,profit_key"
}
df_index = self.spark.read.format("org.elasticsearch.spark.sql") \
.options(**read_options) \
.load(index_name) \
.select("asin", "profit_key") \
.dropna(subset=["profit_key"]) \
.repartition(40, "profit_key")
# 与新增利润率数据 inner join
df_update = df_index.join(
self.df_asin_profit_rate.select("profit_key", "ocean_profit", "air_profit"),
on="profit_key",
how="inner"
).withColumn(
df_update = self.df_asin_profit_rate.withColumn(
"profit_rate_extra",
F.when(
F.col("ocean_profit").isNull() & F.col("air_profit").isNull(),
......@@ -289,13 +207,7 @@ class EsAsinProfitRate(object):
F.col("air_profit").alias("air_profit")
)
)
).select("asin", "profit_rate_extra").cache()
count = df_update.count()
print(f"索引 {index_name} 待更新利润率数据量: {count}")
if count == 0:
print("无待更新数据,跳过")
return
).select("asin", "profit_key", "profit_rate_extra")
write_options = {
"es.nodes": EsUtils.__es_ip__,
......@@ -305,6 +217,7 @@ class EsAsinProfitRate(object):
"es.mapping.id": "asin",
"es.resource": f"{index_name}/_doc",
"es.write.operation": "update",
"es.batch.write.abort.on.failure": "false",
"es.update.retry.on.conflict": "3",
"es.batch.write.refresh": "false",
"es.batch.size.entries": "2000",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment