Commit b34a7b2c by chenyuanjie

keepa导出pg

parent 5efdfc6e
......@@ -60,7 +60,7 @@ if __name__ == '__main__':
print(f"取最近 3 个月:{months_3}")
sql_dwt = f"""
SELECT asin, asin_bought_month, asin_crawl_date
SELECT asin, asin_bought_month
FROM dwt_flow_asin
WHERE site_name = '{site_name}'
AND date_type = 'month'
......@@ -68,40 +68,30 @@ if __name__ == '__main__':
AND asin_bought_month > 0
"""
print(f"sql_dwt=\n{sql_dwt}")
# 统一 asin_crawl_date 为 string 类型(dwt 已是 string,确保 union 兼容)
df_dwt_sales = spark.sql(sqlQuery=sql_dwt) \
.withColumn('asin_crawl_date', F.col('asin_crawl_date').cast('string')) \
.repartition(40, 'asin')
df_dwt_sales = spark.sql(sqlQuery=sql_dwt).repartition(40, 'asin')
# 4. 读取 Doris dwt.{site}_flow_asin_30day asin + 月销 + 抓取时间
# 4. 读取 Doris dwt.{site}_flow_asin_30day asin + 月销
# spark_import_with_connector 不支持 WHERE 下推,asin_bought_month > 0 过滤改到 Spark 端;
# doris.read.field 不保证只返回指定字段,显式 select 收窄 schema 确保 union 兼容
table_identifier = f"dwt.{site_name}_flow_asin_30day"
read_fields = "asin,asin_bought_month,asin_crawl_date"
read_fields = "asin,asin_bought_month"
df_doris_sales = DorisHelper.spark_import_with_connector(spark, table_identifier, read_fields) \
.filter(F.col('asin_bought_month') > 0) \
.select(
F.col('asin'),
F.col('asin_bought_month'),
F.date_format(F.col('asin_crawl_date'), 'yyyy-MM-dd HH:mm:ss').alias('asin_crawl_date'),
) \
.select(F.col('asin'), F.col('asin_bought_month')) \
.repartition(40, 'asin')
# 5. union 两源,按 asin 聚合:月销累加 + 抓取时间取最大,过滤累加月销 >= 50
# 5. union 两源,按 asin 聚合月销累加,过滤累加月销 >= 50
SALES_THRESHOLD = 50
df_asin_with_sales = df_dwt_sales.unionByName(df_doris_sales) \
.groupBy('asin') \
.agg(
F.sum('asin_bought_month').alias('total_bought_month'),
F.max('asin_crawl_date').alias('max_asin_crawl_date'),
) \
.agg(F.sum('asin_bought_month').alias('total_bought_month')) \
.filter(F.col('total_bought_month') >= SALES_THRESHOLD) \
.select('asin', 'max_asin_crawl_date') \
.select('asin') \
.repartition(40, 'asin') \
.cache()
print(f"累加月销 >= {SALES_THRESHOLD} 的 asin 数量:{df_asin_with_sales.count():,}")
# 6. INNER JOIN:只保留"过期 + 近期有月销"的 asin,携带最大抓取时间
# 6. INNER JOIN:只保留"过期 + 近期有月销"的 asin
df_target = df_keepa_expired.join(df_asin_with_sales, on='asin', how='inner').cache()
print(f"过期且近期有月销的 asin 数量:{df_target.count():,}")
......@@ -118,12 +108,12 @@ if __name__ == '__main__':
.dropDuplicates(['asin']).cache()
print(f"PG 已导出 asin 数量:{df_pg_existing.count():,}")
# 8. LEFT ANTI 剔除已导出 + 附加 month 字段(从 max_asin_crawl_date 截取 yyyy-MM
# 8. LEFT ANTI 剔除已导出 + 附加 month 字段(过期 keepa 默认 month = '1970-01'
df_need_export = df_target \
.join(df_pg_existing, on='asin', how='left_anti') \
.select(
F.col('asin'),
F.substring(F.col('max_asin_crawl_date'), 1, 7).alias('month'),
F.lit('1970-01').alias('month'),
).cache()
print(f"导出数据量:{df_need_export.count():,}")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment