import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.hdfs_utils import HdfsUtils from pyspark.sql import functions as F from utils.common_util import CommonUtil, DateTypes from utils.spark_util import SparkUtil from pyspark.sql.types import BooleanType, IntegerType, StructType, StructField, StringType from yswg_utils.common_df import get_self_asin_df, get_asin_unlanuch_df, get_node_first_id_df from yswg_utils.common_udf import category_craw_flag, udf_get_package_quantity """ 聚合nsr榜单Asin """ class DwtNsrAsinDetail(object): def __init__(self, site_name, date_info): self.site_name = site_name self.date_info = date_info app_name = f"{self.__class__.__name__}:{site_name}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.hive_tb = "dwt_nsr_asin_detail" self.current_month = CommonUtil.reformat_date(self.date_info, "%Y-%m-%d", "%Y-%m", ) self.udf_category_craw_flag = F.udf(category_craw_flag, BooleanType()) self.udf_cal_first_category_rank = F.udf(self.cal_first_category_rank, StructType([StructField("first_category_rank", IntegerType(), True), StructField("first_category_rank_date", StringType(), True)]) ) self.udf_cal_package_quantity = F.udf(self.cal_package_quantity, IntegerType()) self.udf_cal_seller_country_type = F.udf(self.cal_seller_country_type, IntegerType()) pass @staticmethod def cal_first_category_rank(category_id, category_first_id, bsr_date_info, current_bsr_rank, flow_update_time, first_category_rank): if category_id == category_first_id: # 如果是大类直接取 bsr_rank return (current_bsr_rank, bsr_date_info) else: # 小类则取流量选品 return (first_category_rank, flow_update_time) @staticmethod def cal_package_quantity(title): """ 打包数量 """ val = udf_get_package_quantity(title) if val is None: val = 1 return val @staticmethod def cal_seller_country_type(asin_buy_box_seller_type, seller_country_name): """ 卖家所属地类型 :param asin_buy_box_seller_type: :param seller_country_name: :return: """ if str(seller_country_name).lower() not in ['none', 'null']: if asin_buy_box_seller_type == 1: return 4 elif asin_buy_box_seller_type != 1 and str(seller_country_name).upper().find('US') != -1: return 1 elif asin_buy_box_seller_type != 1 and str(seller_country_name).upper().find('CN') != -1: return 2 else: return 3 else: return 0 def run(self): df_dwt_flow_asin_part = CommonUtil.select_partitions_df(self.spark, "dwt_flow_asin") # dwt_flow_asin month 最新分区 dwt_flow_asin_last_month = df_dwt_flow_asin_part \ .filter(f"date_type = '{DateTypes.month.name}' and site_name = '{self.site_name}' ") \ .selectExpr("max(date_info)").rdd.flatMap(lambda it: it).collect()[0] # 近30天的asin day_30_before = CommonUtil.get_day_offset(self.date_info, -30) sql = f""" select tmp.asin, title, img_url, img_type, ao_val, rating, total_comments, bsr_orders, bsr_orders_change, price, weight, launch_time, brand_name, buy_box_seller_type, account_name, volume, last_update_time, asin_air_freight_gross_margin, asin_ocean_freight_gross_margin, category_first_id, first_category_rank, seller_id, account_name, seller_country_name, asin_bought_month, tmp_category_id from ( select asin , first(category_id) as tmp_category_id from dwd_nsr_asin_rank where site_name = '{self.site_name}' and date_info <= "{self.date_info}" and date_info >= "{day_30_before}" group by asin ) tmp left join ( select asin, fd_unique as seller_id, first(fd_account_name) as account_name, first(fd_country_name) as seller_country_name from dim_fd_asin_info where site_name = '{self.site_name}' group by asin, fd_unique ) tmp1 on tmp.asin = tmp1.asin left join ( select asin, asin_title as title, asin_img_url as img_url, asin_img_type as img_type, asin_rating as rating, asin_total_comments as total_comments, asin_price as price, asin_weight as weight, asin_launch_time as launch_time, asin_volume as volume, asin_brand_name as brand_name, asin_buy_box_seller_type as buy_box_seller_type, asin_crawl_date as last_update_time, asin_rank as first_category_rank, category_first_id as category_first_id from dim_cal_asin_history_detail where site_name = '{self.site_name}' ) tmp2 on tmp.asin = tmp2.asin left join ( select asin, asin_ao_val as ao_val, bsr_orders as bsr_orders, asin_bsr_orders_change as bsr_orders_change, asin_air_freight_gross_margin as asin_air_freight_gross_margin, asin_ocean_freight_gross_margin as asin_ocean_freight_gross_margin, cast(asin_bought_month as int ) as asin_bought_month from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '{DateTypes.month.name}' and date_info = '{dwt_flow_asin_last_month}' ) tmp3 on tmp.asin = tmp3.asin """ print("======================查询sql如下======================") print(sql) df_all = self.spark.sql(sql) # 内部asin df_self_asin = get_self_asin_df(self.site_name, self.spark).select( F.col("asin"), F.lit(1).alias("asin_type"), ) # 下架asin df_unlanuch = get_asin_unlanuch_df(self.site_name, self.spark) df_node_first_id = get_node_first_id_df(self.site_name, self.spark).select( F.col("node_id").alias("tmp_category_id"), F.col("category_first_id").alias("tmp_category_first_id"), ) df_all = df_all \ .join(df_self_asin, on=["asin"], how='left') \ .join(df_unlanuch, on=["asin"], how='left') \ .join(df_node_first_id, on=["tmp_category_id"], how='left') # 新品没一级分类id的补全一级分类id df_all = df_all.withColumn("category_first_id", F.coalesce(F.col("category_first_id"), F.col("tmp_category_first_id"))) df_all = df_all.withColumn("crawl_flag", self.udf_category_craw_flag(F.col("category_first_id"), F.col("asin"))) # df_all = df_all.withColumn("first_category_rank_tuple", self.udf_cal_first_category_rank( # F.col("category_id"), # F.col("category_first_id"), # F.col("bsr_date_info"), # F.col("current_bsr_rank"), # F.col("last_update_time"), # F.col("first_category_rank"), # )) day_before_30 = CommonUtil.get_day_offset(self.date_info, -30) day_before_90 = CommonUtil.get_day_offset(self.date_info, -90) day_before_180 = CommonUtil.get_day_offset(self.date_info, -180) day_before_360 = CommonUtil.get_day_offset(self.date_info, -360) day_before_450 = CommonUtil.get_day_offset(self.date_info, -450) day_before_720 = CommonUtil.get_day_offset(self.date_info, -720) day_before_1080 = CommonUtil.get_day_offset(self.date_info, -1080) # 上架时间类型 df_all = df_all.withColumn( 'asin_launch_time_type', F.when(F.col("launch_time") >= day_before_30, F.lit(1)) .when((F.col("launch_time") >= day_before_90) & (F.col("launch_time") < day_before_30), F.lit(2)) .when((F.col("launch_time") >= day_before_180) & (F.col("launch_time") < day_before_90), F.lit(3)) .when((F.col("launch_time") >= day_before_360) & (F.col("launch_time") < day_before_180), F.lit(4)) .when((F.col("launch_time") >= day_before_450) & (F.col("launch_time") < day_before_360), F.lit(5)) .when((F.col("launch_time") >= day_before_720) & (F.col("launch_time") < day_before_450), F.lit(6)) .when((F.col("launch_time") >= day_before_1080) & (F.col("launch_time") < day_before_720), F.lit(7)) .otherwise(F.lit(0)) ) # 打包数量 df_all = df_all.withColumn("package_quantity", self.udf_cal_package_quantity(F.col("title"))) # 卖家所属地 df_all = df_all.withColumn("seller_country_type", self.udf_cal_seller_country_type(F.col("buy_box_seller_type"), F.col("seller_country_name"))) df_all = df_all.select( F.col('asin'), F.col('title'), F.col('img_url'), F.col('img_type'), F.col('ao_val'), F.col('rating'), # 评论数量 F.col('total_comments'), F.col('bsr_orders'), F.col('bsr_orders_change'), F.col('price'), F.col('weight'), F.col('launch_time'), F.trim(F.col('brand_name')).alias("brand_name"), F.col('buy_box_seller_type'), F.col('account_name'), F.col('volume'), F.col('last_update_time'), F.col('asin_air_freight_gross_margin'), F.col('asin_ocean_freight_gross_margin'), # 大类默认值空字符串 F.coalesce('category_first_id', F.lit("")).alias("category_first_id"), F.when(F.col("asin_type").isNotNull(), F.lit(1)) .when(F.col("crawl_flag") == False, F.lit(2)) .otherwise(0).alias("asin_type"), # asin 下架时间 F.col("asin_unlaunch_time"), # 店铺唯一id F.col("seller_id"), # 卖家所属地 F.col("seller_country_name"), # 大类排名 F.col("first_category_rank").alias('first_category_rank'), # 大类排名时间 F.col("last_update_time").alias('first_category_rank_date'), # 打包数量 F.col("package_quantity"), # 上架时间类型 F.col("asin_launch_time_type"), # 卖家所属地类型 F.col("seller_country_type"), # 亚马逊销量(月)默认值给-1 F.coalesce('asin_bought_month', F.lit(-1)).alias("asin_bought_month"), F.lit(self.site_name).alias("site_name"), F.lit(self.current_month).alias("date_info") ) # 自动对齐 df_all = CommonUtil.format_df_with_template(self.spark, df_all, self.hive_tb, roundDouble=True) # 分区数设置为6 df_all = df_all.repartition(6) partition_dict = { "site_name": self.site_name, "date_info": self.current_month } partition_by = list(partition_dict.keys()) hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=partition_dict) print(f"清除hdfs目录中:{hdfs_path}") HdfsUtils.delete_file_in_folder(hdfs_path) hive_tb = self.hive_tb print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", ) df_all.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_info = CommonUtil.get_sys_arg(2, None) obj = DwtNsrAsinDetail(site_name=site_name, date_info=date_info) obj.run()