import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from pyspark.sql import functions as F from pyspark.sql.types import IntegerType from yswg_utils.common_udf import udf_title_number_parse_reg from utils.db_util import DBUtil """ 根据不同的历史asin解析asin标题 获取标题中的信息 """ class DwdAsinTitleNumber(object): def __init__(self, site_name, date_type, date_info): self.site_name = site_name self.date_type = date_type self.date_info = date_info app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.hive_tb = "dwd_asin_title_number" self.udf_title_number_parse_reg = udf_title_number_parse_reg() def run(self): sql = f""" select asin_title as title, asin from dim_asin_detail where site_name = '{self.site_name}' and date_type = '{CommonUtil.get_rel_date_type('dim_asin_detail', self.date_type)}' and date_info = '{self.date_info}' """ print(sql) df_asin_detail = self.spark.sql(sql) sql = f""" WITH ranked_edit_logs AS ( SELECT edit_key_id, val_after, ROW_NUMBER() OVER (PARTITION BY edit_key_id ORDER BY create_time DESC) AS rn FROM sys_edit_log WHERE module = '流量选品' AND filed = 'package_quantity' AND site_name='{self.site_name}' ) SELECT edit_key_id AS asin, cast(val_after as int) AS user_package_num FROM ranked_edit_logs WHERE rn = 1 """ print(sql) pg_con_info = DBUtil.get_connection_info("postgresql", "us") if pg_con_info is not None: df_user_package_num = SparkUtil.read_jdbc_query( session=self.spark, url=pg_con_info['url'], pwd=pg_con_info['pwd'], username=pg_con_info['username'], query=sql ) df_user_package_num = F.broadcast(df_user_package_num) df_asin_detail = df_asin_detail.withColumn( "split_detail", F.explode(self.udf_title_number_parse_reg(F.col("title"))) ) df_all = df_asin_detail.join( df_user_package_num, on='asin', how='left' ) df_all = df_all.select( F.col("asin"), F.col("title"), F.col("split_detail").getField("label").alias("label"), # 注意强转成数字类型 F.coalesce( F.col("user_package_num"), F.col("split_detail").getField("value").cast(IntegerType()) ).alias("value"), F.col("split_detail").getField("match").alias("match"), F.current_date().alias("update_time"), F.lit(self.site_name).alias("site_name"), F.lit(self.date_type).alias("date_type"), F.lit(self.date_info).alias("date_info") ) # 分区数量调整 df_save = df_all.repartition(5) partition_dict = { "site_name": self.site_name, "date_type": self.date_type, "date_info": self.date_info, } partition_by = list(partition_dict.keys()) hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=partition_dict) print(f"清除hdfs目录中.....{hdfs_path}") HdfsUtils.delete_hdfs_file(hdfs_path) print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", ) df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) obj = DwdAsinTitleNumber(site_name, date_type, date_info) obj.run()