Commit 2ae61956 by chenyuanjie

Amazon搜索词top品牌

parent d10e99bf
"""
@Author : CT
@Description : Amazon品牌徽章明细(DWD层)
解析 ods_st_quantity_being_sold.brand_badge 字段,
提取品牌名称及推荐原因,按品牌去重保留当期最新一条
@SourceTable : ods_st_quantity_being_sold
@SinkTable : dwd_st_brand_badge
@CreateTime : 2026-06-17
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from utils.hdfs_utils import HdfsUtils
from utils.DorisHelper import DorisHelper
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.window import Window
DORIS_DWD_DB = 'dwd'
DORIS_TABLE = 'dwd_st_brand_badge'
DORIS_COLUMNS = 'site_name,date_info,brand,brand_badge_reason,update_time'
class DwdStBrandBadge(object):
def __init__(self, site_name, date_type, date_info):
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
self.spark = SparkUtil.get_spark_session(app_name)
self.hive_table = "dwd_st_brand_badge"
self.partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info,
}
self.partitions_num = CommonUtil.reset_partitions(site_name, 10)
self.partitions_by = ["site_name", "date_type", "date_info"]
self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_table, partition_dict=self.partition_dict)
def run(self):
# Step1: 读取原始 brand_badge 数据,过滤空值
sql = f"""
select created_time,
brand_badge
from ods_st_quantity_being_sold
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
and brand_badge is not null
and trim(brand_badge) not in ('', '[]')
"""
print(sql)
df_raw = self.spark.sql(sql)
# Step2: 解析 brand_badge JSON 数组 → (created_time, brand, brand_badge_reason)
# 原始格式示例:["推荐原因|Shop 品牌名称", ...]
df_parsed = (
df_raw
.withColumn("badge_arr", F.from_json(F.col("brand_badge"), ArrayType(StringType())))
.withColumn("item", F.explode(F.col("badge_arr")))
.withColumn("parts", F.split(F.col("item"), "\\|"))
.withColumn(
"brand_badge_reason",
F.trim(F.regexp_replace(F.col("parts").getItem(0), r'^"|"$', ""))
)
.withColumn(
"brand",
F.lower(F.trim(F.regexp_replace(
F.regexp_replace(F.col("parts").getItem(1), r"^\s*Shop\s+", ""),
r'^"|"$', ""
)))
)
.filter(
F.col("brand").isNotNull() & (F.col("brand") != "") &
F.col("brand_badge_reason").isNotNull() & (F.col("brand_badge_reason") != "")
)
.select("brand", "brand_badge_reason", "created_time")
)
# Step3: 按品牌去重,保留 created_time 最新的一条
w = Window.partitionBy("brand").orderBy(F.col("created_time").desc_nulls_last())
df_result = (
df_parsed
.withColumn("_rn", F.row_number().over(w))
.filter(F.col("_rn") == 1)
.drop("_rn")
.withColumn("site_name", F.lit(self.site_name))
.withColumn("date_type", F.lit(self.date_type))
.withColumn("date_info", F.lit(self.date_info))
.repartition(self.partitions_num)
.cache()
)
print(f"品牌推荐数据总量:{df_result.count()}")
df_result.show(10, truncate=False)
print(f"清除hdfs目录中:{self.hdfs_path}")
HdfsUtils.delete_file_in_folder(self.hdfs_path)
print(f"当前存储的表名为:{self.hive_table},分区为{self.partitions_by}")
df_result.write.saveAsTable(name=self.hive_table, format='hive', mode='append', partitionBy=self.partitions_by)
print("Hive写入完成,开始写入Doris DWD表")
self._write_to_doris(df_result)
df_result.unpersist()
print("success")
def _write_to_doris(self, df_result):
"""stream load 写 Doris dwd.dwd_st_brand_badge
表为 UNIQUE KEY(site_name, brand, date_info) + sequence_col=update_time,由建表模型自动处理更新去重。
"""
df_doris = df_result.select(
'site_name', 'date_info', 'brand', 'brand_badge_reason',
F.col('created_time').cast('timestamp').alias('update_time')
)
DorisHelper.spark_export_with_columns(
df_save=df_doris,
db_name=DORIS_DWD_DB,
table_name=DORIS_TABLE,
table_columns=DORIS_COLUMNS,
)
print(f"Doris {DORIS_DWD_DB}.{DORIS_TABLE} 写入完成")
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
DwdStBrandBadge(site_name, date_type, date_info).run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment