dim_asin_variation_info.py 6.92 KB
import os
import sys
import re

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# 导入udf公共方法
from yswg_utils.common_udf import udf_asin_to_number
# from ..yswg_utils.common_udf import udf_asin_to_number

# import importlib
# # 动态导入yswg_utils.common_udf模块
# # common_udf_module = importlib.import_module("yswg_utils.common_udf")
# # # 从模块中获取udf_asin_to_number函数
# # udf_asin_to_number = getattr(common_udf_module, "udf_asin_to_number")
# import sys
# sys.path.append('/opt/module/spark/demo/py_demo')
#
# from yswg_utils.common_udf import udf_asin_to_number
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils


class DimAsinVariationInfo(Templates):

    def __init__(self, site_name='us', date_type='week', date_info='2024-12'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        # 初始化self.spark对
        self.db_save = 'dim_asin_variation_info'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.df_save = self.spark.sql("select 1+1;")
        self.df_asin_variation = self.spark.sql("select 1+1;")
        self.df_asin_detail = self.spark.sql("select 1+1;")
        # self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.partitions_by = ['site_name']
        self.partitions_dict = {
            "site_name": site_name
        }
        self.reset_partitions(partitions_num=20)
        self.u_asin_to_number = self.spark.udf.register("u_asin_to_number", udf_asin_to_number, IntegerType())
        self.u_part_int = self.spark.udf.register("u_part_int", self.udf_part_int, IntegerType())

    @staticmethod
    def udf_part_int(mapped_asin):
        part = int(mapped_asin / 10000_0000) + 1
        return part

    def read_data(self):
        # if self.site_name == 'us':
        #     sql = f"SELECT * from ods_asin_variation where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in ('0000-00', '{self.date_info}');"  # and date_info>='2023-15'
        # else:
        #     sql = f"SELECT * from ods_asin_variat where site_name='{self.site_name}';"
        sql = f"SELECT * from ods_asin_variation where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in ('0000-00', '{self.date_info}');"  # and date_info>='2023-15'
        print(f"1. 读取ods_asin_variation表: sql -- {sql}")
        self.df_asin_variation = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_variation = self.df_asin_variation.withColumn("created_date", F.substring(self.df_asin_variation["created_time"], 1, 10))  # 提取日期部分,格式为 YYYY-MM-DD
        self.df_asin_variation = self.df_asin_variation.drop("id")
        self.df_asin_variation.show(10, truncate=False)

        if self.date_type in ["month", "month_week"] and self.date_info >= '2024-05':
            sql = f"SELECT asin, variat_num, created_at as created_time from ods_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in ('0000-00', '{self.date_info}');"  # and date_info>='2023-15'
            print(f"2. 读取ods_asin_variation表: sql -- {sql}")
            self.df_asin_detail = self.spark.sql(sqlQuery=sql).cache()
            # self.df_asin_variation = self.df_asin_variation.withColumn("created_date", F.substring(self.df_asin_variation["created_time"], 1, 10))  # 提取日期部分,格式为 YYYY-MM-DD
            self.df_asin_detail.show(10, truncate=False)
            self.handle_data_drop_variation()

    def handle_data(self):
        # 过滤掉 parent_asin 长度不为 10 的行
        self.df_asin_variation = self.df_asin_variation.filter(F.length("parent_asin") == 10)
        # 新增mapped_asin字段
        self.df_asin_variation = self.df_asin_variation.withColumn('mapped_asin', self.u_asin_to_number('parent_asin'))
        # self.df_asin_variation.show(10, truncate=False)
        # 新增part分区
        # self.df_asin_variation = self.df_asin_variation.withColumn('part', self.u_part_int('mapped_asin'))
        # 根据"asin", "parent_asin"分组, 降序取最新时间的数据  -- 旧: ["asin", "parent_asin"]
        window = Window.partitionBy(["asin"]).orderBy(
            self.df_asin_variation.created_time.desc_nulls_last()
        )
        self.df_asin_variation = self.df_asin_variation.withColumn("type_rank", F.row_number().over(window=window)). \
            filter("type_rank=1").drop("type_rank", "id")

        # 根据parent_asin取最新日期的数据
        window = Window.partitionBy(["parent_asin"]).orderBy(
            self.df_asin_variation.created_date.desc_nulls_last()
        )
        df = self.df_asin_variation.withColumn("type_rank", F.row_number().over(window=window)). \
            filter("type_rank=1").select("parent_asin", "created_date")

        self.df_asin_variation = df.join(self.df_asin_variation, on=["parent_asin", "created_date"], how='inner')
        self.df_asin_variation.show(10, truncate=False)
        self.df_save = self.df_asin_variation
        # 测试uk,de不需要注释
        # self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        # self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
        # self.df_save = self.df_save.drop("date_type", "date_info")
        print(f"self.df_save.count(): {self.df_save.count()}")
        # self.df_save = self.df_save.join(self.df_asin_detail, self.df_save.asin == self.df_asin_detail.asin,
        #                                  "left_anti")
        print(f"self.df_save.count(): {self.df_save.count()}")
        # quit()


        hdfs_path = CommonUtil.build_hdfs_path(self.db_save, partition_dict=self.partitions_dict)
        print(f"当前存储的表名为:{self.db_save},分区为{self.partitions_dict}")
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_file_in_folder(hdfs_path)

    def handle_data_drop_variation(self):
        window = Window.partitionBy(["asin"]).orderBy(
            self.df_asin_detail.created_time.desc_nulls_last()
        )
        self.df_asin_detail = self.df_asin_detail.withColumn("type_rank", F.row_number().over(window=window)).filter("type_rank=1").drop("type_rank", "id")
        self.df_asin_detail = self.df_asin_detail.filter("variat_num is null")
        self.df_asin_detail.show(10, truncate=False)



if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数1:站点
    date_info = sys.argv[3]  # 参数1:站点
    handle_obj = DimAsinVariationInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()