import os import sys import re sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates from pyspark.sql import functions as F from pyspark.sql.window import Window from pyspark.sql.types import StructType, StructField, IntegerType, StringType # 导入udf公共方法 from yswg_utils.common_udf import udf_asin_to_number # from ..yswg_utils.common_udf import udf_asin_to_number # import importlib # # 动态导入yswg_utils.common_udf模块 # # common_udf_module = importlib.import_module("yswg_utils.common_udf") # # # 从模块中获取udf_asin_to_number函数 # # udf_asin_to_number = getattr(common_udf_module, "udf_asin_to_number") # import sys # sys.path.append('/opt/module/spark/demo/py_demo') # # from yswg_utils.common_udf import udf_asin_to_number from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils class DimAsinVariationInfo(Templates): def __init__(self, site_name='us', date_type='week', date_info='2024-12'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info # 初始化self.spark对 self.db_save = 'dim_asin_variation_info' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") self.df_save = self.spark.sql("select 1+1;") self.df_asin_variation = self.spark.sql("select 1+1;") self.df_asin_detail = self.spark.sql("select 1+1;") # self.partitions_by = ['site_name', 'date_type', 'date_info'] self.partitions_by = ['site_name'] self.partitions_dict = { "site_name": site_name } self.reset_partitions(partitions_num=20) self.u_asin_to_number = self.spark.udf.register("u_asin_to_number", udf_asin_to_number, IntegerType()) self.u_part_int = self.spark.udf.register("u_part_int", self.udf_part_int, IntegerType()) @staticmethod def udf_part_int(mapped_asin): part = int(mapped_asin / 10000_0000) + 1 return part def read_data(self): # if self.site_name == 'us': # sql = f"SELECT * from ods_asin_variation where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in ('0000-00', '{self.date_info}');" # and date_info>='2023-15' # else: # sql = f"SELECT * from ods_asin_variat where site_name='{self.site_name}';" sql = f"SELECT * from ods_asin_variation where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in ('0000-00', '{self.date_info}');" # and date_info>='2023-15' print(f"1. 读取ods_asin_variation表: sql -- {sql}") self.df_asin_variation = self.spark.sql(sqlQuery=sql).cache() self.df_asin_variation = self.df_asin_variation.withColumn("created_date", F.substring(self.df_asin_variation["created_time"], 1, 10)) # 提取日期部分,格式为 YYYY-MM-DD self.df_asin_variation = self.df_asin_variation.drop("id") self.df_asin_variation.show(10, truncate=False) if self.date_type in ["month", "month_week"] and self.date_info >= '2024-05': sql = f"SELECT asin, variat_num, created_at as created_time from ods_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in ('0000-00', '{self.date_info}');" # and date_info>='2023-15' print(f"2. 读取ods_asin_variation表: sql -- {sql}") self.df_asin_detail = self.spark.sql(sqlQuery=sql).cache() # self.df_asin_variation = self.df_asin_variation.withColumn("created_date", F.substring(self.df_asin_variation["created_time"], 1, 10)) # 提取日期部分,格式为 YYYY-MM-DD self.df_asin_detail.show(10, truncate=False) self.handle_data_drop_variation() def handle_data(self): # 过滤掉 parent_asin 长度不为 10 的行 self.df_asin_variation = self.df_asin_variation.filter(F.length("parent_asin") == 10) # 新增mapped_asin字段 self.df_asin_variation = self.df_asin_variation.withColumn('mapped_asin', self.u_asin_to_number('parent_asin')) # self.df_asin_variation.show(10, truncate=False) # 新增part分区 # self.df_asin_variation = self.df_asin_variation.withColumn('part', self.u_part_int('mapped_asin')) # 根据"asin", "parent_asin"分组, 降序取最新时间的数据 -- 旧: ["asin", "parent_asin"] window = Window.partitionBy(["asin"]).orderBy( self.df_asin_variation.created_time.desc_nulls_last() ) self.df_asin_variation = self.df_asin_variation.withColumn("type_rank", F.row_number().over(window=window)). \ filter("type_rank=1").drop("type_rank", "id") # 根据parent_asin取最新日期的数据 window = Window.partitionBy(["parent_asin"]).orderBy( self.df_asin_variation.created_date.desc_nulls_last() ) df = self.df_asin_variation.withColumn("type_rank", F.row_number().over(window=window)). \ filter("type_rank=1").select("parent_asin", "created_date") self.df_asin_variation = df.join(self.df_asin_variation, on=["parent_asin", "created_date"], how='inner') self.df_asin_variation.show(10, truncate=False) self.df_save = self.df_asin_variation # 测试uk,de不需要注释 # self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type)) # self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info)) # self.df_save = self.df_save.drop("date_type", "date_info") print(f"self.df_save.count(): {self.df_save.count()}") # self.df_save = self.df_save.join(self.df_asin_detail, self.df_save.asin == self.df_asin_detail.asin, # "left_anti") print(f"self.df_save.count(): {self.df_save.count()}") # quit() hdfs_path = CommonUtil.build_hdfs_path(self.db_save, partition_dict=self.partitions_dict) print(f"当前存储的表名为:{self.db_save},分区为{self.partitions_dict}") print(f"清除hdfs目录中.....{hdfs_path}") HdfsUtils.delete_file_in_folder(hdfs_path) def handle_data_drop_variation(self): window = Window.partitionBy(["asin"]).orderBy( self.df_asin_detail.created_time.desc_nulls_last() ) self.df_asin_detail = self.df_asin_detail.withColumn("type_rank", F.row_number().over(window=window)).filter("type_rank=1").drop("type_rank", "id") self.df_asin_detail = self.df_asin_detail.filter("variat_num is null") self.df_asin_detail.show(10, truncate=False) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数1:站点 date_info = sys.argv[3] # 参数1:站点 handle_obj = DimAsinVariationInfo(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()