Commit 4db15b90 by hejiangming

词频热度新增父级ID字段

parent a858f39d
......@@ -478,6 +478,7 @@ class DwsAbaWordFreqCate(Templates):
# 初始化全局 df
self.df_mask = self.spark.sql("select 1+1;") # 屏蔽表 mask_type
self.lvl2_map = self.spark.sql("select 1+1;") # 二级分类归属 member_cid→category_id
self.l2_parent = self.spark.sql("select 1+1;") # 二级 id→父级(一级) 映射(多父级则多行)
self.cur_cat = self.spark.sql("select 1+1;") # 当月 已归属分类的搜索词级数据
self.all_cat = self.spark.sql("select 1+1;") # 24个月 已归属分类的搜索词级数据
self.base_map_df = self.spark.sql("select 1+1;") # word→(clean_word, base_form) 归一映射
......@@ -503,12 +504,23 @@ class DwsAbaWordFreqCate(Templates):
conn = DBUtil.get_connection_info("mysql", "us")
cfn = SparkUtil.read_jdbc_query(
session=self.spark, url=conn["url"], pwd=conn["pwd"], username=conn["username"], query=cfn_sql)
# comps[2]=二级 id,comps[1]=一级父级(id_path 第1段)。先拆好缓存,下面两处复用。
comps = F.split(F.col('id_path'), '->')
self.lvl2_map = cfn.withColumn('comps', comps).filter(F.size('comps') >= 2) \
.withColumn('category_id', F.element_at('comps', 2)) \
.select('member_cid', 'category_id').distinct().cache()
cfn_comps = cfn.withColumn('comps', comps).filter(F.size('comps') >= 2) \
.withColumn('category_id', F.element_at('comps', 2)).cache()
# 二级归属表(保持原样:member_cid→二级 id,喂 _read_with_cat 做二级归类,逻辑/计数一字不动)
self.lvl2_map = cfn_comps.select('member_cid', 'category_id').distinct().cache()
print(f"二级分类归属表行数: {self.lvl2_map.count()}")
# 二级 → 父级(一级) 映射:单独一张,只给最后输出 parent_category_id 用(绝不碰上面的二级归类)。
# 保留所有父级(含数字残缺路径,按"有父级就保留"口径,不过滤);
# distinct 后,真跨多部门的二级(如 172574→electronics+office-products)天然铺成多行。
self.l2_parent = cfn_comps \
.withColumn('parent_category_id', F.element_at('comps', 1)) \
.select('category_id', 'parent_category_id').distinct().cache()
print(f"二级→父级映射行数: {self.l2_parent.count()}")
# ---- 屏蔽表(PG keyword_usr_mask,静态一份)----
self.df_mask = SparkUtil.read_jdbc_query(
session=self.spark,
......@@ -749,6 +761,19 @@ class DwsAbaWordFreqCate(Templates):
union = union.join(self.attr_df, 'base_form', 'left') \
.withColumn('attr_dim', F.coalesce(F.col('attr_dim_src'), F.lit(''))).drop('attr_dim_src')
# ---- parent_category_id:一级=自己、二级=所属一级(多父级则铺多行)----
# 末尾才贴父级:parent 不进前面的 groupBy(*KEYS),统计在此之前已全部算完,
# 这里 join 只是给每行"贴上"父级、按需复制整行,绝不重算 → 词频/热度一个数都不变。
# 二级多父级 → 这个 join 一对多,自动把该行铺成多行(每个父级一行、统计值原样复制),
# 正好实现"每个一级各存一份这个二级的数据"。
# 一级 category_id 是字母 slug、self.l2_parent 的 key 是数字二级 id,撞不上,
# 故一级行 join 不到、保持单行,再用 when 置成它自己。
# coalesce 兜底:极少数二级在映射里查不到父级时置成自己,不留 NULL(对齐 Java 不吃 NULL)。
union = union.join(F.broadcast(self.l2_parent), 'category_id', 'left') \
.withColumn('parent_category_id',
F.when(F.col('category_level') == 1, F.col('category_id')) # 一级=自己,单行
.otherwise(F.coalesce(F.col('parent_category_id'), F.col('category_id'))))
# ---- 最终列(对齐 Hive DDL;时间格式与兄弟表一致)----
now = F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')
self.df_save = union.select(
......@@ -783,34 +808,20 @@ class DwsAbaWordFreqCate(Templates):
F.col('heat_trend'),
now.alias('created_time'),
now.alias('updated_time'),
F.col('parent_category_id'), # 一级家族归并键:一级=自己,二级=所属一级slug
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info'),
)
# ============================================================
# 存数据:先抽样打印看数据(测试用,后续不需要可整段删),再落 Hive。
# 落表前先 cache + count 触发计算,下面打印和 saveAsTable 都复用这份 cache,不重算整条链路。
# 注意:表需已按 DDL 建好(dws_aba_word_freq_cate)。
# 存数据:cache + count 触发计算后落 Hive(saveAsTable 复用 cache,不重算整条链路)。
# 注意:表需已按 DDL 建好(dws_aba_word_freq_cate,含 parent_category_id 列)。
# ============================================================
def save_data(self):
df = self.df_save.cache()
# ---- 测试抽样打印(后续不需要可把这一段删掉)----
total = df.count()
total = df.count() # 触发计算 + 落表前 cache,下面 saveAsTable 直接复用
print(f"========== 总行数: {total} ==========")
print("---- 按 scope × 层级 行数 ----")
df.groupBy('scope', 'category_level').count().orderBy('scope', 'category_level').show()
print("---- 抽样 30 行(默认类型)----")
df.filter("scope='default'").show(30, truncate=False)
print("---- 占位/空值体检(word_heat 不应为 null)----")
df.selectExpr(
"sum(case when word_heat is null then 1 else 0 end) as word_heat_null",
"sum(case when attr_dim='' then 1 else 0 end) as attr_dim_empty",
"sum(case when word_heat_change_rate=1000 then 1 else 0 end) as yoy_newword",
"sum(brand_word_flag) as brand_cnt",
"sum(new_word_flag) as newword_cnt",
).show()
# ---- 落表:清 HDFS 分区 + saveAsTable append(复用上面已 cache 的 df,避免重算)----
hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict={
......
......@@ -125,6 +125,7 @@ if __name__ == '__main__':
"heat_trend",
"created_time",
"updated_time",
"parent_category_id",
"date_info",
]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment