dim_asin_variation_info.py 6.92 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
import os
import sys
import re

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# 导入udf公共方法
from yswg_utils.common_udf import udf_asin_to_number
# from ..yswg_utils.common_udf import udf_asin_to_number

# import importlib
# # 动态导入yswg_utils.common_udf模块
# # common_udf_module = importlib.import_module("yswg_utils.common_udf")
# # # 从模块中获取udf_asin_to_number函数
# # udf_asin_to_number = getattr(common_udf_module, "udf_asin_to_number")
# import sys
# sys.path.append('/opt/module/spark/demo/py_demo')
#
# from yswg_utils.common_udf import udf_asin_to_number
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils


class DimAsinVariationInfo(Templates):

    def __init__(self, site_name='us', date_type='week', date_info='2024-12'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        # 初始化self.spark对
        self.db_save = 'dim_asin_variation_info'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.df_save = self.spark.sql("select 1+1;")
        self.df_asin_variation = self.spark.sql("select 1+1;")
        self.df_asin_detail = self.spark.sql("select 1+1;")
        # self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.partitions_by = ['site_name']
        self.partitions_dict = {
            "site_name": site_name
        }
        self.reset_partitions(partitions_num=20)
        self.u_asin_to_number = self.spark.udf.register("u_asin_to_number", udf_asin_to_number, IntegerType())
        self.u_part_int = self.spark.udf.register("u_part_int", self.udf_part_int, IntegerType())

    @staticmethod
    def udf_part_int(mapped_asin):
        part = int(mapped_asin / 10000_0000) + 1
        return part

    def read_data(self):
        # if self.site_name == 'us':
        #     sql = f"SELECT * from ods_asin_variation where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in ('0000-00', '{self.date_info}');"  # and date_info>='2023-15'
        # else:
        #     sql = f"SELECT * from ods_asin_variat where site_name='{self.site_name}';"
        sql = f"SELECT * from ods_asin_variation where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in ('0000-00', '{self.date_info}');"  # and date_info>='2023-15'
        print(f"1. 读取ods_asin_variation表: sql -- {sql}")
        self.df_asin_variation = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_variation = self.df_asin_variation.withColumn("created_date", F.substring(self.df_asin_variation["created_time"], 1, 10))  # 提取日期部分,格式为 YYYY-MM-DD
        self.df_asin_variation = self.df_asin_variation.drop("id")
        self.df_asin_variation.show(10, truncate=False)

        if self.date_type in ["month", "month_week"] and self.date_info >= '2024-05':
            sql = f"SELECT asin, variat_num, created_at as created_time from ods_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in ('0000-00', '{self.date_info}');"  # and date_info>='2023-15'
            print(f"2. 读取ods_asin_variation表: sql -- {sql}")
            self.df_asin_detail = self.spark.sql(sqlQuery=sql).cache()
            # self.df_asin_variation = self.df_asin_variation.withColumn("created_date", F.substring(self.df_asin_variation["created_time"], 1, 10))  # 提取日期部分,格式为 YYYY-MM-DD
            self.df_asin_detail.show(10, truncate=False)
            self.handle_data_drop_variation()

    def handle_data(self):
        # 过滤掉 parent_asin 长度不为 10 的行
        self.df_asin_variation = self.df_asin_variation.filter(F.length("parent_asin") == 10)
        # 新增mapped_asin字段
        self.df_asin_variation = self.df_asin_variation.withColumn('mapped_asin', self.u_asin_to_number('parent_asin'))
        # self.df_asin_variation.show(10, truncate=False)
        # 新增part分区
        # self.df_asin_variation = self.df_asin_variation.withColumn('part', self.u_part_int('mapped_asin'))
        # 根据"asin", "parent_asin"分组, 降序取最新时间的数据  -- 旧: ["asin", "parent_asin"]
        window = Window.partitionBy(["asin"]).orderBy(
            self.df_asin_variation.created_time.desc_nulls_last()
        )
        self.df_asin_variation = self.df_asin_variation.withColumn("type_rank", F.row_number().over(window=window)). \
            filter("type_rank=1").drop("type_rank", "id")

        # 根据parent_asin取最新日期的数据
        window = Window.partitionBy(["parent_asin"]).orderBy(
            self.df_asin_variation.created_date.desc_nulls_last()
        )
        df = self.df_asin_variation.withColumn("type_rank", F.row_number().over(window=window)). \
            filter("type_rank=1").select("parent_asin", "created_date")

        self.df_asin_variation = df.join(self.df_asin_variation, on=["parent_asin", "created_date"], how='inner')
        self.df_asin_variation.show(10, truncate=False)
        self.df_save = self.df_asin_variation
        # 测试uk,de不需要注释
        # self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        # self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
        # self.df_save = self.df_save.drop("date_type", "date_info")
        print(f"self.df_save.count(): {self.df_save.count()}")
        # self.df_save = self.df_save.join(self.df_asin_detail, self.df_save.asin == self.df_asin_detail.asin,
        #                                  "left_anti")
        print(f"self.df_save.count(): {self.df_save.count()}")
        # quit()


        hdfs_path = CommonUtil.build_hdfs_path(self.db_save, partition_dict=self.partitions_dict)
        print(f"当前存储的表名为:{self.db_save},分区为{self.partitions_dict}")
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_file_in_folder(hdfs_path)

    def handle_data_drop_variation(self):
        window = Window.partitionBy(["asin"]).orderBy(
            self.df_asin_detail.created_time.desc_nulls_last()
        )
        self.df_asin_detail = self.df_asin_detail.withColumn("type_rank", F.row_number().over(window=window)).filter("type_rank=1").drop("type_rank", "id")
        self.df_asin_detail = self.df_asin_detail.filter("variat_num is null")
        self.df_asin_detail.show(10, truncate=False)



if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数1:站点
    date_info = sys.argv[3]  # 参数1:站点
    handle_obj = DimAsinVariationInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()