Commit dac2671d by hejiangming

月搜索词增加新字段 增加中间表 dim_st_detail_history 中间表 在dim_st_detail流程后 记录搜索词 首次与最后一次出现的时间 …

月搜索词增加新字段  增加中间表 dim_st_detail_history 中间表 在dim_st_detail流程后 记录搜索词 首次与最后一次出现的时间  导出增加hive判断 对应要导出的分区为空就终止代码
parent 75ae57fd
"""
@Description : ABA搜索词全历史出现中间表 - 每月增量更新脚本
@业务背景 :
本中间表 dim_st_detail_history 服务于 dwt_aba_st_analytics.is_first_ever_text 字段判断
(全历史首次出现)。每月 dim_st_detail 跑完后,通过本脚本把当月数据合并进中间表,
保持中间表始终是"截止当月"的最新状态。
@数据流向 :
dim_st_detail_history (M-1 状态) + dim_st_detail (当月分区)
↓ UNION + groupby search_term + MIN(first) / MAX(last)
dim_st_detail_history (M 状态)
@调度位置 : 每月 dim_st_detail 跑完之后、dwt_aba_st_analytics 跑之前
@幂等性 :
- 当月 dim 词集合不变 → 重跑结果完全一致
- 当月 dim 词集合缩小(某词被剔除)→ 该词在中间表里被自动清除
- 当月 dim 词集合扩大(多了新词)→ 新词被正确追加
@SourceTable : dim_st_detail (date_type='month', date_info=当月)
+ dim_st_detail_history (本表自身,读取历史状态)
@SinkTable : dim_st_detail_history
@CreateTime : 2026-05-07
@Param : site_name (us / uk / de) + date_info (YYYY-MM, 当月)
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
class DimStDetailHistory(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info
self.hive_tb = "dim_st_detail_history"
self.hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dim/{self.hive_tb}/site_name={self.site_name}"
self.partitions_by = ['site_name']
app_name = f"{self.hive_tb}: {site_name}, {date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
# 占位 DataFrame
self.df_history = self.spark.sql("select 1+1;")
self.df_current_month = self.spark.sql("select 1+1;")
self.df_save = self.spark.sql("select 1+1;")
def run(self):
# 启动校验:防止误传历史月份破坏中间表
self.check_rollback()
# 读取中间表历史数据 + dim 当月数据
self.read_data()
# UNION + GROUP BY 合并
self.handle_data()
# 写入中间表(覆盖 site_name 分区)
self.save_data()
def check_rollback(self):
# 中间表 max(last) 应该 ≤ 当前传入的 date_info
# 如果 > 当前 date_info,说明中间表已包含未来月份数据 → 判断为回溯历史月
# 直接跑增量会让 first=当月 的少量词的 last 字段被错误重置 → 走 init 重建更安全
check_sql = f"""
SELECT MAX(date_info_last) AS max_last
FROM {self.hive_tb}
WHERE site_name = '{self.site_name}'
"""
max_last = self.spark.sql(check_sql).collect()[0]['max_last']
if max_last is not None and max_last > self.date_info:
raise Exception(
f"中间表 max(date_info_last)={max_last} > 当前 {self.date_info},"
f"判断为回溯历史月场景。\n"
f"请改跑 dim_st_detail_history_init.py 重建中间表。"
)
print(f"[校验通过] 中间表 max(date_info_last)={max_last} ≤ 当前 {self.date_info}")
def read_data(self):
# 分支1的源:中间表里 first ≠ 当月 的所有词
# 用 != 而不是 <:误操作传历史月时数据损失最小
# (比如当前 2026-05 误传 2026-04,!= 只排除 first=2026-04 的少数词,
# 而 < 会排除 first=2026-04 + first=2026-05 一大批词)
history_sql = f"""
SELECT
search_term,
date_info_first,
date_info_last
FROM {self.hive_tb}
WHERE site_name = '{self.site_name}'
AND date_info_first != '{self.date_info}'
"""
print(f"\n[读中间表历史 SQL]\n{history_sql}")
self.df_history = self.spark.sql(history_sql)
# 分支2的源:dim 当月所有词(每词去重一次,因为 dim_st_detail 当月分区每词应该只一行,
# 但 distinct 防御一下避免脏数据)
current_sql = f"""
SELECT DISTINCT search_term
FROM dim_st_detail
WHERE site_name = '{self.site_name}'
AND date_type = 'month'
AND date_info = '{self.date_info}'
"""
print(f"\n[读 dim 当月 SQL]\n{current_sql}")
self.df_current_month = self.spark.sql(current_sql)
def handle_data(self):
# 分支2:当月词加上占位 first=last=当月
# 这样 UNION 后 GROUP BY MIN/MAX 时:
# - 老词:分支1 (first=老月份) UNION 分支2 (first=当月) → MIN=老月份不变 ✓
# 分支1 (last=之前月份) UNION 分支2 (last=当月) → MAX=当月(更新成功)✓
# - 新词:只在分支2 → first=last=当月 ✓
# - 当月被 dim 剔除的词:既不在分支1(被 != 过滤)也不在分支2(dim 没了)→ 自动消失 ✓
df_branch2 = self.df_current_month.select(
'search_term',
F.lit(self.date_info).alias('date_info_first'),
F.lit(self.date_info).alias('date_info_last')
)
# UNION + GROUP BY 聚合
df_union = self.df_history.unionByName(df_branch2)
self.df_save = df_union.groupBy('search_term').agg(
F.min('date_info_first').alias('date_info_first'),
F.max('date_info_last').alias('date_info_last')
)
def save_data(self):
# 加分区列(saveAsTable + partitionBy 需要 DataFrame 里有该列)
self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name))
# repartition 控制文件数(与 init 一致策略:us 5 个、uk/de 3 个)
target_partitions = 15 if self.site_name == 'us' else 10
self.df_save = self.df_save.repartition(target_partitions)
print(f"\n[repartition] 目标文件数: {target_partitions}")
# 关键:先 cache + count 强制把数据物化到内存
# 因为本脚本"自读自写"(读 dim_st_detail_history → 写 dim_st_detail_history),
# 必须在 delete_file_in_folder 之前完成对源数据的读取,否则 Spark 的 lazy execution
# 会让 saveAsTable 时才去读累加表,但此时累加表已被 delete 删空,导致历史数据丢失
self.df_save = self.df_save.cache()
row_count = self.df_save.count()
print(f"\n[已物化] 待写入行数: {row_count}, 目标文件数: {target_partitions}")
# delete_file_in_folder + saveAsTable(format='hive', mode='append'):项目主流写法
# delete_file_in_folder 删文件保目录,避免 Hive 找不到目录的 WARN
# format='hive' 让 Spark 按表 SerDe(LZO 文本)自动写入
# mode='append' 配合前面的 delete_file_in_folder 实现"覆盖整个 site_name 分区"语义
print(f"\n清除分区文件: {self.hdfs_path}")
HdfsUtils.delete_file_in_folder(self.hdfs_path)
print(f"写入表: {self.hive_tb}, 分区: {self.partitions_by}")
self.df_save.write.saveAsTable(
name=self.hive_tb,
format='hive',
mode='append',
partitionBy=self.partitions_by
)
self.df_save.unpersist()
print("[写入完成]")
# 验证
verify_sql = f"""
SELECT
COUNT(*) AS row_count,
COUNT(DISTINCT search_term) AS distinct_terms,
MIN(date_info_first) AS earliest_first,
MAX(date_info_last) AS latest_last
FROM {self.hive_tb}
WHERE site_name = '{self.site_name}'
"""
print(f"\n[验证 SQL]\n{verify_sql}")
self.spark.sql(verify_sql).show(truncate=False)
print(f"\n[完成] 增量更新 dim_st_detail_history site_name={self.site_name}, date_info={self.date_info}")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
if not site_name or not date_info:
print("用法: spark-submit dim_st_detail_history.py <site_name> <date_info>")
print("示例: spark-submit dim_st_detail_history.py us 2026-05")
sys.exit(1)
print(f"{'=' * 60}")
print(f"开始增量更新中间表 dim_st_detail_history")
print(f"site_name = {site_name}, date_info = {date_info}")
print(f"{'=' * 60}")
obj = DimStDetailHistory(site_name=site_name, date_info=date_info)
obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
class DimStDetailHistoryInit(object):
def __init__(self, site_name):
self.site_name = site_name
self.hive_tb = "dim_st_detail_history"
# 累加表 HDFS 路径,用于 delete_file_in_folder 清空目标分区文件
self.hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dim/{self.hive_tb}/site_name={self.site_name}"
self.partitions_by = ['site_name']
app_name = f"{self.hive_tb}_init: {site_name}"
self.spark = SparkUtil.get_spark_session(app_name)
def run(self):
# 第一步:聚合(用默认 200 partitions 做 GROUP BY,保证聚合性能)
aggregate_sql = f"""
SELECT
search_term,
MIN(date_info) AS date_info_first,
MAX(date_info) AS date_info_last
FROM dim_st_detail
WHERE site_name = '{self.site_name}'
AND date_type = 'month'
GROUP BY search_term
"""
print(f"\n[聚合 SQL]\n{aggregate_sql}")
df = self.spark.sql(aggregate_sql)
# 第二步:加分区列(saveAsTable + partitionBy 需要 DataFrame 里有该列)
df = df.withColumn('site_name', F.lit(self.site_name))
# 第三步:repartition 控制最终输出文件数,避免小文件
# us 数据量大(~1183万行)→ 5 个文件;uk/de 数据量小(~350万行)→ 3 个文件
target_partitions = 5 if self.site_name == 'us' else 3
df = df.repartition(target_partitions)
print(f"\n[repartition] 目标文件数: {target_partitions}")
# 第四步:清空目标分区文件 + saveAsTable 写入
# delete_file_in_folder:删文件保目录,避免 Hive 后续找不到目录报 WARN
# saveAsTable(format='hive', mode='append'): 项目主流写法
# format='hive' 让 Spark 按表的 SerDe 定义写入(这里是 LZO 文本格式),不会强制写 ORC
# mode='append' 配合 delete_file_in_folder 实现"覆盖"语义(项目通用约定)
print(f"\n清除分区文件: {self.hdfs_path}")
HdfsUtils.delete_file_in_folder(self.hdfs_path)
print(f"写入表: {self.hive_tb}, 分区: {self.partitions_by}")
df.write.saveAsTable(
name=self.hive_tb,
format='hive',
mode='append',
partitionBy=self.partitions_by
)
print(f"[写入完成]")
# 第五步:验证
verify_sql = f"""
SELECT
COUNT(*) AS row_count,
COUNT(DISTINCT search_term) AS distinct_terms,
MIN(date_info_first) AS earliest_first,
MAX(date_info_last) AS latest_last
FROM {self.hive_tb}
WHERE site_name = '{self.site_name}'
"""
print(f"\n[验证 SQL]\n{verify_sql}")
self.spark.sql(verify_sql).show(truncate=False)
print(f"\n[完成] init dim_st_detail_history site_name={self.site_name}")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
if not site_name:
print("用法: spark-submit dim_st_detail_history_init.py <site_name>")
print("示例: spark-submit dim_st_detail_history_init.py us")
sys.exit(1)
print(f"{'=' * 60}")
print(f"开始初始化累加表 dim_st_detail_history")
print(f"site_name = {site_name}")
print(f"{'=' * 60}")
obj = DimStDetailHistoryInit(site_name=site_name)
obj.run()
......@@ -6,6 +6,7 @@ sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes
from utils.db_util import DBUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
......@@ -31,7 +32,14 @@ if __name__ == '__main__':
# 获取数据库连接
engine = DBUtil.get_db_engine(db_type, site_name)
# 导出前校验 Hive 分区是否有数据,避免空分区触发交换导致 PG 数据被清空
hive_partition_path = f"/home/big_data_selection/dwt/dwt_aba_last_change_rate/site_name={site_name}/date_type={date_type}/date_info={date_info}"
hive_files = HdfsUtils.read_list(hive_partition_path)
if not hive_files:
print(f"[ERROR] Hive 分区无数据文件,路径:{hive_partition_path},跳过导出,请先检查 DWT 计算任务是否正常写入!")
engine.dispose()
sys.exit(1)
print(f"Hive 分区文件数:{len(hive_files)},路径:{hive_partition_path},继续导出")
# 导出表
export_base_tb = f"{site_name}_aba_last_change_rate"
......@@ -76,13 +84,8 @@ if __name__ == '__main__':
"date_type": date_type,
"date_info": date_info
}
# 导出表名
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dwt_aba_last_change_rate",
export_tb=export_tb,
col=[
# 基础导出字段(所有 date_type 通用)
export_base_cols = [
"search_term",
"search_term_id",
"date_type",
......@@ -99,7 +102,39 @@ if __name__ == '__main__':
"amazon_rate_of_change",
"created_time",
"updated_time"
],
]
# 需求1:12 个新字段为 month 月度专属
# - dwt_aba_last_change_rate_new.py 中 handle_rank_rate_history 仅 month 类型才计算实际值
# - 其他 date_type(day/week/last30day/last365day)走 handle_rank_rate_padding 填 null
# - 既然非 month 全是 null,就不该导出(避免污染 PG,避免 last365day 等独立表强制加列)
if date_type == DateTypes.month.name:
month_extra_cols = [
"rank_rate_last_1_month",
"rank_rate_1_month_ago",
"rank_rate_2_month_ago",
"rank_rate_3_month_ago",
"rank_rate_4_month_ago",
"rank_rate_5_month_ago",
"rank_change_last_1_month",
"rank_change_1_month_ago",
"rank_change_2_month_ago",
"rank_change_3_month_ago",
"rank_change_4_month_ago",
"rank_change_5_month_ago"
]
export_cols = export_base_cols + month_extra_cols
else:
export_cols = export_base_cols
# 导出表名
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dwt_aba_last_change_rate",
export_tb=export_tb,
col=export_cols,
partition_dict=partition_dict
)
......
......@@ -6,6 +6,7 @@ sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes
from utils.db_util import DBUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
# 获取入参
......@@ -37,6 +38,16 @@ if __name__ == '__main__':
# 获取数据库连接
engine = DBUtil.get_db_engine(db_type, site_name)
# 导出前校验 Hive 分区是否有数据,避免空分区触发交换导致 PG 数据被清空
hive_partition_path = f"/home/big_data_selection/dwt/dwt_aba_st_analytics/site_name={site_name}/date_type={date_type}/date_info={date_info}"
hive_files = HdfsUtils.read_list(hive_partition_path)
if not hive_files:
print(f"[ERROR] Hive 分区无数据文件,路径:{hive_partition_path},跳过导出,请先检查 DWT 计算任务是否正常写入!")
engine.dispose()
sys.exit(1)
print(f"Hive 分区文件数:{len(hive_files)},路径:{hive_partition_path},继续导出")
suffix = str(date_info).replace("-", "_")
# 导出表--基准表名
......@@ -175,7 +186,13 @@ if __name__ == '__main__':
# month特有导出字段
tb_cols = [
"is_new_market_segment", "color_proportion", "supply_demand", "market_cycle_type", "is_high_return_text",
"st_zr_counts", "st_sp_counts", "st_self_asin_counts", "st_self_asin_proportion"
"st_zr_counts", "st_sp_counts", "st_self_asin_counts", "st_self_asin_proportion",
# 需求2 + 需求3:月度专属字段(仅 month 流程才有意义)
# is_first_ever_text 依赖累加表 dim_st_detail_history(仅 month 数据)
# brand_asin_proportion / seller_asin_proportion 服务月搜索词筛选页面
"is_first_ever_text",
"brand_asin_proportion",
"seller_asin_proportion"
]
# 处理导出表
export_master_tb = f"{export_base_tb}_{date_type}_{year_str}"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment