""" @Author : HuangJian @Description : 关键词与Asin详情维表 @SourceTable : ①dwd_st_asin_measure ②dwt_aba_st_analytics @SinkTable : dwt_aba_last_change_rate @CreateTime : 2022/03/13 14:55 @UpdateTime : 2022/03/13 14:55 """ import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.hdfs_utils import HdfsUtils from utils.common_util import CommonUtil, DateTypes from utils.spark_util import SparkUtil from pyspark.sql import functions as F class DwtAbaLastChangeRate(object): def __init__(self, site_name, date_type, date_info): self.site_name = site_name self.date_type = date_type self.date_info = date_info self.hive_tb = "dwt_aba_last_change_rate" app_name = f"{ self.hive_tb}:{site_name}:{date_type}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.partitions_num = CommonUtil.reset_partitions(site_name, 1) hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.hive_tb}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}" print(f"清除hdfs目录中数据:{hdfs_path}") HdfsUtils.delete_hdfs_file(hdfs_path) self.last_year_index = int # 计算环比日期 self.last_date_info = self.handle_date_offset(0) # 计算同比日期 self.last_year_date_info = self.handle_date_offset(1) # 初始化全局df self.df_aba_analytics = self.spark.sql(f"select 1+1;") self.df_aba_analytics_old = self.spark.sql(f"select 1+1;") self.df_st_base_data = self.spark.sql(f"select 1+1;") self.df_st_last_data = self.spark.sql(f"select 1+1;") self.df_st_last_year_data = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") def handle_date_offset(self, handle_type: int): # handle_type = 0 代表计算环比日期,等于 1 代表计算同比日期 handle_date = self.date_info if handle_type == 0: # 计算环比计算日期--通过检索表的各分区,取当前计算日上一周期 if self.date_type == DateTypes.last365day.name: # 当date_type为['last365day']时,检索分区dwt_aba_last365 date_df = CommonUtil.select_partitions_df(self.spark, "dwt_aba_last365") handle_date = date_df.filter( f"site_name = '{self.site_name}' date_type = '{self.date_type}' and and date_info < '{self.date_info}' " ).selectExpr("max(date_info)").rdd.flatMap(lambda ele: ele).collect()[0] else: # 当date_type为['day','week','month','last30day']时,检索分区dwt_aba_st_analytics date_df = CommonUtil.select_partitions_df(self.spark, "dwt_aba_st_analytics") handle_date = date_df.filter( f"site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info < '{self.date_info}' " ).selectExpr("max(date_info)").rdd.flatMap(lambda ele: ele).collect()[0] else: # 计算同比计算日期 (无论哪个日期类型,开头参数分割第一个参数均为年) year_int = int(CommonUtil.safeIndex(handle_date.split("-"), 0, None)) last_year_int = year_int - 1 self.last_year_index = last_year_int # 将当前年份,替换成同比年份 handle_date = handle_date.replace(str(year_int), str(last_year_int)) print("计算处理之后的日期:", handle_date) return handle_date def run(self): # aba :365的取值表逻辑与其他时间区间(day,week,month,last30day)的取值逻辑不一致 if self.date_type == DateTypes.last365day.name: self.handle_365_data() else: self.read_data() self.handle_base() self.handle_year_ratio() self.save_data() def read_data(self): sql1 = f""" select id, search_term, rank, bsr_orders, asin_cn_count, asin_fbm_count, asin_amazon_count, search_volume from dwt_aba_st_analytics where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ self.df_aba_analytics = self.spark.sql(sql1).repartition(40, 'id').cache() self.df_aba_analytics.show(10, truncate=True) sql2 = f""" select id, rank as last_rank, bsr_orders as last_bsr_orders, asin_cn_count as last_asin_cn_count, asin_fbm_count as last_asin_fbm_count, asin_amazon_count as last_asin_amazon_count from dwt_aba_st_analytics where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.last_date_info}' """ self.df_aba_analytics_old = self.spark.sql(sql2).repartition(40, 'id').cache() self.df_aba_analytics_old.show(10, truncate=True) # 获取同比周期数据--(2022年的同比数据只能通过dim_st_detail取到搜索词排名) if self.last_year_date_info <= '2022-09': sql = f""" select search_term, st_rank as last_year_rank, null as last_year_bsr_orders, null as last_year_asin_cn_count, null as last_year_asin_fbm_count, null as last_year_asin_amazon_count, st_search_num as last_year_search_volume from dim_st_detail where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.last_year_date_info}' """ else: sql = f""" select search_term, rank as last_year_rank, bsr_orders as last_year_bsr_orders, asin_cn_count as last_year_asin_cn_count, asin_fbm_count as last_year_asin_fbm_count, asin_amazon_count as last_year_asin_amazon_count, search_volume as last_year_search_volume from dwt_aba_st_analytics where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.last_year_date_info}' """ self.df_st_last_year_data = self.spark.sql(sql).repartition(40, 'search_term').cache() self.df_st_last_year_data.show(10, truncate=True) def handle_base(self): self.df_st_base_data = self.df_aba_analytics.join( self.df_aba_analytics_old, on='id', how='left' ) self.df_st_base_data = self.df_st_base_data.withColumn( 'rank_rate_of_change', F.round((F.col('rank') - F.col('last_rank')) / F.col('last_rank'), 3) ).withColumn( 'bsr_orders_rate_of_change', F.round((F.col('bsr_orders') - F.col('last_bsr_orders')) / F.col('last_bsr_orders'), 3) ).withColumn( 'cn_seller_rate_of_change', F.round((F.col('asin_cn_count') - F.col('last_asin_cn_count')) / F.col('last_asin_cn_count'), 3) ).withColumn( 'fbm_rate_of_change', F.round((F.col('asin_fbm_count') - F.col('last_asin_fbm_count')) / F.col('last_asin_fbm_count'), 3) ).withColumn( 'amazon_rate_of_change', F.round((F.col('asin_amazon_count') - F.col('last_asin_amazon_count')) / F.col('last_asin_amazon_count'), 3) ).select( 'id', 'search_term', 'rank', 'bsr_orders', 'asin_cn_count', 'asin_fbm_count', 'asin_amazon_count', 'rank_rate_of_change', 'bsr_orders_rate_of_change', 'cn_seller_rate_of_change', 'fbm_rate_of_change', 'amazon_rate_of_change', 'search_volume' ).repartition(40, 'search_term').cache() self.df_aba_analytics.unpersist() self.df_aba_analytics_old.unpersist() def handle_year_ratio(self): # 计算同比逻辑 df_year_ratio = self.df_st_base_data.join( self.df_st_last_year_data, on='search_term', how='left' ) df_year_ratio = df_year_ratio.withColumn( "rank_change_rate", F.round(F.expr("(rank - last_year_rank) / last_year_rank"), 3) ).withColumn( "bsr_orders_change_rate", F.round(F.expr("(bsr_orders - last_year_bsr_orders) / last_year_bsr_orders"), 3) ).withColumn( "cn_seller_change_rate", F.round(F.expr("(asin_cn_count - last_year_asin_cn_count) / last_year_asin_cn_count"), 3) ).withColumn( "fbm_change_rate", F.round(F.expr("(asin_fbm_count - last_year_asin_fbm_count) / last_year_asin_fbm_count"), 3) ).withColumn( "amazon_change_rate", F.round(F.expr("(asin_amazon_count - last_year_asin_amazon_count) / last_year_asin_amazon_count"), 3) ).withColumn( "search_volume_change_rate", F.round(F.expr("(search_volume - last_year_search_volume) / last_year_search_volume"), 3) ).na.fill({ # 默认值1000,本次有数据 同比(环比)没有数据 归为上升,排名负数代表上升 "rank_change_rate": -1000.000, "rank_rate_of_change": -1000.000, "bsr_orders_change_rate": 1000.000, "bsr_orders_rate_of_change": 1000.000, "cn_seller_change_rate": 1000.000, "cn_seller_rate_of_change": 1000.000, "fbm_change_rate": 1000.000, "fbm_rate_of_change": 1000.000, "amazon_change_rate": 1000.000, "amazon_rate_of_change": 1000.000, "search_volume_change_rate": 1000.000 }) self.df_save = df_year_ratio def handle_365_data(self): sql = f""" with base_data as ( select id, search_term, rank, bsr_orders from dwt_aba_last365 where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' ), chain_ratio_data as ( select id, rank as last_rank, bsr_orders as last_bsr_orders from dwt_aba_last365 where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.last_date_info}' ), year_ratio_data as ( select id, rank as last_year_rank, bsr_orders as last_year_bsr_orders from dwt_aba_last365 where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.last_year_date_info}' ) select base.id, base.search_term, base.rank, base.bsr_orders, round((base.rank - chain.last_rank)/chain.last_rank,3) as rank_rate_of_change, round((base.bsr_orders - chain.last_bsr_orders)/chain.last_bsr_orders,3) as bsr_orders_rate_of_change, round((base.rank - year.last_year_rank)/year.last_year_rank,3) as rank_change_rate, round((base.bsr_orders - year.last_year_bsr_orders)/year.last_year_bsr_orders,3) as bsr_orders_change_rate from base_data base left join chain_ratio_data chain on base.id = chain.id left join year_ratio_data year on base.id = year.id """ print("动态365的同比、环比计算sql语句:", sql) self.df_save = self.spark.sql(sqlQuery=sql) # 字段补全 self.df_save = self.df_save.withColumn( "cn_seller_rate_of_change", F.lit(None) ).withColumn( "cn_seller_change_rate", F.lit(None) ).withColumn( "fbm_rate_of_change", F.lit(None) ).withColumn( "fbm_change_rate", F.lit(None) ).withColumn( "amazon_rate_of_change", F.lit(None) ).withColumn( "amazon_change_rate", F.lit(None) ) def save_data(self): self.df_save = self.df_save.select( F.col("id").alias("search_term_id"), F.col("search_term"), F.col("rank_rate_of_change"), F.col("rank_change_rate"), F.col("bsr_orders_rate_of_change"), F.col("bsr_orders_change_rate"), F.col("cn_seller_rate_of_change"), F.col("cn_seller_change_rate"), F.col("fbm_rate_of_change"), F.col("fbm_change_rate"), F.col("amazon_rate_of_change"), F.col("amazon_change_rate"), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias("created_time"), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias("updated_time"), F.col("search_volume_change_rate"), F.lit(self.site_name).alias("site_name"), F.lit(self.date_type).alias("date_type"), F.lit(self.date_info).alias("date_info") ) # 类型转换 self.df_save = CommonUtil.auto_transfer_type(self.spark, self.df_save, self.hive_tb) self.df_save = self.df_save.repartition(self.partitions_num) partition_by = ["site_name", "date_type", "date_info"] print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", ) self.df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) obj = DwtAbaLastChangeRate(site_name, date_type, date_info) obj.run()