Commit d4397441 by hejiangming

词频分类代码

parent b6374899
import os
import sys
from collections import defaultdict
import inflect
sys.path.append(os.path.dirname(sys.path[0]))
from utils.templates import Templates
from utils.common_util import CommonUtil
from utils.db_util import DBUtil
from utils.spark_util import SparkUtil
from utils.hdfs_utils import HdfsUtils
from utils.word_normalize import clean_word, to_base_form # 公共清洗+归一模块(与 dws_aba_word_heat 同源)
from pyspark.sql import functions as F
from pyspark import StorageLevel
# 默认类型白名单(196):keyword_usr_mask.mask_type 落在这些值的词才进 scope=default(与 holiday_aba_word 同一份,写死)
MASK_TYPE_WHITELIST = [
"无",
"100th Day of School(1-3月热卖)",
"2-5月可卖",
"3月可卖",
"4月可卖",
"5月热卖",
"7月17日",
"8/9月热卖",
"NBA全明星赛(2月)",
"NFC 冠军赛(1月)",
"万圣节",
"万圣节/圣诞节",
"万圣节/复活节",
"万圣节/夏季产品",
"万圣节(平时可卖)",
"世界唐氏综合症日(3月21日)",
"世界地球日(4月22日)",
"世界肾脏日(3月14)",
"东正教圣周(4月初)",
"中国农历新年(新年的前1个月热卖)",
"主显节(基督教,1月6日)",
"九夜节(印度节日,10月)",
"五旬节(5月热卖)",
"五月五日节",
"五朔节 / 国际劳动节",
"亚太裔美国人传统月(5月)",
"亡灵节(11月2日)",
"仲夏节(6月24日)",
"休斯顿牲畜展览和牛仔竞技会(每年3月左右)",
"住棚节(9-10月)",
"侯丽节(3月)",
"元旦新年(12月中旬-12月底热卖)",
"兄弟姐妹节(印度节,8月)",
"光明节(11-12月初)",
"全国医生节(3月30日)",
"全国最甜蜜日(10月第三个星期六)",
"全国社会工作月(每年3月)",
"全国穿红色日(2月的1天)",
"全国结直肠癌宣传月(3月)",
"全国脑瘫宣传日(3月25日)",
"六月节(黑奴解放日-6月19日)",
"军人子女月(4月)",
"冬季产品",
"冬季产品(平时可卖)",
"冬季产品(平时可卖-圣诞节上涨)",
"冬季产品(圣诞节上涨)",
"初领圣体季(4-5月)",
"加拿大国庆日(7月1日)",
"匹配日(3月第三个周五)",
"双鱼座(2/19-3/20)",
"员工感谢日(3月的第一个星期五)",
"国家粗花呢日(4月3日)",
"国旗日(5月中下旬-6月初)",
"国殇纪念日(11月11日)",
"国际医生节(3月30日)",
"国际妇女节",
"国际秘书日(4月)",
"圆周率日(3月14)",
"土拨鼠日(2月2日)",
"圣诞节",
"圣诞节/夏季",
"圣诞节/护士节",
"圣诞节/教师节",
"圣诞节/新年",
"圣诞节/母亲节",
"圣诞节/爱尔兰节",
"圣诞节/父亲节",
"圣诞节(11月热卖)",
"圣诞节(1月热卖)",
"圣诞节(平时可卖)",
"地球日(4月22)",
"墨西哥独立日(9月16日)",
"墨西哥节(4月中旬-五月初)",
"复活节",
"复活节/圣诞节",
"复活节(平时可卖)",
"夏季产品",
"夏季产品(平时可卖)",
"多发性硬化症宣传月(3月)",
"大斋首日(复活节的日期前四十天)",
"大词",
"妇女历史月(3月第一天)",
"妇女历史月(3月)",
"婚礼季",
"子宫内膜异位症宣传月(3月)",
"实验周(4月)",
"宰牲节(5-6月)",
"宽扎节(12月26日至1月1日)",
"巴士底日(法国国庆日,7月14日)",
"巴西六月节(丰收节,6月)",
"带女儿和儿子上班日(4月)",
"平时可卖--10/11/12月上涨",
"平时可卖--11/12月上涨",
"平时可卖-1月上涨",
"平时可卖-8月上涨",
"平时可卖(万圣节/爱尔兰节上涨)",
"平时可卖(圣诞节-情人节上涨)",
"平时可卖(圣诞节/情人节/母亲节上涨)",
"平时可卖(夏季-圣诞节上涨)",
"平时可卖(开学季-圣诞节上涨)",
"平时可卖(开学季)",
"平时可卖(春季-圣诞节上涨)",
"平时可卖(母亲节-圣诞节上涨)",
"平时可卖(父亲节-圣诞节上涨)",
"开学季",
"开学季-9月热卖",
"开斋节/斋月(2-3月)",
"心理健康月(5月)",
"性侵犯意识月(4月)",
"总统日(2月15)",
"情人节",
"愚人节(3月初-3月底)",
"感恩节(11月第四个周四,9月下旬-11月中旬热卖",
"感恩节(平时可卖)",
"慕尼黑啤酒节",
"护士节(5月12日) ",
"护士节/圣诞节",
"排灯节(万灯节,印度灯节或屠妖节-每年10月或11月中)",
"教师节",
"教师节/圣诞节",
"教师节/开学季",
"文字",
"星球大战日(5月4日)",
"春季产品",
"春季产品(平时可卖)",
"春秋产品",
"普珥节(2-3月)",
"朋友节(十二月的第三个星期六)",
"查尔斯顿葡萄酒与美食(每年3月上涨)",
"校长日(5月1日)",
"格莱美(2月)",
"棕枝主日(复活节前)",
"植树节",
"樱花节(3-4月)",
"橙色衬衫日(9月30日)",
"母亲节",
"母亲节--11/12月",
"母亲节--冬季产品",
"毕业季(4月初-6月中旬热卖)",
"毕业季/开学季",
"毕业舞会(4月下旬或5月)",
"水瓶座(1月20-2月18)",
"波士顿马拉松(四月的第三个星期一)",
"洛里节(1月)",
"爱乳日(10月初-10月中旬)",
"爱尔兰节",
"父亲节",
"牧师感恩月(10月)",
"犹太新年(9月)",
"狂欢节",
"独立日(7月4日,6月上旬-七月初热卖)",
"狼疮宣传月(5月)",
"男子篮球比赛(3月热卖)",
"癫痫日(3月26为癫痫日,11月为癫痫月)",
"白羊座(3/21-4/19)",
"白血病宣传月(9月)",
"祖父母节(9月)",
"秋季产品",
"秋季产品/感恩节",
"秋季产品(平时可卖)",
"秋季返校季/开学季",
"税务表格(24年1月修订)",
"纳吾肉孜节(3月20日)",
"纳税日(4月15日)",
"美国劳动节(9月第一个星期一)",
"美国原住民传统月(11月)",
"美国心脏月(2月)",
"美国教师节(谢师周-5月第一个整周的周二)",
"美国教师节(谢师周-5月第一个整周的周二) ",
"美国爱国者日(9月11日)",
"老兵节即退伍军人节(11月11日)",
"老板日(10月16日)",
"职业护士助理日(6月)",
"职业治疗月(4月)",
"肯塔基德比马会(5月第一个星期六)",
"自闭症日(4月2日)",
"苏斯博士日(3月2日)",
"西班牙裔传统月(9月)",
"认证护士日(3月19)",
"读书月(3月)",
"贝尔蒙特锦标赛(6月)",
"赎罪日(犹太人节日,10月)",
"赖买丹月(伊斯兰历斋戒,2-3月)",
"超大件产品",
"超级碗(1月中旬-2月中旬)",
"越南新年(春节)",
"逾越节(3月中旬-4月下旬热卖)",
"闺蜜节(庆祝女性友谊,2月13号)",
"阵亡将士纪念日(4月中旬-5月底)",
"阿拉伯裔美国人传统月(4月)",
"预防虐待儿童月(4月)",
"马丁路德金纪念日(12月中旬-1月20日)",
"骄傲月(同志节,每年6月)",
"高尔夫大师赛(4月)",
"黑人历史月(26年2月1日-3月1日)",
"黑人历史节(1.1-2.20热卖)",
]
# 全部类型白名单(219):default 全集 + 非节日的产品/属性/语种等类型,scope=all 用(写死)
MASK_TYPE_WHITELIST_ALL = [
"无",
"100th Day of School(1-3月热卖)",
"2-5月可卖",
"3月可卖",
"4月可卖",
"5月热卖",
"7月17日",
"8/9月热卖",
"NBA全明星赛(2月)",
"NFC 冠军赛(1月)",
"万圣节",
"万圣节/圣诞节",
"万圣节/复活节",
"万圣节/夏季产品",
"万圣节(平时可卖)",
"下滑趋势",
"不可做分类",
"世界唐氏综合症日(3月21日)",
"世界地球日(4月22日)",
"世界肾脏日(3月14)",
"东正教圣周(4月初)",
"中国农历新年(新年的前1个月热卖)",
"主显节(基督教,1月6日)",
"九夜节(印度节日,10月)",
"五旬节(5月热卖)",
"五月五日节",
"五朔节 / 国际劳动节",
"亚太裔美国人传统月(5月)",
"亡灵节(11月2日)",
"仲夏节(6月24日)",
"休斯顿牲畜展览和牛仔竞技会(每年3月左右)",
"住棚节(9-10月)",
"侯丽节(3月)",
"元旦新年(12月中旬-12月底热卖)",
"兄弟姐妹节(印度节,8月)",
"光明节(11-12月初)",
"全国医生节(3月30日)",
"全国最甜蜜日(10月第三个星期六)",
"全国社会工作月(每年3月)",
"全国穿红色日(2月的1天)",
"全国结直肠癌宣传月(3月)",
"全国脑瘫宣传日(3月25日)",
"六月节(黑奴解放日-6月19日)",
"军人子女月(4月)",
"冬季产品",
"冬季产品(平时可卖)",
"冬季产品(平时可卖-圣诞节上涨)",
"冬季产品(圣诞节上涨)",
"初领圣体季(4-5月)",
"加拿大国庆日(7月1日)",
"化妆品",
"匹配日(3月第三个周五)",
"印地语",
"双鱼座(2/19-3/20)",
"员工感谢日(3月的第一个星期五)",
"品牌词",
"国家粗花呢日(4月3日)",
"国旗日(5月中下旬-6月初)",
"国殇纪念日(11月11日)",
"国际医生节(3月30日)",
"国际妇女节",
"国际秘书日(4月)",
"圆周率日(3月14)",
"土拨鼠日(2月2日)",
"圣诞节",
"圣诞节/夏季",
"圣诞节/护士节",
"圣诞节/教师节",
"圣诞节/新年",
"圣诞节/母亲节",
"圣诞节/爱尔兰节",
"圣诞节/父亲节",
"圣诞节(11月热卖)",
"圣诞节(1月热卖)",
"圣诞节(平时可卖)",
"地球日(4月22)",
"墨西哥独立日(9月16日)",
"墨西哥节(4月中旬-五月初)",
"复活节",
"复活节/圣诞节",
"复活节(平时可卖)",
"夏季产品",
"夏季产品(平时可卖)",
"多发性硬化症宣传月(3月)",
"大斋首日(复活节的日期前四十天)",
"大词",
"奥运会(四年一次,下次为2028年)",
"妇女历史月(3月第一天)",
"妇女历史月(3月)",
"婚礼季",
"子宫内膜异位症宣传月(3月)",
"实验周(4月)",
"宰牲节(5-6月)",
"宽扎节(12月26日至1月1日)",
"巴士底日(法国国庆日,7月14日)",
"巴西六月节(丰收节,6月)",
"带女儿和儿子上班日(4月)",
"平时可卖--10/11/12月上涨",
"平时可卖--11/12月上涨",
"平时可卖-1月上涨",
"平时可卖-8月上涨",
"平时可卖(万圣节/爱尔兰节上涨)",
"平时可卖(圣诞节-情人节上涨)",
"平时可卖(圣诞节/情人节/母亲节上涨)",
"平时可卖(夏季-圣诞节上涨)",
"平时可卖(开学季-圣诞节上涨)",
"平时可卖(开学季)",
"平时可卖(春季-圣诞节上涨)",
"平时可卖(母亲节-圣诞节上涨)",
"平时可卖(父亲节-圣诞节上涨)",
"开学季",
"开学季-9月热卖",
"开斋节/斋月(2-3月)",
"影视IP",
"德语",
"心理健康月(5月)",
"性侵犯意识月(4月)",
"总统日(2月15)",
"情人节",
"情趣类产品",
"愚人节(3月初-3月底)",
"感恩节(11月第四个周四,9月下旬-11月中旬热卖",
"感恩节(平时可卖)",
"慕尼黑啤酒节",
"护士节(5月12日) ",
"护士节/圣诞节",
"护肤品",
"排灯节(万灯节,印度灯节或屠妖节-每年10月或11月中)",
"教师节",
"教师节/圣诞节",
"教师节/开学季",
"文字",
"日全食(美国下一周期日全食为2044年8月23)",
"易燃易爆类",
"星球大战日(5月4日)",
"春季产品",
"春季产品(平时可卖)",
"春秋产品",
"普珥节(2-3月)",
"朋友节(十二月的第三个星期六)",
"查尔斯顿葡萄酒与美食(每年3月上涨)",
"校长日(5月1日)",
"格莱美(2月)",
"棕枝主日(复活节前)",
"植树节",
"樱花节(3-4月)",
"橙色衬衫日(9月30日)",
"母亲节",
"母亲节--11/12月",
"母亲节--冬季产品",
"毕业季(4月初-6月中旬热卖)",
"毕业季/开学季",
"毕业舞会(4月下旬或5月)",
"水瓶座(1月20-2月18)",
"法语",
"波士顿马拉松(四月的第三个星期一)",
"洗护品类",
"洛里节(1月)",
"液体",
"爱乳日(10月初-10月中旬)",
"爱尔兰节",
"父亲节",
"牧师感恩月(10月)",
"犹太新年(9月)",
"狂欢节",
"独立日(7月4日,6月上旬-七月初热卖)",
"狼疮宣传月(5月)",
"球队",
"电子产品",
"电子产品壳/膜等",
"电池产品",
"男子篮球比赛(3月热卖)",
"癫痫日(3月26为癫痫日,11月为癫痫月)",
"白羊座(3/21-4/19)",
"白血病宣传月(9月)",
"祖父母节(9月)",
"秋季产品",
"秋季产品/感恩节",
"秋季产品(平时可卖)",
"秋季返校季/开学季",
"税务表格(24年1月修订)",
"纳吾肉孜节(3月20日)",
"纳税日(4月15日)",
"美国劳动节(9月第一个星期一)",
"美国原住民传统月(11月)",
"美国心脏月(2月)",
"美国教师节(谢师周-5月第一个整周的周二)",
"美国教师节(谢师周-5月第一个整周的周二) ",
"美国爱国者日(9月11日)",
"老兵节即退伍军人节(11月11日)",
"老板日(10月16日)",
"职业护士助理日(6月)",
"职业治疗月(4月)",
"肯塔基德比马会(5月第一个星期六)",
"自闭症日(4月2日)",
"苏斯博士日(3月2日)",
"西班牙裔传统月(9月)",
"西语",
"认证护士日(3月19)",
"读书月(3月)",
"贝尔蒙特锦标赛(6月)",
"赎罪日(犹太人节日,10月)",
"赖买丹月(伊斯兰历斋戒,2-3月)",
"超大件产品",
"超级碗(1月中旬-2月中旬)",
"越南新年(春节)",
"越南语",
"逾越节(3月中旬-4月下旬热卖)",
"闺蜜节(庆祝女性友谊,2月13号)",
"阵亡将士纪念日(4月中旬-5月底)",
"阿拉伯裔美国人传统月(4月)",
"鞋",
"预防虐待儿童月(4月)",
"食物",
"马丁路德金纪念日(12月中旬-1月20日)",
"骄傲月(同志节,每年6月)",
"高尔夫大师赛(4月)",
"黑人历史月(26年2月1日-3月1日)",
"黑人历史节(1.1-2.20热卖)",
]
# 分组主键:分类(id+层级) + 归一词。所有"分类内"指标都按这三列 groupBy
KEYS = ['category_id', 'category_level', 'base_form']
class DwsAbaWordFreqCate(Templates):
def __init__(self, site_name="us", date_type="month", date_info="2026-04"):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.hive_tb = "dws_aba_word_freq_cate"
self.spark = self.create_spark_object(
app_name=f"{self.hive_tb}: {self.site_name},{self.date_type},{self.date_info}")
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=20)
# 近24个月(含当月):趋势图 + 同比(M12)/环比(M1)/近6月变化量(M0..M6) 都从这派生
self.months = [CommonUtil.get_month_offset(self.date_info, -i) for i in range(0, 24)]
self.year_ago = CommonUtil.get_month_offset(self.date_info, -11) # 新词数近1年起点(含当月共12个月)
self.p = inflect.engine()
# 初始化全局 df
self.df_mask = self.spark.sql("select 1+1;") # 屏蔽表 mask_type
self.lvl2_map = self.spark.sql("select 1+1;") # 二级分类归属 member_cid→category_id
self.cur_cat = self.spark.sql("select 1+1;") # 当月 已归属分类的搜索词级数据
self.all_cat = self.spark.sql("select 1+1;") # 24个月 已归属分类的搜索词级数据
self.base_map_df = self.spark.sql("select 1+1;") # word→(clean_word, base_form) 归一映射
self.df_rate = self.spark.sql("select 1+1;") # 当月变化率(上升占比用)
self.df_brand = self.spark.sql("select 1+1;") # 当月品牌词库
self.attr_df = self.spark.sql("select 1+1;") # base 词→属性维度(theme_ch)
self.hist_base_df = self.spark.sql("select 1+1;") # 历史 base 集合(是否新增词)
self.hist_first = self.spark.sql("select 1+1;") # 当月全站首次出现月(新词数)
self.df_save = self.spark.sql("select 1+1;")
# ============================================================
# 读数据:屏蔽表 / 分类归属 / 搜索词源 / 各维表 + 驱动端归一映射
# ============================================================
def read_data(self):
# ---- 二级分类归属表:category_full_name 的 id_path 第 2 段为二级分类 ----
# 例:cid=99, id_path='fashion->7147444011->88->99' → 该 cid 归二级分类 7147444011
# distinct:一个 cid 多条路径落到同一二级分类,去重避免词在同一分类被算多次
cfn_sql = f"""
select cast(category_id as char) as member_cid, id_path
from category_full_name
where site = '{self.site_name}' and id_path is not null
"""
conn = DBUtil.get_connection_info("mysql", "us")
cfn = SparkUtil.read_jdbc_query(
session=self.spark, url=conn["url"], pwd=conn["pwd"], username=conn["username"], query=cfn_sql)
comps = F.split(F.col('id_path'), '->')
self.lvl2_map = cfn.withColumn('comps', comps).filter(F.size('comps') >= 2) \
.withColumn('category_id', F.element_at('comps', 2)) \
.select('member_cid', 'category_id').distinct().cache()
print(f"二级分类归属表行数: {self.lvl2_map.count()}")
# ---- 屏蔽表(PG keyword_usr_mask,静态一份)----
self.df_mask = SparkUtil.read_jdbc_query(
session=self.spark,
**{k: DBUtil.get_connection_info("postgresql_cluster", self.site_name)[k] for k in ('url', 'pwd', 'username')},
query=f"select keyword as mask_keyword, mask_type from public.keyword_usr_mask where site_name='{self.site_name}'"
).cache()
# ---- 搜索词源 + 分类归属:当月一份(详情)/ 24月一份(热度)----
self.cur_cat = self._read_with_cat([self.date_info]).persist(StorageLevel.MEMORY_AND_DISK)
self.all_cat = self._read_with_cat(self.months)
# ---- 单复数归一映射:24个月所有 distinct word → (clean_word, base_form),driver 端 inflect ----
words_df = self.all_cat.select(F.explode(F.split(F.lower('search_term'), ' ')).alias('word')) \
.filter("word != ''").distinct()
mappings = []
for w in words_df.toPandas()['word']:
cw = clean_word(w)
if cw is None:
continue
mappings.append((w, cw, to_base_form(cw, self.p)))
self.base_map_df = self.spark.createDataFrame(mappings, schema=['word', 'clean_word', 'base_form']).cache()
print(f"归一映射词数: {len(mappings)}")
# ---- 变化率表(当月,上升占比用)----
self.df_rate = self.spark.sql(f"""
SELECT search_term, rank_change_rate, rank_rate_of_change
FROM dwt_aba_last_change_rate
WHERE site_name='{self.site_name}' AND date_type='month' AND date_info='{self.date_info}'
""").dropDuplicates(['search_term'])
# ---- 品牌词库(当月,精确相等口径)----
self.df_brand = self.spark.sql(f"""
SELECT DISTINCT st_brand_name_lower AS brand_match
FROM dim_st_brand_info
WHERE site_name='{self.site_name}' AND date_type='month' AND date_info='{self.date_info}'
AND black_flag = 0 AND length(st_brand_name_lower) > 1
AND st_brand_name_lower NOT IN ('nan','null','none','n/a')
""")
# ---- 属性词典(PG keywords_match_theme):base 词→维度(theme_ch),多值逗号串 ----
theme_conn = DBUtil.get_connection_info("postgresql_cluster", "us")
theme_rows = SparkUtil.read_jdbc_query(
session=self.spark, url=theme_conn["url"], pwd=theme_conn["pwd"], username=theme_conn["username"],
query="select theme_ch, label_en_lower from public.keywords_match_theme "
"where label_ch is not null and label_en_lower is not null and theme_ch is not null"
).collect()
attr_map = defaultdict(set)
for r in theme_rows:
w = (r['label_en_lower'] or '').strip().lower()
if not w or len(w.split()) > 1: # 空 / 短语跳过(只用单词级词典)
continue
cw = clean_word(w)
if cw is None:
continue
if r['theme_ch']:
# theme_ch 可能是逗号双值(中/英文逗号都可能),如 "主题,风格"/"主题,风格"
# 必须拆成原子维度再入集合:否则落库后是一个复合值,且全角逗号导出 string_to_array(',') 拆不开
base = to_base_form(cw, self.p)
for dim in r['theme_ch'].replace(',', ',').split(','):
dim = dim.strip()
if dim:
attr_map[base].add(dim)
attr_rows = [(b, ','.join(sorted(s))) for b, s in attr_map.items()]
self.attr_df = F.broadcast(self.spark.createDataFrame(attr_rows, schema=['base_form', 'attr_dim_src']))
print(f"属性词典 base 词数: {len(attr_rows)}")
# ---- 历史 base 集合(dim_st_detail_history 全历史,判"是否新增词")----
hist_words = [r['word'] for r in self.spark.sql(f"""
SELECT DISTINCT lower(w.word) AS word
FROM dim_st_detail_history
LATERAL VIEW explode(split(lower(search_term), ' ')) w AS word
WHERE site_name='{self.site_name}' AND date_info_first < '{self.date_info}'
""").filter("word != ''").collect()]
hist_base_set = set()
for w in hist_words:
cw = clean_word(w)
if cw is not None:
hist_base_set.add(to_base_form(cw, self.p))
self.hist_base_df = self.spark.createDataFrame([(b,) for b in hist_base_set], schema=['base_form']) \
.withColumn('hist_seen', F.lit(1))
print(f"历史 base 词数: {len(hist_base_set)}")
# ---- 当月全站首次出现月(新词数近1年口径)----
self.hist_first = self.spark.sql(f"""
SELECT search_term, date_info_first
FROM dim_st_detail_history
WHERE site_name='{self.site_name}'
""")
def _read_with_cat(self, month_list):
"""读多个月份搜索词级数据(is_self_max + rank>0 + join mask_type)并铺开到所属分类。
返回列:search_term, rank, date_info, mask_type, category_id, category_level"""
months_in = "','".join(month_list)
fact = self.spark.sql(f"""
SELECT search_term, rank, date_info,
st_bsr_cate_1_id_new AS slug,
cast(st_bsr_cate_current_id_new as string) AS cid
FROM dwt_aba_st_analytics
WHERE site_name='{self.site_name}' AND date_type='month' AND date_info IN ('{months_in}')
AND rank > 0
AND (is_self_max_num_asin <> 1 OR is_self_max_num_asin IS NULL)
""").join(self.df_mask, F.col('search_term') == self.df_mask['mask_keyword'], 'left').drop('mask_keyword')
common = ['search_term', 'rank', 'date_info', 'mask_type']
# 一级:直接用 slug(不展开)。slug 为空的极少数行无一级分类,丢弃
fact_l1 = fact.filter(F.col('slug').isNotNull()) \
.withColumn('category_id', F.col('slug')).withColumn('category_level', F.lit(1)) \
.select(*common, 'category_id', 'category_level')
# 二级:current_id join 归属表,一个词可命中多条 → 铺成多行
fact_l2 = fact.join(self.lvl2_map, fact['cid'] == self.lvl2_map['member_cid'], 'inner') \
.withColumn('category_level', F.lit(2)) \
.select(*common, 'category_id', 'category_level')
return fact_l1.unionByName(fact_l2)
# ============================================================
# 计算:24月热度宽表 + 两份白名单各跑一遍 + base 级标记 → self.df_save
# ============================================================
def handle_data(self):
# ---- 24个月拆词明细 → base 热度(两份白名单一次条件求和:hd=默认/ha=全部)----
words_all = self.all_cat.select(
'category_id', 'category_level', 'date_info', 'rank', 'mask_type',
F.explode(F.split(F.lower('search_term'), ' ')).alias('word')
).filter("word != ''").join(self.base_map_df, on='word', how='inner') \
.persist(StorageLevel.MEMORY_AND_DISK)
in_default = F.col('mask_type').isNull() | F.col('mask_type').isin(*MASK_TYPE_WHITELIST)
in_all = F.col('mask_type').isNull() | F.col('mask_type').isin(*MASK_TYPE_WHITELIST_ALL)
heat_long = words_all.groupBy(*KEYS, 'date_info').agg(
F.round(F.sum(F.when(in_default, F.lit(100000.0) / F.col('rank'))), 2).alias('hd'),
F.round(F.sum(F.when(in_all, F.lit(100000.0) / F.col('rank'))), 2).alias('ha'),
).persist(StorageLevel.MEMORY_AND_DISK)
# pivot:每月一行 → 每月一列(hd_m{i}/ha_m{i},months[i]=i个月前)
heat_wide = heat_long.groupBy(*KEYS).pivot('date_info', self.months) \
.agg(F.first('hd').alias('hd'), F.first('ha').alias('ha'))
for i, m in enumerate(self.months):
heat_wide = heat_wide.withColumnRenamed(f'{m}_hd', f'hd_m{i}') \
.withColumnRenamed(f'{m}_ha', f'ha_m{i}')
heat_wide = heat_wide.cache()
# ---- 当月拆词明细(详情指标,两份白名单共用)----
cur_words = self.cur_cat.select(
'category_id', 'category_level', 'search_term', 'rank', 'mask_type',
F.explode(F.split(F.lower('search_term'), ' ')).alias('word')
).filter("word != ''").join(self.base_map_df, on='word', how='inner') \
.persist(StorageLevel.MEMORY_AND_DISK)
def heat_trend_df(val_col):
"""从 heat_long 取该口径非空月,拼成 {"YYYY-MM":热度} JSON(仅含有数据月,缺月不写)。
array_sort 按 struct 第一字段 date_info 升序:月份从早到晚,且重跑结果稳定(幂等)。"""
return heat_long.filter(F.col(val_col).isNotNull()).groupBy(*KEYS) \
.agg(F.to_json(F.map_from_entries(F.array_sort(
F.collect_list(F.struct('date_info', val_col))))).alias('heat_trend'))
def build_scope(scope_name, mask_set, heat_prefix, val_col):
"""对单份白名单跑完整"分类×base"统计,返回带 scope 列的 DataFrame。"""
cond = F.col('mask_type').isNull() | F.col('mask_type').isin(*mask_set)
cur_s = cur_words.filter(cond)
# 词频 + 展示词(分类内 rank 最好那条的清洗词)
freq = cur_s.groupBy(*KEYS).agg(
F.count('*').alias('word_freq'),
F.expr('min_by(clean_word, rank)').alias('display_word'))
# 按 (分类, base, search_term) 去重,再算关联数/最佳排名/新词数/示例(左 join 首次出现月)
dedup = cur_s.dropDuplicates(['category_id', 'category_level', 'base_form', 'search_term']) \
.join(self.hist_first, 'search_term', 'left')
relate = dedup.groupBy(*KEYS).agg(F.count('*').alias('relate_st_num'))
others = dedup.groupBy(*KEYS).agg(
F.min('rank').alias('min_rank'),
F.sum(F.when(F.col('date_info_first') >= self.year_ago, 1).otherwise(0)).alias('new_st_num'),
F.expr('min_by(search_term, rank)').alias('top_aba_example'))
# 同比/环比上升占比:该分类·base 去重搜索词里 rank 变化率<0=上升 的占比
# (口径锁定 dwt_aba_last_change_rate:负数=排名上升,固定不变)
ratio = dedup.select(*KEYS, 'search_term').join(self.df_rate, 'search_term', 'left') \
.groupBy(*KEYS).agg(
F.count('*').alias('st_total'),
F.sum(F.when(F.col('rank_change_rate') < 0, 1).otherwise(0)).alias('yoy_up'),
F.sum(F.when(F.col('rank_rate_of_change') < 0, 1).otherwise(0)).alias('mom_up'),
).withColumn('yoy_up_ratio', F.round(F.col('yoy_up') / F.col('st_total'), 4)) \
.withColumn('mom_up_ratio', F.round(F.col('mom_up') / F.col('st_total'), 4)) \
.select(*KEYS, 'yoy_up_ratio', 'mom_up_ratio')
# 24个月热度(取本口径 prefix 列)+ 趋势 JSON
heat_sel = heat_wide.select(
*KEYS, *[F.col(f'{heat_prefix}_m{i}').alias(f'heat_m{i}') for i in range(24)])
trend = heat_trend_df(val_col)
full = freq.join(relate, KEYS, 'left').join(others, KEYS, 'left') \
.join(ratio, KEYS, 'left').join(heat_sel, KEYS, 'left').join(trend, KEYS, 'left')
# 当月=heat_m0;去年=m12(无→-1);上月=m1(无→-1);热度同比/环比(对比期无→新词+1000,正数=上升)
cz = lambda n: F.coalesce(F.col(n), F.lit(0.0))
full = full \
.withColumn('word_heat', F.col('heat_m0')) \
.withColumn('word_heat_last_year', F.coalesce(F.col('heat_m12'), F.lit(-1.0))) \
.withColumn('word_heat_last_month', F.coalesce(F.col('heat_m1'), F.lit(-1.0))) \
.withColumn('word_heat_change_rate', F.when(F.col('heat_m12').isNull(), F.lit(1000.0)).otherwise(
F.round((F.col('heat_m0') - F.col('heat_m12')) / F.col('heat_m12'), 4))) \
.withColumn('word_heat_rate_of_change', F.when(F.col('heat_m1').isNull(), F.lit(1000.0)).otherwise(
F.round((F.col('heat_m0') - F.col('heat_m1')) / F.col('heat_m1'), 4))) \
.withColumn('word_heat_change_last_1_month', F.round(cz('heat_m0') - cz('heat_m1'), 2)) \
.withColumn('word_heat_change_1_month_ago', F.round(cz('heat_m1') - cz('heat_m2'), 2)) \
.withColumn('word_heat_change_2_month_ago', F.round(cz('heat_m2') - cz('heat_m3'), 2)) \
.withColumn('word_heat_change_3_month_ago', F.round(cz('heat_m3') - cz('heat_m4'), 2)) \
.withColumn('word_heat_change_4_month_ago', F.round(cz('heat_m4') - cz('heat_m5'), 2)) \
.withColumn('word_heat_change_5_month_ago', F.round(cz('heat_m5') - cz('heat_m6'), 2)) \
.withColumn('scope', F.lit(scope_name)) \
.filter(F.col('word_freq') >= 2) # 长尾噪音过滤(与 verify 一致)
return full
union = build_scope('default', MASK_TYPE_WHITELIST, 'hd', 'hd') \
.unionByName(build_scope('all', MASK_TYPE_WHITELIST_ALL, 'ha', 'ha'))
# ---- base 级标记(两 scope 共用,union 后一次性 join)----
# 品牌词:用展示原词撞品牌库(避免归一后 crocs→croc 漏判)
union = union.join(self.df_brand, union['display_word'] == self.df_brand['brand_match'], 'left') \
.withColumn('brand_word_flag', F.when(F.col('brand_match').isNotNull(), F.lit(1)).otherwise(F.lit(0))) \
.drop('brand_match')
# 是否新增词:base 不在历史集合 = 全历史首次出现
union = union.join(self.hist_base_df, 'base_form', 'left') \
.withColumn('new_word_flag', F.when(F.col('hist_seen').isNull(), F.lit(1)).otherwise(F.lit(0))) \
.drop('hist_seen')
# 属性维度:无匹配填 '-1'(导出转 {-1})
union = union.join(self.attr_df, 'base_form', 'left') \
.withColumn('attr_dim', F.coalesce(F.col('attr_dim_src'), F.lit('-1'))).drop('attr_dim_src')
# ---- 最终列(对齐 Hive DDL;时间格式与兄弟表一致)----
now = F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')
self.df_save = union.select(
F.col('scope'),
F.col('category_id'),
F.col('category_level').cast('int').alias('category_level'),
F.col('base_form').alias('base_word'),
F.col('display_word'),
F.col('word_freq').cast('int').alias('word_freq'),
F.col('word_heat'),
F.col('relate_st_num').cast('int').alias('relate_st_num'),
F.col('min_rank').cast('int').alias('min_rank'),
F.col('new_st_num').cast('int').alias('new_st_num'),
F.col('word_heat_last_year'),
F.col('word_heat_change_rate'),
F.col('word_heat_last_month'),
F.col('word_heat_rate_of_change'),
F.col('yoy_up_ratio'),
F.col('mom_up_ratio'),
F.col('word_heat_change_last_1_month'),
F.col('word_heat_change_1_month_ago'),
F.col('word_heat_change_2_month_ago'),
F.col('word_heat_change_3_month_ago'),
F.col('word_heat_change_4_month_ago'),
F.col('word_heat_change_5_month_ago'),
F.col('top_aba_example'),
F.col('brand_word_flag'),
F.col('new_word_flag'),
F.col('attr_dim'),
F.lit('').alias('word_cn'),
F.col('heat_trend'),
now.alias('created_time'),
now.alias('updated_time'),
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info'),
)
# ============================================================
# 存数据:先抽样打印看数据(测试用,后续不需要可整段删),再落 Hive。
# 落表前先 cache + count 触发计算,下面打印和 saveAsTable 都复用这份 cache,不重算整条链路。
# 注意:表需已按 DDL 建好(dws_aba_word_freq_cate)。
# ============================================================
def save_data(self):
df = self.df_save.cache()
# ---- 测试抽样打印(后续不需要可把这一段删掉)----
total = df.count()
print(f"========== 总行数: {total} ==========")
print("---- 按 scope × 层级 行数 ----")
df.groupBy('scope', 'category_level').count().orderBy('scope', 'category_level').show()
print("---- 抽样 30 行(默认类型)----")
df.filter("scope='default'").show(30, truncate=False)
print("---- 占位/空值体检(word_heat 不应为 null)----")
df.selectExpr(
"sum(case when word_heat is null then 1 else 0 end) as word_heat_null",
"sum(case when attr_dim='-1' then 1 else 0 end) as attr_dim_empty",
"sum(case when word_heat_change_rate=1000 then 1 else 0 end) as yoy_newword",
"sum(brand_word_flag) as brand_cnt",
"sum(new_word_flag) as newword_cnt",
).show()
# ---- 落表:清 HDFS 分区 + saveAsTable append(复用上面已 cache 的 df,避免重算)----
hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict={
'site_name': self.site_name, 'date_type': self.date_type, 'date_info': self.date_info})
print(f"清除 HDFS 目录: {hdfs_path}")
HdfsUtils.delete_file_in_folder(hdfs_path)
print(f"当前存储的表名为:{self.hive_tb}, 分区为 {self.partitions_by}")
df.repartition(self.partitions_num).write.saveAsTable(
name=self.hive_tb, format='hive', mode='append', partitionBy=self.partitions_by)
print("success")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "site_name 不能为空(us/uk/de/...)"
assert date_type == 'month', "本作业仅支持 month"
assert date_info is not None, "date_info 不能为空(如 2026-04)"
obj = DwsAbaWordFreqCate(site_name=site_name, date_type=date_type, date_info=date_info)
obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes
from utils.db_util import DBUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
# 最后一个参数为 test 标志
test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
print(f"执行参数为{sys.argv}")
assert date_type == DateTypes.month.name, "本脚本仅支持 month 类型导出"
if test_flag == 'test':
db_type = 'postgresql_test'
print("导出到测试库中")
else:
# 工时校验(与新 ABA 流程其它导出脚本一致):非工作时段或负责人不在班则跳过
CommonUtil.judge_is_work_hours(
site_name=site_name, date_type=date_type, date_info=date_info,
principal='hejiangming',
priority=2,
export_tools_type=1,
belonging_to_process=f'分类词频_{date_type}'
)
db_type = 'postgresql_cluster'
print("导出到PG集群中")
# 1) 校验 Hive 分区有数据,避免空分区导出后 PG 数据被清空
hive_partition_path = (
f"/home/big_data_selection/dws/dws_aba_word_freq_cate/"
f"site_name={site_name}/date_type={date_type}/date_info={date_info}"
)
hive_files = HdfsUtils.read_list(hive_partition_path)
if not hive_files:
print(f"[ERROR] Hive 分区无数据文件,路径:{hive_partition_path},跳过导出!")
sys.exit(1)
print(f"Hive 分区文件数:{len(hive_files)},路径:{hive_partition_path},继续导出")
engine = DBUtil.get_db_engine(db_type, site_name)
# 2) 表名拼装
# master 表(DBA 按 DDL 建好,PARTITION BY RANGE(date_info)):us_aba_word_freq_cate_2026
# 子分区表:us_aba_word_freq_cate_2026_04
# copy 表:us_aba_word_freq_cate_2026_04_copy
suffix = str(date_info).replace("-", "_")
year_str = CommonUtil.safeIndex(date_info.split("-"), 0, None)
next_val = CommonUtil.get_next_val(date_type, date_info)
export_base_tb = f"{site_name}_aba_word_freq_cate"
export_master_tb = f"{export_base_tb}_{year_str}"
export_table = f"{export_base_tb}_{suffix}"
export_tb_copy = f"{export_table}_copy"
# 3) 在 master 表上建当月子分区(首次跑当月才真正创建,重跑幂等)
sql_create_partition = f"""
create table if not exists {export_table} partition of {export_master_tb}
for values from ('{date_info}') to ('{next_val}');
"""
DBUtil.engine_exec_sql(engine, sql_create_partition)
# 4) 创建 copy 表(继承子分区结构 like ... including all),并清空
# copy 表是独立普通表,Sqoop 先写到这里,最后通过分区交换替换正式子分区,避免空窗期
sql_copy = f"""
create table if not exists {export_tb_copy}
(
like {export_table} including all
);
truncate table {export_tb_copy};
"""
DBUtil.engine_exec_sql(engine, sql_copy)
# 5) ALTER copy 表的 attr_dim 列类型 VARCHAR[] → VARCHAR(500)
# 原因:Sqoop 不支持向 PG 数组类型写数据,先临时改普通 VARCHAR,
# 让 Sqoop 把 Hive 端 "材质,颜色" 逗号串原样写进来;交换前再 ALTER 回 VARCHAR[]
sql_alter_to_varchar = f"""
ALTER TABLE {export_tb_copy} ALTER COLUMN attr_dim TYPE VARCHAR(500);
"""
DBUtil.engine_exec_sql(engine, sql_alter_to_varchar)
# 6) 拼装 Sqoop 导出脚本(字段顺序与 Hive dws_aba_word_freq_cate schema 一致;
# site_name/date_type 编进表名不导出列,date_info 作为普通列导出)
export_cols = [
"scope",
"category_id",
"category_level",
"base_word",
"display_word",
"word_freq",
"word_heat",
"relate_st_num",
"min_rank",
"new_st_num",
"word_heat_last_year",
"word_heat_change_rate",
"word_heat_last_month",
"word_heat_rate_of_change",
"yoy_up_ratio",
"mom_up_ratio",
"word_heat_change_last_1_month",
"word_heat_change_1_month_ago",
"word_heat_change_2_month_ago",
"word_heat_change_3_month_ago",
"word_heat_change_4_month_ago",
"word_heat_change_5_month_ago",
"top_aba_example",
"brand_word_flag",
"new_word_flag",
"attr_dim",
"word_cn",
"heat_trend",
"created_time",
"updated_time",
"date_info",
]
sh = CommonUtil.build_export_sh(
site_name=site_name,
db_type=db_type,
hive_tb="dws_aba_word_freq_cate",
export_tb=export_tb_copy,
col=export_cols,
partition_dict={
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
client.close()
# 7) Sqoop 写完后,ALTER copy 表的 attr_dim 回 VARCHAR[]
# USING string_to_array(...) 把逗号串 "材质,颜色" 拆成数组 {材质,颜色}
# 词典无匹配的词 PySpark 已填 "-1",转换后是 {-1},与 Java 占位约定一致
sql_alter_back = f"""
ALTER TABLE {export_tb_copy}
ALTER COLUMN attr_dim TYPE VARCHAR[]
USING string_to_array(attr_dim, ',')::varchar[];
"""
DBUtil.engine_exec_sql(engine, sql_alter_back)
# 8) 分区交换:copy 表替换正式子分区,无空窗期
DBUtil.exchange_pg_part_tb(
engine,
source_tb_name=export_tb_copy,
part_master_tb=export_master_tb,
part_target_tb=export_table,
cp_index_flag=False,
part_val={"from": [date_info], "to": [next_val]}
)
# 9) 删除 copy 表(交换后 copy 表里是旧数据,留着没意义)
DBUtil.engine_exec_sql(engine, f"drop table if exists {export_tb_copy};")
print(f"==================表 {export_table} 导出完成==================================")
print("success")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment