dwt_nsr_asin_detail.py 13.1 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from pyspark.sql import functions as F
from utils.common_util import CommonUtil, DateTypes
from utils.spark_util import SparkUtil
from pyspark.sql.types import BooleanType, IntegerType, StructType, StructField, StringType
from yswg_utils.common_df import get_self_asin_df, get_asin_unlanuch_df, get_node_first_id_df
from yswg_utils.common_udf import category_craw_flag, udf_get_package_quantity

"""
聚合nsr榜单Asin
"""


class DwtNsrAsinDetail(object):

    def __init__(self, site_name, date_info):
        self.site_name = site_name
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.hive_tb = "dwt_nsr_asin_detail"
        self.current_month = CommonUtil.reformat_date(self.date_info, "%Y-%m-%d", "%Y-%m", )
        self.udf_category_craw_flag = F.udf(category_craw_flag, BooleanType())

        self.udf_cal_first_category_rank = F.udf(self.cal_first_category_rank,
                                                 StructType([StructField("first_category_rank", IntegerType(), True),
                                                             StructField("first_category_rank_date", StringType(), True)])
                                                 )

        self.udf_cal_package_quantity = F.udf(self.cal_package_quantity, IntegerType())
        self.udf_cal_seller_country_type = F.udf(self.cal_seller_country_type, IntegerType())

        pass

    @staticmethod
    def cal_first_category_rank(category_id, category_first_id, bsr_date_info, current_bsr_rank, flow_update_time,
                                first_category_rank):
        if category_id == category_first_id:
            #  如果是大类直接取 bsr_rank
            return (current_bsr_rank, bsr_date_info)
        else:
            # 小类则取流量选品
            return (first_category_rank, flow_update_time)

    @staticmethod
    def cal_package_quantity(title):
        """
        打包数量
        """
        val = udf_get_package_quantity(title)
        if val is None:
            val = 1
        return val

    @staticmethod
    def cal_seller_country_type(asin_buy_box_seller_type, seller_country_name):
        """
        卖家所属地类型
        :param asin_buy_box_seller_type:
        :param seller_country_name:
        :return:
        """
        if str(seller_country_name).lower() not in ['none', 'null']:
            if asin_buy_box_seller_type == 1:
                return 4
            elif asin_buy_box_seller_type != 1 and str(seller_country_name).upper().find('US') != -1:
                return 1
            elif asin_buy_box_seller_type != 1 and str(seller_country_name).upper().find('CN') != -1:
                return 2
            else:
                return 3
        else:
            return 0

    def run(self):
        df_dwt_flow_asin_part = CommonUtil.select_partitions_df(self.spark, "dwt_flow_asin")

        # dwt_flow_asin month 最新分区
        dwt_flow_asin_last_month = df_dwt_flow_asin_part \
            .filter(f"date_type = '{DateTypes.month.name}' and site_name = '{self.site_name}' ") \
            .selectExpr("max(date_info)").rdd.flatMap(lambda it: it).collect()[0]

        #  近30天的asin
        day_30_before = CommonUtil.get_day_offset(self.date_info, -30)

        sql = f"""
                select tmp.asin,
                       title,
                       img_url,
                       img_type,
                       ao_val,
                       rating,
                       total_comments,
                       bsr_orders,
                       bsr_orders_change,
                       price,
                       weight,
                       launch_time,
                       brand_name,
                       buy_box_seller_type,
                       account_name,
                       volume,
                       last_update_time,
                       asin_air_freight_gross_margin,
                       asin_ocean_freight_gross_margin,
                       category_first_id,
                        first_category_rank,
                        seller_id,
                        account_name,
                        seller_country_name,
                        asin_bought_month,
                        tmp_category_id
            from (
                         select asin , first(category_id) as tmp_category_id
                         from dwd_nsr_asin_rank
                         where site_name = '{self.site_name}'
                              and date_info <= "{self.date_info}"
                              and date_info >= "{day_30_before}"
                         group by asin
                 ) tmp
                     left join (
               select asin,
                       fd_unique              as seller_id,
                       first(fd_account_name) as account_name,
                       first(fd_country_name) as seller_country_name
                from dim_fd_asin_info
                where site_name = '{self.site_name}'
                group by asin, fd_unique
            ) tmp1 on tmp.asin = tmp1.asin
                     left join (
                    select asin,
                           asin_title               as title,
                           asin_img_url             as img_url,
                           asin_img_type            as img_type,
                           asin_rating              as rating,
                           asin_total_comments      as total_comments,
                           asin_price               as price,
                           asin_weight              as weight,
                           asin_launch_time         as launch_time,
                           asin_volume              as volume,
                           asin_brand_name          as brand_name,
                           asin_buy_box_seller_type as buy_box_seller_type,
                           asin_crawl_date          as last_update_time,
                           asin_rank                as first_category_rank,
                           category_first_id        as category_first_id
                from dim_cal_asin_history_detail
                where site_name = '{self.site_name}'
            ) tmp2 on tmp.asin = tmp2.asin
                     left join (
            select asin,
                   asin_ao_val                     as ao_val,
                   bsr_orders                      as bsr_orders,
                   asin_bsr_orders_change          as bsr_orders_change,
                   asin_air_freight_gross_margin   as asin_air_freight_gross_margin,
                   asin_ocean_freight_gross_margin as asin_ocean_freight_gross_margin,
                   cast(asin_bought_month as int ) as asin_bought_month
                from dwt_flow_asin
                where site_name = '{self.site_name}'
                  and date_type = '{DateTypes.month.name}'
                  and date_info = '{dwt_flow_asin_last_month}'
            ) tmp3 on tmp.asin = tmp3.asin
            """
        print("======================查询sql如下======================")
        print(sql)

        df_all = self.spark.sql(sql)

        # 内部asin
        df_self_asin = get_self_asin_df(self.site_name, self.spark).select(
            F.col("asin"),
            F.lit(1).alias("asin_type"),
        )
        # 下架asin
        df_unlanuch = get_asin_unlanuch_df(self.site_name, self.spark)

        df_node_first_id = get_node_first_id_df(self.site_name, self.spark).select(
            F.col("node_id").alias("tmp_category_id"),
            F.col("category_first_id").alias("tmp_category_first_id"),
        )

        df_all = df_all \
            .join(df_self_asin, on=["asin"], how='left') \
            .join(df_unlanuch, on=["asin"], how='left') \
            .join(df_node_first_id, on=["tmp_category_id"], how='left')

        # 新品没一级分类id的补全一级分类id
        df_all = df_all.withColumn("category_first_id", F.coalesce(F.col("category_first_id"), F.col("tmp_category_first_id")))

        df_all = df_all.withColumn("crawl_flag", self.udf_category_craw_flag(F.col("category_first_id"), F.col("asin")))

        # df_all = df_all.withColumn("first_category_rank_tuple", self.udf_cal_first_category_rank(
        #     F.col("category_id"),
        #     F.col("category_first_id"),
        #     F.col("bsr_date_info"),
        #     F.col("current_bsr_rank"),
        #     F.col("last_update_time"),
        #     F.col("first_category_rank"),
        # ))

        day_before_30 = CommonUtil.get_day_offset(self.date_info, -30)
        day_before_90 = CommonUtil.get_day_offset(self.date_info, -90)
        day_before_180 = CommonUtil.get_day_offset(self.date_info, -180)
        day_before_360 = CommonUtil.get_day_offset(self.date_info, -360)
        day_before_450 = CommonUtil.get_day_offset(self.date_info, -450)
        day_before_720 = CommonUtil.get_day_offset(self.date_info, -720)
        day_before_1080 = CommonUtil.get_day_offset(self.date_info, -1080)

        # 上架时间类型
        df_all = df_all.withColumn(
            'asin_launch_time_type',
            F.when(F.col("launch_time") >= day_before_30, F.lit(1))
                .when((F.col("launch_time") >= day_before_90) & (F.col("launch_time") < day_before_30), F.lit(2))
                .when((F.col("launch_time") >= day_before_180) & (F.col("launch_time") < day_before_90), F.lit(3))
                .when((F.col("launch_time") >= day_before_360) & (F.col("launch_time") < day_before_180), F.lit(4))
                .when((F.col("launch_time") >= day_before_450) & (F.col("launch_time") < day_before_360), F.lit(5))
                .when((F.col("launch_time") >= day_before_720) & (F.col("launch_time") < day_before_450), F.lit(6))
                .when((F.col("launch_time") >= day_before_1080) & (F.col("launch_time") < day_before_720), F.lit(7))
                .otherwise(F.lit(0))
        )

        # 打包数量
        df_all = df_all.withColumn("package_quantity", self.udf_cal_package_quantity(F.col("title")))
        # 卖家所属地
        df_all = df_all.withColumn("seller_country_type",
                                   self.udf_cal_seller_country_type(F.col("buy_box_seller_type"), F.col("seller_country_name")))

        df_all = df_all.select(
            F.col('asin'),
            F.col('title'),
            F.col('img_url'),
            F.col('img_type'),
            F.col('ao_val'),
            F.col('rating'),
            # 评论数量
            F.col('total_comments'),
            F.col('bsr_orders'),
            F.col('bsr_orders_change'),
            F.col('price'),
            F.col('weight'),
            F.col('launch_time'),
            F.trim(F.col('brand_name')).alias("brand_name"),
            F.col('buy_box_seller_type'),
            F.col('account_name'),
            F.col('volume'),
            F.col('last_update_time'),
            F.col('asin_air_freight_gross_margin'),
            F.col('asin_ocean_freight_gross_margin'),
            # 大类默认值空字符串
            F.coalesce('category_first_id', F.lit("")).alias("category_first_id"),

            F.when(F.col("asin_type").isNotNull(), F.lit(1))
                .when(F.col("crawl_flag") == False, F.lit(2))
                .otherwise(0).alias("asin_type"),
            #  asin 下架时间
            F.col("asin_unlaunch_time"),

            # 店铺唯一id
            F.col("seller_id"),
            # 卖家所属地
            F.col("seller_country_name"),
            # 大类排名
            F.col("first_category_rank").alias('first_category_rank'),
            # 大类排名时间
            F.col("last_update_time").alias('first_category_rank_date'),
            # 打包数量
            F.col("package_quantity"),
            # 上架时间类型
            F.col("asin_launch_time_type"),
            # 卖家所属地类型
            F.col("seller_country_type"),

            # 亚马逊销量(月)默认值给-1
            F.coalesce('asin_bought_month', F.lit(-1)).alias("asin_bought_month"),

            F.lit(self.site_name).alias("site_name"),
            F.lit(self.current_month).alias("date_info")
        )

        # 自动对齐
        df_all = CommonUtil.format_df_with_template(self.spark, df_all, self.hive_tb, roundDouble=True)
        #  分区数设置为6
        df_all = df_all.repartition(6)

        partition_dict = {
            "site_name": self.site_name,
            "date_info": self.current_month
        }

        partition_by = list(partition_dict.keys())
        hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=partition_dict)
        print(f"清除hdfs目录中:{hdfs_path}")
        HdfsUtils.delete_file_in_folder(hdfs_path)

        hive_tb = self.hive_tb
        print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
        df_all.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_info = CommonUtil.get_sys_arg(2, None)
    obj = DwtNsrAsinDetail(site_name=site_name, date_info=date_info)
    obj.run()