Commit 971fa3ce by hejiangming

no message

parent 4bbac15a
......@@ -2,6 +2,8 @@ import os
import sys
import re
import inflect
sys.path.append(os.path.dirname(sys.path[0]))
from pyspark.sql.types import StringType, MapType, IntegerType, ArrayType
from utils.common_util import CommonUtil
......@@ -11,6 +13,8 @@ from utils.db_util import DBUtil
from pyspark.sql import functions as F
from yswg_utils.common_udf import udf_handle_string_null_value
from utils.templates import Templates
# 单复数归一 / 白名单 / 不规则复数表统一从公共模块引入(原先各文件各写一份,已抽到 utils.word_normalize)
from utils.word_normalize import to_base_form, IRREGULAR_PLURAL, SEMANTIC_DISTINCT_WORDS
class DwsStTheme(Templates):
......@@ -62,13 +66,19 @@ class DwsStTheme(Templates):
return F.udf(udf_ele_mattch, StringType())
# 通过主题正则关系表拼接正则规范--汪瑞
# 通过主题正则关系表拼接正则规范
@staticmethod
def create_regexp_rules():
theme_rules_regexp_dict = {}
engine = DBUtil.get_db_engine('mysql', 'us')
# GROUP_CONCAT 按长度降序:让长标签在 | 交替组里排在它的"词前缀"短标签前面
# type 0 正则形如 \b(plus|plus size)\b,交替组后面没有必匹配内容(没有回溯保护),
# 短标签 plus 先匹配成功后引擎不会回溯,plus size 永远没机会 → 标签错打成 plus
# (与 handle_st_theme 词典大正则的排序修复是同一个病;实测规则表有 plus/plus size、
# heavy/heavy duty、full/full length 三对词前缀冲突,都在 尺寸 type 0)
# type 1-6 交替组后面有必匹配的数字等内容,短标签选错会回溯重试长标签,加排序无影响也无害
rules_sql = f"""
select theme_ch,regular_expression_type, GROUP_CONCAT(label_en_lower) as key_list from aba_match_theme_rules group by theme_ch, regular_expression_type;
select theme_ch,regular_expression_type, GROUP_CONCAT(label_en_lower ORDER BY LENGTH(label_en_lower) DESC) as key_list from aba_match_theme_rules group by theme_ch, regular_expression_type;
"""
with engine.connect() as connect:
rules_result = connect.execute(rules_sql)
......@@ -162,7 +172,12 @@ class DwsStTheme(Templates):
pattern_flag = bool(re.match(num_pattern, pattern_word))
count = 0
if not pattern_flag:
count = sum(1 for word in pattern_list if re.search(r'\b{}\b'.format(re.escape(pattern_word)), word))
# set() 去重后再数:本意是"被另一个更长的标签整词包含才删"(AB词去重,如 make 被 make up 包含)。
# 但同一个匹配词可能经"关键词词典(pattern_type=0)"+"数字规则表(pattern_type=1)"两条路各命中一次,
# 在 pattern_list 里出现两次(如 weighted 既在 keywords_match_theme=功能、又在 aba_match_theme_rules=尺寸)。
# 老逻辑按出现次数 count,"自己撞自己"也会 count=2 被误判成"已被包含"→整条搜索词标签被删光。
# 去重后只有"存在另一个不同的、更长的标签包含本词"时才 count>1,真AB词去重不受影响。
count = sum(1 for word in set(pattern_list) if re.search(r'\b{}\b'.format(re.escape(pattern_word)), word))
# 如果匹配到的pattern_word大于1则说明有已经匹配过的单词
return 0 if count > 1 else 1
......@@ -200,23 +215,124 @@ class DwsStTheme(Templates):
self.df_st_key = self.spark.sql(sql3).repartition(40, 'search_term').cache()
# 获取主题词
# 词典源从 MySQL selection.aba_match_theme 改为 PG 集群 public.keywords_match_theme
# 改动原因:业务方词典统一迁移到 PG 集群维护,跟现有 PG 数据架构对齐
sql4 = f"""
select
select
theme_en,
theme_ch,
label_ch,
label_en_lower
from selection.aba_match_theme
label_en_lower
from public.keywords_match_theme
where label_ch is not null
"""
print("sql:", sql4)
conn_info = DBUtil.get_connection_info("mysql", "us")
conn_info = DBUtil.get_connection_info("postgresql_cluster", "us")
self.df_theme = SparkUtil.read_jdbc_query(
session=self.spark,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=sql4
)
# 词典单复数双向扩展(替代业务方手工录复数列)
# 业务问题:业务方录入只覆盖一种形式(girl 或 girls),搜索词里的另一种形式正则匹配不上
# 解决方案:driver 端用 inflect 双向扩展原词 + base + plural 三种形式
# 性能:1w 多行词典在 driver 单机跑只需几秒
self.df_theme = self._expand_theme_with_plural(self.df_theme)
def _expand_theme_with_plural(self, df_theme):
"""
词典单复数双向扩展:业务方录"girl"自动补出"girls"(反之亦然),让搜索词两种形式都能匹配上。
单词扩展 [原词, base, p lural] 三种形式;短语保留原词不扩展(避免错拆)。
"""
# ============================================================
# Step 1:toPandas 拉到 driver(词典约 1.1w 行,单机可承载)
# 为什么在 driver 跑:inflect/WordNet 没有 spark UDF 版本,
# 每行词典要单独跑归一+加复数,spark UDF 序列化开销反而比 driver 单线程慢
# 性能:1.1w 行约 3-5 秒,可接受
# ============================================================
pdf = df_theme.toPandas()
p = inflect.engine()
expanded = [] # 最终收集的 (variant, theme_en, theme_ch, label_ch) 行
seen = set() # 去重 (variant, theme_en) 组合,避免重复入表
phrase_count = 0 # 统计短语数量(仅用于跑批日志,方便业务抽查)
# ============================================================
# Step 2:遍历词典每行,按"短语 vs 单词"两种情况扩展
# ============================================================
for row in pdf.itertuples(index=False):
word = row.label_en_lower
if not word:
continue
# --------------------------------------------------------
# Step 2a:短语跳过扩展(关键 bug 防护)
# 为什么:如果对短语按单词扩展,会错误拆出短语第一个词单独打标
# 反例:词典 'dusty rose' → 颜色
# 如果扩展会拆出 base='dusty'(仅第一个词)加进词典
# → 任何含 'dusty' 的搜索词(如 "dusty old furniture")都被错标"颜色"
# 正确做法:短语保留原词不扩展,让正则匹配整段短语
# --------------------------------------------------------
if word in SEMANTIC_DISTINCT_WORDS:
# 白名单词只保留原词,不扩展单复数:
# 否则词典录了 short(长度) 会扩展出 shorts,导致"短裤"搜索词被误打"长度"主题(跨语义污染)
variants = {word}
elif len(word.split()) > 1:
phrase_count += 1
variants = {word}
else:
# --------------------------------------------------------
# Step 2b:单词扩展 [原词, 单数 base, 复数 plural] 三种形式
# 为什么三种都加:业务方录哪种形式都行,三种 variant 都能匹配上搜索词
# 例:业务方录 'girl' → base='girl', plural='girls' → 扩展 [girl, girls]
# 业务方录 'girls' → base='girl'(还原), plural='girls' → 同样扩展 [girl, girls]
# 不规则复数:业务方录 'goose' → [goose, geese](inflect 内置支持)
# --------------------------------------------------------
base = to_base_form(word, p)
try:
plural = p.plural(base) or base
except Exception:
plural = base
variants = {word, base, plural} # set 自动去重(word==base==plural 时只保留 1 个)
# 补不规则名词的"省撇号所有格"写法(womens/womans/mens/mans):
# :woman 的复数是 women(不规则),inflect 产不出 womens;但搜索词里 womens/mens很多 dws 搜索词侧是【字面正则匹配、不做归一】,导致匹配不到数据
# 在这条路用不上,只能反过来把这些写法补进词典变体,让搜索词的 womens 能字面撞上 women 的标签。
# 例:base=woman → 补 {womens, womans};base=man → 补 {mens, mans}
# 只影响 base 落在 woman/man/tooth 的词,其它词不变,blast radius 极小
variants |= {k for k, v in IRREGULAR_PLURAL.items() if v == base}
# --------------------------------------------------------
# Step 2c:把所有 variant 加入 expanded 列表(按 (variant, theme_en) 去重)
# 为什么按 (variant, theme_en) 去重而不是只按 variant:
# 同一个 word 在词典里可能对应多个 theme_en
# 例:'rose' 既是 颜色(color) 又是 元素(element)
# 按 (variant, theme_en) 去重既能保留两个 theme,
# 又避免业务方同时录入 girl/girls 两行造成 (girl, crowd) 重复入表
# --------------------------------------------------------
for variant in variants:
if not variant:
continue
key = (variant, row.theme_en)
if key in seen:
continue
seen.add(key)
expanded.append((variant, row.theme_en, row.theme_ch, row.label_ch))
print(f"单复数扩展:词典原始 {len(pdf)} 行 → 扩展后 {len(expanded)} 行(其中短语 {phrase_count} 个未扩展)")
# ============================================================
# Step 3:list + 显式 schema 创建 DataFrame,避开 pandas iteritems bug
# 为什么不走 createDataFrame(pdf):pandas 2.x 移除了 DataFrame.iteritems(),
# PySpark 3.1.x 内部还在调用,会报 "'DataFrame' object has no attribute 'iteritems'"
# 走 list + schema 方式与 pandas 版本完全解耦
# cache 原因:下游 handle_st_theme 多次用到这个扩展后的词典 df
# ============================================================
return self.spark.createDataFrame(
expanded, schema=['label_en_lower', 'theme_en', 'theme_ch', 'label_ch']
).cache()
def handle_data(self):
......@@ -232,6 +348,17 @@ class DwsStTheme(Templates):
self.df_st_base = self.df_st_asin_info.unionByName(
self.df_st_detail
).drop_duplicates(['search_term'])
# 业务规则:单个单词的搜索词(如 gloves / wool)不打任何属性标签(关键词路 + 数字规则路都不打)。
# 在这里(两条匹配路之前)过滤掉单词搜索词,让它们根本不参与匹配 → dws_st_theme 不产生其主题行。
# 词数判定要稳,避免把"一个词"误算成两个:
# trim 去首尾空白 + 按连续空白 \s+ 拆 → "gloves"/"gloves "/"gloves "(带尾/多空格) 都算 1 个词;
# 只按空白拆、不按符号拆 → "men's"/"tree-skirt"(带符号无空格) 仍算 1 个词。
# 词数 > 1 才保留(如 "wool gloves")。
self.df_st_base = self.df_st_base.filter(
F.size(F.split(F.trim(F.col('search_term')), r'\s+')) > 1
)
self.df_st_base = self.df_st_base.join(
self.df_st_key, on='search_term', how='inner'
).cache()
......@@ -239,7 +366,13 @@ class DwsStTheme(Templates):
# 给每个搜索词打上主题标签
def handle_st_theme(self):
pdf_theme = self.df_theme.toPandas()
theme_list = list(set(pdf_theme.label_en_lower))
# 按长度降序排,让长短语优先于它的子词被匹配
# 为什么:Python 正则 | 是"先到先得"而非最长匹配,且 set 顺序随机;
# 如果 'mothers' 排在 'mothers day' 前面,findall 在位置 0 先吃掉 'mothers'(不重叠),
# 'mothers day'(节日)就永远匹配不上 → 节日标签随机丢失
# 例:'mothers day gifts' 修复前可能 → [mothers, gifts](丢节日)
# 长度降序后固定 → [mothers day, gifts] ✓(子词让位短语,与 AB 词去重设计意图一致)
theme_list = sorted(set(pdf_theme.label_en_lower), key=len, reverse=True)
pattern = re.compile(r'(?<!\+|\*|\-|\%|\.|\')\b({})\b'.format('|'.join([re.escape(x) for x in theme_list])),
flags=re.IGNORECASE)
self.df_st_theme = self.df_st_base.withColumn(
......@@ -296,9 +429,16 @@ class DwsStTheme(Templates):
# 通过正则匹配结果拿回中文单位信息
def match_search_term_ch_unit(self):
# 过滤掉 num_info 和 unit_info 都为空的行(没有任何匹配信息的脏行)
# 为什么这么写:每个字段的"空"要同时覆盖 null 和空串 ""(用 isNull() | == ""),
# 再整体取反 ~(A空 and B空) → 只要有一个字段非空就保留。
# 老写法的 bug:用 isNull() & (== ""),但一个字段不可能既是 null 又等于 "",
# 该条件恒为 False → ~False 恒为 True → 整个 filter 形同虚设、一行都没过滤掉。
df_st_mate = self.df_st_topic_base.filter(
~(F.col("num_info").isNull() & (F.col("num_info") == "") &
F.col("unit_info").isNull() & (F.col("unit_info") == ""))
~(
(F.col("num_info").isNull() | (F.col("num_info") == "")) &
(F.col("unit_info").isNull() | (F.col("unit_info") == ""))
)
)
df_st_no_num = df_st_mate.filter(F.col("num_info") == '')
df_st_with_num = df_st_mate.exceptAll(df_st_no_num)
......
......@@ -13,6 +13,7 @@ from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, MapType, IntegerType, StructType, StructField, DoubleType
from utils.common_util import CommonUtil, DateTypes
from utils.word_normalize import to_base_form, SEMANTIC_DISTINCT_WORDS # 归一/白名单统一公共模块
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
......@@ -21,6 +22,11 @@ from yswg_utils.common_udf import udf_ele_mattch
import numpy as np
# 模块级 inflect 引擎单例:driver 端 st_word_filter_condition / _build_word_to_base_mapping 调 to_base_form 时传入
# (SEMANTIC_DISTINCT_WORDS / IRREGULAR_PLURAL / to_base_form 已统一抽到 utils.word_normalize)
_INFLECT_ENGINE = inflect.engine()
class DwtStThemeAgg(object):
def __init__(self, site_name, date_type, date_info):
self.site_name = site_name
......@@ -36,6 +42,12 @@ class DwtStThemeAgg(object):
app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
# 把 word_normalize.py 分发到各 executor:udf_word_restoration 在 executor 端要 import 它,
# 但 executor 的 sys.path 没有 driver 顶部 sys.path.append 的 py_demo 目录,直接 import utils.word_normalize 会 ModuleNotFoundError。
# addPyFile 运行时把该文件临时发到每个 executor 并加进其 sys.path(用完即焚),UDF 即可 import 顶层 word_normalize。
import utils.word_normalize as _wn_mod
self.spark.sparkContext.addPyFile(_wn_mod.__file__)
# 注册自定义函数 (UDF)
self.u_theme_pattern = F.udf(udf_ele_mattch, StringType())
self.u_theme_contain_judge = F.udf(self.udf_theme_contain_judge, IntegerType())
......@@ -85,8 +97,23 @@ class DwtStThemeAgg(object):
words = st_word.split(" ")
word_list = []
for word in words:
# 跳过空串:search_term 带双空格/首尾空格时 split(" ") 会拆出 '',
# 新版 inflect 用 typeguard 校验入参(Word 要求长度≥1),p.plural('') 直接抛
# TypeCheckError: argument "text" (str) is not an instance of inflect.Word
# 上游 rlike 过滤是对"去空格后"的串校验的('gift box'→'giftbox' 合法通过),挡不住双空格
if not word:
continue
word_list.append(word)
word_list.append(p.plural(word))
# 白名单词(short/shorts、person/people 等单复数语义不同)不加复数对偶。
# inflect.plural() 是"双向 toggle"——plural('short')='shorts',
# 而 plural('shorts') 又变回 'short',一来一回闭环。
# 于是 'short' 和 'shorts' 凑出的集合都是 {short, shorts} → 签名相同
# → 在下面 Window.partitionBy('similar_word_list') 去重时被判为重复、删掉一个,
# 把"长度(short)"和"短裤(shorts)"两个不同语义的模板词合并掉。
# 必须显式判白名单,不能赌 inflect 对某些词(person→people→peoples)碰巧不闭环。
# 与 udf_word_restoration / st_word_filter_condition 三处保持同一套白名单逻辑。
if word.lower() not in SEMANTIC_DISTINCT_WORDS:
word_list.append(p.plural(word))
word_list.sort()
return ','.join(set(word_list))
......@@ -94,11 +121,36 @@ class DwtStThemeAgg(object):
@staticmethod
def udf_word_restoration():
# ============================================================
# dedup 第二轮 UDF:对 search_term 拆词后逐词归一,拼成签名用于"词形相同"去重。
# 归一统一调 utils.word_normalize.to_base_form(与匹配侧 _build_word_to_base_mapping 同一套口径)。
# 为什么"函数内 import + function attribute 单例":
# UDF 在 executor 端逐行跑,函数内 import 让每个 executor 进程构造一次依赖(单例缓存),
# 且 to_base_form 在独立模块里、cloudpickle 按"模块名+函数名"引用而非按值 pickle,
# 避开了原先内联想绕的那个 `args[0] from __newobj__` PickleError。
# 注:to_base_form 内部已带白名单/不规则表,不再需要本地内联那几张表。
# ============================================================
def udf_restoration_words(st_word):
# executor 端首次调用时初始化(function attribute 当 per-process 单例缓存)
if not getattr(udf_restoration_words, '_inited', False):
import inflect as _inflect
from textblob import TextBlob as _TB
# 顶层 word_normalize(由 __init__ 的 addPyFile 发到 executor),不是 utils.word_normalize:
# executor sys.path 无 py_demo 目录,addPyFile 把单文件按顶层模块名分发
from word_normalize import to_base_form as _to_base_form
udf_restoration_words._engine = _inflect.engine()
udf_restoration_words._TextBlob = _TB
udf_restoration_words._to_base_form = _to_base_form
udf_restoration_words._inited = True
p = udf_restoration_words._engine
TextBlob = udf_restoration_words._TextBlob
to_base = udf_restoration_words._to_base_form
word_list = []
blob = TextBlob(st_word)
for word in blob.words:
word_list.append(word.lemmatize())
word_list.append(to_base(str(word), p))
word_list.sort()
return ','.join(set(word_list))
......@@ -106,10 +158,18 @@ class DwtStThemeAgg(object):
@staticmethod
def udf_judge_twin_words(st_word):
words = st_word.split(" ")
# 剔除空串后再拆词:search_term 带多余空格时 split(" ") 会拆出 '',混进 set 让叠词判断失效
# 修复前:'gun gun '.split(" ") → ['gun','gun',''] → set={'gun',''} 2个元素 ≠ 1 → 叠词漏网 ✗
# 修复后:剔除空串 → ['gun','gun'] → set={'gun'} 1个元素 → 正常识别为叠词过滤掉 ✓
# 不带多余空格的正常词('gun gun'/'red dress')拆词结果与修复前完全一致,行为不变
words = [w for w in st_word.split(" ") if w]
judge_flag = 0
# 先判断是否完全一致的同属性叠词,如gun gun;这种用户不需要
if len(set(words)) == 1:
# len(words) > 1:叠词定义要求"至少2个词且全相同"。
# 不加的话 "gun "(单词+尾空格)剔空串后只剩 ['gun'],len(set)==1 会被误判成叠词丢弃;
# 外层 Spark 侧的单词豁免(F.size(F.split)==1)对 "gun " 不生效(Spark split 出 ["gun",""] size=2 进了本UDF),
# 所以必须在这里自己兜住单词情况
if len(words) > 1 and len(set(words)) == 1:
judge_flag = 1
return judge_flag
......@@ -130,7 +190,29 @@ class DwtStThemeAgg(object):
# 生成排序后的单数和复数列表
singular_plural_list = []
for word in base_keywords:
sort_list = [word, CommonUtil.convert_singular_plural(word)]
# 白名单词不做单复数对偶,只用原词生成 rlike:
# 否则 short 模板会拼出 \b(short|shorts)\b 把 shorts 的搜索词也匹配进来(反之亦然),
# 两个不同语义的模板词互相"借"对方的流量,指标被污染
if word.lower() in SEMANTIC_DISTINCT_WORDS:
sort_list = [word]
else:
# 用 to_base_form + round-trip 校验拿对偶形式,替代老的 CommonUtil.convert_singular_plural
# 修复两个老 bug:①错拼词借真词流量(sunglasse→sunglasses)②-ss/-sis 真词被砍 s(dress→dres)
w = word.lower()
base = to_base_form(w, _INFLECT_ENGINE)
if base != w:
# word 是复数 → 对偶是单数 base(如 shorts→short、dresses→dress)
counterpart = base
else:
# word 是单数或错拼词 → 算复数再 round-trip 校验:
# 复数转回单数 == base 才接受(真单数 short→shorts ✓);
# 错拼词转回对不上则不配对(sunglasse 的复数 sunglasses 转回是 sunglass≠sunglasse ✗)
try:
plural = _INFLECT_ENGINE.plural(base)
except Exception:
plural = base
counterpart = plural if to_base_form(plural, _INFLECT_ENGINE) == base else word
sort_list = [word, counterpart]
sort_list.sort()
singular_plural_word = "|".join(sort_list)
singular_plural_list.append(singular_plural_word)
......@@ -381,14 +463,20 @@ class DwtStThemeAgg(object):
'row_num', F.row_number().over(window=similar_words_window)
).filter('row_num = 1').drop(*['similar_word_list', 'row_num']).cache()
# 处理品牌词逻辑
df_without_brand_words = self.df_base_filter_date.filter('st_brand_label = 0')
df_brand_words = self.df_base_filter_date.filter('st_brand_label = 1')
df_brand_words = df_brand_words.withColumn(
'brand_match_detail', self.udf_theme_regex(self.brand_pattern)(F.col("search_term"))
).filter('brand_match_detail is not null').drop('brand_match_detail')
# 将处理后的品牌词与非品牌词合并
self.df_base_filter_date = df_without_brand_words.unionByName(df_brand_words)
# 处理品牌词逻辑【已注释:不再做品牌过滤】
# 注释原因:原逻辑把 st_brand_label=1 的词只保留命中"二三级词专用品牌词库"白名单的,其余砍掉。
# 但上游 dim_st_brand_info 会把常用词误判为品牌(如某卖家品牌名就叫 "Dress" → 搜索词 dress 被标 st_brand_label=1),
# 而专用白名单又没收录 dress,导致 dress(rank 108 的真实热词)被错误砍掉,
# 反而留下错拼词 dresse。直接去掉这层过滤,让所有词正常通过,由后续 dedup/黑名单/主题过滤把关。
# 注意:去掉后纯品牌搜索词(如 "anrabess dress")也会进模板词集合,业务若不接受需重新设计品牌识别
# (根因在上游 dim_st_brand_info 品牌库被常用词污染,要先清洗上游再谈下游怎么用)。
# df_without_brand_words = self.df_base_filter_date.filter('st_brand_label = 0')
# df_brand_words = self.df_base_filter_date.filter('st_brand_label = 1')
# df_brand_words = df_brand_words.withColumn(
# 'brand_match_detail', self.udf_theme_regex(self.brand_pattern)(F.col("search_term"))
# ).filter('brand_match_detail is not null').drop('brand_match_detail')
# # 将处理后的品牌词与非品牌词合并
# self.df_base_filter_date = df_without_brand_words.unionByName(df_brand_words)
# 处理黑名单词逻辑
pd_match_blacklist = self.df_match_blacklist.toPandas()
......@@ -396,6 +484,52 @@ class DwtStThemeAgg(object):
'st_blacklist_flag', self.filter_blacklist_words(pd_match_blacklist)("search_term")
).filter('st_blacklist_flag != 1').cache()
def _build_word_to_base_mapping(self, df_st_base, df_pat_base):
"""
Driver 端构建权威 word → base_form 映射并 broadcast。
子词侧 + 模板词侧 union 后归一,保证两侧用同一份规则;复用 to_base_form(带白名单)。
"""
# ============================================================
# Step 1:子词侧 + 模板词侧 union 后 distinct
# 为什么 union:保证两侧归一规则一致,避免同一个词在两侧 base_form 不同 → join miss
# 拆词用 r'\W+'(非 word 字符全当分隔符),与下游 explode 完全一致
# 例 "tree-skirt" → ["tree","skirt"](对齐原 rlike `\btree\b` 在 "tree-skirt" 命中行为)
# 例 "5tree" → ["5tree"](数字+字母粘连不拆,与 `\btree\b` 不命中一致)
# ============================================================
df_st_words = df_st_base.select(
F.explode(F.split(F.lower('search_term'), r'\W+')).alias('word')
)
df_pat_words = df_pat_base.select(
F.explode(F.split(F.lower('search_term'), r'\W+')).alias('word')
)
df_all = df_st_words.union(df_pat_words) \
.filter((F.col('word') != '') & F.col('word').isNotNull()) \
.distinct() \
.cache()
cnt = df_all.count()
print(f"distinct word 总数(子词+模板词 union): {cnt}")
# ============================================================
# Step 2:toPandas 拉到 driver 单线程归一
# 为什么 driver 端跑:inflect/WordNet 没有 Spark UDF 版本,row-by-row PythonUDF 序列化开销
# 反而比 driver 单线程还慢;distinct word ~10w 量级,driver 处理可控(1~2 分钟)
# 与 dws_aba_word_heat._build_word_to_base_mapping 同款做法
# ============================================================
pdf = df_all.toPandas()
df_all.unpersist()
p = inflect.engine()
mappings = [(w, to_base_form(w, p)) for w in pdf['word']]
# ============================================================
# Step 3:list + 显式 schema 创建 DataFrame,broadcast 出去
# 为什么 broadcast:下游子词侧/模板词侧 join 都用这份映射,小表(几 MB),无 shuffle
# 为什么 list+schema 而不是 createDataFrame(pdf):pandas 2.x 移除 iteritems,PySpark 3.1.x
# 内部还在调,走 list 方式与 pandas 版本完全解耦
# ============================================================
df_map = self.spark.createDataFrame(mappings, schema=['word', 'base_form'])
return F.broadcast(df_map)
def handle_st_filter_table(self):
# 过滤特殊词
self.df_base_filter_date = self.df_base_filter_date.join(
......@@ -417,40 +551,123 @@ class DwtStThemeAgg(object):
F.lit(self.date_info).alias('date_info')
).cache()
pattern_words = self.df_base_filter_date.select('search_term')
# 将数据转换成pandas_df
dict_df = pattern_words.toPandas()
# 提取二级词和是否叠词标签转换成list[dict{}]
self.st_word_list = dict_df.to_dict(orient='records')
row_size = 40000
batch_size = 200
# 落表路径校验
# 落表路径校验(新旧实现都需要:写入前先清当前分区,保证幂等)
del_hdfs_path = CommonUtil.build_hdfs_path('tmp_pattern_st_info', partition_dict=self.partition_dict)
print(f"清除hdfs目录中:{del_hdfs_path}")
HdfsUtils.delete_file_in_folder(del_hdfs_path)
partition_by = ["site_name", "date_type", "date_info"]
word_batches = [self.st_word_list[i:i + row_size] for i in range(0, len(self.st_word_list), row_size)]
for word_batch in word_batches:
df_list = [] # 用于存储 DataFrame
for row in word_batch:
# 获取处理后的多级词
pattern_st = row["search_term"]
# 通过方法拆分,获取完全匹配的过滤条件
filter_condition = self.st_word_filter_condition(pattern_st)
filter_condition_expr = F.expr(filter_condition)
df_union_filter = df_st_filter_base.filter(
filter_condition_expr
).withColumn(
"pattern_st", F.lit(pattern_st)
)
df_list.append(df_union_filter)
for i in range(0, len(df_list), batch_size):
print(f"当前是word_batches的轮回:f{word_batches.index(word_batch)},当前写入表的df索引位置:{i + 1}")
tmp_df = df_list[i:i + batch_size]
result_df = self.udf_unionAll(*tmp_df)
result_df = result_df.repartition(1)
result_df.write.saveAsTable(name='tmp_pattern_st_info', format='hive', mode='append', partitionBy=partition_by)
# ===== 原写法(逐模板词 rlike 全表扫描,保留备查,已停用)=====
# 问题:每个模板词单独 rlike 扫一遍 df_st_filter_base,N 模板词(~3w) × 写批次(~150) 起步
# ≈ 3×10¹⁰ 次行级正则判定 → 单月跑批 1~2 天。
# 雪上加霜:repartition(1) 写入单线程;150 次独立 saveAsTable append 各起一个 Spark job。
# 详细问题/方案/口径对齐见
# Pyspark_job/spark_hjm/aba_month_new/分支B_搜索词主题标签/反查子体join化优化方案.md
# pattern_words = self.df_base_filter_date.select('search_term')
# # 将数据转换成pandas_df
# dict_df = pattern_words.toPandas()
# # 提取二级词和是否叠词标签转换成list[dict{}]
# self.st_word_list = dict_df.to_dict(orient='records')
# row_size = 40000
# batch_size = 200
# word_batches = [self.st_word_list[i:i + row_size] for i in range(0, len(self.st_word_list), row_size)]
# for word_batch in word_batches:
# df_list = [] # 用于存储 DataFrame
# for row in word_batch:
# # 获取处理后的多级词
# pattern_st = row["search_term"]
# # 通过方法拆分,获取完全匹配的过滤条件
# filter_condition = self.st_word_filter_condition(pattern_st)
# filter_condition_expr = F.expr(filter_condition)
# df_union_filter = df_st_filter_base.filter(
# filter_condition_expr
# ).withColumn(
# "pattern_st", F.lit(pattern_st)
# )
# df_list.append(df_union_filter)
# for i in range(0, len(df_list), batch_size):
# print(f"当前是word_batches的轮回:f{word_batches.index(word_batch)},当前写入表的df索引位置:{i + 1}")
# tmp_df = df_list[i:i + batch_size]
# result_df = self.udf_unionAll(*tmp_df)
# result_df = result_df.repartition(1)
# result_df.write.saveAsTable(name='tmp_pattern_st_info', format='hive', mode='append', partitionBy=partition_by)
# ===== 新写法:单词级 broadcast join + 多重性判定 =====
# 核心思路:把"子词 rlike 匹配模板词所有单词"改写成
# "拆词 → base_form 归一 → 计数 → 按 base_form join → 验证全 key 命中",
# 从 O(N模板 × N子词) 的暴力扫描降到 O(N子词),预计从天级降到分钟级。
# Step 1:构建权威 word → base_form 映射(driver 端 inflect 归一 + broadcast)
# 复用 to_base_form(带 SEMANTIC_DISTINCT_WORDS 白名单):
# - 白名单词(short/shorts/person/people 等)原样保留 → 与原 rlike `\b(short)\b` 不加复数对偶一致
# - 非白名单(girl/girls/dress/dresses)归到同一 base → 与原 rlike `\b(girl|girls)\b` 双形态等价
df_word_to_base = self._build_word_to_base_mapping(df_st_filter_base, self.df_base_filter_date)
# Step 2:子词侧拆词 + 归一 + 按 (st_key, base_form) 计数
# 例:search_term="cashmere pashmina wrap"
# → 拆词 [cashmere, pashmina, wrap] → 归一 [cashmere, pashmina, wrap]
# → 3 行 (st_key=X, ..., base_form=cashmere, st_cnt=1) / (...,pashmina,1) / (...,wrap,1)
# join 不上 word_to_base 时 coalesce 用原词兜底(理论上 Step1 已 union 全量、不会发生)
df_st_word_cnt = df_st_filter_base.select(
'st_key', 'search_term', 'bsr_orders',
F.explode(F.split(F.lower('search_term'), r'\W+')).alias('word')
).filter("word != ''") \
.join(df_word_to_base, on='word', how='left') \
.withColumn('base_form', F.coalesce(F.col('base_form'), F.col('word'))) \
.groupBy('st_key', 'search_term', 'bsr_orders', 'base_form') \
.agg(F.count('*').alias('st_cnt'))
# Step 3:模板词侧同款拆词 + 归一 + 按 (pattern_st, base_form) 计数
# 例:pattern_st="red dress" → 拆出 {red:1, dress:1}
# pattern_st="cashmere" → {cashmere:1}
# 叠词模板("gun gun")已在 read_data 阶段被 udf_judge_twin_words 过滤,这里不会出现
df_pat_word_cnt = self.df_base_filter_date.select(
F.col('search_term').alias('pattern_st'),
F.explode(F.split(F.lower('search_term'), r'\W+')).alias('word')
).filter("word != ''") \
.join(df_word_to_base, on='word', how='left') \
.withColumn('base_form', F.coalesce(F.col('base_form'), F.col('word'))) \
.groupBy('pattern_st', 'base_form') \
.agg(F.count('*').alias('pat_cnt'))
# 模板词需要满足的 distinct base_form 总数(Step5 "全 key 命中"判定用)
# 例:pattern_st="red dress" → total_keys=2;pattern_st="cashmere" → 1
df_pat_total_keys = df_pat_word_cnt.groupBy('pattern_st').agg(
F.count('*').alias('total_keys')
)
# Step 4:broadcast 模板词侧(小表 ~4w 行)+ inner join 子词侧 on base_form
# 为什么 broadcast:模板词 base_form 表 ~4w 行(3w 模板 × 1.5 词),几 MB 可全量广播;
# 子词侧(百万级)无需 shuffle,避免高频 base_form("for"/"set"/"with")倾斜爆炸。
# filter st_cnt >= pat_cnt:满足多重性要求("tree tree"需子词至少 2 个 tree)
df_match = df_st_word_cnt.join(
F.broadcast(df_pat_word_cnt),
on='base_form', how='inner'
).filter(F.col('st_cnt') >= F.col('pat_cnt'))
# Step 5:按 (pattern_st, st_key) 汇总命中的 distinct base_form 数 → 与 total_keys 比对
# matched_keys == total_keys → 该子词包含模板词的所有单词(顺序无关,与原 rlike 一致)
# 例:pattern_st="red dress" (total_keys=2)
# 子词 "red dress for women" 命中 {red, dress} → matched_keys=2 → 保留 ✓
# 子词 "red shoes" 只命中 {red} → matched_keys=1 → 过滤 ✗
df_matched_keys = df_match.groupBy('pattern_st', 'st_key', 'search_term', 'bsr_orders').agg(
F.countDistinct('base_form').alias('matched_keys')
)
df_result = df_matched_keys.join(
F.broadcast(df_pat_total_keys), on='pattern_st', how='inner'
).filter(F.col('matched_keys') == F.col('total_keys')) \
.select('st_key', 'search_term', 'bsr_orders', 'pattern_st')
# Step 6:补分区字段 + 合理并行度一次性写入
# 取代原来 150 次 append + repartition(1) → 1 次 saveAsTable + 20 分区并行写
df_result = df_result.withColumn('site_name', F.lit(self.site_name)) \
.withColumn('date_type', F.lit(self.date_type)) \
.withColumn('date_info', F.lit(self.date_info)) \
.repartition(20)
df_result.write.saveAsTable(
name='tmp_pattern_st_info', format='hive', mode='append',
partitionBy=partition_by
)
sql = f"""
select
......@@ -621,15 +838,43 @@ class DwtStThemeAgg(object):
print(f"清除hdfs目录中:{hdfs_path_asin_info}")
HdfsUtils.delete_file_in_folder(hdfs_path_asin_info)
self.df_st_theme_agg = self.df_st_theme_agg.unionByName(self.df_st_match_topic_agg)
# 如果模板词本身也有匹配词,则相应的统计需要过滤掉
df_agg_filter = self.df_st_theme_base.select(
F.col('search_term'),
F.col('theme_label_en').alias('theme_label_en_join'),
F.lit(1).alias('join_flag')
# 如果模板词本身也有匹配词,则相应的统计需要过滤掉(把"模板词自己名字里就含的属性"从它的反查子体里剔除)
# ===== 原写法(保留备查,已停用)=====
# 问题:自匹配按英文 theme_label_en 配对。但聚合侧的 theme_label_en 是 concat_ws("/", ...) 拼接串
# (如 "cashmere/pashmina"、"cars/automotive/auto/automobile"——多个同义词/单复数/拼写变体都映射到同一个中文标签,
# 在 handle_st_pattern_common_agg 里被 collect_set 收进同一组拼起来),而明细侧是单个英文词。
# "cashmere/pashmina" == "cashmere" 永远不成立 → 凡是命中多个同义词的属性组都漏删(普遍存在,历史月份均有)。
# df_agg_filter = self.df_st_theme_base.select(
# F.col('search_term'),
# F.col('theme_label_en').alias('theme_label_en_join'),
# F.lit(1).alias('join_flag')
# )
# self.df_st_theme_agg = self.df_st_theme_agg.join(
# df_agg_filter, on=(self.df_st_theme_agg.pattern_st == df_agg_filter.search_term) & (self.df_st_theme_agg.theme_label_en == df_agg_filter.theme_label_en_join), how='left'
# ).filter(F.col('join_flag').isNull())
# ===== 新写法:按中文标签 theme_label_ch + 主题类别 theme_ch 配对,用 left_anti 做减法 =====
# 为什么按 theme_label_ch(中文含义)而不是英文拼接串:
# 聚合本就是按 theme_label_ch 分组的,一个 "/" 组里的英文词必然共享同一个中文标签(这正是它们被收进一组的原因),
# 所以按中文含义配对、删掉整组是精确的——整组本就都是同一个含义,不会误删别的含义。
# 且 theme_label_ch 是单值、不受英文 "/" 拼接影响,从根上避开漏删。
# 为什么再加 theme_ch:万一同一个中文标签挂在两个不同主题类别下,只按 theme_label_ch 会把另一类的行也误删,
# 加 theme_ch 精确到"同一主题类别下的同一中文标签"(防御性加固,正常数据用不到)。
# 为什么用 left_anti:语义就是"聚合表 减去 模板词自身命中的 (中文标签, 主题类别)",
# 且对右表重复天然不敏感(无需 distinct 防放大),比 left join + filter(flag is null) 更直观。
df_self_label = self.df_st_theme_base.select(
F.col('search_term').alias('self_search_term'),
F.col('theme_label_ch').alias('self_theme_label_ch'),
F.col('theme_ch').alias('self_theme_ch')
)
self.df_st_theme_agg = self.df_st_theme_agg.join(
df_agg_filter, on=(self.df_st_theme_agg.pattern_st == df_agg_filter.search_term) & (self.df_st_theme_agg.theme_label_en == df_agg_filter.theme_label_en_join), how='left'
).filter(F.col('join_flag').isNull())
df_self_label,
on=(self.df_st_theme_agg.pattern_st == df_self_label.self_search_term) &
(self.df_st_theme_agg.theme_label_ch == df_self_label.self_theme_label_ch) &
(self.df_st_theme_agg.theme_ch == df_self_label.self_theme_ch),
how='left_anti'
)
self.df_st_theme_agg = self.df_st_theme_agg.select(
F.col('pattern_st'),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment