Commit 94e558b1 by hejiangming

月搜索词优化代码 增加属性划分字段

parent 18c72b2a
...@@ -26,7 +26,7 @@ class DwtAbaStAnalytics(Templates): ...@@ -26,7 +26,7 @@ class DwtAbaStAnalytics(Templates):
# 写入、分区初始化 # 写入、分区初始化
self.df_save = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name', 'date_type', 'date_info'] self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=10) self.reset_partitions(partitions_num=40)
# 初始化列表 # 初始化列表
self.sp_symbols = [] self.sp_symbols = []
...@@ -54,6 +54,9 @@ class DwtAbaStAnalytics(Templates): ...@@ -54,6 +54,9 @@ class DwtAbaStAnalytics(Templates):
self.df_is_hidden_cate = self.spark.sql(f"select 1+1;") self.df_is_hidden_cate = self.spark.sql(f"select 1+1;")
self.df_asin_profit_rate = self.spark.sql(f"select 1+1;") self.df_asin_profit_rate = self.spark.sql(f"select 1+1;")
self.df_history_st = self.spark.sql(f"select 1+1;") # 累加表 dim_st_detail_history 历史搜索词,用于 is_first_ever_text 判断 self.df_history_st = self.spark.sql(f"select 1+1;") # 累加表 dim_st_detail_history 历史搜索词,用于 is_first_ever_text 判断
# 搜索词属性标签 df,来源 dws_st_theme 聚合 theme_ch(材质/颜色/细分人群等)
# 仅 month 流程在 read_data 阶段真正读取数据;非 month 流程下游统一用 lit("-1") 填充
self.df_st_attribute = self.spark.sql(f"select 1+1;")
# 自定义udf函数注册 # 自定义udf函数注册
self.u_contains = self.spark.udf.register('u_contains', self.udf_contains, IntegerType()) self.u_contains = self.spark.udf.register('u_contains', self.udf_contains, IntegerType())
...@@ -496,6 +499,42 @@ class DwtAbaStAnalytics(Templates): ...@@ -496,6 +499,42 @@ class DwtAbaStAnalytics(Templates):
print("self.df_history_st:") print("self.df_history_st:")
self.df_history_st.show(10, truncate=True) self.df_history_st.show(10, truncate=True)
# 读 dws_st_theme 计算搜索词属性标签 st_attribute_label
# 业务背景:前端要在 ABA 搜索词页面给每个词打"属性标签"(材质/颜色/细分人群等),用于属性筛选
# 数据来源:dws_st_theme
# 聚合:同一搜索词可能命中多个属性,按 search_term 聚合 collect_set(theme_ch) → sort_array → concat_ws(',')
# - 用 collect_set 而不是 collect_list:去重,避免"材质,材质"这种重复
# - 必须套 sort_array 强制排序:collect_set 返回顺序不稳定,重跑可能产出 "材质,颜色" 或 "颜色,材质",
# 破坏幂等性、影响 Java 端的字符串比对/缓存 key/diff 检测,加 sort_array 保证字典序固定
# take(1) 而非 count():take 读一行即返回,避免 LZO 文本格式下扫全表
has_theme_data = self.spark.sql(f"""
SELECT 1 FROM dws_st_theme
WHERE site_name='{self.site_name}'
AND date_type='{self.date_type}'
AND date_info='{self.date_info}'
LIMIT 1
""").take(1)
assert len(has_theme_data) > 0, (
f"上游 dws_st_theme 分区无数据 "
f"site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info},"
f"请确认dws_st_theme 调度是否完成;若强行跑 dwt_aba_st_analytics 会导致所有词的 st_attribute_label 错填 '-1'"
)
sql = f"""
select
search_term,
concat_ws(',', sort_array(collect_set(theme_ch))) as st_attribute_label
from dws_st_theme
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
and theme_ch is not null
group by search_term
"""
self.df_st_attribute = self.spark.sql(sqlQuery=sql).repartition(80, 'search_term').cache()
print("self.df_st_attribute:")
self.df_st_attribute.show(10, truncate=True)
def handle_data(self): def handle_data(self):
# 对基础计算表进行关联 # 对基础计算表进行关联
self.handle_base_join() self.handle_base_join()
...@@ -517,6 +556,15 @@ class DwtAbaStAnalytics(Templates): ...@@ -517,6 +556,15 @@ class DwtAbaStAnalytics(Templates):
else: else:
self.df_save = self.df_save.withColumn('is_first_ever_text', F.lit(-1)) self.df_save = self.df_save.withColumn('is_first_ever_text', F.lit(-1))
# 附加属性标签字段 st_attribute_label
# month 流程:left join read_data 阶段已读取的 dws_st_theme 聚合结果,join 不上的词在 handle_column 阶段 fillna("-1")
# 非 month 流程:业务不需要计算,直接 lit("-1") 占位(与 PG 端 '{-1}' 数组占位语义一致)
# 为什么独立加 if/else 而不在 handle_st_cal 的 join 链里:保持非 month 流程不读 dws_st_theme,避免空分区 join 报错
if self.date_type == 'month':
self.df_save = self.df_save.join(self.df_st_attribute, on='search_term', how='left')
self.df_st_attribute.unpersist()
else:
self.df_save = self.df_save.withColumn('st_attribute_label', F.lit('-1'))
# 处理输出字段 # 处理输出字段
self.handle_column() self.handle_column()
...@@ -834,7 +882,7 @@ class DwtAbaStAnalytics(Templates): ...@@ -834,7 +882,7 @@ class DwtAbaStAnalytics(Templates):
self.df_st_brand, on=['search_term'], how='left' self.df_st_brand, on=['search_term'], how='left'
).join( ).join(
self.df_is_hidden_cate, on=['st_bsr_cate_1_id_new'], how='left' self.df_is_hidden_cate, on=['st_bsr_cate_1_id_new'], how='left'
) ).cache()
self.df_st_detail.unpersist() self.df_st_detail.unpersist()
self.df_st_key.unpersist() self.df_st_key.unpersist()
self.df_st_num_stats.unpersist() self.df_st_num_stats.unpersist()
...@@ -981,7 +1029,8 @@ class DwtAbaStAnalytics(Templates): ...@@ -981,7 +1029,8 @@ class DwtAbaStAnalytics(Templates):
F.round("air_profit_avg", 4).alias("gross_profit_fee_air"), # PG numeric(25,4) F.round("air_profit_avg", 4).alias("gross_profit_fee_air"), # PG numeric(25,4)
"is_first_ever_text", # 需求2:全历史首次出现标记(1=当月首次, 0=历史出现过) "is_first_ever_text", # 需求2:全历史首次出现标记(1=当月首次, 0=历史出现过)
"brand_asin_proportion", # 需求3:前三页ASIN数最多品牌的ASIN数占比 "brand_asin_proportion", # 需求3:前三页ASIN数最多品牌的ASIN数占比
"seller_asin_proportion" # 需求3:前三页ASIN数最多卖家的ASIN数占比 "seller_asin_proportion", # 需求3:前三页ASIN数最多卖家的ASIN数占比
"st_attribute_label" # 搜索词属性标签(逗号分隔字符串,sqoop 导出 PG 后转 VARCHAR[])
) )
# 空值处理 # 空值处理
...@@ -999,7 +1048,8 @@ class DwtAbaStAnalytics(Templates): ...@@ -999,7 +1048,8 @@ class DwtAbaStAnalytics(Templates):
"bsr_orders": 0, "bsr_orders": 0,
"is_hidden_cate": 0, "is_hidden_cate": 0,
"brand_asin_proportion": -1, # 需求3:分子为 null(搜索词全无品牌) → 占位 -1 "brand_asin_proportion": -1, # 需求3:分子为 null(搜索词全无品牌) → 占位 -1
"seller_asin_proportion": -1 # 需求3:分子为 null(搜索词全无账号) → 占位 -1 "seller_asin_proportion": -1, # 需求3:分子为 null(搜索词全无账号) → 占位 -1
"st_attribute_label": "-1" # 词典无匹配 → 占位 "-1"(Java 侧转 null 返前端)
}) })
# 日期字段补全 # 日期字段补全
......
...@@ -44,7 +44,6 @@ if __name__ == '__main__': ...@@ -44,7 +44,6 @@ if __name__ == '__main__':
hive_files = HdfsUtils.read_list(hive_partition_path) hive_files = HdfsUtils.read_list(hive_partition_path)
if not hive_files: if not hive_files:
print(f"[ERROR] Hive 分区无数据文件,路径:{hive_partition_path},跳过导出,请先检查 DWT 计算任务是否正常写入!") print(f"[ERROR] Hive 分区无数据文件,路径:{hive_partition_path},跳过导出,请先检查 DWT 计算任务是否正常写入!")
engine.dispose()
sys.exit(1) sys.exit(1)
print(f"Hive 分区文件数:{len(hive_files)},路径:{hive_partition_path},继续导出") print(f"Hive 分区文件数:{len(hive_files)},路径:{hive_partition_path},继续导出")
...@@ -148,7 +147,7 @@ if __name__ == '__main__': ...@@ -148,7 +147,7 @@ if __name__ == '__main__':
sql = f""" sql = f"""
create table if not exists {export_tb_copy} create table if not exists {export_tb_copy}
( (
like {export_tb_target} including indexes including comments like {export_tb_target} including all
); );
truncate table {export_tb_copy}; truncate table {export_tb_copy};
""" """
...@@ -192,7 +191,11 @@ if __name__ == '__main__': ...@@ -192,7 +191,11 @@ if __name__ == '__main__':
# brand_asin_proportion / seller_asin_proportion 服务月搜索词筛选页面 # brand_asin_proportion / seller_asin_proportion 服务月搜索词筛选页面
"is_first_ever_text", "is_first_ever_text",
"brand_asin_proportion", "brand_asin_proportion",
"seller_asin_proportion" "seller_asin_proportion",
# 搜索词属性标签(材质/颜色/细分人群等),仅 month 计算
# Hive 端是逗号分隔 STRING(如 "材质,颜色"),sqoop 写入 PG copy 表需先 ALTER 成 VARCHAR
# 交换前再 ALTER 回 VARCHAR[](用 string_to_array 转换),与 dwt_aba_last365 处理 st_movie_brand_label 同款思路
"st_attribute_label"
] ]
# 处理导出表 # 处理导出表
export_master_tb = f"{export_base_tb}_{date_type}_{year_str}" export_master_tb = f"{export_base_tb}_{date_type}_{year_str}"
...@@ -203,13 +206,18 @@ if __name__ == '__main__': ...@@ -203,13 +206,18 @@ if __name__ == '__main__':
export_cols = export_base_cols + tb_cols export_cols = export_base_cols + tb_cols
# sql建表和创建分区 # sql建表和创建分区
# INCLUDING ALL:继承列定义、生成列、索引、约束等
# 补充显式创建GIN索引:LIKE从分区父表(relkind='p')复制时,分区索引(relkind='I')可能丢失
sql = f""" sql = f"""
create table if not exists {export_master_tb} create table if not exists {export_master_tb}
( (
like {export_tb_before} including indexes including comments like {export_tb_before} including all
) )
partition by range (date_info); partition by range (date_info);
create index if not exists {export_master_tb}_keyword_tsv_idx
on {export_master_tb} using gin (keyword_tsv);
create table if not exists {export_table} partition of {export_master_tb} for values from ('{date_info}') to ('{next_val}'); create table if not exists {export_table} partition of {export_master_tb} for values from ('{date_info}') to ('{next_val}');
""" """
DBUtil.engine_exec_sql(engine, sql) DBUtil.engine_exec_sql(engine, sql)
...@@ -220,11 +228,37 @@ if __name__ == '__main__': ...@@ -220,11 +228,37 @@ if __name__ == '__main__':
sql_copy = f""" sql_copy = f"""
create table if not exists {export_tb_copy} create table if not exists {export_tb_copy}
( (
like {export_table} including indexes including comments like {export_table} including all
); );
truncate table {export_tb_copy}; truncate table {export_tb_copy};
""" """
DBUtil.engine_exec_sql(engine, sql_copy) DBUtil.engine_exec_sql(engine, sql_copy)
# copy 表继承自正式分区表(含 st_attribute_label VARCHAR[]),
# 但 Sqoop 不支持直接写入 PG 数组类型,必须先把 copy 表的该列临时改成 VARCHAR
# 等 Sqoop 完成后、分区交换之前,再 ALTER 回 VARCHAR[](见下方 exchange_pg_part_tb 前的处理)
# 这是与 sqoop_export/dwt_aba_last365.py 中 st_movie_brand_label 同款的"VARCHAR 中转"模式
sql_alter_to_varchar = f"""
ALTER TABLE {export_tb_copy} ALTER COLUMN st_attribute_label TYPE VARCHAR(200);
"""
DBUtil.engine_exec_sql(engine, sql_alter_to_varchar)
# keyword_tsv 是 master 分区表的生成列(GENERATED ALWAYS AS to_tsvector(...) STORED,用于全文检索 GIN 索引)
# PG 已知行为:LIKE 从【分区子表】复制时,GENERATED 表达式不会保留(表达式只挂在 master 上,子表自己不存)
# → copy 表里 keyword_tsv 会退化成普通可空 tsvector 列
# → 后续 exchange_pg_part_tb 的 ATTACH PARTITION 会因列定义与 master 不一致而报错:
# "column keyword_tsv in child table must be a generated column"
# 解决:显式 drop 后以生成列方式重建。CASCADE 是为了同时把 LIKE 时附带继承的 keyword_tsv 索引一并清掉,
# 后面 ATTACH 时 master 的分区索引会自动给新分区补建对应索引。
# 重建在 Sqoop 之前做:生成列由 search_term 自动计算(STORED),Sqoop 不写也不会报错。
# 参考 sqoop_export/dwt_aba_last365.py 中对 keyword_tsv 的同款处理。
sql_fix_keyword_tsv = f"""
ALTER TABLE {export_tb_copy} DROP COLUMN IF EXISTS keyword_tsv CASCADE;
ALTER TABLE {export_tb_copy} ADD COLUMN keyword_tsv tsvector
GENERATED ALWAYS AS (to_tsvector('english_amazonword', search_term)) STORED;
"""
DBUtil.engine_exec_sql(engine, sql_fix_keyword_tsv)
# Sqoop导入的目标表改为copy表 # Sqoop导入的目标表改为copy表
export_table_original = export_table export_table_original = export_table
export_table = export_tb_copy export_table = export_tb_copy
...@@ -275,6 +309,17 @@ if __name__ == '__main__': ...@@ -275,6 +309,17 @@ if __name__ == '__main__':
VALUES('{site_name}', '{datetime.now().date()}', '导出PG数据库完成', 14, 'us_aba_last_30_day', '30_day', 'ABA搜索词', '是', 'ABA搜索词最近30天表','{db_type}'); VALUES('{site_name}', '{datetime.now().date()}', '导出PG数据库完成', 14, 'us_aba_last_30_day', '30_day', 'ABA搜索词', '是', 'ABA搜索词最近30天表','{db_type}');
""" """
elif date_type == DateTypes.month.name: elif date_type == DateTypes.month.name:
# 分区交换前必须把 copy 表的 st_attribute_label 从 VARCHAR 转回 VARCHAR[]
# 否则与 master 分区表 schema 不一致,exchange_pg_part_tb 会失败
# USING string_to_array(...) 把 Sqoop 写入的逗号串(如 "材质,颜色")拆成数组(如 {材质,颜色})
# 词典无匹配的词,PySpark 已 fillna 为 "-1",转换后是 {-1},与 Java 占位约定一致
sql_alter_back = f"""
ALTER TABLE {export_tb_copy}
ALTER COLUMN st_attribute_label TYPE VARCHAR[]
USING string_to_array(st_attribute_label, ',')::varchar[];
"""
DBUtil.engine_exec_sql(engine, sql_alter_back)
# month类型:Sqoop导入到copy表完成后,通过分区交换替换正式分区,避免空窗期 # month类型:Sqoop导入到copy表完成后,通过分区交换替换正式分区,避免空窗期
DBUtil.exchange_pg_part_tb(engine, DBUtil.exchange_pg_part_tb(engine,
source_tb_name=export_tb_copy, source_tb_name=export_tb_copy,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment