dim_asin_launchtime_info.py 10.6 KB
"""
   @Author      : HuangJian
   @Description : 上架日期补充表
   @SourceTable :
                  ①dim_asin_launchtime_info
   @SinkTable   : dim_asin_label
   @CreateTime  : 2023/12/12 11:20
   @UpdateTime  : 2022/12/12 11:20
"""
import os
import sys
import re

sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType,DoubleType
from yswg_utils.common_udf import udf_handle_string_null_value as NullUDF
from utils.redis_utils import RedisUtils



class DimAsinLaunchtimeInfo(object):

    def __init__(self, site_name, date_type, date_info):
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.hive_table = "dim_asin_launchtime_info"
        self.partition_dict = {
            "site_name": site_name
        }
        # 落表路径校验
        self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_table, partition_dict=self.partition_dict)
        self.partitions_num = CommonUtil.reset_partitions(site_name, 50)

        # 自定义全局df
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_history_launchtime = self.spark.sql(f"select 1+1;")
        self.df_asin_keepa_date = self.spark.sql(f"select 1+1;")
        self.df_asin_handle_launchtime = self.spark.sql(f"select 1+1;")


        # udf函数
        self.handle_string_num_value = F.udf(NullUDF, StringType())
        self.spark.udf.register('u_handle_string_num_value', NullUDF,
                                StringType())

    def read_data(self):
        # 读取asin_detail数据
        if self.date_type == 'all':
            # 说明全局数据量执行
            print("==================执行全量数据整合=================")
            sql = f"""
                with st_asin as(
                select
                    asin,
                    u_handle_string_num_value(launch_time) as crawl_asin_launch_time,
                    date_format(updated_at, '{CommonUtil._date_time_format}') as updated_time,
                    concat(site_name,',',date_type,',',date_info) as period_label
                    from ods_asin_detail
                    where site_name = '{self.site_name}'
                    ),
                bsr_asin as(
                select
                    asin,
                    u_handle_string_num_value(launch_time) as crawl_asin_launch_time,
                    date_format(created_at, '{CommonUtil._date_time_format}') as updated_time,
                    concat(site_name,',',date_type,',',date_info) as period_label
                    from ods_self_asin_detail
                    where site_name = '{self.site_name}'
                )
                select asin, crawl_asin_launch_time,updated_time,period_label from st_asin 
                union 
                select asin, crawl_asin_launch_time,updated_time,period_label from bsr_asin
            """
            print(sql)
            self.df_asin_detail = self.spark.sql(sql)
        else:
            print("==================执行分区数据整合=================")
            # 按分区检测是否有新增的asin
            if self.date_type in (DateTypes.week.name,DateTypes.month.name,DateTypes.month_week.name):
                # 取st维度的st下的asin数据
                sql = f"""
                            select 
                                asin,
                                u_handle_string_num_value(launch_time) as crawl_asin_launch_time,
                                date_format(updated_at, '{CommonUtil._date_time_format}') as updated_time,
                                concat(site_name,',',date_type,',',date_info) as period_label
                            from ods_asin_detail
                            where site_name = '{self.site_name}'
                            and date_type = '{self.date_type}'
                            and date_info = '{self.date_info}'
                            """
            elif self.date_type in (DateTypes.day.name):
                # 取bsr日维度的asin
                sql = f"""
                    select
                        asin,
                        u_handle_string_num_value(launch_time) as crawl_asin_launch_time,
                        date_format(created_at, '{CommonUtil._date_time_format}') as updated_time,
                        concat(site_name,',',date_type,',',date_info) as period_label
                    from ods_self_asin_detail
                    where site_name = '{self.site_name}'
                    and date_type = '{self.date_type}'
                    and date_info = '{self.date_info}'
                """
            print(sql)
            # 如果是分区执行可进行缓存数据
            self.df_asin_detail = self.spark.sql(sql).cache()

        # 读取keepa数据
        if self.site_name == 'us':
            sql = f"""select asin, 
                            launch_time as keepa_asin_launch_time, 
                            updated_time as updated_time,
                            1 as keepa_crawl_flag 
                        from ods_asin_keepa_date_tmp
                          where site_name='{self.site_name}'
                          and state = 1 """
        else:
            sql = f"""select asin, 
                            launch_time as keepa_asin_launch_time, 
                            updated_at as updated_time,
                            1 as keepa_crawl_flag  
                        from ods_asin_keep_date 
                        where site_name='{self.site_name}'
                        and state = 3 """
        print(sql)
        self.df_asin_keepa_date = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_keepa_date = self.df_asin_keepa_date.orderBy(self.df_asin_keepa_date.updated_time.desc_nulls_last())
        self.df_asin_keepa_date = self.df_asin_keepa_date.drop_duplicates(['asin'])

        # 读取历史已经整合好的上架日期数据
        sql = f"""
                    select asin,
                           crawl_asin_launch_time,
                           '9999-12-31 23:59:59' as updated_time,
                           appear_period_label as period_label 
                    from dim_asin_launchtime_info 
                    where site_name='{self.site_name}'
                """
        print(sql)
        self.df_history_launchtime = self.spark.sql(sqlQuery=sql)


    def handle_launchtime_data(self):
        if self.date_type != 'all':
            # 如果不是走all逻辑,则说明走日常调度逻辑;可以将我们已有的历史上架日期放入逻辑中去重
            self.df_asin_detail = self.df_asin_detail.unionByName(self.df_history_launchtime).cache()
        df_all_asin = self.df_asin_detail.orderBy(self.df_asin_detail.updated_time.desc_nulls_last())
        # 去重,保留所有的asin
        df_all_asin = df_all_asin.drop_duplicates(['asin'])
        df_all_asin = df_all_asin.select(
            F.col("asin"),F.col("crawl_asin_launch_time").alias("crawl_asin_launch_time_left"), 'period_label')

        # 过滤找出抓取的上架日期不为空的数据
        df_not_null_asin = self.df_asin_detail.filter(" crawl_asin_launch_time is not null")
        df_not_null_asin = df_not_null_asin.orderBy(df_not_null_asin.updated_time.desc_nulls_last())
        df_not_null_asin = df_not_null_asin.drop_duplicates(['asin'])
        df_not_null_asin = df_not_null_asin.select(
            F.col("asin"),
            F.col("crawl_asin_launch_time").alias("crawl_asin_launch_time_right"),
            F.col("period_label").alias("period_label_right")
        )
        self.df_asin_handle_launchtime = df_all_asin.join(
            df_not_null_asin, on='asin', how='left'
        )

        self.df_asin_handle_launchtime = self.df_asin_handle_launchtime.select(
            F.col("asin"),
            F.when((F.col("crawl_asin_launch_time_left").isNull()) & (F.col("crawl_asin_launch_time_right").isNotNull()), F.col("crawl_asin_launch_time_right"))
            .otherwise(F.col("crawl_asin_launch_time_left")).alias("crawl_asin_launch_time"),
            F.when((F.col("crawl_asin_launch_time_left").isNull()) & (F.col("crawl_asin_launch_time_right").isNotNull()), F.col("period_label_right"))
            .otherwise(F.col("period_label")).alias("period_label")
        )

        # 跟keepa_date进行关联,补充launch_time数据
        self.df_asin_handle_launchtime = self.df_asin_handle_launchtime.join(
            self.df_asin_keepa_date, on='asin', how='left'
        )

        self.df_asin_handle_launchtime = self.df_asin_handle_launchtime.select(
            F.col("asin"),
            F.when(F.col("crawl_asin_launch_time").isNull(), F.col("keepa_asin_launch_time"))
                .otherwise(F.col("crawl_asin_launch_time")).alias("asin_launch_time"),
            F.col("crawl_asin_launch_time"),
            F.col("keepa_asin_launch_time"),
            F.col("period_label").alias("appear_period_label"),
            F.when(F.col("keepa_crawl_flag") == 1, F.lit(1)).otherwise(F.lit(0)).alias("keepa_crawl_flag"),
            F.lit(self.site_name).alias("site_name")
        )

    def save_data(self):
        # print(f"清除hdfs目录中:{self.hdfs_path}")
        # HdfsUtils.delete_file_in_folder(self.hdfs_path)
        df_save = self.df_asin_handle_launchtime.repartition(self.partitions_num)
        CommonUtil.save_or_update_table(spark_session=self.spark,hive_tb_name=self.hive_table,partition_dict=self.partition_dict,df_save=df_save,drop_exist_tmp_flag=False)
        print("success")

    def run(self):
        self.read_data()
        self.handle_launchtime_data()
        self.save_data()


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    lock_name = "dim_asin_launchtime_info"
    if date_type == "all":
        # 如果执行数据为all的情况,非自然解锁情况,则需锁定该表2h
        lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=120 * 60, retry_flag=True, retry_time=10*60)
    else:
        lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=30 * 60, retry_flag=True, retry_time=10 * 60)
    if lock_flag:
        try:
            obj = DimAsinLaunchtimeInfo(site_name, date_type, date_info)
            obj.run()
        finally:
            # 执行完成后释放锁
            RedisUtils.release_redis_lock(lock_name)