dim_asin_label.py 5.8 KB
"""
   @Author      : HuangJian
   @Description : 时间周期内-asin品牌标签(搜索词前3页asin抓取)
   @SourceTable :
                  ①ods_other_search_term_data
   @SinkTable   : dim_asin_label
   @CreateTime  : 2023/05/04 15:20
   @UpdateTime  : 2022/05/04 15:20
"""
import os
import sys
import re

sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType
from yswg_utils.common_udf import udf_handle_string_null_value as NullUDF
from functools import reduce


class DimAsinLabel(object):

    def __init__(self, site_name, date_type, date_info):
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.hive_table = "dim_asin_label"
        # 获取周流程的周tuple整合数据
        self.complete_date_info_tuple = CommonUtil.transform_week_tuple(self.spark, self.date_type, self.date_info)
        self.hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dim/{self.hive_table}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
        self.partitions_num = CommonUtil.reset_partitions(site_name, 1)
        self.df_date = object()  # 需要存储的df数据对象
        self.date_sql = self.date_sql_padding()

        # 初始化全局df
        self.df_asin_label = self.spark.sql(f"select 1+1;")
        self.handle_string_num_value = F.udf(NullUDF, StringType())

    def date_sql_padding(self):
        if 'us' == self.site_name:
            if self.date_type == DateTypes.month_week.name:
                date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'"
            elif self.date_type == DateTypes.month.name and self.date_info >= '2023-10':
                date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'"
            else:
                date_sql = f"and date_type='week' and date_info in {self.complete_date_info_tuple}"
        elif self.site_name in ['uk', 'de']:
            if self.date_type == DateTypes.month.name and self.date_info >= '2024-05':
                date_sql = f"and date_type='{self.date_type}' and date_info='{self.date_info}'"
            elif self.date_type == DateTypes.month_week.name and self.date_info >= '2024-06':
                date_sql = f"and date_type='{self.date_type}' and date_info='{self.date_info}'"
            else:
                date_sql = f"and date_type='week' and date_info in {self.complete_date_info_tuple}"
        print(date_sql)
        return date_sql

    def run(self):
        print("======================查询sql如下======================")
        # 读取ods_other_search_term_data
        sql = f"""
            select 
                asin, label 
            from 
                (select 
                    asin, 
                    lower(label) as label, 
                    created_time, 
                    row_number() over(partition by asin, label order by created_time desc) as crank
                from ods_other_search_term_data
                where site_name = '{self.site_name}' {self.date_sql} 
                and trim(label) not in ('null', '')
                ) t 
            where t.crank = 1 
        """
        print(sql)
        self.df_asin_label = self.spark.sql(sqlQuery=sql).cache()

        # 逻辑处理--多asin多标签采用&&&拼接
        self.df_asin_label = self.df_asin_label.groupby(["asin"]).agg(
            F.collect_set("label").alias("asin_label_list")
        )
        movie_label_list = ['prime video', 'dvd', 'blu-ray', 'kindle', 'app', 'paperback', 'audible audiobook',
                            'kindle edition', 'kindle & comixology', 'hardcover', 'comic', 'multi-format', '4k',
                            'library binding', 'vinyl', 'audio cd', 'mp3 music', 'single issue magazine',
                            'print magazine', 'unknown binding']
        condition = reduce(
            lambda acc, keyword: acc | F.expr(f"exists(asin_label_list, x -> x like '%{keyword}%')"),
            movie_label_list,
            F.lit(False)
        )
        self.df_asin_label = self.df_asin_label.withColumn("asin_label_type", condition.cast("int"))

        # 对无法标记的标签默认填充为0
        self.df_asin_label = self.df_asin_label.na.fill({"asin_label_type": 0})

        # 补全分区字段
        df_save = self.df_asin_label.select(
            F.col('asin'),
            self.handle_string_num_value('asin_label_list').alias('asin_label_list'),
            F.col('asin_label_type'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_time'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_time'),
            F.lit(self.site_name).alias("site_name"),
            F.lit(self.date_type).alias("date_type"),
            F.lit(self.date_info).alias("date_info")
        )

        df_save = df_save.repartition(self.partitions_num)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"清除hdfs目录中.....{self.hdfs_path}")
        HdfsUtils.delete_file_in_folder(self.hdfs_path)
        print(f"当前存储的表名为:{self.hive_table},分区为{partition_by}")
        df_save.write.saveAsTable(name=self.hive_table, format='hive', mode='append', partitionBy=partition_by)
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    obj = DimAsinLabel(site_name, date_type, date_info)
    obj.run()