dwt_aba_last_change_rate.py 14.2 KB
"""
   @Author      : HuangJian
   @Description : 关键词与Asin详情维表
   @SourceTable :
                  ①dwd_st_asin_measure
                  ②dwt_aba_st_analytics

   @SinkTable   : dwt_aba_last_change_rate
   @CreateTime  : 2022/03/13 14:55
   @UpdateTime  : 2022/03/13 14:55
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))

from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil, DateTypes
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F


class DwtAbaLastChangeRate(object):

    def __init__(self, site_name, date_type, date_info):
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.hive_tb = "dwt_aba_last_change_rate"
        app_name = f"{ self.hive_tb}:{site_name}:{date_type}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.partitions_num = CommonUtil.reset_partitions(site_name, 1)
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.hive_tb}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
        print(f"清除hdfs目录中数据:{hdfs_path}")
        HdfsUtils.delete_hdfs_file(hdfs_path)

        self.last_year_index = int
        # 计算环比日期
        self.last_date_info = self.handle_date_offset(0)
        # 计算同比日期
        self.last_year_date_info = self.handle_date_offset(1)

        # 初始化全局df
        self.df_aba_analytics = self.spark.sql(f"select 1+1;")
        self.df_aba_analytics_old = self.spark.sql(f"select 1+1;")
        self.df_st_base_data = self.spark.sql(f"select 1+1;")
        self.df_st_last_data = self.spark.sql(f"select 1+1;")
        self.df_st_last_year_data = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")

    def handle_date_offset(self, handle_type: int):
        # handle_type = 0 代表计算环比日期,等于 1 代表计算同比日期
        handle_date = self.date_info
        if handle_type == 0:
            # 计算环比计算日期--通过检索表的各分区,取当前计算日上一周期
            if self.date_type == DateTypes.last365day.name:
                # 当date_type为['last365day']时,检索分区dwt_aba_last365
                date_df = CommonUtil.select_partitions_df(self.spark, "dwt_aba_last365")
                handle_date = date_df.filter(
                    f"site_name = '{self.site_name}' date_type = '{self.date_type}' and and date_info < '{self.date_info}' "
                ).selectExpr("max(date_info)").rdd.flatMap(lambda ele: ele).collect()[0]
            else:
                # 当date_type为['day','week','month','last30day']时,检索分区dwt_aba_st_analytics
                date_df = CommonUtil.select_partitions_df(self.spark, "dwt_aba_st_analytics")
                handle_date = date_df.filter(
                    f"site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info < '{self.date_info}' "
                ).selectExpr("max(date_info)").rdd.flatMap(lambda ele: ele).collect()[0]
        else:
            # 计算同比计算日期 (无论哪个日期类型,开头参数分割第一个参数均为年)
            year_int = int(CommonUtil.safeIndex(handle_date.split("-"), 0, None))
            last_year_int = year_int - 1
            self.last_year_index = last_year_int
            # 将当前年份,替换成同比年份
            handle_date = handle_date.replace(str(year_int), str(last_year_int))
        print("计算处理之后的日期:", handle_date)
        return handle_date

    def run(self):
        # aba :365的取值表逻辑与其他时间区间(day,week,month,last30day)的取值逻辑不一致
        if self.date_type == DateTypes.last365day.name:
            self.handle_365_data()
        else:
            self.read_data()
            self.handle_base()
            self.handle_year_ratio()
        self.save_data()

    def read_data(self):
        sql1 = f"""
            select 
                id,
                search_term,
                rank,
                bsr_orders,
                asin_cn_count,
                asin_fbm_count,
                asin_amazon_count,
                search_volume
            from dwt_aba_st_analytics
            where site_name = '{self.site_name}'
            and date_type = '{self.date_type}'
            and date_info = '{self.date_info}'
        """
        self.df_aba_analytics = self.spark.sql(sql1).repartition(40, 'id').cache()
        self.df_aba_analytics.show(10, truncate=True)

        sql2 = f"""
            select 
                id,
                rank              as last_rank,
                bsr_orders        as last_bsr_orders,
                asin_cn_count     as last_asin_cn_count,
                asin_fbm_count    as last_asin_fbm_count,
                asin_amazon_count as last_asin_amazon_count
            from dwt_aba_st_analytics
            where site_name = '{self.site_name}'
            and date_type = '{self.date_type}'
            and date_info = '{self.last_date_info}'
        """
        self.df_aba_analytics_old = self.spark.sql(sql2).repartition(40, 'id').cache()
        self.df_aba_analytics_old.show(10, truncate=True)

        # 获取同比周期数据--(2022年的同比数据只能通过dim_st_detail取到搜索词排名)
        if self.last_year_date_info <= '2022-09':
            sql = f"""
                select 
                    search_term,
                    st_rank as last_year_rank,
                    null as last_year_bsr_orders,
                    null as last_year_asin_cn_count,
                    null as last_year_asin_fbm_count,
                    null as last_year_asin_amazon_count,
                    st_search_num as last_year_search_volume
                from dim_st_detail
                where site_name = '{self.site_name}'
                and date_type = '{self.date_type}'
                and date_info = '{self.last_year_date_info}'
            """
        else:
            sql = f""" 
                select
                    search_term,
                    rank as last_year_rank, 
                    bsr_orders as last_year_bsr_orders,
                    asin_cn_count as last_year_asin_cn_count,
                    asin_fbm_count as last_year_asin_fbm_count,
                    asin_amazon_count as last_year_asin_amazon_count,
                    search_volume as last_year_search_volume
                from dwt_aba_st_analytics
                where site_name = '{self.site_name}'
                and date_type = '{self.date_type}'
                and date_info = '{self.last_year_date_info}'
            """
        self.df_st_last_year_data = self.spark.sql(sql).repartition(40, 'search_term').cache()
        self.df_st_last_year_data.show(10, truncate=True)

    def handle_base(self):
        self.df_st_base_data = self.df_aba_analytics.join(
            self.df_aba_analytics_old, on='id', how='left'
        )
        self.df_st_base_data = self.df_st_base_data.withColumn(
            'rank_rate_of_change',
            F.round((F.col('rank') - F.col('last_rank')) / F.col('last_rank'), 3)
        ).withColumn(
            'bsr_orders_rate_of_change',
            F.round((F.col('bsr_orders') - F.col('last_bsr_orders')) / F.col('last_bsr_orders'), 3)
        ).withColumn(
            'cn_seller_rate_of_change',
            F.round((F.col('asin_cn_count') - F.col('last_asin_cn_count')) / F.col('last_asin_cn_count'), 3)
        ).withColumn(
            'fbm_rate_of_change',
            F.round((F.col('asin_fbm_count') - F.col('last_asin_fbm_count')) / F.col('last_asin_fbm_count'), 3)
        ).withColumn(
            'amazon_rate_of_change',
            F.round((F.col('asin_amazon_count') - F.col('last_asin_amazon_count')) / F.col('last_asin_amazon_count'), 3)
        ).select(
            'id', 'search_term', 'rank', 'bsr_orders', 'asin_cn_count', 'asin_fbm_count', 'asin_amazon_count',
            'rank_rate_of_change', 'bsr_orders_rate_of_change', 'cn_seller_rate_of_change',
            'fbm_rate_of_change', 'amazon_rate_of_change', 'search_volume'
        ).repartition(40, 'search_term').cache()
        self.df_aba_analytics.unpersist()
        self.df_aba_analytics_old.unpersist()

    def handle_year_ratio(self):
        # 计算同比逻辑
        df_year_ratio = self.df_st_base_data.join(
            self.df_st_last_year_data, on='search_term', how='left'
        )
        df_year_ratio = df_year_ratio.withColumn(
            "rank_change_rate",
            F.round(F.expr("(rank - last_year_rank) / last_year_rank"), 3)
        ).withColumn(
            "bsr_orders_change_rate",
            F.round(F.expr("(bsr_orders - last_year_bsr_orders) / last_year_bsr_orders"), 3)
        ).withColumn(
            "cn_seller_change_rate",
            F.round(F.expr("(asin_cn_count - last_year_asin_cn_count) / last_year_asin_cn_count"), 3)
        ).withColumn(
            "fbm_change_rate",
            F.round(F.expr("(asin_fbm_count - last_year_asin_fbm_count) / last_year_asin_fbm_count"), 3)
        ).withColumn(
            "amazon_change_rate",
            F.round(F.expr("(asin_amazon_count - last_year_asin_amazon_count) / last_year_asin_amazon_count"), 3)
        ).withColumn(
            "search_volume_change_rate",
            F.round(F.expr("(search_volume - last_year_search_volume) / last_year_search_volume"), 3)
        ).na.fill({
            # 默认值1000,本次有数据 同比(环比)没有数据 归为上升,排名负数代表上升
            "rank_change_rate": -1000.000,
            "rank_rate_of_change": -1000.000,
            "bsr_orders_change_rate": 1000.000,
            "bsr_orders_rate_of_change": 1000.000,
            "cn_seller_change_rate": 1000.000,
            "cn_seller_rate_of_change": 1000.000,
            "fbm_change_rate": 1000.000,
            "fbm_rate_of_change": 1000.000,
            "amazon_change_rate": 1000.000,
            "amazon_rate_of_change": 1000.000,
            "search_volume_change_rate": 1000.000
        })
        self.df_save = df_year_ratio

    def handle_365_data(self):
        sql = f"""
            with base_data as (
                select 
                    id,
                    search_term,
                    rank,
                    bsr_orders
                from dwt_aba_last365
                where site_name = '{self.site_name}'
                and date_type = '{self.date_type}'
                and date_info = '{self.date_info}'
            ),
            chain_ratio_data as (
                select 
                    id,
                    rank              as last_rank,
                    bsr_orders        as last_bsr_orders
                from dwt_aba_last365
                where site_name = '{self.site_name}'
                and date_type = '{self.date_type}'
                and date_info = '{self.last_date_info}'
            ),
            year_ratio_data as (
                select 
                    id,
                    rank              as last_year_rank,
                    bsr_orders        as last_year_bsr_orders
                from dwt_aba_last365
                where site_name = '{self.site_name}'
                and date_type = '{self.date_type}'
                and date_info = '{self.last_year_date_info}'
            )

            select 
                base.id,
                base.search_term,
                base.rank,
                base.bsr_orders,
                round((base.rank - chain.last_rank)/chain.last_rank,3) as rank_rate_of_change,
                round((base.bsr_orders - chain.last_bsr_orders)/chain.last_bsr_orders,3) as bsr_orders_rate_of_change,
                round((base.rank - year.last_year_rank)/year.last_year_rank,3) as rank_change_rate,
                round((base.bsr_orders - year.last_year_bsr_orders)/year.last_year_bsr_orders,3) as bsr_orders_change_rate
            from base_data base left join chain_ratio_data chain
            on base.id = chain.id
            left join year_ratio_data year
            on base.id = year.id
        """
        print("动态365的同比、环比计算sql语句:", sql)
        self.df_save = self.spark.sql(sqlQuery=sql)
        # 字段补全
        self.df_save = self.df_save.withColumn(
            "cn_seller_rate_of_change", F.lit(None)
        ).withColumn(
            "cn_seller_change_rate", F.lit(None)
        ).withColumn(
            "fbm_rate_of_change", F.lit(None)
        ).withColumn(
            "fbm_change_rate", F.lit(None)
        ).withColumn(
            "amazon_rate_of_change", F.lit(None)
        ).withColumn(
            "amazon_change_rate", F.lit(None)
        )

    def save_data(self):
        self.df_save = self.df_save.select(
            F.col("id").alias("search_term_id"),
            F.col("search_term"),
            F.col("rank_rate_of_change"),
            F.col("rank_change_rate"),
            F.col("bsr_orders_rate_of_change"),
            F.col("bsr_orders_change_rate"),
            F.col("cn_seller_rate_of_change"),
            F.col("cn_seller_change_rate"),
            F.col("fbm_rate_of_change"),
            F.col("fbm_change_rate"),
            F.col("amazon_rate_of_change"),
            F.col("amazon_change_rate"),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias("created_time"),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias("updated_time"),
            F.col("search_volume_change_rate"),
            F.lit(self.site_name).alias("site_name"),
            F.lit(self.date_type).alias("date_type"),
            F.lit(self.date_info).alias("date_info")
        )

        # 类型转换
        self.df_save = CommonUtil.auto_transfer_type(self.spark, self.df_save, self.hive_tb)

        self.df_save = self.df_save.repartition(self.partitions_num)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
        self.df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    obj = DwtAbaLastChangeRate(site_name, date_type, date_info)
    obj.run()