""" @Author : HuangJian @SourceTable : ①ods_seller_account_syn ②ods_seller_asin_account ③ods_seller_account_feedback @SinkTable : ①dim_fd_asin_info @CreateTime : 2022/12/19 9:56 @UpdateTime : 2022/12/19 9:56 """ import os import sys import re from functools import reduce sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F from utils.spark_util import SparkUtil from pyspark.sql.types import StringType, IntegerType, DoubleType from utils.hdfs_utils import HdfsUtils from utils.common_util import CommonUtil class DwtFdAsinInfo(object): def __init__(self, site_name='us'): super().__init__() self.hive_tb = "dim_fd_asin_info" self.site_name = site_name self.partition_dict = { "site_name": site_name, } # 落表路径校验 self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict) app_name = f"{self.hive_tb}:{self.site_name}" self.spark = SparkUtil.get_spark_session(app_name) self.partitions_num = CommonUtil.reset_partitions(self.site_name, 80) # 初始化全局变量df--ods获取数据的原始df self.df_seller_account_syn = self.spark.sql("select 1+1;") self.df_seller_account_feedback = self.spark.sql("select 1+1;") self.df_fd_asin = self.spark.sql("select 1+1;") # 初始化全局变量df--dwd层转换输出的df self.df_save = self.spark.sql(f"select 1+1;") # 注册自定义函数 (UDF) self.u_handle_seller_unique = F.udf(self.udf_handle_seller_unique, StringType()) @staticmethod def udf_handle_seller_unique(fd_url): return fd_url.split("seller=")[-1].split('&')[0] # 1.获取原始数据 def read_data(self): # 获取ods_seller相关原始表 print("获取 ods_seller_account_syn") sql = f"select id as fd_account_id,seller_id as unique_id, account_name as fd_account_name," \ f"lower(account_name) as fd_account_name_lower, url as fd_url, created_at, updated_at from " \ f" ods_seller_account_syn where site_name='{self.site_name}' " \ # f"group by id,account_name" self.df_seller_account_syn = self.spark.sql(sqlQuery=sql) print(sql) # 避免重复数据 self.df_seller_account_syn = self.df_seller_account_syn.orderBy(self.df_seller_account_syn.fd_account_id.desc()) self.df_seller_account_syn = self.df_seller_account_syn.drop_duplicates(['unique_id']) # print("self.df_seller_account_syn:", self.df_seller_account_syn.show(10, truncate=False)) print("获取 ods_seller_account_feedback") # 抓取会有重复,因此需要全局去重,count取最大即是最近的一个数据 sql = f"select unique_id, " \ f" country_name as fd_country_name " \ f"from (select seller_id as unique_id,country_name, " \ f" row_number() over (partition by seller_id order by created_at desc) sort_flag " \ f" from ods_seller_account_feedback " \ f" where site_name = '{self.site_name}'" \ f" ) t1 " \ f"where sort_flag = 1; " self.df_seller_account_feedback = self.spark.sql(sqlQuery=sql) print(sql) # print("self.df_seller_account_feedback", self.df_seller_account_feedback.show(10, truncate=False)) # 获取我们内部的店铺与asin的数据库(从搜索词抓下来,店铺与asin的关系表) print("获取 ods_seller_asin_account") sql = f""" select seller_id as unique_id,asin from ods_seller_asin_account where site_name='{self.site_name}' """ self.df_fd_asin = self.spark.sql(sqlQuery=sql) self.df_fd_asin = self.df_fd_asin.drop_duplicates(['unique_id', 'asin']) print(sql) def sava_data(self): # 店铺+asin 通过fd_account_name 拼接获取到fd_unique df_save = self.df_seller_account_syn.join(self.df_seller_account_feedback, on='unique_id', how='left') df_save = df_save.join( self.df_fd_asin, on='unique_id', how='left' ) df_save = df_save.select( F.col('fd_account_id'), F.col('unique_id').alias('fd_unique'), F.col('fd_account_name'), F.col('fd_account_name_lower'), F.col('fd_country_name'), F.col('fd_url'), F.col('asin'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_at'), F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_at'), F.lit(self.site_name).alias('site_name'), ) print(f"清除hdfs目录中:{self.hdfs_path}") HdfsUtils.delete_file_in_folder(self.hdfs_path) df_save = df_save.repartition(self.partitions_num) partition_by = ["site_name"] print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", ) df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") def run(self): self.read_data() self.sava_data() if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 # date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter # date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwtFdAsinInfo(site_name=site_name) handle_obj.run()