Commit 8d26273e by chenyuanjie

利润率每日更新

parent 72393bc8
......@@ -7,29 +7,27 @@ from utils.spark_util import SparkUtil
from utils.es_util import EsUtils
from pyspark.sql import functions as F
from utils.common_util import CommonUtil
from datetime import datetime, timedelta
class EsAsinProfitRate(object):
def __init__(self, site_name):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info
self.last_date_info = (datetime.strptime(date_info, "%Y-%m-%d").date() - timedelta(days=1)).strftime("%Y-%m-%d")
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}")
# ES相关配置
self.es_client = EsUtils.get_es_client()
self.es_index = f"{self.site_name}_profit_rate_extra"
self.es_body = self.get_es_body()
self.es_options = self.get_es_options(self.es_index)
self.profit_rate_policy = f"{self.site_name}_profit_rate_policy"
self.user_mask_asin_policy = "user_mask_asin_policy"
self.user_mask_category_policy = "user_mask_category_policy"
self.pipeline_id = f"{self.site_name}_user_mask_and_profit_rate_pipeline"
self.es_profit_rate_index = f"{self.site_name}_profit_rate_extra_v2"
self.es_profit_rate_body = self.get_es_profit_rate_body()
self.es_profit_rate_options = self.get_es_profit_rate_options(self.es_profit_rate_index)
self.df_asin_profit_rate = self.spark.sql(f"select 1+1;")
self.df_keepa_asin = self.spark.sql(f"select 1+1;")
@staticmethod
def get_es_body():
def get_es_profit_rate_body():
return {
"settings": {
"number_of_shards": "3",
......@@ -51,13 +49,17 @@ class EsAsinProfitRate(object):
},
"air_profit": {
"type": "float"
},
"update_time": {
"type": "date",
"format": "yyyy-MM-dd"
}
}
}
}
@staticmethod
def get_es_options(index_name):
def get_es_profit_rate_options(index_name):
return {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
......@@ -74,173 +76,121 @@ class EsAsinProfitRate(object):
}
def run(self):
self.read_data()
self.es_save()
self.create_enrich_policy()
self.create_enrich_pipeline()
self.save_profit_rate_add()
self.update_history_index()
def read_data(self):
def save_profit_rate_add(self):
# 读取利润率整合数据(增量数据)
sql = f"""
select asin, price, ocean_profit, air_profit, package_length, package_width, package_height, weight
from dim_asin_profit_rate_info where site_name = '{self.site_name}'
select asin, price, ocean_profit, air_profit, updated_time from dim_asin_profit_rate_info
where site_name = '{self.site_name}' and updated_time >= '{self.last_date_info}'
"""
self.df_asin_profit_rate = self.spark.sql(sqlQuery=sql).repartition(40, 'asin')
sql = f"""
select asin, package_length, package_width, package_height, weight
from dim_keepa_asin_info where site_name = '{self.site_name}'
"""
self.df_keepa_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin')
# 因为 dim_asin_profit_rate_info 存在重复计算利润率的情况,保留与keepa最新数据所对应的数据行
self.df_asin_profit_rate = self.df_asin_profit_rate.join(
self.df_keepa_asin, on=['asin', 'package_length', 'package_width', 'package_height', 'weight'], how='inner'
).select(
'asin', 'price', 'ocean_profit', 'air_profit'
).withColumn(
self.df_asin_profit_rate = self.df_asin_profit_rate.withColumn(
'profit_key', F.concat_ws("_", F.col("asin"), F.col("price"))
).withColumn(
"update_time",
F.when(
F.col("updated_time").isNotNull(),
F.substring(F.col("updated_time"), 1, 10)
).otherwise(F.lit("1970-01-01"))
).select(
'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time'
).cache()
print(f"增量利润率数据如下:")
self.df_asin_profit_rate.show(10, False)
def es_save(self):
print(f"创建富集索引:{self.es_index}!")
EsUtils.create_index(self.es_index, self.es_client, self.es_body)
print(f"创建利润率索引:{self.es_profit_rate_index}!")
EsUtils.create_index(self.es_profit_rate_index, self.es_client, self.es_profit_rate_body)
try:
self.df_asin_profit_rate.write.format("org.elasticsearch.spark.sql") \
.options(**self.es_options) \
.mode("append") \
.save()
print(f"ES {self.es_index} 索引更新完毕!")
# self.df_asin_profit_rate.write.format("org.elasticsearch.spark.sql") \
# .options(**self.es_profit_rate_options) \
# .mode("append") \
# .save()
print(f"ES {self.es_profit_rate_index} 索引更新完毕!")
except Exception as e:
print("An error occurred while writing to Elasticsearch:", str(e))
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES数据更新失败', f'失败索引:{self.es_index}')
def create_enrich_policy(self):
# self.es_client.ingest.delete_pipeline(id=self.pipeline_id)
# self.es_client.enrich.delete_policy(name=self.policy_name)
# print(f"创建富集策略:{self.policy_name}!")
# policy_body = {
# "match": {
# "indices": f"{self.es_index}",
# "match_field": "profit_key",
# "enrich_fields": ["ocean_profit", "air_profit"]
# }
# }
# self.es_client.enrich.put_policy(name=self.policy_name, body=policy_body)
print(f"刷新富集策略:{self.profit_rate_policy}!")
self.es_client.enrich.execute_policy(self.profit_rate_policy, request_timeout=1800)
def create_enrich_pipeline(self):
print(f"创建富集管道:{self.pipeline_id}!")
pipeline_body = {
"description": "asin profit_rate and user_mask pipeline",
"processors": [
{
"enrich": {
"policy_name": self.profit_rate_policy,
"field": "profit_key",
"target_field": "profit_rate_extra",
"max_matches": 1,
"ignore_missing": True
},
},
{
"enrich": {
"policy_name": f"{self.user_mask_asin_policy}",
"field": "asin",
"target_field": "policy_add_1",
"max_matches": 1,
"ignore_missing": True
},
},
{
"enrich": {
"policy_name": f"{self.user_mask_category_policy}",
"field": "category_id",
"target_field": "policy_add_2",
"max_matches": 1,
"ignore_missing": True
},
},
{
"set": {
"field": "usr_mask_type",
"value": "{{policy_add_1.usr_mask_type}}",
"ignore_empty_value": True
}
},
{
"set": {
"field": "usr_mask_progress",
"value": "{{policy_add_1.usr_mask_progress}}",
"ignore_empty_value": True
}
},
{
"set": {
"field": "package_quantity",
"value": "{{policy_add_1.package_quantity}}",
"ignore_empty_value": True
}
},
{
"set": {
"field": "usr_mask_type",
"value": "{{policy_add_2.usr_mask_type}}",
"ignore_empty_value": True
}
},
{
"remove": {
"field": "policy_add_1",
"ignore_missing": True
}
},
{
"remove": {
"field": "policy_add_2",
"ignore_missing": True
}
},
{
"convert": {
"field": "package_quantity",
"type": "integer",
"ignore_missing": True
}
}
]
}
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES数据更新失败', f'失败索引:{self.es_profit_rate_index}')
self.es_client.ingest.put_pipeline(id=self.pipeline_id, body=pipeline_body)
# 刷新ES数据,使pipeline生效
# body = {
# "query": {
# "bool": {
# "must_not": {
# "exists": {
# "field": "profit_rate_extra"
# }
# }
# }
# }
# }
body = {
}
self.es_client.update_by_query(
index="us_st_detail_month_2025_11",
body=body,
pipeline=self.pipeline_id,
refresh=True,
wait_for_completion=False,
request_timeout=600
def update_history_index(self):
"""更新历史月度索引的利润率数据"""
# 从 2025-05 开始,遍历到最新索引
start_date = datetime(2025, 5, 1)
current_date = start_date
while True:
year = current_date.year
month = current_date.month
month_str = f"{year}-{month:02d}"
index_name = f"{self.site_name}_st_detail_month_{year}_{month:02d}"
# 检查索引是否存在
if not self.es_client.indices.exists(index=index_name):
print(f"索引 {index_name} 不存在,停止遍历")
break
print(f"\n{'='*60}")
print(f"开始处理索引: {index_name}")
print(f"{'='*60}")
try:
self.update_single_history_index(index_name, month_str)
except Exception as e:
print(f"更新索引 {index_name} 失败: {str(e)}")
# 移动到下个月
if month == 12:
current_date = datetime(year + 1, 1, 1)
else:
current_date = datetime(year, month + 1, 1)
def update_single_history_index(self, index_name, month_str):
"""更新单个历史索引"""
hive_sql = f"""
select asin, asin_price as price from dwt_flow_asin where site_name = '{self.site_name}' and date_type = 'month'
and date_info = '{month_str}' and asin_price is not null
"""
df_hive = self.spark.sql(hive_sql)
df_update = self.df_asin_profit_rate.join(
df_hive, on=['asin', 'price'], how='inner'
).withColumn(
"profit_rate_extra",
F.struct(
F.col("ocean_profit").alias("ocean_profit"),
F.col("air_profit").alias("air_profit")
)
pass
).select("asin", "profit_rate_extra")
es_options = {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.mapping.id": "asin",
"es.resource": f"{index_name}/_doc",
"es.batch.write.refresh": "false",
"es.batch.size.entries": "5000",
"es.write.operation": "update",
"es.batch.write.retry.count": "3",
"es.batch.write.retry.wait": "10s",
"es.internal.es.version.ignore": "true" # 忽略版本检查
}
print(f"索引 {index_name} 待更新数据量: {df_update.count()}")
df_update.show(5, False)
df_update.write.format("org.elasticsearch.spark.sql") \
.options(**es_options) \
.mode("append") \
.save()
print(f"索引 {index_name} 更新完毕!")
if __name__ == "__main__":
site_name = sys.argv[1]
handle_obj = EsAsinProfitRate(site_name)
date_info = sys.argv[2]
handle_obj = EsAsinProfitRate(site_name, date_info)
handle_obj.run()
print("success!!!")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment