# author : wangrui # data : 2024/5/29 17:27 import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.common_util import CommonUtil from utils.spark_util import SparkUtil from pyspark.storagelevel import StorageLevel from pyspark.sql import Window from pyspark.sql import functions as F from utils.hdfs_utils import HdfsUtils from utils.db_util import DBUtil from pyspark.sql.types import * class DwsLatestAsinGeneralAttributes(object): def __init__(self, site_name, date_type, date_info): self.site_name = site_name self.date_type = date_type self.date_info = date_info self.hive_tb = f'dws_latest_asin_general_attributes' self.partition_dict = { "site_name": site_name, "date_type": date_type, "date_info": date_info } # 落表路径校验 self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict) # 创建spark_session对象相关 app_name = f"{self.__class__.__name__}:{site_name}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.partitions_by = ['site_name', 'date_type', 'date_info'] self.partition_num = CommonUtil.reset_partitions(self.site_name, partitions_num=80) # 初始化全局dataframe self.df_asin_detail = self.spark.sql(f"select 1+1") self.df_asin_bs_category = self.spark.sql(f"select 1+1") self.df_fd_asin_info = self.spark.sql(f"select 1+1") self.df_asin_measure = self.spark.sql(f"select 1+1") self.df_main = self.spark.sql(f"select 1+1") self.df_hide_category = self.spark.sql(f"select 1+1") self.df_bsr_end = self.spark.sql(f"select 1+1") # 读取数据 def read_data(self): print("1. 读取dim_asin_detail获取变体信息") sql = f""" select asin, parent_asin, one_star as asin_one_star, two_star as asin_two_star, asin_brand_name, asin_is_brand, asin_is_alarm, three_star as asin_three_star, four_star as asin_four_star, five_star as asin_five_star, low_star as asin_low_star, variation_num as asin_variation_num, asin_rating, asin_total_comments, asin_buy_box_seller_type, account_name, account_id, updated_time, category_id as top_category_id, category_first_id as top_category_first_id from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and parent_asin is not null""" print("sql:", sql) self.df_asin_detail = self.spark.sql(sql) self.df_asin_detail = self.df_asin_detail.na.fill({"asin_is_brand": 0, "asin_is_alarm": 0}) self.df_asin_detail = self.df_asin_detail.repartition(60).persist(StorageLevel.DISK_ONLY) self.df_asin_detail.show(10, truncate=False) print("2. 读取dim_asin_bs_category获取分类信息") sql = f""" select asin, asin_bs_cate_1_rank as first_category_rank, asin_bs_cate_1_id as category_first_id, asin_bs_cate_current_rank as current_category_rank, asin_bs_cate_current_id as category_id from dim_asin_bs_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}'""" print("sql:", sql) self.df_asin_bs_category = self.spark.sql(sqlQuery=sql) self.df_asin_bs_category = self.df_asin_bs_category.repartition(60).persist(StorageLevel.DISK_ONLY) self.df_asin_bs_category.show(10, truncate=False) print("3. 读取dim_fd_asin_info, 获取卖家信息") if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') or ( self.date_type == '4_week' and self.date_info >= '2024-21'): sql = f""" select fd_unique as account_id, upper(fd_country_name) as asin_seller_country_name from dim_fd_asin_info where site_name='{self.site_name}' and fd_unique is not null group by fd_unique, fd_country_name""" else: sql = f""" select asin, account_id, account_name, asin_seller_country_name from (select fd_unique as account_id, fd_account_name as account_name, upper(fd_country_name) as asin_seller_country_name, asin, ROW_NUMBER() OVER (PARTITION BY asin ORDER BY updated_at DESC) AS t_rank from dim_fd_asin_info where site_name = '{self.site_name}' and fd_unique is not null) tmp where tmp.t_rank = 1""" print("sql:", sql) self.df_fd_asin_info = self.spark.sql(sqlQuery=sql) self.df_fd_asin_info = self.df_fd_asin_info.repartition(60).persist(StorageLevel.DISK_ONLY) self.df_fd_asin_info.show(10, truncate=False) print("4. 读取dwd_asin_measure, 获取bsr销量、母体ao、母体自然流量占比等信息") sql = f""" select asin ,cast(asin_bsr_orders as int) as asin_bsr_orders, round(asin_flow_proportion_matrix, 3) as asin_flow_proportion_matrix, round(asin_ao_val_matrix, 3) as asin_ao_val_matrix from dwd_asin_measure where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'""" print("sql:" + sql) self.df_asin_measure = self.spark.sql(sqlQuery=sql) self.df_asin_measure = self.df_asin_measure.repartition(60).persist(StorageLevel.DISK_ONLY) self.df_asin_measure.show(10, truncate=False) print("5. 读取隐藏分类信息") sql = f""" select category_id_base as category_id, 1 as hide_flag from us_bs_category_hide group by category_id_base""" print("sql:", sql) mysql_con_info = DBUtil.get_connection_info(db_type='mysql', site_name='us') if mysql_con_info is not None: df_hide_category = SparkUtil.read_jdbc_query( session=self.spark, url=mysql_con_info['url'], pwd=mysql_con_info['pwd'], username=mysql_con_info['username'], query=sql) self.df_hide_category = F.broadcast(df_hide_category) self.df_hide_category.show(10, truncate=False) print("6.获取ods_bsr_end,获取有效rank信息") sql = f"""select rank as limit_rank, category_id as category_first_id from ods_bsr_end where site_name='{self.site_name}'""" print("sql:", sql) df_bsr_end = self.spark.sql(sqlQuery=sql) self.df_bsr_end = F.broadcast(df_bsr_end) self.df_bsr_end.show(10, truncate=False) # 获取变体下最新asin def handle_latest_asin(self): latest_asin_window = Window.partitionBy('parent_asin').orderBy( F.desc_nulls_last("updated_time") ) self.df_asin_detail = self.df_asin_detail.withColumn("u_rank", F.row_number().over(window=latest_asin_window)) self.df_asin_detail = self.df_asin_detail.filter("u_rank=1").drop("u_rank", "updated_time") self.df_asin_detail = self.df_asin_detail.repartition(60) # 获取变体下最新asin的通用详情 def handle_latest_asin_detail(self): if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') or ( self.date_type == '4_week' and self.date_info >= '2024-21'): self.df_main = self.df_asin_detail.join(self.df_fd_asin_info, on=['account_id'], how='left') else: self.df_asin_detail = self.df_asin_detail.drop("account_id", "account_name") self.df_main = self.df_asin_detail.join(self.df_fd_asin_info, on=['asin'], how='left') self.df_main = self.df_main.join( self.df_asin_bs_category, on=['asin'], how='left' ).join( self.df_asin_measure, on=['asin'], how='left' ) self.df_main = self.df_main.withColumn( "category_id", F.coalesce(F.col("category_id"), F.col("top_category_id")) ).withColumn( "category_first_id", F.coalesce(F.col("category_first_id"), F.col("top_category_first_id")) ).drop("top_category_id", "top_categoty_first_id") # 处理asin分类是否属于隐藏分类 def handle_asin_category_is_hide(self): self.df_main = self.df_main.join(self.df_hide_category, on=['category_id'], how='left') self.df_main = self.df_main.withColumn("asin_is_hide", F.expr(""" CASE WHEN hide_flag = 1 THEN 1 WHEN category_first_id = 'grocery' and category_id != '6492272011' THEN 1 WHEN category_first_id in ('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed') THEN 1 WHEN category_id in ('21393128011', '21377129011', '21377127011', '21377130011', '21388218011', '21377132011') THEN 1 ELSE 0 END """)) # 处理变体下最新asin的基础类别信息 def handle_asin_basic_type(self): self.df_main = self.df_main.join(self.df_bsr_end, on=['category_first_id'], how='left') self.df_main = self.df_main.withColumn( "asin_rank_type", F.expr(""" CASE WHEN first_category_rank IS NOT NULL AND first_category_rank BETWEEN 0 AND 1000 THEN 1 WHEN first_category_rank BETWEEN 1000 AND 5000 THEN 2 WHEN first_category_rank BETWEEN 5000 AND 10000 THEN 3 WHEN first_category_rank BETWEEN 10000 AND 20000 THEN 4 WHEN first_category_rank BETWEEN 20000 AND 30000 THEN 5 WHEN first_category_rank BETWEEN 30000 AND 50000 THEN 6 WHEN first_category_rank BETWEEN 50000 AND 70000 THEN 7 WHEN first_category_rank >= 70000 THEN 8 ELSE 0 END""") ).withColumn( "asin_site_name_type", F.expr("""CASE WHEN asin_buy_box_seller_type = 1 THEN 4 WHEN asin_buy_box_seller_type != 1 AND asin_seller_country_name is not null AND asin_seller_country_name like '%US%' THEN 1 WHEN asin_buy_box_seller_type != 1 AND asin_seller_country_name is not null AND asin_seller_country_name like '%CN%' THEN 2 ELSE 3 END""") ).withColumn( "asin_rating_type", F.expr(""" CASE WHEN asin_rating >= 4.5 THEN 1 WHEN asin_rating >= 4 AND asin_rating < 4.5 THEN 2 WHEN asin_rating >= 3.5 AND asin_rating < 4 THEN 3 WHEN asin_rating >= 3 AND asin_rating < 3.5 THEN 4 WHEN asin_rating < 3 AND asin_rating >= 0 THEN 5 ELSE 0 END""") ).withColumn( "bsr_type", F.expr("""CASE WHEN limit_rank is null and category_first_id <= 500000 THEN 1 WHEN limit_rank is not null and category_first_id <= limit_rank THEN 1 ELSE 0 END""") ).drop("limit_rank") # 字段标准化及存储数据 def df_save(self): df_save = self.df_main\ .select("parent_asin", "asin", "asin_one_star", "asin_two_star", "asin_three_star", "asin_four_star", "asin_five_star", "asin_low_star", "asin_variation_num", "asin_rating", "asin_rating_type", "asin_total_comments", "asin_buy_box_seller_type", "account_name", "account_id", "asin_brand_name", "asin_is_brand", "asin_is_alarm", "asin_is_hide", "first_category_rank", "current_category_rank", "category_first_id", "category_id", "asin_rank_type", "bsr_type", "asin_seller_country_name", "asin_site_name_type", "asin_bsr_orders", "asin_flow_proportion_matrix", "asin_ao_val_matrix", F.lit(self.site_name).alias('site_name'), F.lit(self.date_type).alias('date_type'), F.lit(self.date_info).alias('date_info')).cache() df_save = df_save.drop_duplicates(['parent_asin']) df_save = df_save.repartition(60) print("dws_latest_asin_detail处理完毕, 最后的数据量为: ", df_save.count()) df_save.show(10, truncate=False) print(f"清除hdfs目录中:{self.hdfs_path}") HdfsUtils.delete_file_in_folder(self.hdfs_path) partition_by = ["site_name", "date_type", "date_info"] print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}") df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=self.partitions_by) print("success") def run(self): self.read_data() self.handle_latest_asin() self.handle_latest_asin_detail() self.handle_asin_category_is_hide() self.handle_asin_basic_type() self.df_save() if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 latest_asin_obj = DwsLatestAsinGeneralAttributes(site_name, date_type, date_info) latest_asin_obj.run()