""" @Author : wangrui @Description : 流量选品 @SourceTable : dwd_asin_measure dim_asin_detail ods_bsr_end dim_asin_bs_category dim_fd_asin_info dim_asin_volume @SinkTable : dwt_flow_asin @CreateTime : 2023/01/10 07:55 @UpdateTime : 2023/01/10 07:55 """ import os import sys import re from functools import reduce sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F import datetime from pyspark.sql.types import StringType, IntegerType, DoubleType, MapType # from utils import common_udf # common_udf.udf_get_package_quantity(title='xxpiec') # quit() from utils.db_util import DBUtil from utils.spark_util import SparkUtil from sqlalchemy import create_engine import pandas as pd class DwtFlowAsin(Templates): def __init__(self, site_name="us", date_type="week", date_info="2022-1"): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f"dwt_flow_asin" self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}") self.previous_date = self.udf_get_previous_last_30_day(self) self.current_date = self.udf_get_current_time(self) self.get_date_info_tuple() self.get_year_month_days_dict(year=int(self.year)) self.orders_transform_rate = self.get_orders_transform_rate() # 写入、分区初始化 self.df_save = self.spark.sql(f"select 1+1;") self.partitions_by = ['site_name', 'date_type', 'date_info'] if self.date_type in ["week"]: self.reset_partitions(80) elif self.date_type in ["month", "last30day", '4_week']: self.reset_partitions(140) elif self.date_type in ["quarter"]: self.reset_partitions(200) # 初始化全局df self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_asin_measure = self.spark.sql(f"select 1+1;") self.df_bsr_end = self.spark.sql(f"select 1+1;") self.df_one_category_report = self.spark.sql(f"select 1+1;") self.df_fd_asin_info = self.spark.sql(f"select 1+1;") self.df_asin_variat = self.spark.sql(f"select 1+1;") self.df_flow_asin_last = self.spark.sql(f"select 1+1;") self.df_asin_volume = self.spark.sql(f"select 1+1;") self.df_asin_counts = self.spark.sql(f"select 1+1;") self.df_st_asin_info = self.spark.sql(f"select 1+1;") self.df_bs_report = self.spark.sql(f"select 1+1;") self.u_judge_us_size_type = self.spark.udf.register('u_judge_us_size_type', self.udf_get_us_size_type, IntegerType()) self.u_judge_other_size_type = self.spark.udf.register('u_judge_other_size_type', self.udf_get_other_size_type, IntegerType()) self.u_judge_rating_type = self.spark.udf.register('u_judge_rating_type', self.udf_get_rating_type, IntegerType()) self.u_judge_site_name_type = self.spark.udf.register('u_judge_site_name_type', self.udf_get_site_name_type, IntegerType()) self.u_judge_weight_type = self.spark.udf.register('u_judge_weight_type', self.udf_get_weight_type, IntegerType()) self.u_judge_quantity_variartion_type = self.spark.udf.register('u_judge_quantity_variation_type', self.udf_get_quantity_variation_type, IntegerType()) self.u_judget_bsr_type = self.spark.udf.register('u_judge_bsr_type', self.udf_get_bsr_type, IntegerType()) self.u_year_week = self.spark.udf.register('u_year_week', self.udf_year_week, StringType()) self.u_judge_package_quantity = self.spark.udf.register('u_judge_package_quantity', common_udf.udf_get_package_quantity, IntegerType()) self.u_judge_sp_type = self.spark.udf.register('u_judge_sp_type', self.udf_get_sp_type, MapType(StringType(), IntegerType(), False)) self.u_judge_brand_type = self.spark.udf.register('u_judge_brand_type', self.udf_get_brand_type, IntegerType()) def get_orders_transform_rate(self): month_days = self.year_month_days_dict[int(self.month)] if self.date_type in ['day', 'week']: if self.date_type == 'day': return 1 / month_days if self.date_type == 'week': return 7 / month_days else: return 1 @staticmethod def udf_year_month(self): self.df_date = self.spark.sql(f"select * from dim_date_20_to_30;") df = self.df_date.toPandas() if self.date_type == 'day' or self.date_type == 'last30day': df_today = df.loc[df.date == f'{self.date_info}'] year_month = list(df_today.year_month)[0] print(year_month) return year_month elif self.date_type == 'month': year_month = self.date_info return year_month @staticmethod def udf_year_week(dt): year, week = dt.split("-")[0], dt.split("-")[1] if int(week) < 10: return f"{year}-0{week}" else: return f"{year}-{week}" @staticmethod def udf_get_previous_last_30_day(self): self.df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;") df = self.df_date.toPandas() if self.date_type == 'last30day': df_loc = df.loc[df.date == f'{self.date_info}'] current_date_id = list(df_loc.id)[0] original_date_id = int(current_date_id) - 30 df_loc = df.loc[df.id == original_date_id] original_year_month = list(df_loc.year_month)[0] df_loc = df.loc[(df.year_month == original_year_month) & (df.day == 1)] original_year_month_id = list(df_loc.id)[0] previous_year_month_id = int(original_year_month_id) - 1 df_loc = df.loc[df.id == previous_year_month_id] previous_date = list(df_loc.year_month)[0] return previous_date elif self.date_type == 'month': df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 1)] current_month_id = list(df_loc.id)[0] previous_month_id = int(current_month_id) - 1 df_loc = df.loc[df.id == previous_month_id] previous_date = list(df_loc.year_month)[0] return previous_date elif self.date_type == '4_week': df_loc = df.loc[(df.year_week == f'{self.date_info}') & (df.week_day == 1)] current_4_week_id = list(df_loc.id)[0] df_loc = df.loc[df.id == int(current_4_week_id) - 21] current_4_week_month = list(df_loc.year_month)[0] df_loc = df.loc[(df.year_month == current_4_week_month) & (df.day == 1)] current_4_week_month_id = list(df_loc.id)[0] previous_4_week_month_id = int(current_4_week_month_id) - 1 df_loc = df.loc[df.id == previous_4_week_month_id] previous_date = list(df_loc.year_month)[0] return previous_date @staticmethod def udf_get_current_time(self): self.df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;") df = self.df_date.toPandas() if self.date_type == 'month': df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 25)] current_id = list(df_loc.id)[0] df_loc = df.loc[df.id == current_id] current_date = list(df_loc.date)[0] return current_date elif self.date_type in ['week', '4_week']: df_loc = df.loc[(df.year_week == f'{self.date_info}') & (df.week_day == 7)] current_id = list(df_loc.id)[0] df_loc = df.loc[df.id == current_id] current_date = list(df_loc.date)[0] return current_date elif self.date_type in ['last30day', 'day']: return self.date_info @staticmethod def udf_get_us_size_type(asin_weight, asin_length, asin_width, asin_high, asin_element_unit): if (asin_weight is not None) and (asin_length is not None) and (asin_width is not None) and ( asin_high is not None): if asin_element_unit == 'cm': asin_length = asin_length * 0.39 asin_width = asin_width * 0.39 asin_high = asin_high * 0.39 asin_girths = asin_length + (asin_width + asin_high) * 2 if (asin_weight > 0 and asin_weight * 16 <= 16) and (asin_length > 0 and asin_length <= 15) and ( asin_width > 0 and asin_width <= 12) and (asin_high > 0 and asin_high <= 0.75): return 1 elif (asin_weight * 16 > 16 and asin_weight <= 20) and (asin_length > 15 and asin_length <= 18) and ( asin_width > 12 and asin_width <= 14) and (asin_high > 0.75 and asin_high <= 8): return 2 elif (asin_weight > 20 and asin_weight <= 70) and (asin_length > 18 and asin_length <= 60) and ( asin_width > 14 and asin_width <= 30) and (asin_girths <= 130): return 3 elif (asin_weight > 70 and asin_weight <= 150) and (asin_length > 60 and asin_length <= 108) and ( asin_girths <= 130): return 4 elif (asin_weight > 70 and asin_weight <= 150) and (asin_length > 60 and asin_length <= 108) and ( asin_girths > 130) and (asin_girths <= 165): return 5 elif (asin_weight > 150) and (asin_length > 108) and (asin_girths > 165): return 6 else: return 0 else: return 0 @staticmethod def udf_get_other_size_type(asin_weight, asin_length, asin_width, asin_high, asin_element_unit): if (asin_weight is not None) and (asin_length is not None) and (asin_width is not None) and ( asin_high is not None): if asin_element_unit == 'inches': asin_length = asin_length * 2.54 asin_width = asin_width * 2.54 asin_high = asin_high * 2.54 asin_girths = asin_length + (asin_width + asin_high) * 2 if (asin_weight > 0 and asin_weight <= 100) and (asin_length > 0 and asin_length <= 20) and ( asin_width > 0 and asin_width <= 15) and (asin_high > 0 and asin_high <= 1): return 1 elif (asin_weight > 100 and asin_weight <= 500) and (asin_length > 20 and asin_length <= 33) and ( asin_width > 15 and asin_width <= 23) and (asin_high > 1 and asin_high <= 2.5): return 2 elif (asin_weight > 500 and asin_weight <= 1000) and (asin_length > 20 and asin_length <= 33) and ( asin_width > 15 and asin_width <= 23) and (asin_high > 2.5 and asin_high <= 5): return 3 elif (asin_weight > 1000 and asin_weight <= 12000) and (asin_length > 33 and asin_length <= 45) and ( asin_width > 23 and asin_width <= 34) and (asin_high > 5 and asin_high <= 26): return 4 elif (asin_weight > 1000 and asin_weight <= 2000) and (asin_length > 45 and asin_length <= 61) and ( asin_width > 34 and asin_width <= 46) and (asin_high > 26 and asin_high <= 46): return 5 elif (asin_length > 0 and asin_length <= 150) and (asin_girths > 0 and asin_girths <= 300): return 6 elif (asin_length > 150) and (asin_girths > 300): return 7 else: return 0 else: return 0 @staticmethod def udf_get_rating_type(asin_rating): if asin_rating is not None: if asin_rating >= 4.5: return 1 elif (asin_rating >= 4 and asin_rating < 4.5): return 2 elif (asin_rating >= 3.5 and asin_rating < 4): return 3 elif (asin_rating >= 3 and asin_rating < 3.5): return 4 elif (asin_rating < 3): return 5 else: return 0 else: return 0 @staticmethod def udf_get_quantity_variation_type(size): size = str(size).lower() if size not in ['null', 'none']: sign_word = 'quantity' if sign_word in size: return 1 else: return 0 else: return 0 @staticmethod def udf_get_site_name_type(asin_buy_box_seller_type, seller_country_name): if str(seller_country_name).lower() not in ['none', 'null']: if asin_buy_box_seller_type == 1: return 4 elif asin_buy_box_seller_type != 1 and str(seller_country_name).upper().find('US') != -1: return 1 elif asin_buy_box_seller_type != 1 and str(seller_country_name).upper().find('CN') != -1: return 2 else: return 3 else: return 0 @staticmethod def udf_get_weight_type(asin_weight): if asin_weight is not None: if (asin_weight > 0 and asin_weight < 0.2): return 1 if (asin_weight >= 0.2 and asin_weight < 0.4): return 2 if (asin_weight >= 0.4 and asin_weight < 0.6): return 3 if (asin_weight >= 0.6 and asin_weight < 1): return 4 if (asin_weight >= 1 and asin_weight < 2): return 5 if (asin_weight >= 2): return 6 else: return 0 return 0 @staticmethod def udf_get_bsr_type(asin_rank, limit_rank): if asin_rank is not None: if asin_rank <= limit_rank: return 1 else: return 0 else: return 0 # @staticmethod # def udf_get_package_quantity(title): # if title != '': # title = str(title).lower() # eligible_list = [] # if title not in ['null', 'none']: # pattern1 = r'\b[^\d\s]*(\d+)[^\d\s]* (pcs|piece|pieces|set|pack|packs|pairs|pk)\b' # pattern2 = r'\b(set|pack) of [^\d\s]*(\d+)[^\d\s]*\b' # pattern3 = r'\b[^\d\s]*(\d+)[^\d\s]*-(pack|piece)\b' # pattern4 = r'\b[^\d\s]*(\d+)pcs\b' # title_list1 = re.findall(pattern1, title) # title_list2 = re.findall(pattern2, title) # title_list3 = re.findall(pattern3, title) # title_list4 = re.findall(pattern4, title) # if len(title_list1) > 0: # for title_element in title_list1: # eligible_element = title_element[0] # if (not str(eligible_element).startswith('0')) and (int(eligible_element) < 100000) and ( # int(eligible_element) >= 0): # eligible_list.append(int(eligible_element)) # if len(title_list2) > 0: # for title_element in title_list2: # eligible_element = title_element[1] # if (not str(eligible_element).startswith('0')) and (int(eligible_element) < 100000) and ( # int(eligible_element) >= 0): # eligible_list.append(int(eligible_element)) # if len(title_list3) > 0: # for title_element in title_list3: # eligible_element = title_element[0] # if (not str(eligible_element).startswith('0')) and (int(eligible_element) < 100000) and ( # int(eligible_element) >= 0): # eligible_list.append(int(eligible_element)) # if len(title_list4) > 0: # for title_element in title_list4: # if (not str(title_element).startswith('0')) and (int(title_element) < 100000) and ( # int(title_element) >= 0): # eligible_list.append(int(title_element)) # if len(eligible_list) > 0: # max_eligible_element = max(eligible_list) # min_eligible_element = min(eligible_list) # if max_eligible_element - min_eligible_element >= 1000: # return min_eligible_element # else: # return max_eligible_element @staticmethod def udf_get_sp_type(sp_num): if sp_num is not None: sp_type_num_list = str(sp_num).split(",") if len(sp_type_num_list) == 3: return {"sp_type1": int(sp_type_num_list[0]), "sp_type2": int(sp_type_num_list[1]), "sp_type3": int(sp_type_num_list[2])} elif len(sp_type_num_list) == 1: return {"sp_type1": int(sp_type_num_list[0]), "sp_type2": 0, "sp_type3": 0} @staticmethod def udf_get_brand_type(asin_brand_name): if asin_brand_name is not None: asin_brand_name = str(asin_brand_name).lower() if asin_brand_name not in ['null', 'none']: return 1 def read_data(self): # 1.获取dwd_asin_measure,得到各种类型的统计、asin的ao_val、asin的预估销量跟bsr销量 if self.site_name == 'us': sql = f"select " \ f"asin, " \ f"asin_zr_counts, " \ f"asin_sp_counts, " \ f"asin_sb_counts, " \ f"asin_sb1_counts, " \ f"asin_ac_counts, " \ f"asin_bs_counts, " \ f"asin_er_counts, " \ f"asin_tr_counts, " \ f"asin_ao_val, " \ f"asin_zr_orders_sum, " \ f"asin_bsr_orders " \ f"from dwd_asin_measure " \ f"where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'" print("sql:" + sql) self.df_asin_measure = self.spark.sql(sqlQuery=sql).cache() self.df_asin_measure = self.df_asin_measure.withColumnRenamed("asin_sb1_counts", "asin_vi_counts") self.df_asin_measure = self.df_asin_measure.withColumnRenamed("asin_zr_orders_sum", "orders") self.df_asin_measure = self.df_asin_measure.withColumnRenamed("asin_bsr_orders", "bsr_orders") else: sql = f"select " \ f"asin, " \ f"asin_zr_counts, " \ f"asin_sp_counts, " \ f"asin_sb_counts, " \ f"asin_sb1_counts, " \ f"asin_ac_counts, " \ f"asin_bs_counts, " \ f"asin_er_counts, " \ f"asin_tr_counts, " \ f"asin_ao_val " \ f"from dwd_asin_counts " \ f"where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'" print("sql:" + sql) self.df_asin_counts = self.spark.sql(sqlQuery=sql).cache() self.df_asin_counts = self.df_asin_counts.withColumnRenamed("asin_sb1_counts", "asin_vi_counts") sql = f"select " \ f"asin, " \ f"asin_st_zr_orders_sum as orders " \ f"from dwd_st_asin_info " \ f"where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'" print("sql:" + sql) self.df_st_asin_info = self.spark.sql(sqlQuery=sql).cache() self.df_st_asin_info = self.df_st_asin_info.drop_duplicates(['asin']) if int(self.year) == 2022 and int(self.month) < 3: sql = f"select " \ f"cate_1_id, " \ f"rank as asin_rank, " \ f"ceil(orders*{self.orders_transform_rate}) as bsr_orders " \ f"from ods_one_category_report " \ f"where site_name='{self.site_name}' and date_type='month' and date_info='2022-12';" else: sql = f"select " \ f"cate_1_id, " \ f"rank as asin_rank, " \ f"ceil(orders*{self.orders_transform_rate}) as bsr_orders " \ f"from ods_one_category_report " \ f"where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}';" print("orders_transform_rate:" + str(self.orders_transform_rate)) print("sql:" + sql) self.df_bs_report = self.spark.sql(sqlQuery=sql).cache() # 2.获取dim_asin_detail,得到asin详情 sql = f"select " \ f"asin, " \ f"bsr_cate_1_id, " \ f"bsr_cate_current_id, " \ f"asin_img_url, " \ f"asin_title, " \ f"asin_title_len, " \ f"asin_price, " \ f"asin_rating, " \ f"asin_total_comments, " \ f"asin_buy_box_seller_type, " \ f"asin_page_inventory, " \ f"asin_category_desc, " \ f"asin_volume, " \ f"asin_weight, " \ f"asin_color, " \ f"asin_size, " \ f"asin_style, " \ f"asin_is_sale, " \ f"asin_rank, " \ f"if(((lower(asin_launch_time) == 'none') or (lower(asin_launch_time) == 'null')), '1970-01-01 00:00:00', asin_launch_time) as asin_launch_time, " \ f"asin_is_new, " \ f"asin_img_num, " \ f"asin_img_type, " \ f"asin_material, " \ f"asin_brand_name, " \ f"asin_activity_type, " \ f"act_one_two_val, " \ f"act_three_four_val, " \ f"act_five_six_val, " \ f"act_eight_val, " \ f"qa_num, " \ f"one_star," \ f"two_star, " \ f"three_star, " \ f"four_star, " \ f"five_star, " \ f"low_star, " \ f"together_asin, " \ f"ac_name, " \ f"parent_asin," \ f"sp_num," \ f"asin_label_type as is_movie_label " \ f"from dim_asin_detail " \ f"where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' " \ f"and asin not in " \ f"(select asin from ods_self_asin " \ f"where site_name = '{self.site_name}' )" print("sql:" + sql) self.df_asin_detail = self.spark.sql(sqlQuery=sql).cache() self.df_asin_detail = self.df_asin_detail.withColumnRenamed("bsr_cate_1_id", "cate_1_id") self.df_asin_detail = self.df_asin_detail.withColumnRenamed("bsr_cate_current_id", "cate_current_id") self.df_asin_detail = self.df_asin_detail.withColumnRenamed("asin_label_type", "label_type") # self.df_asin_bs_category.show(10, truncate=False) # 3.获取ods_bsr_end,获取有效rank信息 sql = f"select rank,bsr_name " \ f"from ods_bsr_end " \ f"where site_name = '{self.site_name}'" print("sql:" + sql) self.df_bsr_end = self.spark.sql(sqlQuery=sql).cache() self.df_bsr_end = self.df_bsr_end.withColumnRenamed("bsr_name", "name") self.df_bsr_end = self.df_bsr_end.withColumnRenamed("rank", "limit_rank") # self.df_bsr_end.show(10, truncate=False) # 4.获取dim_asin_bs_category,获取分类名称 sql = f"select asin_bs_cate_1_id as cate_1_id, asin_bs_cate_1_en_name as name, " \ f" date_info as dt from dim_asin_bs_category " \ f" where site_name='{self.site_name}';" print("sql:" + sql) self.df_one_category_report = self.spark.sql(sqlQuery=sql) # 5.获取dim_fd_asin_info,得到卖家相关信息 sql = f"select " \ f"fd_account_id, " \ f"fd_account_name, " \ f"fd_country_name, " \ f"asin " \ f" from dim_fd_asin_info " \ f" where site_name = '{self.site_name}'" print("sql:" + sql) self.df_fd_asin_info = self.spark.sql(sqlQuery=sql).cache() self.df_fd_asin_info = self.df_fd_asin_info.drop_duplicates(['asin']) self.df_fd_asin_info = self.df_fd_asin_info.withColumnRenamed("fd_account_id", "account_id") self.df_fd_asin_info = self.df_fd_asin_info.withColumnRenamed("fd_account_name", "account_name") self.df_fd_asin_info = self.df_fd_asin_info.withColumnRenamed("fd_country_name", "seller_country_name") self.df_fd_asin_info = self.df_fd_asin_info.na.fill({"seller_country_name": 'none'}) # 6.获取ods_asin_variat,得到变体信息 sql = f"select " \ f"asin, " \ f"parent_asin, " \ f"size, " \ f"updated_time " \ f" from ods_asin_variat " \ f" where site_name = '{self.site_name}'" print("sql:" + sql) self.df_asin_variat = self.spark.sql(sqlQuery=sql) self.df_asin_variat = self.df_asin_variat.orderBy(self.df_asin_variat.updated_time.desc_nulls_last()) self.df_asin_variat = self.df_asin_variat.drop_duplicates(["asin"]) self.df_asin_variat = self.df_asin_variat.drop("updated_time") self.df_asin_variat = self.df_asin_variat.na.fill({"size": "none"}) # 7.获取上一个最近30天的整合结果 sql = f"select " \ f"asin, " \ f"asin_rank as previous_asin_rank, " \ f"asin_ao_val as previous_asin_ao_val, " \ f"asin_price as previous_asin_price, " \ f"orders as previous_orders, " \ f"bsr_orders as previous_bsr_orders, " \ f"asin_rating as previous_asin_rating, " \ f"asin_total_comments as previous_asin_total_comments, " \ f"sales as previous_sales, " \ f"variation_num as previous_variation_num " \ f"from dwt_flow_asin " \ f"where site_name='{self.site_name}' and date_type = 'month' and date_info = '{self.previous_date}'" print("sql:" + sql) self.df_flow_asin_last = self.spark.sql(sqlQuery=sql) # 8.获取dwd_asin_volume,得到体系信息 sql = f"select " \ f"asin, " \ f"asin_length, " \ f"asin_width, " \ f"asin_high," \ f"asin_element_unit " \ f"from dim_asin_volume where site_name = '{self.site_name}'" print("sql:" + sql) self.df_asin_volume = self.spark.sql(sqlQuery=sql) self.df_asin_volume = self.df_asin_volume.na.fill({"asin_length": 0.0, "asin_width": 0.0, "asin_high": 0.0}) def get_asin_detail(self): if self.site_name == 'us': self.df_asin_detail = self.df_asin_detail.join( self.df_asin_measure, on=['asin'], how='left' ) else: self.df_asin_detail = self.df_asin_detail.join( self.df_asin_counts, on=['asin'], how='left' ).join( self.df_st_asin_info, on=['asin'], how='left' ) self.df_asin_detail = self.df_asin_detail.join( self.df_bs_report, on=['asin_rank', 'cate_1_id'], how='left' ) self.df_asin_detail = self.df_asin_detail.join( self.df_fd_asin_info, on=['asin'], how='left' ).join( self.df_asin_volume, on=['asin'], how='left' ) self.df_asin_detail = self.df_asin_detail.withColumn("sales", F.col("bsr_orders") * F.col("asin_price")) self.df_asin_detail = self.df_asin_detail.withColumn("is_brand_label", self.u_judge_brand_type( self.df_asin_detail.asin_brand_name)) def get_package_quantity(self): self.df_asin_detail = self.df_asin_detail.withColumn("package_quantity", self.u_judge_package_quantity( self.df_asin_detail.asin_title)) quit() def get_volume_detail(self): if self.site_name == 'us': self.df_asin_detail = self.df_asin_detail.withColumn("asin_size_type", self.u_judge_us_size_type( self.df_asin_detail.asin_weight, self.df_asin_detail.asin_length, self.df_asin_detail.asin_width, self.df_asin_detail.asin_high, self.df_asin_detail.asin_element_unit)) else: self.df_asin_detail = self.df_asin_detail.withColumn("asin_size_type", self.u_judge_other_size_type( self.df_asin_detail.asin_weight, self.df_asin_detail.asin_length, self.df_asin_detail.asin_width, self.df_asin_detail.asin_high, self.df_asin_detail.asin_element_unit)) # else: # self.df_asin_detail = self.df_asin_detail.withColumn("asin_size_type", # self.u_judge_size_type(self.df_asin_detail.asin_weight, # self.df_asin_detail.asin_length, # self.df_asin_detail.asin_width, # self.df_asin_detail.asin_high)) self.df_asin_detail = self.df_asin_detail.drop("asin_length") self.df_asin_detail = self.df_asin_detail.drop("asin_width") self.df_asin_detail = self.df_asin_detail.drop("asin_high") self.df_asin_detail = self.df_asin_detail.drop("asin_element_unit") def get_rating_type(self): self.df_asin_detail = self.df_asin_detail.withColumn("asin_rating_type", self.u_judge_rating_type( self.df_asin_detail.asin_rating)) def get_site_name_type(self): self.df_asin_detail = self.df_asin_detail.withColumn("asin_site_name_type", self.u_judge_site_name_type( self.df_asin_detail.asin_buy_box_seller_type, self.df_asin_detail.seller_country_name)) def get_weight_type(self): self.df_asin_detail = self.df_asin_detail.withColumn("asin_weight_type", self.u_judge_weight_type(self.df_asin_detail.asin_weight)) def get_launch_time_type(self): time = datetime.datetime.strptime(self.current_date, '%Y-%m-%d') one_month_time = time + datetime.timedelta(days=-30) three_months_time = time + datetime.timedelta(days=-90) six_months_time = time + datetime.timedelta(days=-180) twelve_months_time = time + datetime.timedelta(days=-360) twenty_four_months_time = time + datetime.timedelta(days=-720) thirty_six_months_time = time + datetime.timedelta(days=-1080) self.df_asin_detail = self.df_asin_detail.withColumn('asin_launch_time_type', F.when( F.col("asin_launch_time") >= one_month_time, F.lit(1)). \ when( (F.col("asin_launch_time") >= three_months_time) & (F.col("asin_launch_time") < one_month_time), F.lit(2)). \ when( (F.col("asin_launch_time") >= six_months_time) & (F.col("asin_launch_time") < three_months_time), F.lit(3)). \ when( (F.col("asin_launch_time") >= twelve_months_time) & (F.col("asin_launch_time") < six_months_time), F.lit(4)). \ when( (F.col("asin_launch_time") >= twenty_four_months_time) & (F.col("asin_launch_time") < twelve_months_time), F.lit(5)). \ when( (F.col("asin_launch_time") >= thirty_six_months_time) & ( F.col("asin_launch_time") < twenty_four_months_time), F.lit(6)). \ when( (F.col("asin_launch_time") < thirty_six_months_time), F.lit(7)).otherwise(F.lit(0))) def get_ao_val_type(self): self.df_asin_detail = self.df_asin_detail.withColumn("asin_ao_val_type", F.when(F.col("asin_ao_val").isNotNull(), F.when( (F.col("asin_ao_val") > 0) & ( F.col("asin_ao_val") < 0.1), F.lit(1)).when( (F.col("asin_ao_val") >= 0.1) & ( F.col("asin_ao_val") < 0.2), F.lit(2)).when( (F.col("asin_ao_val") >= 0.2) & ( F.col("asin_ao_val") < 0.4), F.lit(3)).when( (F.col("asin_ao_val") >= 0.4) & ( F.col("asin_ao_val") < 0.8), F.lit(4)).when( (F.col("asin_ao_val") >= 0.8) & ( F.col("asin_ao_val") < 1.2), F.lit(5)).when( (F.col("asin_ao_val") >= 1.2) & ( F.col("asin_ao_val") < 2), F.lit(6)).when( (F.col("asin_ao_val") >= 2), F.lit(7)).otherwise( F.lit(0))).otherwise( F.lit(0))) def get_rank_type(self): self.df_asin_detail = self.df_asin_detail.withColumn("asin_rank_type", F.when(F.col("asin_rank").isNotNull(), F.when( (F.col("asin_rank") > 0) & (F.col("asin_rank") < 1000), F.lit(1)).when( (F.col("asin_rank") >= 1000) & ( F.col("asin_rank") < 5000), F.lit(2)).when( (F.col("asin_rank") >= 5000) & ( F.col("asin_rank") < 10000), F.lit(3)).when( (F.col("asin_rank") >= 10000) & ( F.col("asin_rank") < 20000), F.lit(4)).when( (F.col("asin_rank") >= 20000) & ( F.col("asin_rank") < 30000), F.lit(5)).when( (F.col("asin_rank") >= 30000) & ( F.col("asin_rank") < 50000), F.lit(6)).when( (F.col("asin_rank") >= 50000) & ( F.col("asin_rank") < 70000), F.lit(7)).when( (F.col("asin_rank") >= 70000), F.lit(8)).otherwise( F.lit(0))).otherwise( F.lit(0))) def get_price_type(self): self.df_asin_detail = self.df_asin_detail.withColumn("asin_price_type", F.when(F.col("asin_price").isNotNull(), F.when( (F.col("asin_price") > 0) & (F.col("asin_price") < 10), F.lit(1)).when( (F.col("asin_price") >= 10) & ( F.col("asin_price") < 15), F.lit(2)).when( (F.col("asin_price") >= 15) & ( F.col("asin_price") < 20), F.lit(3)).when( (F.col("asin_price") >= 20) & ( F.col("asin_price") < 30), F.lit(4)).when( (F.col("asin_price") >= 30) & ( F.col("asin_price") < 50), F.lit(5)).when( (F.col("asin_price") >= 50), F.lit(6)).otherwise( F.lit(0))).otherwise( F.lit(0))) def get_asin_index_change(self): self.df_asin_detail = self.df_asin_detail.join( self.df_flow_asin_last, on=['asin'], how='left' ) # 1.排名变化 self.df_asin_detail = self.df_asin_detail.withColumn("asin_rank_rise", F.col("asin_rank") - F.col("previous_asin_rank")) self.df_asin_detail = self.df_asin_detail.withColumn("asin_rank_change", F.when((F.col("previous_asin_rank") != 0) & ( F.col("previous_asin_rank").isNotNull()), (F.col("asin_rank") - F.col( "previous_asin_rank")) / F.col( "previous_asin_rank")).otherwise(F.lit(None))) # 2.AO值变化 self.df_asin_detail = self.df_asin_detail.withColumn("asin_ao_rise", F.col("asin_ao_val") - F.col("previous_asin_ao_val")) self.df_asin_detail = self.df_asin_detail.withColumn("asin_ao_change", F.when((F.col("previous_asin_ao_val") != 0.0) & ( F.col("previous_asin_ao_val").isNotNull()), (F.col("asin_ao_val") - F.col( "previous_asin_ao_val")) / F.col( "previous_asin_ao_val")).otherwise(F.lit(None))) # 3.价格变化 self.df_asin_detail = self.df_asin_detail.withColumn("asin_price_rise", F.col("asin_price") - F.col("previous_asin_price")) self.df_asin_detail = self.df_asin_detail.withColumn("asin_price_change", F.when((F.col("previous_asin_price") != 0.0) & ( F.col("previous_asin_price").isNotNull()), (F.col("asin_price") - F.col( "previous_asin_price")) / F.col( "previous_asin_price")).otherwise(F.lit(None))) # 4.预估销量变化 self.df_asin_detail = self.df_asin_detail.withColumn("asin_orders_rise", F.col("orders") - F.col("previous_orders")) self.df_asin_detail = self.df_asin_detail.withColumn("asin_orders_change", F.when((F.col("previous_orders") != 0) & ( F.col("previous_orders").isNotNull()), (F.col("orders") - F.col( "previous_orders")) / F.col( "previous_orders")).otherwise(F.lit(None))) # 5.评分变化 self.df_asin_detail = self.df_asin_detail.withColumn("asin_rating_rise", F.col("asin_rating") - F.col("previous_asin_rating")) self.df_asin_detail = self.df_asin_detail.withColumn("asin_rating_change", F.when((F.col("previous_asin_rating") != 0.0) & ( F.col("previous_asin_rating").isNotNull()), (F.col("asin_rating") - F.col( "previous_asin_rating")) / F.col( "previous_asin_rating")).otherwise(F.lit(None))) # 6.评论数变化 self.df_asin_detail = self.df_asin_detail.withColumn("asin_comments_rise", F.col("asin_total_comments") - F.col( "previous_asin_total_comments")) self.df_asin_detail = self.df_asin_detail.withColumn("asin_comments_change", F.when((F.col("previous_asin_total_comments") != 0) & ( F.col("previous_asin_total_comments").isNotNull()), (F.col("asin_total_comments") - F.col( "previous_asin_total_comments")) / F.col( "previous_asin_total_comments")).otherwise( F.lit(None))) # 7.bsr销量变化 self.df_asin_detail = self.df_asin_detail.withColumn("asin_bsr_orders_rise", F.col("bsr_orders") - F.col("previous_bsr_orders")) self.df_asin_detail = self.df_asin_detail.withColumn("asin_bsr_orders_change", F.when((F.col("previous_bsr_orders") != 0) & ( F.col("previous_bsr_orders").isNotNull()), (F.col("bsr_orders") - F.col( "previous_bsr_orders")) / F.col( "previous_bsr_orders")).otherwise(F.lit(None))) # 7.销售额变化 self.df_asin_detail = self.df_asin_detail.withColumn("asin_sales_rise", F.col("sales") - F.col("previous_sales")) self.df_asin_detail = self.df_asin_detail.withColumn("asin_sales_change", F.when((F.col("previous_sales") != 0) & ( F.col("previous_sales").isNotNull()), (F.col("sales") - F.col( "previous_sales")) / F.col( "previous_sales")).otherwise(F.lit(None))) # 8.变体数量变化 self.df_asin_detail = self.df_asin_detail.withColumn("asin_variation_rise", F.col("variation_num") - F.col("previous_variation_num")) self.df_asin_detail = self.df_asin_detail.withColumn("asin_variation_change", F.when((F.col("previous_variation_num") != 0) & ( F.col("previous_variation_num").isNotNull()), (F.col("variation_num") - F.col( "previous_variation_num")) / F.col( "previous_variation_num")).otherwise( F.lit(None))) self.df_asin_detail = self.df_asin_detail.drop("previous_asin_rank") self.df_asin_detail = self.df_asin_detail.drop("previous_asin_ao_val") self.df_asin_detail = self.df_asin_detail.drop("previous_asin_price") self.df_asin_detail = self.df_asin_detail.drop("previous_orders") self.df_asin_detail = self.df_asin_detail.drop("previous_bsr_orders") self.df_asin_detail = self.df_asin_detail.drop("previous_asin_rating") self.df_asin_detail = self.df_asin_detail.drop("previous_asin_total_comments") self.df_asin_detail = self.df_asin_detail.drop("previous_sales") self.df_asin_detail = self.df_asin_detail.drop("previous_variation_num") def get_asin_bsr_type(self): self.df_one_category_report = self.df_one_category_report.withColumn( "dt_sort", self.u_year_week(self.df_one_category_report.dt) ) # bs窗口内排序 window = Window.partitionBy(['cate_1_id']).orderBy( self.df_one_category_report.dt_sort.desc() ) self.df_one_category_report = self.df_one_category_report.withColumn("asin_dt_top", F.row_number().over(window=window)) # 取按asin分组的组内第一条,就是去重后的最新asin_bs_category self.df_one_category_report = self.df_one_category_report.filter("asin_dt_top=1") self.df_one_category_report = self.df_one_category_report.drop("asin_dt_top", "dt", "dt_sort") self.df_asin_detail = self.df_asin_detail.join( self.df_one_category_report, on=['cate_1_id'], how='left' ) self.df_asin_detail = self.df_asin_detail.join( self.df_bsr_end, on=['name'], how='left' ) self.df_asin_detail = self.df_asin_detail.na.fill({"limit_rank": 500000}) self.df_asin_detail = self.df_asin_detail.withColumn("bsr_type", self.u_judget_bsr_type(self.df_asin_detail.asin_rank, self.df_asin_detail.limit_rank)) self.df_asin_detail = self.df_asin_detail.drop("name") self.df_asin_detail = self.df_asin_detail.drop("limit_rank") def get_asin_zr_best_orders_type(self): df_parent_asin_group = self.df_asin_detail.filter("parent_asin is not null") orders_window = Window.partitionBy(["parent_asin"]).orderBy( df_parent_asin_group.orders.desc_nulls_last() ) df_parent_asin_group = df_parent_asin_group.withColumn("orders_rank", F.row_number().over(window=orders_window)) df_parent_asin_group = df_parent_asin_group.select("asin", "orders_rank") self.df_asin_detail = self.df_asin_detail.join( df_parent_asin_group, on='asin', how='left' ) self.df_asin_detail = self.df_asin_detail.withColumn("zr_best_orders_type", F.when(F.col("orders_rank") == 1, F.lit(1)).otherwise( F.lit(0))) self.df_asin_detail = self.df_asin_detail.drop("orders_rank") def get_asin_bsr_best_orders_type(self): df_parent_asin_group = self.df_asin_detail.filter("parent_asin is not null") df_no_parent_asin_group = self.df_asin_detail.filter("parent_asin is null") df_no_parent_asin_group = df_no_parent_asin_group.withColumn("bsr_best_orders_type", F.lit(1)) bsr_orders_window = Window.partitionBy(["parent_asin"]).orderBy( df_parent_asin_group.bsr_orders.desc_nulls_last() ) df_parent_asin_group = df_parent_asin_group.withColumn("bsr_orders_rank", F.row_number().over(window=bsr_orders_window)) df_parent_asin_group = df_parent_asin_group.withColumn("bsr_best_orders_type", F.when(F.col("bsr_orders_rank") == 1, F.lit(1)).otherwise(F.lit(0))) df_parent_asin_group = df_parent_asin_group.drop("bsr_orders_rank") self.df_asin_detail = df_parent_asin_group.unionByName(df_no_parent_asin_group) def get_asin_variation_number(self): df_variat_group = self.df_asin_variat.groupby(["parent_asin"]).agg( F.count("asin").alias("variation_num") ) self.df_asin_detail = self.df_asin_detail.join( df_variat_group, on=['parent_asin'], how='left' ) def get_asin_quantity_variation_type(self): df_asin_quantity_variation = self.df_asin_variat.withColumn("asin_quantity_variation_type", self.u_judge_quantity_variartion_type( self.df_asin_variat.size)) df_asin_quantity_variation = df_asin_quantity_variation.drop("parent_asin") df_asin_quantity_variation = df_asin_quantity_variation.drop("size") self.df_asin_detail = self.df_asin_detail.join( df_asin_quantity_variation, on=['asin'], how='left' ) def get_asin_sp_type_num(self): sp_type_num_map = self.u_judge_sp_type(self.df_asin_detail.sp_num) self.df_asin_detail = self.df_asin_detail.withColumn("sp_type1", sp_type_num_map["sp_type1"]) self.df_asin_detail = self.df_asin_detail.withColumn("sp_type2", sp_type_num_map["sp_type2"]) self.df_asin_detail = self.df_asin_detail.withColumn("sp_type3", sp_type_num_map["sp_type3"]) self.df_asin_detail = self.df_asin_detail.drop("sp_num") def get_alarm_brand_type(self): if self.site_name == 'us': pg_sql = f"select brand_name as asin_brand_name, 1 as is_alarm_brand from brand_alert_erp where brand_name is not null group by brand_name" db_type = "postgresql" con_info = DBUtil.get_connection_info(db_type=db_type, site_name=self.site_name) if con_info is not None: df_alarm_brand = SparkUtil.read_jdbc_query(session=self.spark, url=con_info['url'], pwd=con_info['pwd'], username=con_info['username'], query=pg_sql) self.df_asin_detail = self.df_asin_detail.join( df_alarm_brand, on=['asin_brand_name'], how='left' ) def handle_data_group(self): self.df_save = self.df_asin_detail self.df_save = self.df_save.withColumn("created_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \ withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')) self.df_save = self.df_save.withColumn("re_string_field1", F.lit("null")) self.df_save = self.df_save.withColumn("re_string_field2", F.lit("null")) self.df_save = self.df_save.withColumn("re_string_field3", F.lit("null")) self.df_save = self.df_save.withColumn("re_int_field1", F.lit(0)) self.df_save = self.df_save.withColumn("re_int_field2", F.lit(0)) self.df_save = self.df_save.withColumn("re_int_field3", F.lit(0)) self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type)) self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info)) self.df_save = self.df_save.na.fill( {"asin_zr_counts": 0, "asin_sp_counts": 0, "asin_sb_counts": 0, "asin_vi_counts": 0, "asin_bs_counts": 0, "asin_ac_counts": 0, "asin_tr_counts": 0, "asin_er_counts": 0, "asin_title_len": 0, "asin_total_comments": 0, "variation_num": 0, "asin_img_num": 0, "act_one_two_val": 0.0, "act_three_four_val": 0.0, "act_five_six_val": 0.0, "act_eight_val": 0.0, "qa_num": 0, "one_star": 0, "two_star": 0, "three_star": 0, "four_star": 0, "five_star": 0, "low_star": 0, "asin_size_type": 0, "asin_rating_type": 0, "asin_site_name_type": 0, "asin_weight_type": 0, "asin_launch_time_type": 0, "asin_ao_val_type": 0, "asin_rank_type": 0, "asin_price_type": 0, "asin_quantity_variation_type": 0, "package_quantity": 1, "is_movie_label": 0, "is_brand_label": 1, "is_alarm_brand": 0}) def handle_data(self): self.get_asin_detail() self.get_asin_sp_type_num() self.get_package_quantity() self.get_volume_detail() self.get_site_name_type() self.get_rating_type() self.get_weight_type() self.get_launch_time_type() self.get_ao_val_type() self.get_rank_type() self.get_price_type() self.get_asin_bsr_type() self.get_asin_zr_best_orders_type() self.get_asin_variation_number() self.get_asin_quantity_variation_type() self.get_asin_bsr_best_orders_type() self.get_alarm_brand_type() self.get_asin_index_change() self.handle_data_group() if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwtFlowAsin(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()