dim_fd_asin_info.py 5.71 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
"""
   @Author      : HuangJian
   @SourceTable :
                  ①ods_seller_account_syn
                  ②ods_seller_asin_account
                  ③ods_seller_account_feedback
   @SinkTable   :
                  ①dim_fd_asin_info
   @CreateTime  : 2022/12/19 9:56
   @UpdateTime  : 2022/12/19 9:56
"""

import os
import sys
import re
from functools import reduce



sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from utils.spark_util import SparkUtil
from pyspark.sql.types import StringType, IntegerType, DoubleType
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil


class DwtFdAsinInfo(object):
    def __init__(self, site_name='us'):
        super().__init__()
        self.hive_tb = "dim_fd_asin_info"
        self.site_name = site_name
        self.partition_dict = {
            "site_name": site_name,
        }
        # 落表路径校验
        self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict)
        app_name = f"{self.hive_tb}:{self.site_name}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.partitions_num = CommonUtil.reset_partitions(self.site_name, 80)


        # 初始化全局变量df--ods获取数据的原始df
        self.df_seller_account_syn = self.spark.sql("select 1+1;")
        self.df_seller_account_feedback = self.spark.sql("select 1+1;")
        self.df_fd_asin = self.spark.sql("select 1+1;")

        # 初始化全局变量df--dwd层转换输出的df
        self.df_save = self.spark.sql(f"select 1+1;")
        # 注册自定义函数 (UDF)
        self.u_handle_seller_unique = F.udf(self.udf_handle_seller_unique, StringType())

    @staticmethod
    def udf_handle_seller_unique(fd_url):
        return fd_url.split("seller=")[-1].split('&')[0]

    # 1.获取原始数据
    def read_data(self):
        # 获取ods_seller相关原始表
        print("获取 ods_seller_account_syn")
        sql = f"select id as fd_account_id,seller_id as unique_id, account_name as fd_account_name," \
              f"lower(account_name) as fd_account_name_lower, url as fd_url, created_at, updated_at from " \
              f" ods_seller_account_syn where site_name='{self.site_name}' " \
              # f"group by id,account_name"
        self.df_seller_account_syn = self.spark.sql(sqlQuery=sql)
        print(sql)
        # 避免重复数据
        self.df_seller_account_syn = self.df_seller_account_syn.orderBy(self.df_seller_account_syn.fd_account_id.desc())
        self.df_seller_account_syn = self.df_seller_account_syn.drop_duplicates(['unique_id'])
        # print("self.df_seller_account_syn:", self.df_seller_account_syn.show(10, truncate=False))


        print("获取 ods_seller_account_feedback")
        # 抓取会有重复,因此需要全局去重,count取最大即是最近的一个数据
        sql = f"select unique_id, " \
              f"       country_name as fd_country_name " \
              f"from (select seller_id as unique_id,country_name, " \
              f"         row_number() over (partition by seller_id order by created_at desc) sort_flag " \
              f"         from ods_seller_account_feedback  " \
              f"         where site_name = '{self.site_name}'" \
              f"    ) t1 " \
              f"where sort_flag = 1;  "
        self.df_seller_account_feedback = self.spark.sql(sqlQuery=sql)
        print(sql)
        # print("self.df_seller_account_feedback", self.df_seller_account_feedback.show(10, truncate=False))

        # 获取我们内部的店铺与asin的数据库(从搜索词抓下来,店铺与asin的关系表)
        print("获取 ods_seller_asin_account")
        sql = f"""
                    select seller_id as unique_id,asin from ods_seller_asin_account 
                    where site_name='{self.site_name}'
                """
        self.df_fd_asin = self.spark.sql(sqlQuery=sql)
        self.df_fd_asin = self.df_fd_asin.drop_duplicates(['unique_id', 'asin'])
        print(sql)


    def sava_data(self):
        # 店铺+asin 通过fd_account_name 拼接获取到fd_unique
        df_save = self.df_seller_account_syn.join(self.df_seller_account_feedback, on='unique_id', how='left')

        df_save = df_save.join(
            self.df_fd_asin, on='unique_id', how='left'
        )

        df_save = df_save.select(
            F.col('fd_account_id'),
            F.col('unique_id').alias('fd_unique'),
            F.col('fd_account_name'),
            F.col('fd_account_name_lower'),
            F.col('fd_country_name'),
            F.col('fd_url'),
            F.col('asin'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('created_at'),
            F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS').alias('updated_at'),
            F.lit(self.site_name).alias('site_name'),
        )

        print(f"清除hdfs目录中:{self.hdfs_path}")
        HdfsUtils.delete_file_in_folder(self.hdfs_path)
        df_save = df_save.repartition(self.partitions_num)
        partition_by = ["site_name"]
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
        df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")

    def run(self):
        self.read_data()
        self.sava_data()

if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    # date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    # date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwtFdAsinInfo(site_name=site_name)
    handle_obj.run()