import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.hdfs_utils import HdfsUtils from pyspark.sql import functions as F, Window from utils.common_util import CommonUtil, DateTypes from utils.spark_util import SparkUtil from pyspark.sql.types import BooleanType from yswg_utils.common_df import get_self_asin_df from yswg_utils.common_df import get_bsr_category_tree_df, get_asin_unlanuch_df from yswg_utils.common_udf import category_craw_flag """ 聚合nsr榜单Asin """ class DwtBsrAsinDetailAll(object): def __init__(self, site_name, date_info): self.site_name = site_name self.date_info = date_info app_name = f"{self.__class__.__name__}:{site_name}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.hive_tb = "dwt_nsr_asin_detail_all" self.current_month = CommonUtil.reformat_date(self.date_info, "%Y-%m-%d", "%Y-%m", ) self.udf_category_craw_flag = F.udf(category_craw_flag, BooleanType()) pass def run(self): df_dwt_flow_asin_part = CommonUtil.select_partitions_df(self.spark, "dwt_flow_asin") dwt_flow_asin_last_month = df_dwt_flow_asin_part \ .filter(f"date_type = '{DateTypes.month.name}' and site_name = '{self.site_name}' ") \ .selectExpr("max(date_info)").rdd.flatMap(lambda it: it).collect()[0] if date_info <= '2023-08-14': sql = f""" with asin_all as ( select *, row_number() over ( order by site_name) as id from dwd_nsr_asin_rank where site_name = '{self.site_name}' and date_type = 'last30day' and date_info = '{self.date_info}' ), asin_detail as ( select * from dwt_nsr_asin_detail where site_name = '{self.site_name}' and date_info = '{self.current_month}' ) select asin_all.asin, category_id, bsr_rank, is_1_day_flag, is_7_day_flag, is_30_day_flag, bsr_count, is_asin_new, is_asin_bsr_new, last_bsr_day, title, img_url, img_type, ao_val, rating, total_comments, bsr_orders, bsr_orders_change, price, weight, launch_time, brand_name, buy_box_seller_type, account_name, volume, last_update_time, asin_air_freight_gross_margin, asin_ocean_freight_gross_margin from asin_all left join asin_detail on asin_all.asin = asin_detail.asin; """ else: sql = f""" with asin_all as ( select asin, category_id, bsr_rank, is_1_day_flag, is_7_day_flag, is_30_day_flag, bsr_count, is_asin_new, is_asin_bsr_new, last_bsr_day, row_number() over ( order by site_name) as id from dwd_nsr_asin_rank where site_name = '{self.site_name}' and date_type = 'last30day' and date_info = '{self.date_info}' ), account_name_tb as ( select asin, first(fd_account_name) as account_name from dim_fd_asin_info where site_name = '{self.site_name}' group by asin ), asin_his as ( select asin, asin_title as title, asin_img_url as img_url, asin_img_type as img_type, asin_rating as rating, asin_total_comments as total_comments, asin_price as price, asin_weight as weight, asin_launch_time as launch_time, asin_volume as volume, asin_brand_name as brand_name, asin_buy_box_seller_type as buy_box_seller_type, asin_crawl_date as last_update_time from dim_cal_asin_history_detail where site_name = '{self.site_name}' ), flow_asin as ( select asin, asin_ao_val as ao_val, bsr_orders as bsr_orders, asin_bsr_orders_change as bsr_orders_change, asin_air_freight_gross_margin as asin_air_freight_gross_margin, asin_ocean_freight_gross_margin as asin_ocean_freight_gross_margin from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '{DateTypes.month.name}' and date_info = '{dwt_flow_asin_last_month}' ) select asin_all.asin, category_id, bsr_rank, is_1_day_flag, is_7_day_flag, is_30_day_flag, bsr_count, is_asin_new, is_asin_bsr_new, last_bsr_day, title, img_url, img_type, ao_val, rating, total_comments, bsr_orders, bsr_orders_change, price, weight, launch_time, brand_name, buy_box_seller_type, account_name, volume, last_update_time, asin_air_freight_gross_margin, asin_ocean_freight_gross_margin from asin_all left join account_name_tb on asin_all.asin = account_name_tb.asin left join flow_asin on asin_all.asin = flow_asin.asin left join asin_his on asin_all.asin = asin_his.asin """ print("======================查询sql如下======================") print(sql) df_all = self.spark.sql(sql) df_self_asin = get_self_asin_df(self.site_name, self.spark).select( F.col("asin"), F.lit(1).alias("asin_type"), ) category_df = get_bsr_category_tree_df(self.site_name, self.spark).select( F.col("category_id"), F.col("category_first_id"), ) df_unlanuch = get_asin_unlanuch_df(self.site_name, self.spark) df_all = df_all \ .join(df_self_asin, on=["asin"], how='left') \ .join(df_unlanuch, on=["asin"], how='left') \ .join(category_df, on=["category_id"], how='left') df_all = df_all.withColumn("crawl_flag", self.udf_category_craw_flag(F.col("category_first_id"), F.col("asin"))) # 生成id df_all = df_all.withColumn("id", F.row_number().over(window=Window.orderBy(F.lit(1)))) df_all = df_all.select( F.col("id"), F.col('asin'), F.col('category_id'), F.col('bsr_rank'), F.col('is_1_day_flag'), F.col('is_7_day_flag'), F.col('is_30_day_flag'), F.col('bsr_count'), F.col('is_asin_new'), F.col('is_asin_bsr_new'), F.col('last_bsr_day'), F.col('title'), F.col('img_url'), F.col('img_type'), F.col('ao_val'), F.col('rating'), F.col('total_comments'), F.col('bsr_orders'), F.col('bsr_orders_change'), F.col('price'), F.col('weight'), F.col('launch_time'), F.trim(F.col('brand_name')).alias("brand_name"), F.col('buy_box_seller_type'), F.col('account_name'), F.col('volume'), F.col('last_update_time'), F.col('asin_air_freight_gross_margin'), F.col('asin_ocean_freight_gross_margin'), F.col("category_first_id"), F.when(F.col("asin_type").isNotNull(), F.lit(1)) .when(F.col("crawl_flag") == False, F.lit(2)) .otherwise(0).alias("asin_type"), # asin 下架时间 F.col("asin_unlaunch_time"), F.lit(self.site_name).alias("site_name"), F.lit(self.date_info).alias("date_info") ) df_all = df_all.repartition(15) partition_dict = { "site_name": self.site_name, "date_info": self.date_info } partition_by = list(partition_dict.keys()) # 自动对齐 df_all = CommonUtil.format_df_with_template(self.spark, df_all, self.hive_tb, roundDouble=True) hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=partition_dict) print(f"清除hdfs目录中:{hdfs_path}") HdfsUtils.delete_file_in_folder(hdfs_path) hive_tb = self.hive_tb print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", ) df_all.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_info = CommonUtil.get_sys_arg(2, None) obj = DwtBsrAsinDetailAll(site_name=site_name, date_info=date_info) obj.run()