""" @Author : wangrui @Description : 流量选品 @SourceTable : dwd_asin_measure dim_asin_detail ods_bsr_end dim_asin_bs_category dim_fd_asin_info dim_asin_volume @SinkTable : dwt_flow_asin @CreateTime : 2023/01/10 07:55 @UpdateTime : 2023/01/10 07:55 """ import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # 分组排序的udf窗口函数 from pyspark.sql.window import Window from datetime import datetime, timedelta from pyspark.sql import functions as F from utils.db_util import DBUtil from utils.spark_util import SparkUtil from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from pyspark.storagelevel import StorageLevel from pyspark.sql.types import * from utils.DorisHelper import DorisHelper class DwtFlowAsin(Templates): def __init__(self, site_name="us", date_type="week", date_info="2022-1"): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.hive_tb = f"dwt_flow_asin" self.partition_dict = { "site_name": site_name, "date_type": date_type, "date_info": date_info } self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict) self.spark = self.create_spark_object( app_name=f"{self.hive_tb}: {self.site_name}, {self.date_type}, {self.date_info}") self.previous_date = self.udf_get_previous_last_30_day(self) self.current_date = self.udf_get_current_time(self) # doris相关配置 self.doris_db = "test" if self.test_flag == "test" else "selection" self.asin_latest_detail_table = f"{self.site_name}_asin_latest_detail" # 写入、分区初始化 self.df_save = self.spark.sql(f"select 1+1;") self.partitions_by = ['site_name', 'date_type', 'date_info'] self.reset_partitions(60) self.launch_time_interval_dict = self.get_launch_time_interval_dict() # 初始化全局df self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_asin_measure = self.spark.sql(f"select 1+1;") self.df_bsr_end = self.spark.sql(f"select 1+1;") self.df_asin_bs_category = self.spark.sql(f"select 1+1;") self.df_fd_asin_info = self.spark.sql(f"select 1+1;") self.df_flow_asin_last = self.spark.sql(f"select 1+1;") self.df_title_matching_degree = self.spark.sql(f"select 1+1;") @staticmethod def udf_get_previous_last_30_day(self): self.df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;") df = self.df_date.toPandas() if self.date_type == 'last30day': df_loc = df.loc[df.date == f'{self.date_info}'] current_date_id = list(df_loc.id)[0] original_date_id = int(current_date_id) - 30 df_loc = df.loc[df.id == original_date_id] original_year_month = list(df_loc.year_month)[0] df_loc = df.loc[(df.year_month == original_year_month) & (df.day == 1)] original_year_month_id = list(df_loc.id)[0] previous_year_month_id = int(original_year_month_id) - 1 df_loc = df.loc[df.id == previous_year_month_id] previous_date = list(df_loc.year_month)[0] return previous_date elif self.date_type in ['month', 'month_week']: df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 1)] current_month_id = list(df_loc.id)[0] previous_month_id = int(current_month_id) - 1 df_loc = df.loc[df.id == previous_month_id] previous_date = list(df_loc.year_month)[0] return previous_date elif self.date_type == '4_week': df_loc = df.loc[(df.year_week == f'{self.date_info}') & (df.week_day == 1)] current_4_week_id = list(df_loc.id)[0] df_loc = df.loc[df.id == int(current_4_week_id) - 21] current_4_week_month = list(df_loc.year_month)[0] df_loc = df.loc[(df.year_month == current_4_week_month) & (df.day == 1)] current_4_week_month_id = list(df_loc.id)[0] previous_4_week_month_id = int(current_4_week_month_id) - 1 df_loc = df.loc[df.id == previous_4_week_month_id] previous_date = list(df_loc.year_month)[0] return previous_date @staticmethod def udf_get_current_time(self): self.df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;") df = self.df_date.toPandas() if self.date_type == 'month': df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 25)] current_id = list(df_loc.id)[0] df_loc = df.loc[df.id == current_id] current_date = list(df_loc.date)[0] return current_date elif self.date_type in ['week', '4_week']: df_loc = df.loc[(df.year_week == f'{self.date_info}') & (df.week_day == 7)] current_id = list(df_loc.id)[0] df_loc = df.loc[df.id == current_id] current_date = list(df_loc.date)[0] return current_date elif self.date_type in ['last30day', 'day']: return self.date_info elif self.date_type in ['month_week']: return str(datetime.now().date()) @staticmethod def get_launch_time_interval_dict(): cur_date = datetime.now().date() return { "one_month": (cur_date + timedelta(days=-30)).strftime('%Y-%m-%d'), "three_month": (cur_date + timedelta(days=-90)).strftime('%Y-%m-%d'), "six_month": (cur_date + timedelta(days=-180)).strftime('%Y-%m-%d'), "twelve_month": (cur_date + timedelta(days=-360)).strftime('%Y-%m-%d'), "twenty_four_month": (cur_date + timedelta(days=-720)).strftime('%Y-%m-%d'), "thirty_six_month": (cur_date + timedelta(days=-1080)).strftime('%Y-%m-%d') } @staticmethod def calculate_change(current_col, previous_col): rise_col = F.col(current_col) - F.col(previous_col) change_col = F.when((F.col(previous_col).isNotNull()) & (F.col(previous_col) != 0), F.round((F.col(current_col) - F.col(previous_col)) / F.col(previous_col), 4) ).otherwise(None) return rise_col, change_col def read_data(self): print("1.获取dwd_asin_measure,得到各种类型的统计、ao值、自然流量占比、月销信息、bsr销量信息") sql = f""" select asin, round(asin_ao_val, 3) as asin_ao_val, round(asin_ao_val_matrix, 3) as matrix_ao_val, asin_zr_counts, asin_sp_counts, (asin_sb1_counts + asin_sb2_counts) as asin_sb_counts, asin_sb3_counts as asin_vi_counts, asin_bs_counts, asin_ac_counts, asin_tr_counts, asin_er_counts, asin_st_counts, asin_bsr_orders as bsr_orders, asin_amazon_orders, round(asin_zr_flow_proportion, 3) as zr_flow_proportion, round(asin_flow_proportion_matrix, 3) as matrix_flow_proportion from dwd_asin_measure where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'""" print("sql:" + sql) self.df_asin_measure = self.spark.sql(sqlQuery=sql) self.df_asin_measure = self.df_asin_measure.repartition(60).persist(StorageLevel.DISK_ONLY) self.df_asin_measure.show(10, truncate=False) print("2.获取dim_asin_detail,得到asin详情") sql = f""" select asin, asin_img_url, lower(asin_title) as asin_title, asin_title_len, asin_price, asin_rating, asin_total_comments, asin_buy_box_seller_type, seller_json, asin_page_inventory, asin_category_desc, asin_volume, asin_weight, asin_length, asin_width, asin_height, asin_color, asin_size, asin_style, asin_is_sale, asin_launch_time, asin_is_new, asin_img_num, asin_img_type, asin_material, lower(asin_brand_name) as asin_brand_name, asin_activity_type, act_one_two_val, act_three_four_val, act_five_six_val, act_eight_val, one_star,two_star, three_star, four_star, five_star, low_star, together_asin, ac_name, variation_num, account_name, account_id, parent_asin, asin_lob_info, is_contains_lob_info, is_package_quantity_abnormal, asin_quantity_variation_type, package_quantity, asin_is_movie as is_movie_label, asin_is_brand as is_brand_label, asin_is_alarm as is_alarm_brand, asin_is_self, date_format(created_time, 'yyyy-MM-dd HH:mm:ss') as asin_crawl_date, asin_bought_month, asin_image_view, case when product_description is not null then 1 else 0 end as is_with_product_description, asin_describe, category_id as top_category_id, category_first_id as top_category_first_id, customer_reviews_json, img_list as img_info, asin_follow_sellers as follow_sellers_count from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'""" print("sql:" + sql) self.df_asin_detail = self.spark.sql(sqlQuery=sql) self.df_asin_detail = self.df_asin_detail.repartition(60).persist(StorageLevel.DISK_ONLY) self.df_asin_detail.show(10, truncate=False) print("3.获取ods_bsr_end,获取有效rank信息") sql = f"""select rank as limit_rank, category_id as category_first_id from ods_bsr_end where site_name='{self.site_name}'""" print("sql:" + sql) df_bsr_end = self.spark.sql(sqlQuery=sql) self.df_bsr_end = F.broadcast(df_bsr_end) self.df_bsr_end.show(10, truncate=False) print("4.获取dim_asin_bs_category,获取分类名称") sql = f""" select asin, asin_bs_cate_1_rank as first_category_rank, asin_bs_cate_current_rank as current_category_rank, asin_bs_cate_1_id as category_first_id, asin_bs_cate_current_id as category_id from dim_asin_bs_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}'""" print("sql:" + sql) self.df_asin_bs_category = self.spark.sql(sqlQuery=sql) self.df_asin_bs_category = self.df_asin_bs_category.repartition(60).persist(StorageLevel.DISK_ONLY) self.df_asin_bs_category.show(10, truncate=False) print("5.获取dim_fd_asin_info,得到卖家相关信息") if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') or (self.date_type == '4_week' and self.date_info >= '2024-21'): sql = f""" select fd_unique as account_id, upper(fd_country_name) as seller_country_name from dim_fd_asin_info where site_name='{self.site_name}' and fd_unique is not null group by fd_unique, fd_country_name""" else: sql = f""" select account_id, account_name, seller_country_name, asin from (select fd_unique as account_id, fd_account_name as account_name, upper(fd_country_name) as seller_country_name, asin, ROW_NUMBER() OVER (PARTITION BY asin ORDER BY updated_at DESC) AS t_rank from dim_fd_asin_info where site_name = '{self.site_name}' and fd_unique is not null) tmp where tmp.t_rank = 1 """ self.df_fd_asin_info = self.spark.sql(sqlQuery=sql) self.df_fd_asin_info = self.df_fd_asin_info.repartition(60).persist(StorageLevel.DISK_ONLY) self.df_fd_asin_info.show(10, truncate=False) print("6.获取上一个最近30天的整合结果") sql = f""" select asin, round(asin_ao_val, 3) as previous_asin_ao_val, asin_price as previous_asin_price, sales as pervious_sales, variation_num as previous_variation_num, asin_rating as previous_asin_rating, bsr_orders as previous_bsr_orders, asin_total_comments as previous_asin_total_comments, first_category_rank as previous_first_category_rank from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.previous_date}' """ print("sql:" + sql) self.df_flow_asin_last = self.spark.sql(sqlQuery=sql) self.df_flow_asin_last = self.df_flow_asin_last.repartition(60).persist(StorageLevel.DISK_ONLY) self.df_flow_asin_last.show(10, truncate=False) print("7.获取asin的标题匹配度") sql = f""" select asin, contains_flag from dwd_title_matching_degree where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ print("sql:" + sql) self.df_title_matching_degree = self.spark.sql(sqlQuery=sql) self.df_title_matching_degree = self.df_title_matching_degree.repartition(60).persist(StorageLevel.DISK_ONLY) self.df_title_matching_degree.show(10, truncate=False) # 处理asin基础属性信息(体积重量相关)及bsr销售额相关信息 def handle_asin_basic_attribute(self): # 处理重量类型 self.df_asin_detail = self.df_asin_detail.withColumn("asin_weight_type", F.expr("""CASE WHEN asin_weight BETWEEN 0 AND 0.2 THEN 1 WHEN asin_weight BETWEEN 0.2 AND 0.4 THEN 2 WHEN asin_weight BETWEEN 0.4 AND 0.6 THEN 3 WHEN asin_weight BETWEEN 0.6 AND 1 THEN 4 WHEN asin_weight BETWEEN 1 AND 2 THEN 5 WHEN asin_weight >= 2 THEN 6 ELSE 0 END""")) # 处理体积重/毛重相关信息 self.df_asin_detail = self.df_asin_detail.withColumn( "asin_weight_ratio", F.when( F.col("asin_length").isNotNull() & (F.col("asin_width").isNotNull()) & (F.col("asin_height").isNotNull()) & (F.col("asin_weight") > 0), F.round(F.col("asin_length") * F.col("asin_width") * F.col("asin_height") * 3.2774128 / (F.col("asin_weight") * 453.59), 4) ).otherwise(F.lit(-1)) ) # 处理尺寸类型 if self.site_name == 'us': expr_str = f""" CASE WHEN asin_weight > 0 AND asin_weight * 16 <= 16 AND asin_length > 0 AND asin_length <= 15 AND asin_width > 0 AND asin_width <= 12 AND asin_height > 0 AND asin_height <= 0.75 THEN 1 WHEN asin_weight > 0 AND asin_weight <= 20 AND asin_length > 0 AND asin_length <= 18 AND asin_width > 0 AND asin_width <= 14 AND asin_height > 0 AND asin_height <= 8 THEN 2 WHEN asin_weight > 0 AND asin_weight <= 70 AND asin_length > 0 AND asin_length <= 60 AND asin_width > 0 AND asin_width <= 30 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 130 THEN 3 WHEN asin_weight > 0 AND asin_weight <= 150 AND asin_length > 0 AND asin_length <= 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 130 THEN 4 WHEN asin_weight > 0 AND asin_weight <= 150 AND asin_length > 0 AND asin_length <= 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 165 THEN 5 WHEN asin_weight > 150 AND asin_length > 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 > 165 THEN 6 ELSE 0 END""" else: expr_str = f""" CASE WHEN asin_weight > 0 AND asin_weight <= 100 AND asin_length > 0 AND asin_length <= 20 AND asin_width > 0 AND asin_width <= 15 AND asin_height > 0 AND asin_height <= 1 THEN 1 WHEN asin_weight > 0 AND asin_weight <= 500 AND asin_length > 0 AND asin_length <= 33 AND asin_width > 0 AND asin_width <= 23 AND asin_height > 0 AND asin_height <= 2.5 THEN 2 WHEN asin_weight > 0 AND asin_weight <= 1000 AND asin_length > 0 AND asin_length <= 33 AND asin_width > 0 AND asin_width <= 23 AND asin_height > 0 AND asin_height <= 5 THEN 3 WHEN asin_weight > 0 AND asin_weight <= 12000 AND asin_length > 0 AND asin_length <= 45 AND asin_width > 0 AND asin_width <= 34 AND asin_height > 0 AND asin_height <= 26 THEN 4 WHEN asin_weight > 0 AND asin_weight <= 2000 AND asin_length > 0 AND asin_length <= 61 AND asin_width > 0 AND asin_width <= 46 AND asin_height > 0 AND asin_height <= 46 THEN 5 WHEN asin_length > 0 AND asin_length <= 150 AND asin_length + asin_length + (asin_width + asin_height) <= 300 THEN 6 WHEN asin_length > 150 AND asin_length + asin_length + (asin_width + asin_height) > 300 THEN 7 ELSE 0 END""" self.df_asin_detail = self.df_asin_detail.withColumn("asin_size_type", F.expr(expr_str)).drop("asin_length", "asin_width", "asin_height") # 通过ASIN页面信息处理(评分类型、上架时间类型、价格类型) def handle_asin_detail_all_type(self): # 评分类型 self.df_asin_detail = self.df_asin_detail.withColumn( "asin_rating_type", F.expr("""CASE WHEN asin_rating >= 4.5 THEN 1 WHEN asin_rating >= 4 AND asin_rating < 4.5 THEN 2 WHEN asin_rating >= 3.5 AND asin_rating < 4 THEN 3 WHEN asin_rating >= 3 AND asin_rating < 3.5 THEN 4 WHEN asin_rating < 3 AND asin_rating >= 0 THEN 5 ELSE 0 END""")) # 上架时间类型 one_month = self.launch_time_interval_dict['one_month'] three_month = self.launch_time_interval_dict['three_month'] six_month = self.launch_time_interval_dict['six_month'] twelve_month = self.launch_time_interval_dict['twelve_month'] twenty_four_month = self.launch_time_interval_dict['twenty_four_month'] thirty_six_month = self.launch_time_interval_dict['thirty_six_month'] expr_str = f"""CASE WHEN asin_launch_time >= '{one_month}' THEN 1 WHEN asin_launch_time >= '{three_month}' AND asin_launch_time < '{one_month}' THEN 2 WHEN asin_launch_time >= '{six_month}' AND asin_launch_time < '{three_month}' THEN 3 WHEN asin_launch_time >= '{twelve_month}' AND asin_launch_time < '{six_month}' THEN 4 WHEN asin_launch_time >= '{twenty_four_month}' AND asin_launch_time < '{twelve_month}' THEN 5 WHEN asin_launch_time >= '{thirty_six_month}' AND asin_launch_time < '{twenty_four_month}' THEN 6 WHEN asin_launch_time < '{thirty_six_month}' THEN 7 ELSE 0 END""" self.df_asin_detail = self.df_asin_detail.withColumn("asin_launch_time_type", F.expr(expr_str)) # 价格类型 self.df_asin_detail = self.df_asin_detail.withColumn( "asin_price_type", F.expr(""" CASE WHEN asin_price IS NOT NULL AND asin_price > 0 AND asin_price < 10 THEN 1 WHEN asin_price >= 10 AND asin_price < 15 THEN 2 WHEN asin_price >= 15 AND asin_price < 20 THEN 3 WHEN asin_price >= 20 AND asin_price < 30 THEN 4 WHEN asin_price >= 30 AND asin_price < 50 THEN 5 WHEN asin_price >= 50 THEN 6 ELSE 0 END""")) # 处理asin分类、排名、排名类型字段、是否有效排名信息 def handle_asin_category_info(self): self.df_asin_detail = self.df_asin_detail.join(self.df_asin_bs_category, on=['asin'], how='left') self.df_asin_detail = self.df_asin_detail.withColumn( "category_id", F.coalesce(F.col("category_id"), F.col("top_category_id")) ).withColumn( "category_first_id", F.coalesce(F.col("category_first_id"), F.col("top_category_first_id")) ).drop("top_category_id", "top_categoty_first_id") self.df_asin_detail = self.df_asin_detail.withColumn( "asin_rank_type", F.expr(""" CASE WHEN first_category_rank IS NOT NULL AND first_category_rank BETWEEN 0 AND 1000 THEN 1 WHEN first_category_rank BETWEEN 1000 AND 5000 THEN 2 WHEN first_category_rank BETWEEN 5000 AND 10000 THEN 3 WHEN first_category_rank BETWEEN 10000 AND 20000 THEN 4 WHEN first_category_rank BETWEEN 20000 AND 30000 THEN 5 WHEN first_category_rank BETWEEN 30000 AND 50000 THEN 6 WHEN first_category_rank BETWEEN 50000 AND 70000 THEN 7 WHEN first_category_rank >= 70000 THEN 8 ELSE 0 END""")) self.df_asin_detail = self.df_asin_detail.join(self.df_bsr_end, on=['category_first_id'], how='left') self.df_asin_detail = self.df_asin_detail.withColumn( "bsr_type", F.expr("""CASE WHEN limit_rank is null and category_first_id <= 500000 THEN 1 WHEN limit_rank is not null and category_first_id <= limit_rank THEN 1 ELSE 0 END""")).drop("limit_rank") self.df_asin_bs_category.unpersist() # 处理asin的bsr销量、亚马逊月销信息、ao值、母体ao、自然流量占比、母体自然流量占比、ao值类型 def handle_asin_measure(self): self.df_asin_detail = self.df_asin_detail.join(self.df_asin_measure, on=['asin'], how='left') self.df_asin_detail = self.df_asin_detail.withColumn( "sales", F.round(F.col("bsr_orders") * F.col("asin_price"), 2) ).withColumn( "asin_bought_month", F.coalesce(F.col("asin_bought_month"), F.col("asin_amazon_orders")) ).withColumn("asin_ao_val_type", F.expr("""CASE WHEN asin_ao_val BETWEEN 0 AND 0.1 THEN 1 WHEN asin_ao_val BETWEEN 0.1 AND 0.2 THEN 2 WHEN asin_ao_val BETWEEN 0.2 AND 0.4 THEN 3 WHEN asin_ao_val BETWEEN 0.4 AND 0.8 THEN 4 WHEN asin_ao_val BETWEEN 0.8 AND 1.2 THEN 5 WHEN asin_ao_val BETWEEN 1.2 AND 2 THEN 6 WHEN asin_ao_val >= 2 THEN 7 ELSE 0 END""")).drop("asin_amazon_orders") self.df_asin_measure.unpersist() # 处理配送方式、卖家所在地以及卖家所在地类型 def handle_seller_country(self): if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') or (self.date_type == '4_week' and self.date_info >= '2024-21'): self.df_asin_detail = self.df_asin_detail.join(self.df_fd_asin_info, on=['account_id'], how='left') else: self.df_asin_detail = self.df_asin_detail.drop("account_id", "account_name") self.df_asin_detail = self.df_asin_detail.join(self.df_fd_asin_info, on=['asin'], how='left') self.df_asin_detail = self.df_asin_detail.withColumn("asin_site_name_type", F.expr(""" CASE WHEN asin_buy_box_seller_type = 1 THEN 4 WHEN asin_buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%US%' THEN 1 WHEN asin_buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%CN%' THEN 2 ELSE 3 END""")) self.df_fd_asin_info.unpersist() # 处理asin的lqs评分 def handle_asin_lqs_rating(self): self.df_asin_detail = self.df_asin_detail.withColumn( "category_node_rating", F.when(F.col("category_id").isNotNull(), F.lit(1)).otherwise(F.lit(0)) ).withColumn( "zr_rating", F.when(F.col("asin_zr_counts") > 0, F.lit(0.5)).otherwise(F.lit(0)) ).withColumn( "sp_rating", F.when(F.col("asin_sp_counts") > 0, F.lit(1)).otherwise(F.lit(0)) ).withColumn( "a_add_rating", F.when(F.col("asin_img_type").contains("3"), F.lit(1)).otherwise(F.lit(0)) ).withColumn( "video_rating", F.when(F.col("asin_img_type").contains("2"), F.lit(0.5)).otherwise(F.lit(0)) ).withColumn( "brand_rating", F.when(F.col("is_brand_label") == 1, F.lit(0.2)).otherwise(F.lit(0)) ).withColumn( "product_describe_rating", F.when(F.col("is_with_product_description") == 1, F.lit(0.2)).otherwise(F.lit(0)) ).withColumn( "highlight_rating", F.when((F.col("asin_describe").isNotNull()) & (F.size(F.split(F.col("asin_describe"), '\\|-\\|')) <= 4), F.size(F.split(F.col("asin_describe"), '\\|-\\|')) * 0.4) .when((F.col("asin_describe").isNotNull()) & (F.size(F.split(F.col("asin_describe"), '\\|-\\|')) > 4), F.lit(1.6)) .otherwise(F.lit(0)) ).withColumn( "title_len_rating", F.when((F.col("asin_title_len") >= 50) & (F.col("asin_title_len") <= 200), F.lit(0.5)).otherwise(F.lit(0)) ).withColumn( "title_brand_rating", F.expr(f"""CASE WHEN asin_brand_name is not null AND lower(regexp_replace(asin_title, '[^a-zA-Z0-9\\s]', '')) LIKE lower(regexp_replace(asin_brand_name, '[^a-zA-Z0-9\\s]', '')) || '%' THEN 0.5 ELSE 0 END""") ).withColumn( "img_num_rating", F.when(F.col("asin_img_num") <= 4, F.col("asin_img_num") * 0.5).when(F.col("asin_img_num") > 4, F.lit(2)).otherwise(F.lit(0)) ).withColumn( "img_enlarge_rating", F.when(F.col("asin_image_view") == 1, F.lit(0.5)).otherwise(F.lit(0)) ) self.df_asin_detail = self.df_asin_detail.withColumn( "asin_lqs_rating_detail", F.to_json(F.struct(F.col("category_node_rating"), F.col("zr_rating"), F.col("sp_rating"), F.col("a_add_rating"), F.col("video_rating"), F.col("brand_rating"), F.col("product_describe_rating"), F.col("highlight_rating"), F.col("title_len_rating"), F.col("title_brand_rating"), F.col("img_num_rating"), F.col("img_enlarge_rating")) ) ) self.df_asin_detail = self.df_asin_detail.withColumn( "asin_lqs_rating", F.col("category_node_rating") + F.col("zr_rating") + F.col("sp_rating") + F.col("a_add_rating") + F.col( "video_rating") + F.col("brand_rating") + F.col("product_describe_rating") + F.col( "highlight_rating") + F.col("title_len_rating") + F.col("title_brand_rating") + F.col( "img_num_rating") + F.col("img_enlarge_rating") ) self.df_asin_detail = self.df_asin_detail.\ drop("is_with_product_description", "asin_describe", "asin_image_view", "category_node_rating", "zr_rating", "sp_rating", "a_add_rating", "video_rating", "brand_rating", "product_describe_rating", "highlight_rating", "title_len_rating", "title_brand_rating", "img_num_rating", "img_enlarge_rating") # 处理asin是否隐藏分类信息(US/UK/DE站点通用)以及asin_type信息 def handle_asin_is_hide(self): mysql_con = DBUtil.get_connection_info("mysql", "us") sql = f""" select category_id_base as category_id, 1 as hide_flag from us_bs_category_hide group by category_id_base """ df_hide_category = SparkUtil.read_jdbc_query(session=self.spark, url=mysql_con['url'], pwd=mysql_con['pwd'], username=mysql_con['username'], query=sql) self.df_asin_detail = self.df_asin_detail.join(df_hide_category, on=['category_id'], how='left') self.df_asin_detail = self.df_asin_detail.withColumn("asin_is_hide", F.expr(""" CASE WHEN hide_flag = 1 THEN 1 WHEN category_first_id = 'grocery' and category_id != '6492272011' THEN 1 WHEN category_id in ('21393128011', '21377129011', '21377127011', '21377130011', '21388218011', '21377132011') THEN 1 ELSE 0 END""")).drop("hide_flag") self.df_asin_detail = self.df_asin_detail.withColumn("asin_is_need", F.expr(""" CASE WHEN category_first_id in ('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed') THEN 1 WHEN asin NOT LIKE 'B0%' THEN 1 ELSE 0 END""")) self.df_asin_detail = self.df_asin_detail.withColumn("asin_type", F.expr(""" CASE WHEN asin_is_self=1 THEN 1 WHEN asin_is_need=1 THEN 2 WHEN asin_is_hide=1 THEN 3 ELSE 0 END""" )).drop("asin_is_self", "asin_is_need", "asin_is_hide") # 处理匹配度 def handle_title_matching_degree(self): window = Window.partitionBy("asin") self.df_title_matching_degree = self.df_title_matching_degree.withColumn( "asin_count", F.count("asin").over(window)) # 统计每个asin中,flag为1的数量 self.df_title_matching_degree = self.df_title_matching_degree.withColumn( "contains_count", F.sum("contains_flag").over(window)) self.df_title_matching_degree = self.df_title_matching_degree.withColumn( "title_matching_degree", F.round(F.col("contains_count") / F.col("asin_count"), 4)) self.df_title_matching_degree = self.df_title_matching_degree.drop("asin_count", "contains_count", "contains_flag") self.df_title_matching_degree = self.df_title_matching_degree.drop_duplicates(['asin']) self.df_asin_detail = self.df_asin_detail.join(self.df_title_matching_degree, on=['asin'], how='left') self.df_title_matching_degree.unpersist() # 处理变体ASIN属性(asin维度)的变化率相关信息 def handle_asin_attribute_change(self): self.df_asin_detail = self.df_asin_detail.join(self.df_flow_asin_last, on=['asin'], how='left') columns_to_change = [ ("first_category_rank", "previous_first_category_rank", "asin_rank"), ("bsr_orders", "previous_bsr_orders", "asin_bsr_orders"), ("asin_rating", "previous_asin_rating", "asin_rating"), ("asin_total_comments", "previous_asin_total_comments", "asin_comments"), ("variation_num", "previous_variation_num", "asin_variation"), ("asin_ao_val", "previous_asin_ao_val", "asin_ao"), ("asin_price", "previous_asin_price", "asin_price"), ("sales", "pervious_sales", "asin_sales") ] for current_col, previous_col, suffix in columns_to_change: rise_col, change_col = self.calculate_change(current_col, previous_col) if suffix == 'asin_ao': self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_rise", F.round(rise_col, 3)) elif suffix in ['asin_price', 'sales']: self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_rise", F.round(rise_col, 2)) elif suffix == 'asin_rating': self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_rise", F.round(rise_col, 1)) else: self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_rise", rise_col.cast(IntegerType())) self.df_asin_detail = self.df_asin_detail.withColumn(f"{suffix}_change", F.round(change_col, 4)) self.df_asin_detail.drop(previous_col) self.df_flow_asin_last.unpersist() # 字段标准化 def handle_column(self): self.df_save = self.df_asin_detail.\ select("asin", "asin_ao_val", "asin_zr_counts", "asin_sp_counts", "asin_sb_counts", "asin_vi_counts", "asin_bs_counts", "asin_ac_counts", "asin_tr_counts", "asin_er_counts", "bsr_orders", F.lit(None).alias("orders"), "sales", F.lit(None).alias("cate_current_id"), F.lit(None).alias("cate_1_id"), "asin_img_url", "asin_title", "asin_title_len", "asin_price", "asin_rating", "asin_total_comments", "asin_buy_box_seller_type", "asin_page_inventory", "asin_category_desc", "asin_volume", "asin_weight", "asin_color", "asin_size", "asin_style", "asin_is_sale", F.lit(None).alias("asin_rank"), "asin_launch_time", "asin_is_new", "asin_img_num", "asin_img_type", "asin_material", "asin_brand_name", "asin_activity_type", "act_one_two_val", "act_three_four_val", "act_five_six_val", "act_eight_val", F.lit(None).alias("qa_num"), "one_star", "two_star", "three_star", "four_star", "five_star", "low_star", "together_asin", "ac_name", "variation_num", "account_name", "account_id", "seller_country_name", "bsr_type", F.lit(-1).alias("bsr_best_orders_type"), F.lit(None).alias("zr_best_orders_type"), "parent_asin", "asin_rank_rise", "asin_rank_change", "asin_ao_rise", "asin_ao_change", "asin_price_rise", "asin_price_change", F.lit(None).alias("asin_orders_rise"), F.lit(None).alias("asin_orders_change"), "asin_rating_rise", "asin_rating_change", "asin_comments_rise", "asin_comments_change", "asin_bsr_orders_rise", "asin_bsr_orders_change", "asin_sales_rise", "asin_sales_change", "asin_variation_rise", "asin_variation_change", "asin_size_type", "asin_rating_type", "asin_site_name_type", "asin_weight_type", "asin_launch_time_type", "asin_ao_val_type", "asin_rank_type", "asin_price_type", F.lit(None).alias("created_time"), F.lit(None).alias("updated_time"), "asin_lob_info", "customer_reviews_json", "img_info", "is_contains_lob_info", "is_package_quantity_abnormal", "asin_st_counts", "asin_quantity_variation_type", "package_quantity", F.lit(None).alias("sp_type1"), F.lit(None).alias("sp_type2"), F.lit(None).alias("sp_type3"), "is_movie_label", "is_brand_label", "is_alarm_brand", "asin_type", F.lit(None).alias("asin_cost_fee"), F.lit(None).alias("asin_refund_fee"), F.lit(None).alias("asin_adv_fee"), F.lit(None).alias("asin_commission_fee"), F.lit(None).alias("asin_fba_fee"), F.lit(None).alias("asin_freight_air_fee"), F.lit(None).alias("asin_freight_ocean_fee"), F.lit(None).alias("asin_operate_fee"), F.lit(None).alias("asin_air_freight_gross_margin"), F.lit(None).alias("asin_ocean_freight_gross_margin"), "asin_crawl_date", F.lit(None).alias("asin_package_quantity"), F.lit(None).alias("asin_pattern_name"), "category_first_id", "category_id", "first_category_rank", "current_category_rank", "asin_weight_ratio", "asin_bought_month", F.lit(None).alias("buy_data_bought_week"), F.lit(None).alias("buy_data_viewed_month"), F.lit(None).alias("buy_data_viewed_week"), F.lit(None).alias("theme_en"), F.lit(None).alias("theme_label_en"), "asin_lqs_rating", "asin_lqs_rating_detail", "title_matching_degree", "zr_flow_proportion", "matrix_flow_proportion", "matrix_ao_val", "follow_sellers_count", "seller_json", F.lit(self.site_name).alias("site_name"), F.lit(self.date_type).alias("date_type"), F.lit(self.date_info).alias("date_info")) self.df_save = self.df_save.na.fill( {"asin_zr_counts": 0, "asin_sp_counts": 0, "asin_sb_counts": 0, "asin_vi_counts": 0, "asin_bs_counts": 0, "asin_ac_counts": 0, "asin_tr_counts": 0, "asin_er_counts": 0, "asin_title_len": 0, "asin_total_comments": 0, "variation_num": 0, "asin_img_num": 0, "act_one_two_val": 0.0, "act_three_four_val": 0.0, "act_five_six_val": 0.0, "act_eight_val": 0.0, "one_star": 0, "two_star": 0, "three_star": 0, "four_star": 0, "five_star": 0, "low_star": 0, "asin_size_type": 0, "asin_rating_type": 0, "asin_site_name_type": 0, "asin_weight_type": 0, "asin_launch_time_type": 0, "asin_ao_val_type": 0, "asin_rank_type": 0, "asin_price_type": 0, "asin_quantity_variation_type": 0, "package_quantity": 1, "is_movie_label": 0, "is_brand_label": 0, "is_alarm_brand": 0, "title_matching_degree": 0.0, "asin_lqs_rating": 0.0, "follow_sellers_count": -1}) self.df_save = self.df_save.repartition(60).persist(StorageLevel.DISK_ONLY) self.df_save = self.df_save.drop_duplicates(['asin']).filter(F.length(F.col("asin"))<=10) print("数据量为:", self.df_save.count()) self.df_save.show(10, truncate=False) # 保存数据 def save_data(self): print(f"清除hdfs目录中:{self.hdfs_path}") HdfsUtils.delete_file_in_folder(self.hdfs_path) partition_by = ["site_name", "date_type", "date_info"] print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", ) df_save = self.df_save df_save = df_save.drop("seller_json") df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=self.partitions_by) print("save asin_detail success") if self.date_type in ['month', 'month_week'] and self.date_info >= '2024-06': max_report_sql = f""" select max(report_date) as completed_date_info from workflow_everyday where site_name='us' and date_type='month' and page='流量选品' and status_val=14 """ mysql_con = DBUtil.get_connection_info("mysql", "us") df_date_info = SparkUtil.read_jdbc_query(session=self.spark, url=mysql_con['url'], pwd=mysql_con['pwd'], username=mysql_con['username'], query=max_report_sql) completed_date_info = df_date_info.take(1)[0]['completed_date_info'] if self.date_info > completed_date_info: print("往doris存储最新asin详情信息:") df_doris = self.df_save.\ select("asin", "asin_ao_val", "asin_title", "asin_title_len", "asin_category_desc", "asin_volume", "asin_weight", "asin_launch_time", "asin_brand_name", "one_star", "two_star", "three_star", "four_star", "five_star", "low_star", "account_name", "account_id", "seller_country_name", "category_first_id", "parent_asin", "variation_num", "img_info", "asin_crawl_date", "asin_price", "asin_rating", "asin_total_comments", "matrix_ao_val", "zr_flow_proportion", "matrix_flow_proportion", "date_info", F.col("asin_img_url").alias("img_url"), F.col("category_id").alias("category_current_id"), F.col("first_category_rank").alias("category_first_rank"), F.col("current_category_rank").alias("category_current_rank"), "asin_type", "bsr_orders", F.col("sales").alias("bsr_orders_sale"), F.col("asin_page_inventory").alias("page_inventory"), "asin_bought_month", "seller_json", F.col("asin_buy_box_seller_type").alias("buy_box_seller_type") ) table_columns = """asin, asin_ao_val, asin_title, asin_title_len, asin_category_desc, asin_volume, asin_weight, asin_launch_time, asin_brand_name, one_star, two_star, three_star, four_star, five_star, low_star, account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info, asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion, date_info, img_url, category_current_id, category_first_rank, category_current_rank, asin_type, bsr_orders, bsr_orders_sale, page_inventory, asin_bought_month, seller_json, buy_box_seller_type""" DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.asin_latest_detail_table, table_columns=table_columns) print("save asin_latest_detail success") else: print("不用导出旧数据到doris中") pass def handle_data(self): self.handle_asin_basic_attribute() self.handle_asin_detail_all_type() self.handle_asin_category_info() self.handle_asin_measure() self.handle_seller_country() self.handle_asin_lqs_rating() self.handle_asin_is_hide() self.handle_title_matching_degree() self.handle_asin_attribute_change() self.handle_column() if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwtFlowAsin(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()