Commit 2f887935 by chenyuanjie

fix

parent fd4b7fdd
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
author: CT author: CT
description: 从 Hive dim_keepa_asin_info 读取 keepa 数据,过滤 updated_time 超过 3 个月的 asin, description: 从 Hive dim_keepa_asin_info 读取 keepa 数据,过滤 updated_time 超过 3 个月的 asin,
附带当前月份字段 month(yyyy-MM),导出到 PG us_asin_profit_keepa_add,触发爬虫重新抓取 Keepa 附带当前月份字段 month(yyyy-MM),导出到 PG us_asin_profit_keepa_add,触发爬虫重新抓取 Keepa
导出前 LEFT ANTI 剔除 PG 表中已存在的 asin,避免重复触发
执行示例: spark-submit export_keepa_asin_del.py us 执行示例: spark-submit export_keepa_asin_del.py us
""" """
import os import os
...@@ -35,15 +36,31 @@ if __name__ == '__main__': ...@@ -35,15 +36,31 @@ if __name__ == '__main__':
current_month = datetime.now().strftime('%Y-%m') current_month = datetime.now().strftime('%Y-%m')
print(f"过滤阈值 updated_time < {three_months_ago};附加 month={current_month}") print(f"过滤阈值 updated_time < {three_months_ago};附加 month={current_month}")
con_info = DBUtil.get_connection_info(db_type='postgresql_cluster', site_name=site_name)
table_name = 'us_asin_profit_keepa_add'
# 3. 读取 PG 已导出 asin 集合,用于 LEFT ANTI 剔除(避免重复触发爬虫)
# PG 端只取 asin 列全量,Spark 端 dropDuplicates 去重
df_pg_existing = spark.read.format("jdbc") \
.option("url", con_info["url"]) \
.option("dbtable", table_name) \
.option("user", con_info["username"]) \
.option("password", con_info["pwd"]) \
.load() \
.select('asin') \
.dropDuplicates(['asin']).cache()
print(f"PG 已导出 asin 数量:{df_pg_existing.count():,}")
# 4. 过滤 3 个月以前 + LEFT ANTI 剔除已导出
df_need_export = df_all.filter(F.col('updated_time') < F.lit(three_months_ago)) \ df_need_export = df_all.filter(F.col('updated_time') < F.lit(three_months_ago)) \
.select( .select(
F.col('asin'), F.col('asin'),
F.lit(current_month).alias('month'), F.lit(current_month).alias('month'),
).cache() ) \
.join(df_pg_existing, on='asin', how='left_anti') \
.cache()
print(f"导出数据量:{df_need_export.count():,}") print(f"导出数据量:{df_need_export.count():,}")
con_info = DBUtil.get_connection_info(db_type='postgresql_cluster', site_name=site_name)
table_name = 'us_asin_profit_keepa_add'
df_need_export.write.format("jdbc") \ df_need_export.write.format("jdbc") \
.option("url", con_info["url"]) \ .option("url", con_info["url"]) \
.option("dbtable", table_name) \ .option("dbtable", table_name) \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment