""" @Author : HuangJian @SourceTable : ①ods_seller_account_syn ②ods_seller_asin_account ③ods_seller_account_feedback @SinkTable : ①dim_seller_asin_info @CreateTime : 2022/11/2 9:56 @UpdateTime : 2022/11/2 9:56 """ import datetime import traceback import os import sys from datetime import date, timedelta sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.sql.functions import ceil from pyspark.sql.types import IntegerType from utils.templates import Templates # from ..utils.templates import Templates from pyspark.sql import functions as F class DwdSellerAsinInfo(Templates): def __init__(self, site_name='us', date_type="week", date_info='2022-40'): super().__init__() self.db_save = "dim_seller_asin_info" self.site_name = site_name self.date_type = date_type self.date_info = date_info self.spark = self.create_spark_object( app_name=f"{self.db_save}:{self.site_name}_{self.date_type}_{self.date_info}") self.df_date = self.get_year_week_tuple() self.partitions_by = ['site_name', 'date_type', 'date_info'] self.partitions_num = 20 self.reset_partitions(partitions_num=self.partitions_num) # 初始化全局变量df--ods获取数据的原始df self.df_seller_account_syn = self.spark.sql("select 1+1;") self.df_seller_asin_account = self.spark.sql("select 1+1;") self.df_seller_account_feedback = self.spark.sql("select 1+1;") # 初始化全局变量df--dwd层转换输出的df self.df_save = self.spark.sql(f"select 1+1;") # 1.获取原始数据 def read_data(self): # 获取ods_seller相关原始表 print("获取 ods_seller_account_syn") sql = f"select id as account_id, account_name from ods_seller_account_syn where site_name='{self.site_name}' and date_type='{self.date_type}'and date_info='{self.date_info}' group by id,account_name" self.df_seller_account_syn = self.spark.sql(sqlQuery=sql) print("self.df_seller_account_syn:", self.df_seller_account_syn.show(10, truncate=False)) print("获取 ods_seller_asin_account") sql = f"select account_name, asin from ods_seller_asin_account where site_name = '{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' group by account_name, asin" self.df_seller_asin_account = self.spark.sql(sqlQuery=sql).cache() print("self.df_seller_asin_account:", self.df_seller_asin_account.show(10, truncate=False)) print("获取 ods_seller_account_feedback") # 抓取会有重复,因此需要全局去重,count取最大即是最近的一个数据 sql = f"select account_id, " \ f" country_name, " \ f" count_30_day as fb_count_30_day, " \ f" count_1_year as fb_count_1_year, " \ f" count_lifetime as fb_count_lifetime " \ f"from (select account_id,country_name,count_30_day, count_1_year, count_lifetime, " \ f" row_number() over (partition by account_id order by created_at desc) sort_flag " \ f" from ods_seller_account_feedback " \ f" where site_name = '{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' " \ f" ) t1 " \ f"where sort_flag = 1; " self.df_seller_account_feedback = self.spark.sql(sqlQuery=sql) # print("self.df_seller_account_feedback", self.df_seller_account_feedback.show(10, truncate=False)) def handle_data(self): self.handle_seller_asin_base() def handle_seller_asin_base(self): # ods_seller_asin_account与ods_seller_account_syn关联,得到account_id self.df_save = self.df_seller_asin_account.\ join(self.df_seller_account_syn, on='account_name', how='left') # print("df_seller_asin_account join df_seller_account_syn: ", self.df_save.show(10, truncate=False)) self.df_save = self.df_save.\ join( self.df_seller_account_feedback, on='account_id', how='left') # print("df_save join df_seller_account_feedback: ", self.df_save.show(10, truncate=False)) # 空值处理 # int类型空值处理 self.df_save = self.df_save. \ na.fill({"fb_count_30_day": 0, "fb_count_1_year": 0.0, "fb_count_lifetime": 0.0}) # String类型空值处理 self.df_save = self.df_save. \ na.fill({"country_name": "null"}) # 预留字段补全 self.df_save = self.df_save.withColumn("re_string_field1", F.lit("null")) self.df_save = self.df_save.withColumn("re_string_field2", F.lit("null")) self.df_save = self.df_save.withColumn("re_string_field3", F.lit("null")) self.df_save = self.df_save.withColumn("re_int_field1", F.lit(0)) self.df_save = self.df_save.withColumn("re_int_field2", F.lit(0)) self.df_save = self.df_save.withColumn("re_int_field3", F.lit(0)) # 分区字段补全 self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type)) self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info)) # print("self.df_save", self.df_save.show(10, truncate=False)) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwdSellerAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()