""" @Author : HuangJian @Description : 上架日期补充表 @SourceTable : ①dim_asin_launchtime_info @SinkTable : dim_asin_label @CreateTime : 2023/12/12 11:20 @UpdateTime : 2022/12/12 11:20 """ import os import sys import re sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil, DateTypes from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from pyspark.sql.window import Window from pyspark.sql import functions as F from pyspark.sql.types import StringType, IntegerType,DoubleType from yswg_utils.common_udf import udf_handle_string_null_value as NullUDF from utils.redis_utils import RedisUtils class DimAsinLaunchtimeInfo(object): def __init__(self, site_name, date_type, date_info): self.site_name = site_name self.date_type = date_type self.date_info = date_info app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.hive_table = "dim_asin_launchtime_info" self.partition_dict = { "site_name": site_name } # 落表路径校验 self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_table, partition_dict=self.partition_dict) self.partitions_num = CommonUtil.reset_partitions(site_name, 50) # 自定义全局df self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_history_launchtime = self.spark.sql(f"select 1+1;") self.df_asin_keepa_date = self.spark.sql(f"select 1+1;") self.df_asin_handle_launchtime = self.spark.sql(f"select 1+1;") # udf函数 self.handle_string_num_value = F.udf(NullUDF, StringType()) self.spark.udf.register('u_handle_string_num_value', NullUDF, StringType()) def read_data(self): # 读取asin_detail数据 if self.date_type == 'all': # 说明全局数据量执行 print("==================执行全量数据整合=================") sql = f""" with st_asin as( select asin, u_handle_string_num_value(launch_time) as crawl_asin_launch_time, date_format(updated_at, '{CommonUtil._date_time_format}') as updated_time, concat(site_name,',',date_type,',',date_info) as period_label from ods_asin_detail where site_name = '{self.site_name}' ), bsr_asin as( select asin, u_handle_string_num_value(launch_time) as crawl_asin_launch_time, date_format(created_at, '{CommonUtil._date_time_format}') as updated_time, concat(site_name,',',date_type,',',date_info) as period_label from ods_self_asin_detail where site_name = '{self.site_name}' ) select asin, crawl_asin_launch_time,updated_time,period_label from st_asin union select asin, crawl_asin_launch_time,updated_time,period_label from bsr_asin """ print(sql) self.df_asin_detail = self.spark.sql(sql) else: print("==================执行分区数据整合=================") # 按分区检测是否有新增的asin if self.date_type in (DateTypes.week.name,DateTypes.month.name,DateTypes.month_week.name): # 取st维度的st下的asin数据 sql = f""" select asin, u_handle_string_num_value(launch_time) as crawl_asin_launch_time, date_format(updated_at, '{CommonUtil._date_time_format}') as updated_time, concat(site_name,',',date_type,',',date_info) as period_label from ods_asin_detail where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ elif self.date_type in (DateTypes.day.name): # 取bsr日维度的asin sql = f""" select asin, u_handle_string_num_value(launch_time) as crawl_asin_launch_time, date_format(created_at, '{CommonUtil._date_time_format}') as updated_time, concat(site_name,',',date_type,',',date_info) as period_label from ods_self_asin_detail where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ print(sql) # 如果是分区执行可进行缓存数据 self.df_asin_detail = self.spark.sql(sql).cache() # 读取keepa数据 if self.site_name == 'us': sql = f"""select asin, launch_time as keepa_asin_launch_time, updated_time as updated_time, 1 as keepa_crawl_flag from ods_asin_keepa_date_tmp where site_name='{self.site_name}' and state = 1 """ else: sql = f"""select asin, launch_time as keepa_asin_launch_time, updated_at as updated_time, 1 as keepa_crawl_flag from ods_asin_keep_date where site_name='{self.site_name}' and state = 3 """ print(sql) self.df_asin_keepa_date = self.spark.sql(sqlQuery=sql).cache() self.df_asin_keepa_date = self.df_asin_keepa_date.orderBy(self.df_asin_keepa_date.updated_time.desc_nulls_last()) self.df_asin_keepa_date = self.df_asin_keepa_date.drop_duplicates(['asin']) # 读取历史已经整合好的上架日期数据 sql = f""" select asin, crawl_asin_launch_time, '9999-12-31 23:59:59' as updated_time, appear_period_label as period_label from dim_asin_launchtime_info where site_name='{self.site_name}' """ print(sql) self.df_history_launchtime = self.spark.sql(sqlQuery=sql) def handle_launchtime_data(self): if self.date_type != 'all': # 如果不是走all逻辑,则说明走日常调度逻辑;可以将我们已有的历史上架日期放入逻辑中去重 self.df_asin_detail = self.df_asin_detail.unionByName(self.df_history_launchtime).cache() df_all_asin = self.df_asin_detail.orderBy(self.df_asin_detail.updated_time.desc_nulls_last()) # 去重,保留所有的asin df_all_asin = df_all_asin.drop_duplicates(['asin']) df_all_asin = df_all_asin.select( F.col("asin"),F.col("crawl_asin_launch_time").alias("crawl_asin_launch_time_left"), 'period_label') # 过滤找出抓取的上架日期不为空的数据 df_not_null_asin = self.df_asin_detail.filter(" crawl_asin_launch_time is not null") df_not_null_asin = df_not_null_asin.orderBy(df_not_null_asin.updated_time.desc_nulls_last()) df_not_null_asin = df_not_null_asin.drop_duplicates(['asin']) df_not_null_asin = df_not_null_asin.select( F.col("asin"), F.col("crawl_asin_launch_time").alias("crawl_asin_launch_time_right"), F.col("period_label").alias("period_label_right") ) self.df_asin_handle_launchtime = df_all_asin.join( df_not_null_asin, on='asin', how='left' ) self.df_asin_handle_launchtime = self.df_asin_handle_launchtime.select( F.col("asin"), F.when((F.col("crawl_asin_launch_time_left").isNull()) & (F.col("crawl_asin_launch_time_right").isNotNull()), F.col("crawl_asin_launch_time_right")) .otherwise(F.col("crawl_asin_launch_time_left")).alias("crawl_asin_launch_time"), F.when((F.col("crawl_asin_launch_time_left").isNull()) & (F.col("crawl_asin_launch_time_right").isNotNull()), F.col("period_label_right")) .otherwise(F.col("period_label")).alias("period_label") ) # 跟keepa_date进行关联,补充launch_time数据 self.df_asin_handle_launchtime = self.df_asin_handle_launchtime.join( self.df_asin_keepa_date, on='asin', how='left' ) self.df_asin_handle_launchtime = self.df_asin_handle_launchtime.select( F.col("asin"), F.when(F.col("crawl_asin_launch_time").isNull(), F.col("keepa_asin_launch_time")) .otherwise(F.col("crawl_asin_launch_time")).alias("asin_launch_time"), F.col("crawl_asin_launch_time"), F.col("keepa_asin_launch_time"), F.col("period_label").alias("appear_period_label"), F.when(F.col("keepa_crawl_flag") == 1, F.lit(1)).otherwise(F.lit(0)).alias("keepa_crawl_flag"), F.lit(self.site_name).alias("site_name") ) def save_data(self): # print(f"清除hdfs目录中:{self.hdfs_path}") # HdfsUtils.delete_file_in_folder(self.hdfs_path) df_save = self.df_asin_handle_launchtime.repartition(self.partitions_num) CommonUtil.save_or_update_table(spark_session=self.spark,hive_tb_name=self.hive_table,partition_dict=self.partition_dict,df_save=df_save,drop_exist_tmp_flag=False) print("success") def run(self): self.read_data() self.handle_launchtime_data() self.save_data() if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) lock_name = "dim_asin_launchtime_info" if date_type == "all": # 如果执行数据为all的情况,非自然解锁情况,则需锁定该表2h lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=120 * 60, retry_flag=True, retry_time=10*60) else: lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=30 * 60, retry_flag=True, retry_time=10 * 60) if lock_flag: try: obj = DimAsinLaunchtimeInfo(site_name, date_type, date_info) obj.run() finally: # 执行完成后释放锁 RedisUtils.release_redis_lock(lock_name)