Commit 67cc3704 by chenyuanjie

asin利润率相关链路整合优化

parent cc39dbf0
"""
author: CT
description: 利润率数据增量同步 + 去重整合 — 一站式 PG → Hive → Doris
步骤:
1) sqoop 从 PG {site_name}_asin_profit_rate_calc 增量拉取
时间窗 [last_date_info, 当天 00:00:00),先写入 Hive
dim_asin_profit_rate_info (site_name, date_info=今日) 分区作为"原始增量"
2) Spark 读当日 sqoop 增量 + Hive 历史所有分区,按 (asin, price) 去重
排序键:updated_time desc 保留最新一行
3) 覆盖当日分区为整合后全量快照;写入校验通过后删除所有 < 今日 的历史分区
4) 整合后的当日数据写入 Doris dwd.dwd_asin_profit_rate_latest
Doris UNIQUE KEY(site_name, asin, price) + sequence_col=asin_crawl_date 自动取最新
执行示例: spark-submit dim_asin_profit_rate_info.py us 2026-05-15
"""
import os
import sys
from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
from utils.hdfs_utils import HdfsUtils
from datetime import datetime, timedelta
from utils.DorisHelper import DorisHelper
class DimAsinProfitRateInfo(object):
......@@ -16,90 +33,183 @@ class DimAsinProfitRateInfo(object):
self.site_name = site_name
self.date_info = date_info
self.last_date_info = (datetime.strptime(date_info, "%Y-%m-%d").date() - timedelta(days=1)).strftime("%Y-%m-%d")
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}: {self.site_name} {self.date_info}")
self.df_asin_profit = self.spark.sql(f"select 1+1;")
self.df_keepa_asin = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
# 上限:程序运行当天 00:00:00(拉昨天及之前完整数据,今天还在写不拉)
self.upper_bound = datetime.now().strftime("%Y-%m-%d") + " 00:00:00"
self.spark = SparkUtil.get_spark_session(
f"{self.__class__.__name__}: {self.site_name} {self.date_info}"
)
self.hive_table = "dim_asin_profit_rate_info"
self.doris_db = "dwd"
self.doris_table = "dwd_asin_profit_rate_latest"
self.df_today = None
self.df_history = None
self.df_save = None
def run(self):
self.sqoop_to_hive()
self.read_data()
self.handle_data()
self.save_data()
self.write_to_doris()
def sqoop_to_hive(self):
"""sqoop 从 PG 增量拉到 Hive dim_asin_profit_rate_info 当日分区(原始增量)"""
partition_dict = {"site_name": self.site_name, "date_info": self.date_info}
hdfs_path = CommonUtil.build_hdfs_path(self.hive_table, partition_dict=partition_dict)
engine = get_remote_engine(site_name=self.site_name, db_type='postgresql_cluster')
import_table = f"{self.site_name}_asin_profit_rate_calc"
# 增量区间:[last_date_info, 程序运行当天 00:00:00)
# 上限用 self.upper_bound(当天 00:00:00),避免拉到 PG 当日正在写入的数据
query = f"""
SELECT asin, price, category, ocean_profit, air_profit,
package_length, package_width, package_height, weight,
updated_time, asin_crawl_date
FROM {import_table}
WHERE updated_time >= '{self.last_date_info}' AND updated_time < '{self.upper_bound}' AND \\$CONDITIONS
"""
print(f"sqoop query:\n{query}")
engine.sqoop_raw_import(
query=query,
hive_table=self.hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
check_count=True,
)
def read_data(self):
# 清洗整合利润率表
sql = f"""
select asin, price, category, ocean_profit, air_profit, package_length, package_width, package_height, weight, updated_time, date_info
from dim_asin_profit_rate_info where site_name = '{self.site_name}';
"""读当日 sqoop 增量 + 历史所有分区"""
sql_today = f"""
SELECT asin, price, category, ocean_profit, air_profit,
package_length, package_width, package_height, weight,
asin_crawl_date, updated_time
FROM {self.hive_table}
WHERE site_name = '{self.site_name}' AND date_info = '{self.date_info}'
"""
self.df_asin_profit = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
# 读取keepa数据
sql = f"""
select asin, package_length, package_width, package_height, item_weight as weight
from dim_keepa_asin_info where site_name = '{self.site_name}';
print(f"sql_today =\n{sql_today}")
self.df_today = self.spark.sql(sqlQuery=sql_today).repartition(40, 'asin', 'price')
sql_history = f"""
SELECT asin, price, category, ocean_profit, air_profit,
package_length, package_width, package_height, weight,
asin_crawl_date, updated_time
FROM {self.hive_table}
WHERE site_name = '{self.site_name}' AND date_info != '{self.date_info}'
"""
self.df_keepa_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin')
print(f"sql_history =\n{sql_history}")
self.df_history = self.spark.sql(sqlQuery=sql_history).repartition(40, 'asin', 'price').cache()
def handle_data(self):
# 因为keepa数据存在更新的情况,保留与keepa最新数据所对应的数据行
self.df_asin_profit = self.df_asin_profit.join(
self.df_keepa_asin, on=['asin', 'package_length', 'package_width', 'package_height', 'weight'], how='inner'
"""union 当日 + 历史,按 (asin, price) 去重保留 updated_time desc 最新一行"""
df_all = self.df_today.unionByName(self.df_history)
window = Window.partitionBy('asin', 'price').orderBy(
F.col('updated_time').desc_nulls_last(),
)
# 去重
window = Window.partitionBy(['asin', 'price']).orderBy(
self.df_asin_profit.updated_time.desc_nulls_last()
)
self.df_asin_profit = self.df_asin_profit.withColumn(
'rank', F.row_number().over(window=window)
).filter('rank = 1').drop('rank', 'date_info')
self.df_save = df_all.withColumn('rk', F.row_number().over(window)) \
.filter('rk = 1') \
.drop('rk')
def save_data(self):
self.df_save = self.df_asin_profit.withColumn(
"site_name", F.lit(self.site_name)
).withColumn(
"date_info", F.lit(self.date_info)
).repartition(10).cache()
"""覆盖当日分区为整合后全量快照;写入校验通过后删除历史分区"""
self.df_save = self.df_save \
.withColumn('site_name', F.lit(self.site_name)) \
.withColumn('date_info', F.lit(self.date_info)) \
.repartition(40).cache()
new_count = self.df_save.count()
print(f"最新数据量:{new_count}")
old_count = self.df_history.count()
print(f"整合后 new_count={new_count:,},历史 old_count={old_count:,}")
hive_tb = "dim_asin_profit_rate_info"
partition_dict = {
"site_name": self.site_name,
"date_info": self.date_info
}
partition_dict = {"site_name": self.site_name, "date_info": self.date_info}
partition_by = list(partition_dict.keys())
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
hdfs_path = CommonUtil.build_hdfs_path(self.hive_table, partition_dict)
# 先清掉 sqoop 写入的当日分区(hdfs + metastore),避免重复 append
print(f"清空当日分区 hdfs:{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path)
print(f"正在进行数据存储,当前存储的表名为:{hive_tb},存储路径:{hdfs_path}")
self.df_save.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
# 验证实际写入数量,确保写入成功后再删除历史分区
written_count = self.spark.sql(f"""
select count(1) as cnt from {hive_tb}
where site_name='{self.site_name}' and date_info='{self.date_info}'
""").collect()[0]['cnt']
print(f"实际写入数量:{written_count},预期:{new_count}")
self.spark.sql(
f"ALTER TABLE {self.hive_table} DROP IF EXISTS "
f"PARTITION (site_name='{self.site_name}', date_info='{self.date_info}')"
)
# 写入整合后的当日分区(try-except 保护:写入失败立即抛出,绝不进入删除历史分区流程)
print(f"写入 Hive 当日分区:{hdfs_path}")
try:
self.df_save.write.saveAsTable(
name=self.hive_table, format='hive', mode='append', partitionBy=partition_by
)
except Exception as e:
print(f"❌ Hive 当日分区写入失败:{e};保留所有历史分区不删,请人工排查!")
self.df_history.unpersist()
raise
# 验证当日分区实际写入行数与预期一致(防止 saveAsTable 静默丢数据)
written_count = self.spark.sql(
f"SELECT COUNT(1) AS cnt FROM {self.hive_table} "
f"WHERE site_name='{self.site_name}' AND date_info='{self.date_info}'"
).collect()[0]['cnt']
print(f"实际写入行数 written_count={written_count:,},预期 new_count={new_count:,}")
if written_count != new_count:
raise RuntimeError(
f"写入数量校验失败!实际写入 {written_count} != 预期 {new_count},终止删除历史分区,请人工检查。"
print(f"❌ 写入校验失败 written_count={written_count} != new_count={new_count},保留历史分区不删!")
self.df_history.unpersist()
return
# 安全控制:当日整合分区行数必须 >= 历史总和才允许删历史(防止整合丢失数据)
if new_count < old_count:
print(f"⚠️ 校验未通过 new_count={new_count} < old_count={old_count},跳过历史分区删除,请人工排查!")
self.df_history.unpersist()
return
print(f"校验通过,删除所有 date_info < {self.date_info} 的历史分区")
partitions = self.spark.sql(f"SHOW PARTITIONS {self.hive_table}").collect()
for row in partitions:
# row[0] 形如 "site_name=us/date_info=2026-05-10"
parts = dict(p.split('=') for p in row[0].split('/'))
p_site = parts.get('site_name')
p_date = parts.get('date_info')
if p_site == self.site_name and p_date < self.date_info:
print(f" 删除分区: site_name={p_site}, date_info={p_date}")
self.spark.sql(
f"ALTER TABLE {self.hive_table} DROP IF EXISTS "
f"PARTITION (site_name='{p_site}', date_info='{p_date}')"
)
print("写入校验通过!")
print(f"正在删除历史分区数据")
self.spark.sql(f"""
ALTER TABLE {hive_tb} DROP IF EXISTS PARTITION (site_name='{self.site_name}', date_info='{self.last_date_info}')
""")
HdfsUtils.delete_hdfs_file(
CommonUtil.build_hdfs_path(hive_tb, partition_dict={"site_name": self.site_name, "date_info": self.last_date_info})
CommonUtil.build_hdfs_path(
self.hive_table,
partition_dict={"site_name": p_site, "date_info": p_date},
)
)
print("历史分区清理完毕!")
self.df_history.unpersist()
def write_to_doris(self):
"""整合后的当日数据写 Doris dwd_asin_profit_rate_latest
Doris UNIQUE KEY(site_name, asin, price) + sequence_col=asin_crawl_date 自动按抓取时间取最新
"""
df_to_doris = self.df_save.select(
F.lit(self.site_name).alias('site_name'),
F.col('asin'),
F.round(F.col('price'), 2).cast('decimal(20,2)').alias('price'),
F.col('ocean_profit'),
F.col('air_profit'),
F.to_timestamp(F.col('asin_crawl_date')).alias('asin_crawl_date'),
F.to_timestamp(F.col('updated_time')).alias('update_time'),
).cache()
count = df_to_doris.count()
print(f"写入 Doris 数据量:{count:,}")
df_to_doris.show(10, truncate=False)
table_columns = "site_name, asin, price, ocean_profit, air_profit, asin_crawl_date, update_time"
DorisHelper.spark_export_with_columns(
df_save=df_to_doris,
db_name=self.doris_db,
table_name=self.doris_table,
table_columns=table_columns,
)
df_to_doris.unpersist()
self.df_save.unpersist()
print("success!")
if __name__ == "__main__":
site_name = sys.argv[1]
date_info = sys.argv[2]
handle_obj = DimAsinProfitRateInfo(site_name, date_info)
handle_obj.run()
DimAsinProfitRateInfo(site_name, date_info).run()
"""
author: CT
description: 导出待计算利润率的 ASIN
1) Hive dwt_flow_asin 月维度读取 date_info >= '2025-05' 的所有 ASIN:
asin / price / category_first_id / asin_crawl_date
2) Doris dwt.{site}_flow_asin_30day 读取所有相关 ASIN:
asin / price / category_first_id / asin_crawl_date
3) union 后按 (asin, price) 去重保留 asin_crawl_date 最新
4) LEFT JOIN 分类、INNER JOIN 当日 keepa 增量
5) keepa 关联到的 ASIN 全部导出 PG {site}_asin_profit_rate_calc 重新计算利润率
执行示例: spark-submit export_need_profit_rate.py us 2026-05-15
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from pyspark.sql import functions as F, Window
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from utils.DorisHelper import DorisHelper
START_MONTH = '2025-05'
class ExportNeedProfitRate(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info # 计算时间 yyyy-MM-dd(用作当日 keepa 增量分区)
self.spark = SparkUtil.get_spark_session(
f"{self.__class__.__name__}: {self.site_name} {self.date_info}"
)
def run(self):
df_export = self.build_export_df()
self.write_to_pg(df_export)
def build_export_df(self):
# 1. Hive dwt_flow_asin 月维度,date_info >= 2025-05 所有月份
sql_dwt = f"""
SELECT asin,
asin_price AS price,
category_first_id,
asin_crawl_date
FROM dwt_flow_asin
WHERE site_name = '{self.site_name}'
AND date_type = 'month'
AND date_info >= '{START_MONTH}'
AND asin_price > 0
"""
print(f"sql_dwt =\n{sql_dwt}")
df_dwt = self.spark.sql(sqlQuery=sql_dwt) \
.withColumn('price', F.round(F.col('price'), 2).cast('decimal(20,2)')) \
.withColumn('asin_crawl_date', F.to_timestamp(F.col('asin_crawl_date')))
# 2. Doris dwt.{site}_flow_asin_30day 所有相关 ASIN
doris_sql = f"""
SELECT asin, price, category_first_id, asin_crawl_date
FROM dwt.{self.site_name}_flow_asin_30day
WHERE price > 0
"""
print(f"doris_sql =\n{doris_sql}")
df_doris = DorisHelper.spark_import_with_sql(self.spark, doris_sql) \
.withColumn('price', F.round(F.col('price'), 2).cast('decimal(20,2)')) \
.withColumn('asin_crawl_date', F.col('asin_crawl_date').cast('timestamp')) \
.select('asin', 'price', 'category_first_id', 'asin_crawl_date')
# 3. union + 按 (asin, price) 去重保留 asin_crawl_date 最新
df_flow = df_dwt.unionByName(df_doris).repartition(40, 'asin', 'price')
window = Window.partitionBy('asin', 'price').orderBy(F.col('asin_crawl_date').desc_nulls_last())
df_flow = df_flow.withColumn('rk', F.row_number().over(window)) \
.filter('rk = 1') \
.drop('rk') \
.cache()
# 4. 分类名 LEFT JOIN
sql_cate = f"""
SELECT category_first_id, en_name AS category
FROM dim_bsr_category_tree
WHERE site_name = '{self.site_name}' AND nodes_num = 2
"""
df_cate = self.spark.sql(sqlQuery=sql_cate)
# 5. keepa 当日增量 INNER JOIN
sql_keepa = f"""
SELECT asin, package_length, package_width, package_height, item_weight AS weight
FROM dim_keepa_asin_info
WHERE site_name = '{self.site_name}' AND date_info = '{self.date_info}'
"""
df_keepa = self.spark.sql(sqlQuery=sql_keepa) \
.filter((F.col('package_length') > 0) &
(F.col('package_width') > 0) &
(F.col('package_height') > 0) &
(F.col('weight') > 0)) \
.repartition(40, 'asin')
df_result = df_flow \
.join(df_cate, on='category_first_id', how='left') \
.join(df_keepa, on='asin', how='inner') \
.withColumn('source_month', F.date_format(F.col('asin_crawl_date'), 'yyyy-MM')) \
.withColumn('part_key', F.ntile(50).over(Window.orderBy(F.rand()))) \
.select(
'asin', 'price', 'category',
'package_length', 'package_width', 'package_height', 'weight',
'part_key', 'source_month', 'asin_crawl_date',
)
count = df_result.count()
print(f"待计算利润率数据量:{count:,}")
df_result.show(10, truncate=False)
df_flow.unpersist()
return df_result
def write_to_pg(self, df_export):
con_info = DBUtil.get_connection_info(db_type='postgresql_cluster', site_name=self.site_name)
table_name = f"{self.site_name}_asin_profit_rate_calc"
print(f"导出到 PG {table_name}")
df_export.write.format("jdbc") \
.option("url", con_info["url"]) \
.option("dbtable", table_name) \
.option("user", con_info["username"]) \
.option("password", con_info["pwd"]) \
.mode("append") \
.save()
print("success")
if __name__ == "__main__":
site_name = sys.argv[1]
date_info = sys.argv[2]
ExportNeedProfitRate(site_name, date_info).run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment