""" @Author : HuangJian @Description : 时间周期内-asin品牌标签(搜索词前3页asin抓取) @SourceTable : ①ods_other_search_term_data @SinkTable : dim_asin_label @CreateTime : 2023/05/04 15:20 @UpdateTime : 2022/05/04 15:20 """ import os import sys import re sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil, DateTypes from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from pyspark.sql import functions as F from pyspark.sql.types import StringType, ArrayType from yswg_utils.common_udf import udf_handle_string_null_value as NullUDF from functools import reduce class DimAsinLabel(object): def __init__(self, site_name, date_type, date_info): self.site_name = site_name self.date_type = date_type self.date_info = date_info app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.hive_table = "dim_asin_label" # 获取周流程的周tuple整合数据 self.complete_date_info_tuple = CommonUtil.transform_week_tuple(self.spark, self.date_type, self.date_info) self.hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dim/{self.hive_table}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}" self.partitions_num = CommonUtil.reset_partitions(site_name, 1) self.df_date = object() # 需要存储的df数据对象 self.date_sql = self.date_sql_padding() # 初始化全局df self.df_asin_label = self.spark.sql(f"select 1+1;") self.handle_string_num_value = F.udf(NullUDF, StringType()) def date_sql_padding(self): if 'us' == self.site_name: if self.date_type == DateTypes.month_week.name: date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'" elif self.date_type == DateTypes.month.name and self.date_info >= '2023-10': date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'" else: date_sql = f"and date_type='week' and date_info in {self.complete_date_info_tuple}" elif self.site_name in ['uk', 'de']: if self.date_type == DateTypes.month.name and self.date_info >= '2024-05': date_sql = f"and date_type='{self.date_type}' and date_info='{self.date_info}'" elif self.date_type == DateTypes.month_week.name and self.date_info >= '2024-06': date_sql = f"and date_type='{self.date_type}' and date_info='{self.date_info}'" else: date_sql = f"and date_type='week' and date_info in {self.complete_date_info_tuple}" print(date_sql) return date_sql def run(self): print("======================查询sql如下======================") # 读取ods_other_search_term_data sql = f""" select asin, label from (select asin, lower(label) as label, created_time, row_number() over(partition by asin, label order by created_time desc) as crank from ods_other_search_term_data where site_name = '{self.site_name}' {self.date_sql} and trim(label) not in ('null', '') ) t where t.crank = 1 """ print(sql) self.df_asin_label = self.spark.sql(sqlQuery=sql).cache() # 逻辑处理--多asin多标签采用&&&拼接 self.df_asin_label = self.df_asin_label.groupby(["asin"]).agg( F.collect_set("label").alias("asin_label_list") ) movie_label_list = ['prime video', 'dvd', 'blu-ray', 'kindle', 'app', 'paperback', 'audible audiobook', 'kindle edition', 'kindle & comixology', 'hardcover', 'comic', 'multi-format', '4k', 'library binding', 'vinyl', 'audio cd', 'mp3 music', 'single issue magazine', 'print magazine', 'unknown binding'] condition = reduce( lambda acc, keyword: acc | F.expr(f"exists(asin_label_list, x -> x like '%{keyword}%')"), movie_label_list, F.lit(False) ) self.df_asin_label = self.df_asin_label.withColumn("asin_label_type", condition.cast("int")) # 对无法标记的标签默认填充为0 self.df_asin_label = self.df_asin_label.na.fill({"asin_label_type": 0}) # 补全分区字段 df_save = self.df_asin_label.select( F.col('asin'), self.handle_string_num_value('asin_label_list').alias('asin_label_list'), F.col('asin_label_type'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'), F.lit(self.site_name).alias("site_name"), F.lit(self.date_type).alias("date_type"), F.lit(self.date_info).alias("date_info") ) df_save = df_save.repartition(self.partitions_num) partition_by = ["site_name", "date_type", "date_info"] print(f"清除hdfs目录中.....{self.hdfs_path}") HdfsUtils.delete_file_in_folder(self.hdfs_path) print(f"当前存储的表名为:{self.hive_table},分区为{partition_by}") df_save.write.saveAsTable(name=self.hive_table, format='hive', mode='append', partitionBy=partition_by) print("success") if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) obj = DimAsinLabel(site_name, date_type, date_info) obj.run()