dim_seller_asin_info.py 5.75 KB
"""
   @Author      : HuangJian
   @SourceTable :
                  ①ods_seller_account_syn
                  ②ods_seller_asin_account
                  ③ods_seller_account_feedback
   @SinkTable   :
                  ①dim_seller_asin_info
   @CreateTime  : 2022/11/2 9:56
   @UpdateTime  : 2022/11/2 9:56
"""


import datetime
import traceback
import os
import sys
from datetime import date, timedelta

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.sql.functions import ceil
from pyspark.sql.types import IntegerType
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F


class DwdSellerAsinInfo(Templates):
    def __init__(self, site_name='us', date_type="week", date_info='2022-40'):
        super().__init__()
        self.db_save = "dim_seller_asin_info"
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}:{self.site_name}_{self.date_type}_{self.date_info}")
        self.df_date = self.get_year_week_tuple()
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.partitions_num = 20
        self.reset_partitions(partitions_num=self.partitions_num)


        # 初始化全局变量df--ods获取数据的原始df
        self.df_seller_account_syn = self.spark.sql("select 1+1;")
        self.df_seller_asin_account = self.spark.sql("select 1+1;")
        self.df_seller_account_feedback = self.spark.sql("select 1+1;")

        # 初始化全局变量df--dwd层转换输出的df
        self.df_save = self.spark.sql(f"select 1+1;")


    # 1.获取原始数据
    def read_data(self):
        # 获取ods_seller相关原始表
        print("获取 ods_seller_account_syn")
        sql = f"select id as account_id, account_name from ods_seller_account_syn where site_name='{self.site_name}' and  date_type='{self.date_type}'and date_info='{self.date_info}' group by id,account_name"
        self.df_seller_account_syn = self.spark.sql(sqlQuery=sql)
        print("self.df_seller_account_syn:", self.df_seller_account_syn.show(10, truncate=False))

        print("获取 ods_seller_asin_account")
        sql = f"select account_name, asin from ods_seller_asin_account where site_name = '{self.site_name}' and  date_type='{self.date_type}' and date_info='{self.date_info}' group by account_name, asin"
        self.df_seller_asin_account = self.spark.sql(sqlQuery=sql).cache()
        print("self.df_seller_asin_account:", self.df_seller_asin_account.show(10, truncate=False))

        print("获取 ods_seller_account_feedback")
        # 抓取会有重复,因此需要全局去重,count取最大即是最近的一个数据
        sql = f"select account_id, " \
              f"       country_name, " \
              f"       count_30_day   as fb_count_30_day, " \
              f"       count_1_year   as fb_count_1_year, " \
              f"       count_lifetime as fb_count_lifetime " \
              f"from (select account_id,country_name,count_30_day, count_1_year, count_lifetime, " \
              f"         row_number() over (partition by account_id order by created_at desc) sort_flag " \
              f"         from ods_seller_account_feedback  " \
              f"         where site_name = '{self.site_name}' and  date_type='{self.date_type}' and date_info='{self.date_info}'  " \
              f"    ) t1 " \
              f"where sort_flag = 1;  "
        self.df_seller_account_feedback = self.spark.sql(sqlQuery=sql)
        # print("self.df_seller_account_feedback", self.df_seller_account_feedback.show(10, truncate=False))


    def handle_data(self):
        self.handle_seller_asin_base()

    def handle_seller_asin_base(self):
        # ods_seller_asin_account与ods_seller_account_syn关联,得到account_id
        self.df_save = self.df_seller_asin_account.\
            join(self.df_seller_account_syn, on='account_name', how='left')
        # print("df_seller_asin_account join df_seller_account_syn: ", self.df_save.show(10, truncate=False))

        self.df_save = self.df_save.\
            join( self.df_seller_account_feedback, on='account_id', how='left')
        # print("df_save join df_seller_account_feedback: ", self.df_save.show(10, truncate=False))

        # 空值处理
        # int类型空值处理
        self.df_save = self.df_save. \
            na.fill({"fb_count_30_day": 0, "fb_count_1_year": 0.0, "fb_count_lifetime": 0.0})

        # String类型空值处理
        self.df_save = self.df_save. \
            na.fill({"country_name": "null"})

        # 预留字段补全
        self.df_save = self.df_save.withColumn("re_string_field1", F.lit("null"))
        self.df_save = self.df_save.withColumn("re_string_field2", F.lit("null"))
        self.df_save = self.df_save.withColumn("re_string_field3", F.lit("null"))
        self.df_save = self.df_save.withColumn("re_int_field1", F.lit(0))
        self.df_save = self.df_save.withColumn("re_int_field2", F.lit(0))
        self.df_save = self.df_save.withColumn("re_int_field3", F.lit(0))

        # 分区字段补全
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
        # print("self.df_save", self.df_save.show(10, truncate=False))

if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwdSellerAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()