Commit ff9987d0 by hejiangming

no message

parent e22e3efe
"""
月度:dws_aba_word_heat.py
计算 ABA 月度词频热度 + 同比 + 主题标签,写入 Hive dws_aba_word_heat 表
业务场景:
前端"词频统计"页面,用户从筛选页面进入后前端拆词,展示每个单词的:
词频(前端算)+ 热度(本表)+ 同比(本表)+ 主题标签(本表)
本表按"单词"维度预计算,前端按单词查询。
热度算法:
对每个 ABA 词 split(' ') 拆词,单词热度 = 100000/rank
累计所有 ABA 词中同名单词的热度 → 该单词当月总热度
例:'grinch tree skirt' 排名 4993 → grinch/tree/skirt 各贡献 100000/4993 = 20.03
同比:(当月 - 去年) / 去年;新词(去年同月无)→ 1000(标记为上升)
"""
import os
import re
import sys
import inflect
import pandas as pd
sys.path.append(os.path.dirname(sys.path[0]))
from utils.templates import Templates
from utils.common_util import CommonUtil
from utils.word_normalize import to_base_form, clean_word # 归一/清洗统一公共模块
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from pyspark.sql import functions as F
class DwsAbaWordHeat(Templates):
def __init__(self, site_name="us", date_type="month", date_info="2026-05"):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = "dws_aba_word_heat"
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name},{self.date_type},{self.date_info}")
# 写入、分区初始化
self.df_save = self.spark.sql("select 1+1;")
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=10)
# 去年同月(同比基准)
self.last_year_month = CommonUtil.get_month_offset(self.date_info, -12)
# 初始化全局 df 和 sp_symbols
self.sp_symbols = []
self.df_curr = self.spark.sql("select 1+1;") # 当月 search_term + rank
self.df_last = self.spark.sql("select 1+1;") # 去年同月 search_term + rank
self.df_theme_agg = self.spark.sql("select 1+1;") # 词典 word + theme_ch_list(聚合后)
self.df_word_to_base = self.spark.sql("select 1+1;") # variant_word → base_form(全量映射)
def read_data(self):
# ========== 1) 读当月 dwt_aba_st_analytics ==========
# 提前用 take(1) 校验有数据,避免空分区导致全表静默错误
# take(1) 而非 count():LZO 文本格式下 count 会扫全表,take 读到一行就返回
has_curr = self.spark.sql(f"""
SELECT 1 FROM dwt_aba_st_analytics
WHERE site_name='{self.site_name}'
AND date_type='{self.date_type}'
AND date_info='{self.date_info}'
LIMIT 1
""").take(1)
assert len(has_curr) > 0, (
f"上游 dwt_aba_st_analytics 当月分区无数据 "
f"site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info},"
f"请先确认上游调度是否完成"
)
sql_curr = f"""
select search_term, rank
from dwt_aba_st_analytics
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
and rank > 0
and (st_bsr_cate_1_id_new not in ('audible','books','digital-text','dmusic','mobile-apps','movies-tv','music','software','videogames')
or st_bsr_cate_1_id_new is null)
and (is_self_max_num_asin <> 1 or is_self_max_num_asin is null)
"""
self.df_curr = self.spark.sql(sql_curr).repartition(40, 'search_term').cache()
print("self.df_curr:")
self.df_curr.show(10, truncate=True)
# ========== 2) 读去年同月 dwt_aba_st_analytics(同比基准) ==========
# 业务确认:us/uk/de 三站点 2025 年都有数据,所以不做"分区不存在"兜底
sql_last = f"""
select search_term, rank
from dwt_aba_st_analytics
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.last_year_month}'
and rank > 0
and (st_bsr_cate_1_id_new not in ('audible','books','digital-text','dmusic','mobile-apps','movies-tv','music','software','videogames')
or st_bsr_cate_1_id_new is null)
and (is_self_max_num_asin <> 1 or is_self_max_num_asin is null)
"""
self.df_last = self.spark.sql(sql_last).repartition(40, 'search_term').cache()
print(f"self.df_last (去年同月 date_info={self.last_year_month}):")
self.df_last.show(10, truncate=True)
# ========== 3) 读 PG 词典 keywords_match_theme ==========
# 21821 更新:词典源从 MySQL selection.aba_match_theme 改为 PG 集群 public.keywords_match_theme
# 与 dws_st_theme 同源,单词级 word → theme_ch 映射
sql_theme = """
select label_en_lower as word, theme_ch
from public.keywords_match_theme
where label_ch is not null
and label_en_lower is not null
and theme_ch is not null
"""
pg_conn = DBUtil.get_connection_info("postgresql_cluster", "us")
df_theme_raw = SparkUtil.read_jdbc_query(
session=self.spark, url=pg_conn["url"],
pwd=pg_conn["pwd"], username=pg_conn["username"],
query=sql_theme
)
# 同一 word 在词典里可能有多行主题(如某词既是材质也是细分人群)
# 这里先按 word 聚合(sort_array 保证幂等);后续 _build_base_theme 还会再做一次 base 维度聚合
self.df_theme_agg = df_theme_raw.groupBy('word').agg(
F.concat_ws(',', F.sort_array(F.collect_set('theme_ch'))).alias('theme_ch_list')
).cache()
print("self.df_theme_agg:")
self.df_theme_agg.show(10, truncate=False)
# ========== 4) 读 MySQL 特殊字符列表 sp_symbols(当前不启用过滤) ==========
# 前端按空格拆分时保留孤立特殊字符(?/!/& 这种)作为"词",大数据保持口径一致
# 代码保留:后续如果业务要求开启过滤,去掉 _calc_word_heat 里 sp_symbols 那段的注释即可
sql_sp = """
select character_name
from match_character_dict
where match_type = '特殊字符'
"""
mysql_conn = DBUtil.get_connection_info("mysql", "us")
sp_df = SparkUtil.read_jdbc_query(
session=self.spark, url=mysql_conn["url"],
pwd=mysql_conn["pwd"], username=mysql_conn["username"],
query=sql_sp
)
self.sp_symbols = [row['character_name'] for row in sp_df.collect()]
print(f"sp_symbols(当前未启用过滤): {self.sp_symbols}")
# ========== 5) 构建全量 word → base_form 映射表 ==========
# 来源:拆词侧 distinct + 词典侧 word,union 后 distinct,保证拆词和词典走同一份 base
# 这样拆词的 base 和词典的 base 一定对得齐,theme 查找不会因归一不一致而 miss
self.df_word_to_base = self._build_word_to_base_mapping()
def _build_word_to_base_mapping(self):
"""
构建权威 word → base_form 映射表(全量,含拆词侧 + 词典侧),返回 broadcast df。
保证拆词的 base 和词典的 base 一定对得齐,防止同一个词在两侧归一结果不一致导致 theme 查不到。
"""
# ============================================================
# Step 1:拆词侧 + 词典侧 union 后 distinct
# 为什么 union:要保证拆词的 base 和词典的 base 用同一份映射规则
# 量级(us 站点):200w 搜索词 × 平均 3-4 词 ≈ 600-800w explode 行 → distinct 后约 50-100w 词
# ============================================================
df_curr_words = self.df_curr.select(
F.explode(F.split(F.lower('search_term'), ' ')).alias('word')
)
df_last_words = self.df_last.select(
F.explode(F.split(F.lower('search_term'), ' ')).alias('word')
)
df_theme_words = self.df_theme_agg.select('word')
# cache 原因:下面会跑两个 action(count + toPandas),不 cache 会触发两次完整的 distinct 计算
# 200w 搜索词 union distinct 比较重,cache 能减半 shuffle
df_all = df_curr_words.union(df_last_words).union(df_theme_words) \
.filter((F.col('word') != '') & F.col('word').isNotNull()) \
.distinct() \
.cache()
cnt = df_all.count()
print(f"distinct word 总数(拆词+词典 union): {cnt}")
# ============================================================
# Step 2:toPandas 拉到 driver,单线程跑 to_base_form 归一
# 为什么在 driver 跑:inflect/WordNet 没有 spark UDF 版本,每个词 ~1ms,100w 词约 70 秒可接受
# 跑完即 unpersist,释放 executor 内存
# ============================================================
pdf = df_all.toPandas()
df_all.unpersist()
p = inflect.engine()
# 先 clean_word 清洗(None=丢弃,脏词不进映射 → join 时被丢),再归一 base_form
mappings = []
for w in pdf['word']:
cw = clean_word(w)
if cw is None:
continue
mappings.append((w, cw, to_base_form(cw, p)))
# ============================================================
# Step 3:打印归一示例方便调试(只用 pandas 做筛选展示,不参与 createDataFrame)
# 跑批日志里能看到前 20 条"被归一的"词,业务/工程师快速抽查
# ============================================================
pdf_map = pd.DataFrame(mappings, columns=['word', 'clean_word', 'base_form'])
diff_sample = pdf_map[
(pdf_map['word'] != pdf_map['clean_word']) | (pdf_map['clean_word'] != pdf_map['base_form'])
].head(20)
print("word → clean_word → base_form 示例(前 20 条有变化的):")
print(diff_sample.to_string(index=False))
# ============================================================
# Step 4:list of tuples + 显式 schema 创建 DataFrame,再 broadcast
# 为什么不走 createDataFrame(pdf):pandas 2.x 移除了 DataFrame.iteritems(),
# PySpark 3.1.x 内部还在调用,会报 "'DataFrame' object has no attribute 'iteritems'"
# 走 list + schema 方式与 pandas 版本完全解耦
# 为什么 broadcast:后续 4 处 join(拆词归一 ×2 + 词典归一 + 反向展开)都用这份映射
# 小表(10w 级别,几 MB),broadcast 无 shuffle
# ============================================================
df_map = self.spark.createDataFrame(mappings, schema=['word', 'clean_word', 'base_form'])
return F.broadcast(df_map)
def _calc_word_heat(self, df_aba):
"""
拆词 + 归一到 base_form + 按 base_form 聚合热度。
热度公式:sum(100000 / rank),单复数会在 base_form 维度合并(girl/girls 共享同一份热度)。
"""
# ============================================================
# Step 1:拆词(lower + split + explode)
# 与前端拆词口径完全一致(前端就是按空格拆 + 转小写)
# 不去重叠词:'shoe shoe rack' 让 'shoe' 累加两次热度(前端词频也会算两次)
# ============================================================
df_words = df_aba.select(
F.explode(F.split(F.lower('search_term'), ' ')).alias('word'),
F.col('rank')
).filter((F.col('word') != '') & F.col('word').isNotNull())
# 当前前端不过滤孤立特殊字符(?/!/&),大数据保持一致
# 后续如需启用打开下面注释,给词典维护者控制
# if self.sp_symbols:
# df_words = df_words.filter(~F.col('word').isin(self.sp_symbols))
# ============================================================
# Step 2:broadcast join 归一到 base_form(inner)
# 为什么 broadcast:word_to_base 是 10w 行级小表,避免大表 shuffle
# 映射含拆词侧 + 词典侧所有 distinct word,拆词的 word 几乎都能 join 上;
# 只有 clean_word=None 的脏词(中韩 / 纯符号)没进映射 → inner join 在这里把它们丢弃
# 例:'sweaters' → base_form='sweater'(归一);中韩 / 纯符号词不在映射 → 被丢弃
# ============================================================
df_normalized = df_words.join(
self.df_word_to_base, on='word', how='inner'
)
# ============================================================
# Step 3:按 base_form 聚合热度
# girl/girls 都被归一到 base='girl' → 在这里合并热度
# 公式:sum(100000 / rank)
# 例:'girl' 出现在 rank=10 和 rank=100 → heat = 100000/10 + 100000/100 = 11000
# ============================================================
return df_normalized.groupBy('base_form').agg(
F.sum(F.lit(100000.0) / F.col('rank')).alias('word_heat')
)
def _build_base_theme(self):
"""
词典侧按 base_form 聚合 theme_ch_list。
修复早期 bug:同 base 多词典行(如 girl→'人群' + girls→'女性')的 theme 会互相覆盖。
"""
# ============================================================
# Step 1:把聚合好的 theme_ch_list 重新 explode 成单个 theme 一行
# 为什么 explode:方便后续按 base_form 维度重新 collect_set 合并
# 输入:df_theme_agg(word='girl', theme_ch_list='人群,女孩气')
# explode 后:(word='girl', theme='人群'), (word='girl', theme='女孩气')
# ============================================================
df_exploded = self.df_theme_agg \
.withColumn('theme', F.explode(F.split('theme_ch_list', ','))) \
.filter(F.col('theme') != '')
# ============================================================
# Step 2:broadcast join 把 word 归一到 base_form
# 跟拆词侧用同一份映射,保证 base 对齐
# 例:word='girls' → base_form='girl',word='girl' → base_form='girl'
# ============================================================
df_normalized = df_exploded \
.join(self.df_word_to_base, on='word', how='inner')
# ============================================================
# Step 3:按 base_form 聚合所有 theme tokens
# 关键修复点:collect_set 自然合并不同词典行的 theme(不会再覆盖)
# 例:词典 A 行:girl→'人群',B 行:girls→'女孩气'
# 归一后两者 base_form 都是 'girl',collect_set 得到 {'人群', '女孩气'}
# 早期 bug 版本用 (base, variant) 去重,B 行的 '女孩气' 会被跳过丢失
# sort_array 保证幂等:concat_ws 拼接结果稳定,重跑不会因顺序不同导致 diff
# ============================================================
return df_normalized.groupBy('base_form').agg(
F.concat_ws(',', F.sort_array(F.collect_set('theme'))).alias('theme_ch_list')
)
def handle_data(self):
# ========== 1) 算当月 + 去年同月的 base_form 维度热度 ==========
df_curr_heat = self._calc_word_heat(self.df_curr)
df_last_heat = self._calc_word_heat(self.df_last)
# ========== 2) 同比 = (当月-去年)/去年;新词→1000(与 dwt_aba_last_change_rate 惯例一致) ==========
# 左 join:保留所有当月有热度的 base;去年没有的 → last_heat 为 NULL → 同比填 1000
df_heat = df_curr_heat.withColumnRenamed('word_heat', 'curr_heat').join(
df_last_heat.withColumnRenamed('word_heat', 'last_heat'),
on='base_form', how='left'
).withColumn(
'word_heat_change_rate',
F.when(
F.col('last_heat').isNull(), F.lit(1000.0)
).otherwise(
F.round((F.col('curr_heat') - F.col('last_heat')) / F.col('last_heat'), 4)
)
).select(
'base_form',
F.col('curr_heat').alias('word_heat'),
'word_heat_change_rate'
)
# ========== 3) 词典按 base_form 聚合 theme(修复 theme 互相覆盖 bug) ==========
df_base_theme = self._build_base_theme()
# ========== 4) 反向展开:从当月拆词侧收集所有 (word, base_form) variant 组合 ==========
# 只展开"实际出现过的 variant"——前端按 word 查询,拆不到的词无需输出
# 例:词典录 girls 但当月 ABA 只出现 girl → 只输出 word='girl',不输出 word='girls'
# ★口径(C1):word 列存【原始词】(不是 clean_word),让前端用原始拆词(towel®/men's)能直接查到;
# inner join 仍会丢掉 clean=None 的脏词(中韩/纯符号);
# 热度/同比/主题按 base_form(clean_word 归一后)合并 → towel® 这行显示的是所有 towel 的热度
df_curr_variants = self.df_curr.select(
F.explode(F.split(F.lower('search_term'), ' ')).alias('word')
).filter((F.col('word') != '') & F.col('word').isNotNull()) \
.distinct() \
.join(self.df_word_to_base, on='word', how='inner') \
.select('word', 'base_form') \
.distinct()
# ========== 5) 组装最终结果 ==========
# df_heat(base 维) → join theme(base 维) → join variants(base→多 variant 展开) → 每 variant 一行
# inner join variants:df_curr_variants 的 base ⊆ df_heat 的 base,inner 与 left 等价但更直观
df_final = df_heat \
.join(df_base_theme, on='base_form', how='left') \
.fillna({'theme_ch_list': '-1'}) \
.join(df_curr_variants, on='base_form', how='inner') \
.select(
F.col('word'),
F.round('word_heat', 4).alias('word_heat'),
F.col('word_heat_change_rate'),
F.col('theme_ch_list')
)
# ========== 6) 入库前字段处理(精度对齐 PG numeric(20,4),附 site/date 分区字段) ==========
self.df_save = df_final.select(
F.col('word'),
F.col('word_heat'),
F.col('word_heat_change_rate'),
F.col('theme_ch_list'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info')
)
def save_data(self):
# 清 HDFS 分区目录后写入(与项目其他 dws/dwt 脚本保持一致)
# 先清是为了避免 saveAsTable append 模式下旧文件残留导致重复
hdfs_path = CommonUtil.build_hdfs_path(
self.db_save,
partition_dict={
'site_name': self.site_name,
'date_type': self.date_type,
'date_info': self.date_info
}
)
print(f"清除 HDFS 目录: {hdfs_path}")
HdfsUtils.delete_file_in_folder(hdfs_path)
self.df_save = self.df_save.repartition(10)
print(f"当前存储的表名为:{self.db_save}, 分区为 {self.partitions_by}")
self.df_save.write.saveAsTable(
name=self.db_save, format='hive', mode='append', partitionBy=self.partitions_by
)
# 注意:unpersist 必须放在 saveAsTable 之后
# 原版放在 handle_data 末尾的 bug:handle_data 只构造 lazy DataFrame,saveAsTable 才触发计算
# 提前 unpersist 会导致 saveAsTable 阶段 cache 已被标记释放 → 重新扫上游 → cache 失效
self.df_curr.unpersist()
self.df_last.unpersist()
self.df_theme_agg.unpersist()
print("success")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "site_name 不能为空"
assert date_type is not None, "date_type 不能为空"
assert date_info is not None, "date_info 不能为空"
obj = DwsAbaWordHeat(site_name=site_name, date_type=date_type, date_info=date_info)
obj.run()
"""
年度:dws_aba_word_heat_last365.py
计算 ABA 年度滚动词频热度 + 同比 + 主题标签,写入 Hive dws_aba_word_heat_last365 表
业务场景:
前端"词频统计"页面支持"年度"切换:展示该单词在过去 12 个月的平均热度 + 年度同比
本表是滚动型:每跑一次基准月,输出"基于该基准月往前 12 个月汇总"的结果
仅 us 站点(业务要求)
聚合算法:
年度热度 = sum(12个月 word_heat) / count(distinct 实际出现的月份)
- 动态分母(不是固定除以 12):照顾季节性词(如 christmas 只在 11-12 月出现)
- 不除空缺月份,反映该词在出现的月份里的平均热度
年度同比:
(今年年度热度 - 去年年度热度) / 去年年度热度
- 去年年度热度同算法,从 dws_aba_word_heat 的去年 12 个月分区聚出来
- 新词(去年 12 个月都没该词)→ 1000(与月度同款占位)
theme_ch_list 取 12 个月合并去重:
跟 dwt_aba_last365 处理 st_attribute_label 同思路——把 12 个月的 theme_ch_list 全部合并去重
原因:原来只取最新月口径,但基准月该词没出现就丢标签(假阴性,前端筛选漏数据)
改造:collect_set + flatten + array_distinct + sort_array,保留所有历史月命中的主题
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.templates import Templates
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from pyspark.sql import functions as F
class DwsAbaWordHeatLast365(Templates):
def __init__(self, site_name="us", date_type="month", date_info="2026-05"):
super().__init__()
# 业务确认:本表仅 us 站点
assert site_name == 'us', "dws_aba_word_heat_last365 仅 us 站点跑"
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = "dws_aba_word_heat_last365"
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name},{self.date_type},{self.date_info}")
# 写入、分区初始化
self.df_save = self.spark.sql("select 1+1;")
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=10)
# 过去12个月 list:基准月含本月,往前 11 个月
# 例:date_info='2026-05' → [2026-05, 2026-04, ..., 2025-06]
self.last_12_month = [CommonUtil.get_month_offset(self.date_info, -i) for i in range(12)]
# 再往前12个月 list(用于年度同比)
# 例:[2025-05, 2025-04, ..., 2024-06]
self.prev_12_month = [CommonUtil.get_month_offset(self.date_info, -12 - i) for i in range(12)]
# 初始化全局 df
self.df_curr_year = self.spark.sql("select 1+1;") # 当年 12 个月明细
self.df_prev_year = self.spark.sql("select 1+1;") # 去年 12 个月明细
self.df_theme_latest = self.spark.sql("select 1+1;") # 基准月 theme_ch_list
def read_data(self):
# 1) has_data assert 基准月(必须有数据)
# 历史月份缺失允许(影响分母但不阻断),基准月缺失直接报错
has_curr = self.spark.sql(f"""
SELECT 1 FROM dws_aba_word_heat
WHERE site_name='{self.site_name}'
AND date_type='{self.date_type}'
AND date_info='{self.date_info}'
LIMIT 1
""").take(1)
assert len(has_curr) > 0, (
f"上游 dws_aba_word_heat 基准月分区无数据 "
f"site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info},"
f"请先确认 dws_aba_word_heat 月度调度是否完成"
)
# 2) 读当年 12 个月 dws_aba_word_heat(用于算年度平均)
sql_curr = f"""
select word, word_heat, date_info
from dws_aba_word_heat
where site_name='{self.site_name}'
and date_type='{self.date_type}'
and date_info in ({CommonUtil.list_to_insql(self.last_12_month)})
"""
self.df_curr_year = self.spark.sql(sql_curr).repartition(40, 'word').cache()
print(f"self.df_curr_year (当年 12 个月,{self.last_12_month[-1]} ~ {self.last_12_month[0]}):")
self.df_curr_year.show(10, truncate=True)
# 3) 读去年 12 个月 dws_aba_word_heat(用于算年度同比)
# 启动期前 24 个月数据可能不齐,缺失月份不参与聚合,影响精度但不阻断
sql_prev = f"""
select word, word_heat, date_info
from dws_aba_word_heat
where site_name='{self.site_name}'
and date_type='{self.date_type}'
and date_info in ({CommonUtil.list_to_insql(self.prev_12_month)})
"""
self.df_prev_year = self.spark.sql(sql_prev).repartition(40, 'word').cache()
print(f"self.df_prev_year (去年 12 个月,{self.prev_12_month[-1]} ~ {self.prev_12_month[0]}):")
self.df_prev_year.show(10, truncate=True)
# 4) 读 12 个月 theme_ch_list,后续在 handle_data 里按 word 合并去重
# 原来只读基准月(最新月口径),但基准月该词没出现就丢标签(假阴性,前端筛选漏数据)
# 改成 12 月合并去重,保留所有历史月命中的主题(与 dwt_aba_last365 的 st_attribute_label 同步改造)
sql_theme = f"""
select word, theme_ch_list
from dws_aba_word_heat
where site_name='{self.site_name}'
and date_type='{self.date_type}'
and date_info in ({CommonUtil.list_to_insql(self.last_12_month)})
"""
self.df_theme_latest = self.spark.sql(sql_theme).repartition(40, 'word').cache()
print(f"self.df_theme_latest (12 个月,{self.last_12_month[-1]} ~ {self.last_12_month[0]}):")
self.df_theme_latest.show(10, truncate=False)
def handle_data(self):
# 1) 当年年度热度 = sum(word_heat) / count(distinct 实际出现的月份)
# 用 count(distinct date_info) 做动态分母,照顾季节性词
df_curr_agg = self.df_curr_year.groupBy('word').agg(
F.sum('word_heat').alias('curr_total'),
F.countDistinct('date_info').alias('curr_months')
).withColumn(
'curr_avg_heat',
F.round(F.col('curr_total') / F.col('curr_months'), 4)
).select('word', 'curr_avg_heat')
# 2) 去年年度热度(同算法)
df_prev_agg = self.df_prev_year.groupBy('word').agg(
F.sum('word_heat').alias('prev_total'),
F.countDistinct('date_info').alias('prev_months')
).withColumn(
'prev_avg_heat',
F.round(F.col('prev_total') / F.col('prev_months'), 4)
).select('word', 'prev_avg_heat')
# 3) 当年 left join 去年算同比
# 新词(去年 12 个月都没该词)→ word_heat_change_rate=1000
df_heat = df_curr_agg.join(
df_prev_agg, on='word', how='left'
).withColumn(
'word_heat_change_rate',
F.when(
F.col('prev_avg_heat').isNull(), F.lit(1000.0)
).otherwise(
F.round((F.col('curr_avg_heat') - F.col('prev_avg_heat')) / F.col('prev_avg_heat'), 4)
)
).withColumnRenamed('curr_avg_heat', 'word_heat').drop('prev_avg_heat')
# 4) 按 word 合并 12 个月所有非 -1 主题,去重后重新拼串(同 dwt_aba_last365 的 st_attribute_label 处理)
# 步骤: 单月逗号串 split → flatten 跨月合并 → array_distinct 去重 → sort_array 排序 → concat_ws 拼串
# 过滤 '-1' 占位,防止合并进有效结果;12 月全 -1 时输出空串,nullif 转 null 后由 fillna 转 '-1'
self.df_theme_latest = self.df_theme_latest.groupBy('word').agg(
F.expr("""
nullif(
concat_ws(',',
sort_array(array_distinct(flatten(
collect_set(
case when theme_ch_list != '-1' then split(theme_ch_list, ',') end
)
)))
),
''
)
""").alias('theme_ch_list')
)
df_heat = df_heat.join(
self.df_theme_latest, on='word', how='left'
).fillna({'theme_ch_list': '-1'})
# 5) 入库前字段处理(精度按 PG numeric(20,4) 对齐)
self.df_save = df_heat.select(
F.col('word'),
F.round('word_heat', 4).alias('word_heat'),
F.col('word_heat_change_rate'),
F.col('theme_ch_list'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info')
)
# 释放上游 cache
self.df_curr_year.unpersist()
self.df_prev_year.unpersist()
self.df_theme_latest.unpersist()
def save_data(self):
hdfs_path = CommonUtil.build_hdfs_path(
self.db_save,
partition_dict={
'site_name': self.site_name,
'date_type': self.date_type,
'date_info': self.date_info
}
)
print(f"清除 HDFS 目录: {hdfs_path}")
HdfsUtils.delete_file_in_folder(hdfs_path)
self.df_save = self.df_save.repartition(10)
print(f"当前存储的表名为:{self.db_save}, 分区为 {self.partitions_by}")
self.df_save.write.saveAsTable(
name=self.db_save, format='hive', mode='append', partitionBy=self.partitions_by
)
print("success")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "site_name 不能为空"
assert date_type is not None, "date_type 不能为空"
assert date_info is not None, "date_info 不能为空"
obj = DwsAbaWordHeatLast365(site_name=site_name, date_type=date_type, date_info=date_info)
obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes
from utils.db_util import DBUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
# 获取最后一个参数(test 标志)
test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
print(f"执行参数为{sys.argv}")
assert date_type == DateTypes.month.name, "本脚本仅支持 month 类型导出"
if test_flag == 'test':
db_type = 'postgresql_test'
print("导出到测试库中")
else:
# 工时校验(与新 ABA 流程其他导出脚本一致):非工作时段或负责人不在班则跳过
CommonUtil.judge_is_work_hours(
site_name=site_name, date_type=date_type, date_info=date_info,
principal='hejiangming',
priority=2,
export_tools_type=1,
belonging_to_process=f'新ABA流程词频热度_{date_type}'
)
db_type = 'postgresql_cluster'
print("导出到PG集群中")
# 1) 校验 Hive 分区有数据,避免空分区导出后 PG 数据被清空
hive_partition_path = (
f"/home/big_data_selection/dws/dws_aba_word_heat/"
f"site_name={site_name}/date_type={date_type}/date_info={date_info}"
)
hive_files = HdfsUtils.read_list(hive_partition_path)
if not hive_files:
print(f"[ERROR] Hive 分区无数据文件,路径:{hive_partition_path},跳过导出!")
sys.exit(1)
print(f"Hive 分区文件数:{len(hive_files)},路径:{hive_partition_path},继续导出")
engine = DBUtil.get_db_engine(db_type, site_name)
# 2) 表名拼装
# master 表(DBA 建好的,PARTITION BY RANGE):us_aba_word_heat_2026
# 子分区表:us_aba_word_heat_month_2026_05
# copy 表:us_aba_word_heat_month_2026_05_copy
suffix = str(date_info).replace("-", "_")
year_str = CommonUtil.safeIndex(date_info.split("-"), 0, None)
next_val = CommonUtil.get_next_val(date_type, date_info)
export_base_tb = f"{site_name}_aba_word_heat"
export_master_tb = f"{export_base_tb}_{year_str}"
export_table = f"{export_base_tb}_{suffix}"
export_tb_copy = f"{export_table}_copy"
# 3) 在 master 表上建当月子分区(首次跑当月才会真正创建,重跑幂等)
sql_create_partition = f"""
create table if not exists {export_table} partition of {export_master_tb}
for values from ('{date_info}') to ('{next_val}');
"""
DBUtil.engine_exec_sql(engine, sql_create_partition)
# 4) 创建 copy 表(继承子分区结构 like ... including all),并清空
# copy 表是独立普通表,Sqoop 先写到这里,最后通过分区交换替换正式子分区,避免空窗期
sql_copy = f"""
create table if not exists {export_tb_copy}
(
like {export_table} including all
);
truncate table {export_tb_copy};
"""
DBUtil.engine_exec_sql(engine, sql_copy)
# 5) ALTER copy 表的 theme_ch_list 列类型 VARCHAR[] → VARCHAR(200)
# 原因:Sqoop 不支持向 PG 数组类型写数据,必须临时改成普通 VARCHAR,
# 让 Sqoop 把 Hive 端 "材质,颜色" 这种逗号串原样写进来
# 交换前再 ALTER 回 VARCHAR[]
sql_alter_to_varchar = f"""
ALTER TABLE {export_tb_copy} ALTER COLUMN theme_ch_list TYPE VARCHAR(200);
"""
DBUtil.engine_exec_sql(engine, sql_alter_to_varchar)
# 6) 拼装 Sqoop 导出脚本(字段顺序与 Hive dws_aba_word_heat schema 一致)
export_cols = [
"word",
"word_heat",
"word_heat_change_rate",
"theme_ch_list",
"created_time",
"updated_time",
"date_info"
]
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dws_aba_word_heat",
export_tb=export_tb_copy,
col=export_cols,
partition_dict={
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
# 7) Sqoop 写完后,ALTER copy 表的 theme_ch_list 回 VARCHAR[]
# USING string_to_array(...) 把逗号串 "材质,颜色" 拆成数组 {材质,颜色}
# 词典无匹配的词 PySpark 已 fillna "-1",转换后是 {-1},与 Java 占位约定一致
sql_alter_back = f"""
ALTER TABLE {export_tb_copy}
ALTER COLUMN theme_ch_list TYPE VARCHAR[]
USING string_to_array(theme_ch_list, ',')::varchar[];
"""
DBUtil.engine_exec_sql(engine, sql_alter_back)
# 8) 分区交换:copy 表替换正式子分区,无空窗期
DBUtil.exchange_pg_part_tb(
engine,
source_tb_name=export_tb_copy,
part_master_tb=export_master_tb,
part_target_tb=export_table,
cp_index_flag=False,
part_val={"from": [date_info], "to": [next_val]}
)
# 9) 删除 copy 表(交换后 copy 表里是旧数据,留着没意义)
DBUtil.engine_exec_sql(engine, f"drop table if exists {export_tb_copy};")
# 10) 更新 workflow_everyday 流程表(业务监听导出完成的标记)
# 参考 sqoop_export/dwt_aba_last365.py 的 REPLACE INTO 写法(新流程节点登记)
# page='AbaWordHeat' 为本表专属标识,date_type='month',table_name 是 master 表名
# if test_flag != 'test':
# mysql_engine = DBUtil.get_db_engine("mysql", "us")
# with mysql_engine.connect() as connection:
# sql = f"""
# replace into workflow_everyday (
# site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type
# )
# values (
# '{site_name}', '{date_info}', '导出pg完成', 14,
# '{export_master_tb}', 'month', 'AbaWordHeat', '是',
# 'ABA词频热度月表', 'postgresql_cluster'
# );
# """
# print("================================更新 workflow_everyday================================")
# print(sql)
# connection.execute(sql)
print(f"==================表 {export_table} 导出完成==================================")
print("success")
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
print(f"执行参数为{sys.argv}")
assert site_name == 'us', "本表仅 us 站点导出"
assert date_type == DateTypes.month.name, "本脚本仅支持 month 类型(基准月口径)"
if test_flag == 'test':
db_type = 'postgresql_test'
print("导出到测试库中")
else:
# 工时校验(与新 ABA 流程其他导出脚本一致)
CommonUtil.judge_is_work_hours(
site_name=site_name, date_type=date_type, date_info=date_info,
principal='hejiangming',
priority=2,
export_tools_type=1,
belonging_to_process=f'新ABA流程年词频热度_{date_type}'
)
db_type = "postgresql_cluster"
print("导出到PG集群中")
# 1) 校验 Hive 分区有数据
hive_partition_path = (
f"/home/big_data_selection/dws/dws_aba_word_heat_last365/"
f"site_name={site_name}/date_type={date_type}/date_info={date_info}"
)
hive_files = HdfsUtils.read_list(hive_partition_path)
if not hive_files:
print(f"[ERROR] Hive 分区无数据文件,路径:{hive_partition_path},跳过导出!")
sys.exit(1)
print(f"Hive 分区文件数:{len(hive_files)},路径:{hive_partition_path},继续导出")
# 2) 表名拼装(年表固定,不带年份后缀)
export_tb_target = "us_aba_word_heat_last_365_day" # 正式表
export_tb_copy = f"{export_tb_target}_copy" # copy 表
engine = DBUtil.get_db_engine(db_type, site_name)
# 3) 创建 copy 表(drop + create like including comments),并 ALTER 数组列为 VARCHAR
# 用 `including comments` 而不是 `including all`:
# only 复制列定义和注释,不复制索引/约束,避免主键冲突等问题(与 dwt_aba_last365 同款)
with engine.connect() as connection:
sql = f"""
drop table if exists {export_tb_copy};
create table if not exists {export_tb_copy}
(
like {export_tb_target} including comments
);
ALTER TABLE {export_tb_copy} ALTER COLUMN theme_ch_list TYPE VARCHAR(200);
"""
print("================================执行 SQL================================")
print(sql)
connection.execute(sql)
# 4) Sqoop 导出(字段顺序与 Hive 表 schema 一致,分区字段 date_info 放最后)
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dws_aba_word_heat_last365",
export_tb=export_tb_copy,
col=[
"word",
"word_heat",
"word_heat_change_rate",
"theme_ch_list",
"created_time",
"updated_time",
"date_info"
],
partition_dict={
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
# 5) 交换 copy 表与正式表(DBUtil.exchange_tb 内部做 rename 交换)
# 与 dwt_aba_last365 同款:cp_index_flag=True 复制索引到新正式表
DBUtil.exchange_tb(
engine,
source_tb_name=export_tb_copy,
target_tb_name=export_tb_target,
cp_index_flag=True
)
# 6) 交换完成后,把正式表的 theme_ch_list 从 VARCHAR 转回 VARCHAR[]
# USING string_to_array(...) 把 "材质,颜色" 拆成 {材质,颜色}
# 词典无匹配的词 PySpark 已 fillna "-1",转换后是 {-1},与 Java 占位约定一致
with engine.connect() as connection:
sql = f"""
ALTER TABLE {export_tb_target}
ALTER COLUMN theme_ch_list TYPE VARCHAR[]
USING string_to_array(theme_ch_list, ',')::varchar[];
"""
print("================================执行 SQL================================")
print(sql)
connection.execute(sql)
# 7) 更新 workflow_everyday 流程表(业务监听导出完成的标记)
# 参考 sqoop_export/dwt_aba_last365.py 的写法
# if test_flag != 'test':
# mysql_engine = DBUtil.get_db_engine("mysql", "us")
# with mysql_engine.connect() as connection:
# sql = f"""
# replace into workflow_everyday (
# site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type
# )
# values (
# '{site_name}', '{date_info}', '导出pg完成', 14,
# '{export_tb_target}', '365_day', 'AbaWordHeatYear', '是',
# 'ABA词频热度年表(最近12月,每月更新)', 'postgresql_cluster'
# );
# """
# print("================================更新 workflow_everyday================================")
# print(sql)
# connection.execute(sql)
print("success")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment