import os import sys from pyspark.sql.types import DoubleType, StringType sys.path.append(os.path.dirname(sys.path[0])) from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from utils.common_util import CommonUtil, DateTypes # 上级目录 from pyspark.sql.window import Window from pyspark.sql import functions as F from yswg_utils.common_udf import parse_weight_str from yswg_utils.common_udf import udf_handle_string_null_value from yswg_utils.common_df import get_node_first_id_df from utils.redis_utils import RedisUtils class DimCalAsinDetail(object): def __init__(self, site_name, date_type, date_info): self.site_name = site_name self.date_type = date_type self.date_info = date_info # 初始化参数 self.partitions_by = ['site_name'] self.partitions_num = CommonUtil.reset_partitions(self.site_name, partitions_num=80) app_name = f"{self.__class__.__name__}:{self.site_name}" self.spark = SparkUtil.get_spark_session(app_name) self.hive_table = f'dim_cal_asin_history_detail' self.partition_dict = { "site_name": site_name } self.udf_parse_weight_str_reg = self.spark.udf.register("udf_parse_weight_str_reg", self.udf_parse_weight_str, DoubleType()) self.udf_handle_null_value = self.spark.udf.register("udf_handle_null_value", udf_handle_string_null_value, StringType()) @staticmethod def udf_parse_weight_str(weight_str: str, site_name: str): """ 解析重量 :param weight_str: :param site_name: :return: """ if weight_str is None: return None weight_val, unit = parse_weight_str(weight_str, site_name) if weight_val != 'none' and weight_val is not None: return float(weight_val) def run(self): print(f"读取数据中.....") if self.date_type == 'all': # 读取dim_asin_detail sql = f"""select asin, asin_img_url, asin_title, asin_title_len, asin_category_desc, asin_rank, asin_volume, asin_weight, asin_color, asin_size, asin_style, asin_price, asin_rating, asin_total_comments, asin_material, asin_brand_name, asin_page_inventory, asin_buy_box_seller_type, asin_launch_time, asin_img_num, asin_img_type, asin_is_sale, bsr_cate_1_id, bsr_cate_current_id, asin_is_amazon, asin_is_FBA, asin_is_FBM, asin_is_other, udf_handle_null_value(node_id) as node_id, asin_is_picture, asin_is_video, asin_is_aadd, date_format(created_time,'{CommonUtil._date_time_format}') as asin_crawl_date from dim_asin_detail where site_name='{self.site_name}' and date_type='month' ; """ self.date_type = 'day_all' elif self.date_type in (DateTypes.week.name, DateTypes.month.name, DateTypes.month_week.name): sql = f"""select asin, asin_img_url, asin_title, asin_title_len, asin_category_desc, asin_rank, asin_volume, asin_weight, asin_color, asin_size, asin_style, asin_price, asin_rating, asin_total_comments,asin_material, asin_brand_name, asin_page_inventory, asin_buy_box_seller_type, asin_launch_time, asin_img_num, asin_img_type, asin_is_sale, bsr_cate_1_id, bsr_cate_current_id, asin_is_amazon, asin_is_FBA, asin_is_FBM, asin_is_other, udf_handle_null_value(node_id) as node_id, asin_is_picture, asin_is_video, asin_is_aadd, date_format(created_time,'{CommonUtil._date_time_format}') as asin_crawl_date from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}'; """ else: sql = f""" select asin, asin_img_url, asin_title, asin_title_len, asin_category_desc, asin_rank, asin_volume, asin_weight, asin_color, asin_size, asin_style, asin_price, asin_rating, asin_total_comments,asin_material, asin_brand_name, asin_page_inventory, asin_buy_box_seller_type, asin_launch_time, asin_img_num, asin_img_type, asin_is_sale, bsr_cate_1_id, bsr_cate_current_id, asin_is_amazon, asin_is_FBA, asin_is_FBM, asin_is_other, node_id, asin_is_picture, asin_is_video, asin_is_aadd, date_format(created_time,'{CommonUtil._date_time_format}') as asin_crawl_date from dim_asin_detail where site_name='{self.site_name}' limit 0 """ print("======================整合搜索词 day asin 中... sql如下======================") print(sql) df_asin_detail = self.spark.sql(sqlQuery=sql) self_asin_sql = None if self.date_type == DateTypes.day.name: self_asin_sql = f""" select asin as asin, img_url as asin_img_url, title as asin_title, title_len as asin_title_len, category as asin_category_desc, rank as asin_rank, volume as asin_volume, udf_parse_weight_str_reg(weight_str,'{self.site_name}') as asin_weight, null as asin_color, null as asin_size, null as asin_style, price as asin_price, rating as asin_rating, total_comments as asin_total_comments, material as asin_material, brand as asin_brand_name, page_inventory as asin_page_inventory, buy_box_seller_type as asin_buy_box_seller_type, launch_time as asin_launch_time, img_num as asin_img_num, img_type as asin_img_type, null as asin_is_sale, null as bsr_cate_1_id, null as bsr_cate_current_id, if(buy_box_seller_type == 1, 1, 0) as asin_is_amazon, if(buy_box_seller_type == 2, 1, 0) as asin_is_FBA, if(buy_box_seller_type == 3, 1, 0) as asin_is_FBM, if(buy_box_seller_type == 4, 1, 0) as asin_is_other, node_id, if(locate(1, img_type) > 0, 1, 0) as asin_is_picture, if(locate(2, img_type) > 0, 1, 0) as asin_is_video, if(locate(3, img_type) > 0, 1, 0) as asin_is_aadd, date_format(created_at, '{CommonUtil._date_time_format}') as asin_crawl_date from ods_self_asin_detail where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'; """ elif self.date_type == "day_all": self_asin_sql = f""" select asin as asin, img_url as asin_img_url, title as asin_title, title_len as asin_title_len, category as asin_category_desc, rank as asin_rank, volume as asin_volume, udf_parse_weight_str_reg(weight_str, '{self.site_name}') as asin_weight, null as asin_color, null as asin_size, null as asin_style, price as asin_price, rating as asin_rating, total_comments as asin_total_comments, material as asin_material, brand as asin_brand_name, page_inventory as asin_page_inventory, buy_box_seller_type as asin_buy_box_seller_type, launch_time as asin_launch_time, img_num as asin_img_num, img_type as asin_img_type, null as asin_is_sale, null as bsr_cate_1_id, null as bsr_cate_current_id, if(buy_box_seller_type == 1, 1, 0) as asin_is_amazon, if(buy_box_seller_type == 2, 1, 0) as asin_is_FBA, if(buy_box_seller_type == 3, 1, 0) as asin_is_FBM, if(buy_box_seller_type == 4, 1, 0) as asin_is_other, if(locate(1, img_type) > 0, 1, 0) as asin_is_picture, node_id, if(locate(2, img_type) > 0, 1, 0) as asin_is_video, if(locate(3, img_type) > 0, 1, 0) as asin_is_aadd, date_format(created_at, '{CommonUtil._date_time_format}') as asin_crawl_date from ( select *, row_number() over (partition by asin order by date_info desc) as row_number from ods_self_asin_detail where site_name = '{self.site_name}' and date_type = '{DateTypes.day.name}' ) where row_number = 1; """ if self_asin_sql is not None: print("======================整合day asin 中sql如下======================") print(self_asin_sql) df_self_asin_detail = self.spark.sql(sqlQuery=self_asin_sql) # 合并要更新的数据 df_asin_detail = df_asin_detail.unionByName(df_self_asin_detail, allowMissingColumns=False) pass # 判断是否有数据需要整合 if df_asin_detail.first() == None: print("============================无数据跳过===================================") return print("======================获取dim_bsr_category_tree first_id======================") df_node_cate = get_node_first_id_df(self.site_name, self.spark) df_asin_detail = df_asin_detail.join( df_node_cate, on=['node_id'], how='left' ) # 读取dim_cal_asin_history_detail sql = f"""select asin, asin_img_url, asin_title, asin_title_len, asin_category_desc, asin_rank, asin_volume, asin_weight, asin_color, asin_size, asin_style, asin_price, asin_rating, asin_total_comments, asin_material, asin_brand_name, asin_page_inventory, asin_buy_box_seller_type, asin_launch_time, asin_img_num, asin_img_type, asin_is_sale, bsr_cate_1_id, bsr_cate_current_id, asin_is_amazon, asin_is_FBA, asin_is_FBM, asin_is_other, asin_is_picture, asin_is_video, asin_is_aadd, asin_crawl_date, node_id, category_first_id from dim_cal_asin_history_detail where site_name='{self.site_name}' ; """ df_asin_cal_detail = self.spark.sql(sqlQuery=sql) print("======================查询sql如下======================") print(sql) df_asin_detail = self.handle_df_duplicated(df_asin_detail, df_asin_cal_detail) df_save = self.handle_column(df_asin_detail).repartition(self.partitions_num) # print("self.df_save", df_save.show(10, truncate=False)) # quit() CommonUtil.save_or_update_table(spark_session=self.spark, hive_tb_name=self.hive_table, partition_dict=self.partition_dict, df_save=df_save, drop_exist_tmp_flag=False) print("success") # 根据asin去重,取dt最大的asin保留 def handle_df_duplicated(self, df_asin_detail, df_asin_cal_detail): print("针对asin进行数据去重...") # 将新老数据进行合并 # df_asin_detail = df_asin_detail.union(df_asin_cal_detail) df_asin_detail = df_asin_detail.unionByName(df_asin_cal_detail, allowMissingColumns=True) # asin窗口内排序,按照dt降序 window = Window.partitionBy(['asin']).orderBy( df_asin_detail.asin_crawl_date.desc_nulls_last() ) df_asin_detail = df_asin_detail.withColumn("sort_top", F.row_number().over(window=window)) # 取按asin分组的组内第一条,就是去重后的最新asin_detail df_asin_detail = df_asin_detail.filter("sort_top=1") # 去除掉排序字段 df_asin_detail = df_asin_detail.drop("asin_dt_top", "dt") return df_asin_detail def handle_column(self, df_asin_detail): df_save = df_asin_detail.select("asin", "asin_title", F.col("asin_title_len").cast('int').alias('asin_title_len'), "asin_category_desc", F.col("asin_rank").cast('int').alias('asin_rank'), self.udf_handle_null_value("asin_volume").alias("asin_volume"), F.col("asin_weight").cast('double').alias('asin_weight'), self.udf_handle_null_value("asin_color").alias("asin_color"), self.udf_handle_null_value("asin_size").alias("asin_size"), self.udf_handle_null_value("asin_style").alias("asin_style"), "asin_price", "asin_rating", F.col("asin_total_comments").cast('int').alias('asin_total_comments'), self.udf_handle_null_value("asin_material").alias("asin_material"), self.udf_handle_null_value("asin_brand_name").alias("asin_brand_name"), "bsr_cate_1_id", "bsr_cate_current_id", F.col("asin_page_inventory").cast('int').alias('asin_page_inventory'), F.col("asin_buy_box_seller_type").cast('int').alias('asin_buy_box_seller_type'), "asin_is_amazon", "asin_is_fba", "asin_is_fbm", "asin_is_other", F.col("asin_is_sale").cast('int').alias('asin_is_sale'), "asin_launch_time", F.col("asin_img_num").cast('int').alias('asin_img_num'), "asin_img_type", "asin_is_picture", F.col("asin_is_video").cast('int').alias('asin_is_video'), F.col("asin_is_aadd").cast('int').alias('asin_is_aadd'), self.udf_handle_null_value("asin_img_url").alias("asin_img_url"), "asin_crawl_date", "node_id", "category_first_id" ) # 预留字段补全 df_save = df_save.withColumn("re_string_field1", F.lit(None)) df_save = df_save.withColumn("re_string_field2", F.lit(None)) df_save = df_save.withColumn("re_string_field3", F.lit(None)) df_save = df_save.withColumn("re_int_field1", F.lit(None)) df_save = df_save.withColumn("re_int_field2", F.lit(None)) df_save = df_save.withColumn("re_int_field3", F.lit(None)) # 分区字段补全 df_save = df_save.withColumn("site_name", F.lit(self.site_name)) return df_save if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) # 参数1:站点 date_type = CommonUtil.get_sys_arg(2, None) # 参数2:类型:week/4_week/month/quarter/day date_info = CommonUtil.get_sys_arg(3, None) # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 lock_name = "dim_cal_asin_history_detail" if date_type == "all": # 如果执行数据为all的情况,非自然解锁情况,则需锁设定该表90分钟 lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=90 * 60, retry_flag=True, retry_time=10 * 60) else: lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=30 * 60, retry_flag=True, retry_time=10 * 60) if lock_flag: try: obj = DimCalAsinDetail(site_name, date_type, date_info) obj.run() finally: # 执行完成后释放锁 RedisUtils.release_redis_lock(lock_name)