Commit 7edc66cc by fangxingjun

Merge branch 'developer' of 47.106.101.75:abel_cjy/Amazon-Selection-Data into developer

parents e56a458e 5655a77a
"""
author: CT
description: 利润率数据增量整合 — ODS → Hive → Doris
前置:ods_asin_profit_rate.py 已将当日 PG 增量 sqoop 到 ods_asin_profit_rate
步骤:
1) 读 ods_asin_profit_rate 当日分区(原始增量)+ dim_asin_profit_rate_info 历史分区
2) 按 (asin, price) 去重,updated_time desc 保留最新一行
3) 覆盖 dim_asin_profit_rate_info 当日分区为整合后全量快照;校验后删除历史分区
4) 当日 ODS 增量写入 Doris dwd.dwd_asin_profit_rate_latest
Doris UNIQUE KEY(site_name, asin, price) + sequence_col=update_time 自动取最新
执行示例: spark-submit dim_asin_profit_rate_info.py us 2026-05-15
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from datetime import datetime, timedelta
from utils.DorisHelper import DorisHelper
class DimAsinProfitRateInfo(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info
self.last_date_info = (datetime.strptime(date_info, "%Y-%m-%d").date() - timedelta(days=1)).strftime("%Y-%m-%d")
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}: {self.site_name} {self.date_info}")
self.df_asin_profit = self.spark.sql(f"select 1+1;")
self.df_keepa_asin = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
self.site_name = site_name
self.date_info = date_info
self.spark = SparkUtil.get_spark_session(
f"{self.__class__.__name__}: {self.site_name} {self.date_info}"
)
self.ods_table = "ods_asin_profit_rate"
self.hive_table = "dim_asin_profit_rate_info"
self.doris_db = "dwd"
self.doris_table = "dwd_asin_profit_rate_latest"
self.df_today = None
self.df_history = None
self.df_save = None
def run(self):
self.read_data()
self.handle_data()
self.save_data()
self.write_to_doris()
def read_data(self):
# 清洗整合利润率表
sql = f"""
select asin, price, category, ocean_profit, air_profit, package_length, package_width, package_height, weight, updated_time, date_info
from dim_asin_profit_rate_info where site_name = '{self.site_name}';
"""读当日 ODS 增量(ods_asin_profit_rate)+ dim 历史所有分区"""
sql_today = f"""
SELECT asin, price, category, ocean_profit, air_profit,
package_length, package_width, package_height, weight,
asin_crawl_date, updated_time
FROM {self.ods_table}
WHERE site_name = '{self.site_name}' AND date_info = '{self.date_info}'
"""
self.df_asin_profit = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
# 读取keepa数据
sql = f"""
select asin, package_length, package_width, package_height, item_weight as weight
from dim_keepa_asin_info where site_name = '{self.site_name}';
print(f"sql_today =\n{sql_today}")
# cache:save_data 会 DROP PARTITION 当日分区,write_to_doris 还要复用此 df,必须先物化
# count() 强制立即物化,确保 cache 在 DROP/saveAsTable 之前已填充到 BlockManager
self.df_today = self.spark.sql(sqlQuery=sql_today).repartition(40, 'asin', 'price').cache()
self.df_today.count()
sql_history = f"""
SELECT asin, price, category, ocean_profit, air_profit,
package_length, package_width, package_height, weight,
asin_crawl_date, updated_time
FROM {self.hive_table}
WHERE site_name = '{self.site_name}' AND date_info != '{self.date_info}'
"""
self.df_keepa_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin')
print(f"sql_history =\n{sql_history}")
self.df_history = self.spark.sql(sqlQuery=sql_history).repartition(40, 'asin', 'price').cache()
def handle_data(self):
# 因为keepa数据存在更新的情况,保留与keepa最新数据所对应的数据行
self.df_asin_profit = self.df_asin_profit.join(
self.df_keepa_asin, on=['asin', 'package_length', 'package_width', 'package_height', 'weight'], how='inner'
)
# 去重
window = Window.partitionBy(['asin', 'price']).orderBy(
self.df_asin_profit.updated_time.desc_nulls_last()
"""union 当日 + 历史,按 (asin, price) 去重保留 updated_time desc 最新一行"""
df_all = self.df_today.unionByName(self.df_history)
window = Window.partitionBy('asin', 'price').orderBy(
F.col('updated_time').desc_nulls_last(),
)
self.df_asin_profit = self.df_asin_profit.withColumn(
'rank', F.row_number().over(window=window)
).filter('rank = 1').drop('rank', 'date_info')
self.df_save = df_all.withColumn('rk', F.row_number().over(window)) \
.filter('rk = 1') \
.drop('rk')
def save_data(self):
self.df_save = self.df_asin_profit.withColumn(
"site_name", F.lit(self.site_name)
).withColumn(
"date_info", F.lit(self.date_info)
).repartition(10).cache()
"""覆盖当日分区为整合后全量快照;写入校验通过后删除历史分区"""
self.df_save = self.df_save \
.withColumn('site_name', F.lit(self.site_name)) \
.withColumn('date_info', F.lit(self.date_info)) \
.repartition(40).cache()
new_count = self.df_save.count()
print(f"最新数据量:{new_count}")
hive_tb = "dim_asin_profit_rate_info"
partition_dict = {
"site_name": self.site_name,
"date_info": self.date_info
}
partition_by = list(partition_dict.keys())
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
old_count = self.df_history.count()
print(f"整合后 new_count={new_count:,},历史 old_count={old_count:,}")
partition_dict = {"site_name": self.site_name, "date_info": self.date_info}
partition_by = list(partition_dict.keys())
hdfs_path = CommonUtil.build_hdfs_path(self.hive_table, partition_dict)
# 先清掉 sqoop 写入的当日分区(hdfs + metastore),避免重复 append
print(f"清空当日分区 hdfs:{hdfs_path}")
HdfsUtils.delete_hdfs_file(hdfs_path)
print(f"正在进行数据存储,当前存储的表名为:{hive_tb},存储路径:{hdfs_path}")
self.df_save.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
# 验证实际写入数量,确保写入成功后再删除历史分区
written_count = self.spark.sql(f"""
select count(1) as cnt from {hive_tb}
where site_name='{self.site_name}' and date_info='{self.date_info}'
""").collect()[0]['cnt']
print(f"实际写入数量:{written_count},预期:{new_count}")
if written_count != new_count:
raise RuntimeError(
f"写入数量校验失败!实际写入 {written_count} != 预期 {new_count},终止删除历史分区,请人工检查。"
self.spark.sql(
f"ALTER TABLE {self.hive_table} DROP IF EXISTS "
f"PARTITION (site_name='{self.site_name}', date_info='{self.date_info}')"
)
# 写入整合后的当日分区(try-except 保护:写入失败立即抛出,绝不进入删除历史分区流程)
print(f"写入 Hive 当日分区:{hdfs_path}")
try:
self.df_save.write.saveAsTable(
name=self.hive_table, format='hive', mode='append', partitionBy=partition_by
)
print("写入校验通过!")
print(f"正在删除历史分区数据")
self.spark.sql(f"""
ALTER TABLE {hive_tb} DROP IF EXISTS PARTITION (site_name='{self.site_name}', date_info='{self.last_date_info}')
""")
HdfsUtils.delete_hdfs_file(
CommonUtil.build_hdfs_path(hive_tb, partition_dict={"site_name": self.site_name, "date_info": self.last_date_info})
except Exception as e:
print(f"❌ Hive 当日分区写入失败:{e};保留所有历史分区不删,请人工排查!")
self.df_history.unpersist()
raise
# 验证当日分区实际写入行数与预期一致(防止 saveAsTable 静默丢数据)
written_count = self.spark.sql(
f"SELECT COUNT(1) AS cnt FROM {self.hive_table} "
f"WHERE site_name='{self.site_name}' AND date_info='{self.date_info}'"
).collect()[0]['cnt']
print(f"实际写入行数 written_count={written_count:,},预期 new_count={new_count:,}")
if written_count != new_count:
print(f"❌ 写入校验失败 written_count={written_count} != new_count={new_count},保留历史分区不删!")
self.df_history.unpersist()
return
# 安全控制:当日整合分区行数必须 >= 历史总和才允许删历史(防止整合丢失数据)
if new_count < old_count:
print(f"⚠️ 校验未通过 new_count={new_count} < old_count={old_count},跳过历史分区删除,请人工排查!")
self.df_history.unpersist()
return
print(f"校验通过,删除所有 date_info < {self.date_info} 的历史分区")
partitions = self.spark.sql(f"SHOW PARTITIONS {self.hive_table}").collect()
for row in partitions:
# row[0] 形如 "site_name=us/date_info=2026-05-10"
parts = dict(p.split('=') for p in row[0].split('/'))
p_site = parts.get('site_name')
p_date = parts.get('date_info')
if p_site == self.site_name and p_date < self.date_info:
print(f" 删除分区: site_name={p_site}, date_info={p_date}")
self.spark.sql(
f"ALTER TABLE {self.hive_table} DROP IF EXISTS "
f"PARTITION (site_name='{p_site}', date_info='{p_date}')"
)
HdfsUtils.delete_hdfs_file(
CommonUtil.build_hdfs_path(
self.hive_table,
partition_dict={"site_name": p_site, "date_info": p_date},
)
)
print("历史分区清理完毕!")
self.df_history.unpersist()
def write_to_doris(self):
"""当日 sqoop 增量数据写 Doris dwd_asin_profit_rate_latest(不回灌历史)
Doris UNIQUE KEY(site_name, asin, price) + sequence_col=update_time 自动按更新时间取最新
"""
df_to_doris = self.df_today.select(
F.lit(self.site_name).alias('site_name'),
F.col('asin'),
F.round(F.col('price'), 2).cast('decimal(20,2)').alias('price'),
F.col('ocean_profit'),
F.col('air_profit'),
F.to_timestamp(F.col('asin_crawl_date')).alias('asin_crawl_date'),
F.to_timestamp(F.col('updated_time')).alias('update_time'),
).cache()
count = df_to_doris.count()
print(f"写入 Doris 增量数据量:{count:,}")
df_to_doris.show(10, truncate=False)
table_columns = "site_name, asin, price, ocean_profit, air_profit, asin_crawl_date, update_time"
DorisHelper.spark_export_with_columns(
df_save=df_to_doris,
db_name=self.doris_db,
table_name=self.doris_table,
table_columns=table_columns,
)
df_to_doris.unpersist()
self.df_today.unpersist()
self.df_save.unpersist()
print("success!")
if __name__ == "__main__":
site_name = sys.argv[1]
date_info = sys.argv[2]
handle_obj = DimAsinProfitRateInfo(site_name, date_info)
handle_obj.run()
DimAsinProfitRateInfo(site_name, date_info).run()
"""
author: CT
description: Keepa 数据聚合 — 一站式 Hive → Hive + Doris
步骤:
1) 读 Hive ods_keepa_asin_detail 当日分区,解析 last_detail JSON 各字段
派生 keepa_launch_time = min(listed_since, tracking_since) 转 yyyy-MM-dd HH:mm:ss
weight 字段已废弃置 NULL(Doris 端不再保留)
2) 与 Hive 历史 dim_keepa_asin_info union 按 asin 去重保留 updated_time 最新
3) 写入 Hive dim_keepa_asin_info(当日分区)+ 删除所有 date_info < 今日的历史分区
4) 当日新数据(不含历史)写入 Doris dwd.dwd_keepa_asin_detail
Doris UNIQUE KEY(site_name, asin) + sequence_col=updated_time 自动取最新
执行示例: spark-submit dim_keepa_asin_info.py us 2026-05-15
"""
import os
import sys
......@@ -85,14 +98,14 @@ class DimKeepaAsinInfo(object):
# 过滤脏数据:productType in (3,4,5) 且 title 为空的异常数据不做保留
~(F.col("product_type").isin(3, 4, 5) & F.col("title").isNull())
).cache()
# 写入 Doris 需带 site_name 分区字段,并把 keepa_launch_time 转为 DATETIME(DDL 类型已改)
# 写入 Doris 需带 site_name 分区字段,并把 keepa_launch_time / updated_time 转为 DATETIME(DDL 类型已改)
# weight 字段在 Doris dwd_keepa_asin_detail 已废弃,select 不带
self.df_to_doris = self.df_keepa_asin.select(
F.lit(self.site_name).alias('site_name'),
'asin', 'package_length', 'package_width', 'package_height', 'package_weight', 'item_weight',
'listed_since', 'release_date', 'tracking_since',
F.to_timestamp(F.col('keepa_launch_time')).alias('keepa_launch_time'),
'updated_time'
F.to_timestamp(F.col('updated_time')).alias('updated_time')
)
# 读取历史数据
......@@ -117,7 +130,7 @@ class DimKeepaAsinInfo(object):
"site_name", F.lit(self.site_name)
).withColumn(
"date_info", F.lit(self.date_info)
).repartition(50).cache()
).repartition(40).cache()
new_count = self.df_save.count()
old_count = self.df_keepa_asin_history.count()
hive_tb = "dim_keepa_asin_info"
......
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.es_util import EsUtils
from pyspark.sql import functions as F, Window
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from datetime import datetime, timedelta
class DwtFlowKeepaAsin(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}: {self.site_name} {self.date_info}")
self.df_flow_asin = self.spark.sql(f"select 1+1;")
self.df_category_id = self.spark.sql(f"select 1+1;")
self.df_keepa_asin = self.spark.sql(f"select 1+1;")
self.df_calc_asin = self.spark.sql(f"select 1+1;")
self.df_export_asin = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def run(self):
self.read_data()
self.handle_data()
self.save_data()
def read_data(self):
# 读取流量选品月asin
sql = f"""
select asin, asin_price as price, category_first_id, date_info as source_month
from dwt_flow_asin
where site_name = '{self.site_name}'
and date_type = 'month'
and date_info >= '2025-05'
and asin_price is not null
and asin_price > 0
"""
df_flow_asin_month = self.spark.sql(sqlQuery=sql) \
.withColumn('price', F.round(F.col('price'), 2).cast('decimal(10,2)'))
# 读取ES最近30天缺少利润率的asin
days_30_ago = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d 00:00:00")
es_read_options = {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.nodes.wan.only": "false",
"es.mapping.date.rich": "false",
"es.scroll.size": "2000",
"es.read.field.include": "asin,price,category_first_id,asin_crawl_date",
"es.query": f'{{"query":{{"bool":{{"must":[{{"range":{{"price":{{"gt":0}}}}}},{{"range":{{"asin_crawl_date":{{"gte":"{days_30_ago}"}}}}}}],"must_not":{{"exists":{{"field":"profit_rate_extra.ocean_profit"}}}}}}}}}}'
}
df_flow_asin_30day = self.spark.read.format("org.elasticsearch.spark.sql") \
.options(**es_read_options) \
.load(f"{self.site_name}_flow_asin_30day") \
.withColumn('price', F.round(F.col('price'), 2).cast('decimal(10,2)')) \
.withColumn('source_month', F.date_format(F.col('asin_crawl_date'), 'yyyy-MM')) \
.select('asin', 'price', 'category_first_id', 'source_month')
# 合并两部分,按(asin, price)去重保留最新source_month
self.df_flow_asin = df_flow_asin_month.union(df_flow_asin_30day).repartition(40, 'asin')
window = Window.partitionBy(['asin', 'price']).orderBy(F.col('source_month').desc_nulls_last())
self.df_flow_asin = self.df_flow_asin.withColumn(
'rank', F.row_number().over(window=window)
).filter('rank = 1').drop('rank').cache()
# 读取分类数据
sql = f"""
select category_first_id, en_name as category from dim_bsr_category_tree where site_name = '{self.site_name}' and nodes_num = 2
"""
self.df_category_id = self.spark.sql(sqlQuery=sql).cache()
# 读取keepa数据
sql = f"""
select asin, package_length, package_width, package_height, item_weight as weight from dim_keepa_asin_info where site_name = '{self.site_name}' and date_info = '{self.date_info}'
"""
self.df_keepa_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').filter(
(F.col("package_length") > 0) & (F.col("package_width") > 0) & (F.col("package_height") > 0) & (F.col("weight") > 0)
).cache()
# 读取已经计算过利润率的asin
sql = f"""
select asin, price, package_length, package_width, package_height, weight from dim_asin_profit_rate_info where site_name = '{self.site_name}' and date_info = '{self.date_info}'
"""
self.df_calc_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
# 读取已经导出过asin+price,避免重复计算
sql = f"""
select asin, price, package_length, package_width, package_height, weight from dwt_flow_keepa_asin where site_name = '{self.site_name}'
"""
self.df_export_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
def handle_data(self):
self.df_save = self.df_flow_asin.join(
self.df_category_id, on='category_first_id', how='left'
).join(
self.df_keepa_asin, on='asin', how='inner'
).join(
self.df_calc_asin, on=['asin', 'price', 'package_length', 'package_width', 'package_height', 'weight'], how='left_anti'
).join(
self.df_export_asin, on=['asin', 'price', 'package_length', 'package_width', 'package_height', 'weight'], how='left_anti'
).cache()
self.df_flow_asin.unpersist()
self.df_category_id.unpersist()
self.df_keepa_asin.unpersist()
self.df_calc_asin.unpersist()
self.df_export_asin.unpersist()
start_key = 1
self.df_save = self.df_save.withColumn(
'part_key', F.ntile(50).over(Window.orderBy(F.rand())) + (start_key - 1)
).select(
F.col('asin'),
F.col('price'),
F.col('category'),
F.col('package_length'),
F.col('package_width'),
F.col('package_height'),
F.col('weight'),
F.col('part_key'),
F.col('source_month'),
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_info).alias('date_info')
).repartition(10)
def save_data(self):
hive_tb = "dwt_flow_keepa_asin"
partition_dict = {
"site_name": self.site_name,
"date_info": self.date_info
}
partition_by = list(partition_dict.keys())
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
HdfsUtils.delete_file_in_folder(hdfs_path)
print(f"正在进行数据存储,当前存储的表名为:{hive_tb},存储路径:{hdfs_path}")
self.df_save.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success")
if __name__ == "__main__":
site_name = sys.argv[1]
date_info = sys.argv[2]
handle_obj = DwtFlowKeepaAsin(site_name, date_info)
handle_obj.run()
......@@ -2,6 +2,7 @@
author: CT
description: 从 Hive dim_keepa_asin_info 读取 keepa 数据,过滤 updated_time 超过 3 个月的 asin,
附带当前月份字段 month(yyyy-MM),导出到 PG us_asin_profit_keepa_add,触发爬虫重新抓取 Keepa
导出前 LEFT ANTI 剔除 PG 表中已存在的 asin,避免重复触发
执行示例: spark-submit export_keepa_asin_del.py us
"""
import os
......@@ -28,22 +29,39 @@ if __name__ == '__main__':
WHERE site_name = '{site_name}'
"""
print(f"sql=\n{sql}")
df_all = spark.sql(sqlQuery=sql)
df_all = spark.sql(sqlQuery=sql).cache()
print(f"全量读取 keepa 数据:{df_all.count()}")
# 2. Spark 端过滤超过 3 个月的 asin(数据读取后处理,不在 Hive SQL 中算)
three_months_ago = (datetime.now() - relativedelta(months=3)).strftime('%Y-%m-%d %H:%M:%S')
current_month = datetime.now().strftime('%Y-%m')
print(f"过滤阈值 updated_time < {three_months_ago};附加 month={current_month}")
con_info = DBUtil.get_connection_info(db_type='postgresql_cluster', site_name=site_name)
table_name = 'us_asin_profit_keepa_add'
# 3. 读取 PG 已导出 asin 集合,用于 LEFT ANTI 剔除(避免重复触发爬虫)
# PG 端只取 asin 列全量,Spark 端 dropDuplicates 去重
df_pg_existing = spark.read.format("jdbc") \
.option("url", con_info["url"]) \
.option("dbtable", table_name) \
.option("user", con_info["username"]) \
.option("password", con_info["pwd"]) \
.load() \
.select('asin') \
.dropDuplicates(['asin']).cache()
print(f"PG 已导出 asin 数量:{df_pg_existing.count():,}")
# 4. 过滤 3 个月以前 + LEFT ANTI 剔除已导出
df_need_export = df_all.filter(F.col('updated_time') < F.lit(three_months_ago)) \
.select(
F.col('asin'),
F.lit(current_month).alias('month'),
).cache()
) \
.join(df_pg_existing, on='asin', how='left_anti') \
.cache()
print(f"导出数据量:{df_need_export.count():,}")
con_info = DBUtil.get_connection_info(db_type='postgresql_cluster', site_name=site_name)
table_name = 'us_asin_profit_keepa_add'
df_need_export.write.format("jdbc") \
.option("url", con_info["url"]) \
.option("dbtable", table_name) \
......
"""
author: CT
description: 导出待计算利润率的 ASIN
1) Hive dwt_flow_asin 月维度读取 date_info >= '2025-05' 的所有 ASIN:
asin / price / category_first_id / asin_crawl_date
2) Doris dwt.{site}_flow_asin_30day 读取所有相关 ASIN:
asin / price / category_first_id / asin_crawl_date
3) union 后按 (asin, price) 去重保留 asin_crawl_date 最新
4) LEFT JOIN 分类、INNER JOIN keepa 增量
keepa 表已整合为单分区快照,按 updated_time > last_date_info 筛增量
5) keepa 关联到的 ASIN 全部导出 PG {site}_asin_profit_rate_calc 重新计算利润率
执行示例: spark-submit export_need_profit_rate.py us 2026-05-15
"""
import os
import sys
from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0]))
from pyspark.sql import functions as F, Window
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from utils.DorisHelper import DorisHelper
START_MONTH = '2025-05'
class ExportNeedProfitRate(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info # 计算时间 yyyy-MM-dd
# keepa 增量过滤下限:date_info - 1 天(如 date_info=2026-05-13 → last_date_info=2026-05-12)
self.last_date_info = (datetime.strptime(date_info, "%Y-%m-%d").date() - timedelta(days=1)).strftime("%Y-%m-%d")
self.spark = SparkUtil.get_spark_session(
f"{self.__class__.__name__}: {self.site_name} {self.date_info}"
)
def run(self):
df_export = self.build_export_df()
self.write_to_pg(df_export)
def build_export_df(self):
# 1. Hive dwt_flow_asin 月维度,date_info >= 2025-05 所有月份
sql_dwt = f"""
SELECT asin,
asin_price AS price,
category_first_id,
asin_crawl_date
FROM dwt_flow_asin
WHERE site_name = '{self.site_name}'
AND date_type = 'month'
AND date_info >= '{START_MONTH}'
AND asin_price > 0
"""
print(f"sql_dwt =\n{sql_dwt}")
df_dwt = self.spark.sql(sqlQuery=sql_dwt) \
.withColumn('price', F.round(F.col('price'), 2).cast('decimal(20,2)')) \
.withColumn('asin_crawl_date', F.to_timestamp(F.col('asin_crawl_date')))
# 2. Doris dwt.{site}_flow_asin_30day 所有相关 ASIN
# spark_import_with_connector 不支持 WHERE 下推,price > 0 过滤改到 Spark 端
table_identifier = f"dwt.{self.site_name}_flow_asin_30day"
read_fields = "asin,price,category_first_id,asin_crawl_date"
df_doris = DorisHelper.spark_import_with_connector(self.spark, table_identifier, read_fields) \
.filter(F.col('price') > 0) \
.withColumn('price', F.round(F.col('price'), 2).cast('decimal(20,2)')) \
.withColumn('asin_crawl_date', F.col('asin_crawl_date').cast('timestamp')) \
.select('asin', 'price', 'category_first_id', 'asin_crawl_date')
# 3. union + 按 (asin, price) 去重保留 asin_crawl_date 最新
df_flow = df_dwt.unionByName(df_doris).repartition(40, 'asin', 'price')
window = Window.partitionBy('asin', 'price').orderBy(F.col('asin_crawl_date').desc_nulls_last())
df_flow = df_flow.withColumn('rk', F.row_number().over(window)) \
.filter('rk = 1') \
.drop('rk') \
.cache()
# 4. 分类名 LEFT JOIN
sql_cate = f"""
SELECT category_first_id, en_name AS category
FROM dim_bsr_category_tree
WHERE site_name = '{self.site_name}' AND nodes_num = 2
"""
df_cate = self.spark.sql(sqlQuery=sql_cate)
# 5. keepa 当日增量 INNER JOIN
# keepa 表已整合为单分区快照,用 updated_time > last_date_info 筛"近一天更新"的增量
sql_keepa = f"""
SELECT asin, package_length, package_width, package_height, item_weight AS weight
FROM dim_keepa_asin_info
WHERE site_name = '{self.site_name}' AND updated_time >= '{self.last_date_info}'
"""
df_keepa = self.spark.sql(sqlQuery=sql_keepa) \
.filter((F.col('package_length') > 0) &
(F.col('package_width') > 0) &
(F.col('package_height') > 0) &
(F.col('weight') > 0)) \
.repartition(40, 'asin')
df_result = df_flow \
.join(df_cate, on='category_first_id', how='left') \
.join(df_keepa, on='asin', how='inner') \
.withColumn('source_month', F.date_format(F.col('asin_crawl_date'), 'yyyy-MM')) \
.withColumn('part_key', F.ntile(50).over(Window.orderBy(F.rand()))) \
.select(
'asin', 'price', 'category',
'package_length', 'package_width', 'package_height', 'weight',
'part_key', 'source_month', 'asin_crawl_date',
).cache()
count = df_result.count()
print(f"待计算利润率数据量:{count:,}")
df_result.show(10, truncate=False)
df_flow.unpersist()
return df_result
def write_to_pg(self, df_export):
con_info = DBUtil.get_connection_info(db_type='postgresql_cluster', site_name=self.site_name)
table_name = f"{self.site_name}_asin_profit_rate_calc"
print(f"导出到 PG {table_name}")
df_export.write.format("jdbc") \
.option("url", con_info["url"]) \
.option("dbtable", table_name) \
.option("user", con_info["username"]) \
.option("password", con_info["pwd"]) \
.mode("append") \
.save()
print("success")
if __name__ == "__main__":
site_name = sys.argv[1]
date_info = sys.argv[2]
ExportNeedProfitRate(site_name, date_info).run()
"""
author: CT
description: sqoop 从 PG {site_name}_asin_profit_rate_calc 增量拉取利润率数据
写入 Hive ods_asin_profit_rate 分区 (site_name, date_info)
增量时间窗:[date_info-1天 00:00:00, 当天运行时刻的 00:00:00)
执行示例: python ods_asin_profit_rate.py us 2026-05-15
"""
import os
import sys
from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "site_name 不能为空!"
assert date_info is not None, "date_info 不能为空!"
last_date_info = (datetime.strptime(date_info, "%Y-%m-%d").date() - timedelta(days=1)).strftime("%Y-%m-%d")
# 上限:程序运行当天 00:00:00,避免拉到 PG 当日正在写入的数据
upper_bound = datetime.now().strftime("%Y-%m-%d") + " 00:00:00"
hive_table = "ods_asin_profit_rate"
import_table = f"{site_name}_asin_profit_rate_calc"
partition_dict = {"site_name": site_name, "date_info": date_info}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
engine = get_remote_engine(site_name=site_name, db_type='postgresql_cluster')
query = f"""
SELECT asin, price, category, ocean_profit, air_profit,
package_length, package_width, package_height, weight,
updated_time, asin_crawl_date
FROM {import_table}
WHERE updated_time >= '{last_date_info}' AND updated_time < '{upper_bound}' AND \\$CONDITIONS
"""
print(f"sqoop query:\n{query}")
engine.sqoop_raw_import(
query=query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
check_count=True,
)
print("success!")
......@@ -33,9 +33,9 @@ if __name__ == '__main__':
db_type=db_type
)
# 增量区间:[last_date_info, 程序运行时刻)
# 增量区间:[last_date_info, 程序运行当天 00:00:00)
# 下限:last_date_info(前一日),间隔几天补跑时传更早 date_info 自动覆盖区间
# 上限:upper_bound(程序运行时刻),半开区间避免拉到 PG 正在写入的最新数据,且不固定为一天
# 上限:upper_bound(程序运行当天 00:00:00),半开区间避免拉到 PG 当日正在写入的数据
query = f"""
SELECT asin, last_detail::text as last_detail, update_at FROM {import_table}
WHERE update_at >= '{last_date_info}' AND update_at < '{upper_bound}' AND last_detail is not null AND \$CONDITIONS
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment