Commit 415179d3 by chenyuanjie

每日导出过期keepa

parent a3a44cc8
"""
author: CT
description: 从 Hive dim_keepa_asin_info 读取 keepa 数据,过滤 updated_time 超过 3 个月的 asin,
附带当前月份字段 month(yyyy-MM),导出到 PG us_asin_profit_keepa_add,触发爬虫重新抓取 Keepa
导出前 LEFT ANTI 剔除 PG 表中已存在的 asin,避免重复触发
description: 从 Hive dim_keepa_asin_info 读取 keepa 数据,过滤 updated_time 超过 3 个月的 asin(keepa 过期),
并关联近期有月销的 asin(dwt_flow_asin 最近 3 个月 ∪ Doris dwt.{site}_flow_asin_30day),
只导出"过期 + 近期有月销"的 asin 到 PG us_asin_profit_keepa_add,触发爬虫重新抓取 Keepa。
导出前 LEFT ANTI 剔除 PG 表中已存在的 asin,避免重复触发。
执行示例: spark-submit export_keepa_asin_del.py us
"""
import os
......@@ -16,6 +17,7 @@ from pyspark.sql import functions as F
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from utils.DorisHelper import DorisHelper
if __name__ == '__main__':
site_name = sys.argv[1]
......@@ -23,25 +25,89 @@ if __name__ == '__main__':
spark = SparkUtil.get_spark_session(f'export_keepa_asin_del_{site_name}')
# 1. 全量读取 keepa 数据(updated_time 过滤改到 Spark 端)
sql = f"""
sql_keepa = f"""
SELECT asin, updated_time
FROM dim_keepa_asin_info
WHERE site_name = '{site_name}'
"""
print(f"sql=\n{sql}")
df_all = spark.sql(sqlQuery=sql).cache()
print(f"全量读取 keepa 数据:{df_all.count()}")
print(f"sql_keepa=\n{sql_keepa}")
df_keepa = spark.sql(sqlQuery=sql_keepa).cache()
print(f"全量读取 keepa 数据:{df_keepa.count():,}")
# 2. Spark 端过滤超过 3 个月的 asin(数据读取后处理,不在 Hive SQL 中算
# 2. 过滤超过 3 个月的 asin(keepa 过期
three_months_ago = (datetime.now() - relativedelta(months=3)).strftime('%Y-%m-%d %H:%M:%S')
current_month = datetime.now().strftime('%Y-%m')
print(f"过滤阈值 updated_time < {three_months_ago};附加 month={current_month}")
print(f"过滤阈值 updated_time < {three_months_ago}")
df_keepa_expired = df_keepa.filter(F.col('updated_time') < F.lit(three_months_ago)) \
.select('asin') \
.dropDuplicates(['asin']) \
.repartition(40, 'asin') \
.cache()
print(f"keepa 过期 asin 数量:{df_keepa_expired.count():,}")
# 3. 读取 Hive dwt_flow_asin 最近 3 个月(基于 MySQL workflow_everyday 最大 report_date 向前推 3 个月)、月销 > 0 的 asin
sql_max_month = f"select MAX(report_date) as date_info from workflow_everyday where site_name = '{site_name}' and date_type = 'month' and page = '流量选品'"
print(f"sql_max_month=\n{sql_max_month}")
mysql_con = DBUtil.get_connection_info('mysql', site_name)
max_date_info = SparkUtil.read_jdbc_query(
session=spark, url=mysql_con['url'],
pwd=mysql_con['pwd'], username=mysql_con['username'], query=sql_max_month,
).collect()[0]['date_info']
print(f"workflow_everyday 最大 report_date:{max_date_info}")
base_dt = datetime.strptime(str(max_date_info), '%Y-%m')
months_3 = [(base_dt - relativedelta(months=i)).strftime('%Y-%m') for i in range(3)]
months_in = ",".join([f"'{m}'" for m in months_3])
print(f"取最近 3 个月:{months_3}")
sql_dwt = f"""
SELECT asin, asin_bought_month, asin_crawl_date
FROM dwt_flow_asin
WHERE site_name = '{site_name}'
AND date_type = 'month'
AND date_info IN ({months_in})
AND asin_bought_month > 0
"""
print(f"sql_dwt=\n{sql_dwt}")
# 统一 asin_crawl_date 为 string 类型(dwt 已是 string,确保 union 兼容)
df_dwt_sales = spark.sql(sqlQuery=sql_dwt) \
.withColumn('asin_crawl_date', F.col('asin_crawl_date').cast('string')) \
.repartition(40, 'asin')
# 4. 读取 Doris dwt.{site}_flow_asin_30day asin + 月销 + 抓取时间
# spark_import_with_connector 不支持 WHERE 下推,asin_bought_month > 0 过滤改到 Spark 端;
# doris.read.field 不保证只返回指定字段,显式 select 收窄 schema 确保 union 兼容
table_identifier = f"dwt.{site_name}_flow_asin_30day"
read_fields = "asin,asin_bought_month,asin_crawl_date"
df_doris_sales = DorisHelper.spark_import_with_connector(spark, table_identifier, read_fields) \
.filter(F.col('asin_bought_month') > 0) \
.select(
F.col('asin'),
F.col('asin_bought_month'),
F.date_format(F.col('asin_crawl_date'), 'yyyy-MM-dd HH:mm:ss').alias('asin_crawl_date'),
) \
.repartition(40, 'asin')
# 5. union 两源,按 asin 聚合:月销累加 + 抓取时间取最大,过滤累加月销 >= 50
SALES_THRESHOLD = 50
df_asin_with_sales = df_dwt_sales.unionByName(df_doris_sales) \
.groupBy('asin') \
.agg(
F.sum('asin_bought_month').alias('total_bought_month'),
F.max('asin_crawl_date').alias('max_asin_crawl_date'),
) \
.filter(F.col('total_bought_month') >= SALES_THRESHOLD) \
.select('asin', 'max_asin_crawl_date') \
.repartition(40, 'asin') \
.cache()
print(f"累加月销 >= {SALES_THRESHOLD} 的 asin 数量:{df_asin_with_sales.count():,}")
# 6. INNER JOIN:只保留"过期 + 近期有月销"的 asin,携带最大抓取时间
df_target = df_keepa_expired.join(df_asin_with_sales, on='asin', how='inner').cache()
print(f"过期且近期有月销的 asin 数量:{df_target.count():,}")
# 7. 读取 PG 已导出 asin(去重),用于 LEFT ANTI 剔除
con_info = DBUtil.get_connection_info(db_type='postgresql_cluster', site_name=site_name)
table_name = 'us_asin_profit_keepa_add'
# 3. 读取 PG 已导出 asin 集合,用于 LEFT ANTI 剔除(避免重复触发爬虫)
# PG 端只取 asin 列全量,Spark 端 dropDuplicates 去重
df_pg_existing = spark.read.format("jdbc") \
.option("url", con_info["url"]) \
.option("dbtable", table_name) \
......@@ -52,14 +118,13 @@ if __name__ == '__main__':
.dropDuplicates(['asin']).cache()
print(f"PG 已导出 asin 数量:{df_pg_existing.count():,}")
# 4. 过滤 3 个月以前 + LEFT ANTI 剔除已导出
df_need_export = df_all.filter(F.col('updated_time') < F.lit(three_months_ago)) \
# 8. LEFT ANTI 剔除已导出 + 附加 month 字段(从 max_asin_crawl_date 截取 yyyy-MM)
df_need_export = df_target \
.join(df_pg_existing, on='asin', how='left_anti') \
.select(
F.col('asin'),
F.lit(current_month).alias('month'),
) \
.join(df_pg_existing, on='asin', how='left_anti') \
.cache()
F.substring(F.col('max_asin_crawl_date'), 1, 7).alias('month'),
).cache()
print(f"导出数据量:{df_need_export.count():,}")
df_need_export.write.format("jdbc") \
......@@ -70,5 +135,10 @@ if __name__ == '__main__':
.mode("append") \
.save()
df_keepa.unpersist()
df_keepa_expired.unpersist()
df_asin_with_sales.unpersist()
df_target.unpersist()
df_pg_existing.unpersist()
df_need_export.unpersist()
print("success")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment