Commit 775f1ccb by chenyuanjie

keepa流程优化:断点续传+Doris表优化+筛选过期keepa

parent 4af80620
...@@ -24,8 +24,8 @@ class DimKeepaAsinInfo(object): ...@@ -24,8 +24,8 @@ class DimKeepaAsinInfo(object):
self.df_save = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;")
# doris相关配置 # doris相关配置
self.doris_db = "selection" self.doris_db = "dwd"
self.doris_table = f"{self.site_name}_keepa_asin_latest_detail" self.doris_table = "dwd_keepa_asin_detail"
self.df_to_doris = self.spark.sql(f"select 1+1;") self.df_to_doris = self.spark.sql(f"select 1+1;")
def run(self): def run(self):
...@@ -52,7 +52,8 @@ class DimKeepaAsinInfo(object): ...@@ -52,7 +52,8 @@ class DimKeepaAsinInfo(object):
F.get_json_object("last_detail", "$.title").alias("title"), F.get_json_object("last_detail", "$.title").alias("title"),
F.col('updated_time') F.col('updated_time')
).withColumn( ).withColumn(
'weight', F.greatest(F.col("package_weight"), F.col("item_weight")) # weight 字段已废弃(Doris 端 dwd_keepa_asin_detail 不再保留),Hive 兼容 schema 保留列但置 NULL
'weight', F.lit(None).cast('int')
).withColumn( ).withColumn(
"min_time", F.least( "min_time", F.least(
F.when(F.col("listed_since") != 0, F.col("listed_since")), F.when(F.col("listed_since") != 0, F.col("listed_since")),
...@@ -63,7 +64,7 @@ class DimKeepaAsinInfo(object): ...@@ -63,7 +64,7 @@ class DimKeepaAsinInfo(object):
F.when( F.when(
F.col("min_time").isNull(), F.lit(None) F.col("min_time").isNull(), F.lit(None)
).otherwise( ).otherwise(
F.date_format(F.from_unixtime((F.col("min_time") + F.lit(21564000)) * 60), "yyyy-MM-dd") F.date_format(F.from_unixtime((F.col("min_time") + F.lit(21564000)) * 60), "yyyy-MM-dd HH:mm:ss")
) )
).select( ).select(
F.col('asin'), F.col('asin'),
...@@ -84,9 +85,14 @@ class DimKeepaAsinInfo(object): ...@@ -84,9 +85,14 @@ class DimKeepaAsinInfo(object):
# 过滤脏数据:productType in (3,4,5) 且 title 为空的异常数据不做保留 # 过滤脏数据:productType in (3,4,5) 且 title 为空的异常数据不做保留
~(F.col("product_type").isin(3, 4, 5) & F.col("title").isNull()) ~(F.col("product_type").isin(3, 4, 5) & F.col("title").isNull())
).cache() ).cache()
# 写入 Doris 需带 site_name 分区字段,并把 keepa_launch_time 转为 DATETIME(DDL 类型已改)
# weight 字段在 Doris dwd_keepa_asin_detail 已废弃,select 不带
self.df_to_doris = self.df_keepa_asin.select( self.df_to_doris = self.df_keepa_asin.select(
'asin', 'package_length', 'package_width', 'package_height', 'package_weight', 'item_weight', 'weight', F.lit(self.site_name).alias('site_name'),
'listed_since', 'release_date', 'tracking_since', 'keepa_launch_time', 'updated_time' 'asin', 'package_length', 'package_width', 'package_height', 'package_weight', 'item_weight',
'listed_since', 'release_date', 'tracking_since',
F.to_timestamp(F.col('keepa_launch_time')).alias('keepa_launch_time'),
'updated_time'
) )
# 读取历史数据 # 读取历史数据
...@@ -126,19 +132,28 @@ class DimKeepaAsinInfo(object): ...@@ -126,19 +132,28 @@ class DimKeepaAsinInfo(object):
print("success!") print("success!")
if new_count >= old_count: if new_count >= old_count:
print(f"正在删除历史分区数据") # 删除所有 date_info < 当前 date_info 的历史分区(兼容间隔几天才执行的场景)
self.spark.sql(f""" print(f"正在删除所有 date_info < {self.date_info} 的历史分区数据")
ALTER TABLE {hive_tb} DROP IF EXISTS PARTITION (site_name='{self.site_name}', date_info='{self.last_date_info}') partitions = self.spark.sql(f"SHOW PARTITIONS {hive_tb}").collect()
""") for row in partitions:
# row[0] 形如 "site_name=us/date_info=2026-05-10"
parts = dict(p.split('=') for p in row[0].split('/'))
p_site = parts.get('site_name')
p_date = parts.get('date_info')
if p_site == self.site_name and p_date < self.date_info:
print(f" 删除分区: site_name={p_site}, date_info={p_date}")
self.spark.sql(
f"ALTER TABLE {hive_tb} DROP IF EXISTS PARTITION (site_name='{p_site}', date_info='{p_date}')"
)
HdfsUtils.delete_hdfs_file( HdfsUtils.delete_hdfs_file(
CommonUtil.build_hdfs_path(hive_tb, partition_dict={"site_name": self.site_name, "date_info": self.last_date_info}) CommonUtil.build_hdfs_path(hive_tb, partition_dict={"site_name": p_site, "date_info": p_date})
) )
print("success!") print("历史分区清理完毕!")
# 写入Doris表 # 写入Doris表
print("往doris存储最新keepa详情信息:") print("往doris存储最新keepa详情信息:")
doris_table_columns = """ doris_table_columns = """
asin, package_length, package_width, package_height, package_weight, item_weight, weight, site_name, asin, package_length, package_width, package_height, package_weight, item_weight,
listed_since, release_date, tracking_since, keepa_launch_time, updated_time listed_since, release_date, tracking_since, keepa_launch_time, updated_time
""" """
DorisHelper.spark_export_with_columns( DorisHelper.spark_export_with_columns(
......
"""
author: CT
description: 从 Hive dim_keepa_asin_info 读取 keepa 数据,过滤 updated_time 超过 3 个月的 asin,
附带当前月份字段 month(yyyy-MM),导出到 PG us_asin_profit_keepa_add,触发爬虫重新抓取 Keepa
执行示例: spark-submit export_keepa_asin_del.py us
"""
import os
import sys
from datetime import datetime
from dateutil.relativedelta import relativedelta
sys.path.append(os.path.dirname(sys.path[0]))
from pyspark.sql import functions as F
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
if __name__ == '__main__':
site_name = sys.argv[1]
spark = SparkUtil.get_spark_session(f'export_keepa_asin_del_{site_name}')
# 1. 全量读取 keepa 数据(updated_time 过滤改到 Spark 端)
sql = f"""
SELECT asin, updated_time
FROM dim_keepa_asin_info
WHERE site_name = '{site_name}'
"""
print(f"sql=\n{sql}")
df_all = spark.sql(sqlQuery=sql)
# 2. Spark 端过滤超过 3 个月的 asin(数据读取后处理,不在 Hive SQL 中算)
three_months_ago = (datetime.now() - relativedelta(months=3)).strftime('%Y-%m-%d %H:%M:%S')
current_month = datetime.now().strftime('%Y-%m')
print(f"过滤阈值 updated_time < {three_months_ago};附加 month={current_month}")
df_need_export = df_all.filter(F.col('updated_time') < F.lit(three_months_ago)) \
.select(
F.col('asin'),
F.lit(current_month).alias('month'),
).cache()
print(f"导出数据量:{df_need_export.count():,}")
con_info = DBUtil.get_connection_info(db_type='postgresql_cluster', site_name=site_name)
table_name = 'us_asin_profit_keepa_add'
df_need_export.write.format("jdbc") \
.option("url", con_info["url"]) \
.option("dbtable", table_name) \
.option("user", con_info["username"]) \
.option("password", con_info["pwd"]) \
.mode("append") \
.save()
df_need_export.unpersist()
print("success")
...@@ -31,9 +31,11 @@ if __name__ == '__main__': ...@@ -31,9 +31,11 @@ if __name__ == '__main__':
db_type=db_type db_type=db_type
) )
# 不再限制 update_at 上限:拉取 last_date_info 之后所有更新数据
# 适配间隔几天才执行的场景(间隔多日时传更早的 date_info,自动覆盖间隔区间全部增量)
query = f""" query = f"""
SELECT asin, last_detail::text as last_detail, update_at FROM {import_table} SELECT asin, last_detail::text as last_detail, update_at FROM {import_table}
WHERE update_at >= '{last_date_info}' AND update_at < '{date_info}' AND last_detail is not null AND \$CONDITIONS WHERE update_at >= '{last_date_info}' AND last_detail is not null AND \$CONDITIONS
""" """
engine.sqoop_raw_import( engine.sqoop_raw_import(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment