""" ABA搜索词统计报表 """ """ author: 汪瑞 description: 基于dwd层等表,计算出search_term维度的报表信息 table_read_name: dim_cal_asin_history_detail系列表,dwd_st_measure系列表,dws_top100_asin_info系列表,ods_st_key系列表, dwd_asin_measure系列表, dwd_st_asin_measure系列表 table_save_name: dwt_aba_st_analytics_report table_save_level: dwt version: 1.0 created_date: 2022-11-17 updated_date: 2022-11-17 """ import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F from pyspark.sql.types import StringType, IntegerType, MapType from datetime import datetime, timedelta from yswg_utils.common_udf import udf_get_package_quantity import math class DwtAbaStAnalyticsReport(Templates): def __init__(self, site_name="us", date_type="day", date_info="2022-11-02"): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f"dwt_aba_st_analytics_report" self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}, {self.date_info}") self.df_save = self.spark.sql(f"select 1+1;") self.df_asin_history_detail = self.spark.sql(f"select 1+1;") self.df_st_asin_measure = self.spark.sql(f"select 1+1;") self.df_st_asin_detail = self.spark.sql(f"select 1+1;") self.df_st_key = self.spark.sql(f"select 1+1;") self.df_top100_asin = self.spark.sql(f"select 1+1;") self.df_st_measure = self.spark.sql(f"select 1+1;") self.df_asin_measure = self.spark.sql(f"select 1+1;") self.df_asin_count = self.spark.sql(f"select 1+1;") self.df_st_buy_box = self.spark.sql(f"select 1+1;") self.df_st_color = self.spark.sql(f"select 1+1;") self.df_st_seller = self.spark.sql(f"select 1+1;") self.df_st_top20 = self.spark.sql(f"select 1+1;") self.df_st_attribute = self.spark.sql(f"select 1+1;") self.df_st_img_type_group = self.spark.sql(f"select 1+1;") self.df_st_launch_time_group = self.spark.sql(f"select 1+1;") self.df_st_price_group = self.spark.sql(f"select 1+1;") self.df_st_ao_group = self.spark.sql(f"select 1+1;") self.df_st_comments_group = self.spark.sql("select 1+1;") self.df_st_rating_group = self.spark.sql("select 1+1;") self.df_st_detail = self.spark.sql("select 1+1;") self.df_seller_asin = self.spark.sql("select 1+1;") self.df_st_size = self.spark.sql("select 1+1;") self.df_st_package_num = self.spark.sql("select 1+1;") self.partitions_by = ['site_name', 'date_type', 'date_info'] self.reset_partitions(65) self.u_get_buy_box_num = self.spark.udf.register("u_get_buy_box_num", self.udf_get_buy_box_num, StringType()) self.u_get_buy_box = self.spark.udf.register("u_get_buy_box", self.udf_get_buy_box_type, StringType()) self.u_get_img_type = self.spark.udf.register("u_get_img_type", self.udf_get_img_type, StringType()) self.u_get_seller_num = self.spark.udf.register('u_get_seller_num', self.udf_get_seller_num, StringType()) self.u_get_seller_bsr_orders = self.spark.udf.register('u_get_seller_bsr_orders', self.udf_get_seller_bsr_orders, StringType()) self.u_judge_color = self.spark.udf.register('u_judge_color', self.udf_judge_color, IntegerType()) self.u_judge_size = self.spark.udf.register('u_judge_size', self.udf_judge_multi_size, StringType()) self.u_get_package_num = F.udf(udf_get_package_quantity, IntegerType()) self.u_get_package_num_trend = self.spark.udf.register('u_get_package_num_trend', self.udf_reverse_package_num_array, StringType()) self.u_get_package_num_trend_market_share = self.spark.udf.register('u_get_package_num_trend_market_share', self.udf_package_num_counts, StringType()) self.u_get_package_num_corresponding_asin = self.spark.udf.register('u_get_package_num_corresponding_asin', self.udf_package_num_corresponding_asin_list, StringType()) self.u_get_price_interval = self.spark.udf.register('u_get_price_interval', self.udf_price_interval_info, MapType(StringType(), StringType(), False)) @staticmethod def udf_get_img_type(img_type): all_img_type = str(img_type).split(",") if ('2' in all_img_type) & ('3' in all_img_type): return 'aadd_video_num' elif ('2' not in all_img_type) & ('3' in all_img_type): return 'aadd_no_video_num' elif ('2' not in all_img_type) & ('3' not in all_img_type): return 'no_aadd_no_video_num' elif ('2' in all_img_type) & ('3' not in all_img_type): return 'no_aadd_video_num' @staticmethod def udf_get_buy_box_type(buy_box): if "1" == str(buy_box): return 'Amazon' elif "2" == str(buy_box): return 'FBA' elif "3" == str(buy_box): return 'FBM' else: return 'other' @staticmethod def udf_get_seller_num(seller_type, all_seller): all_seller_type = str(seller_type).split(",") seller_num = '' for i in range(len(all_seller_type)): splits = all_seller.count(all_seller_type[i]) if (i < len(all_seller_type) - 1): seller_num = seller_num + str(splits) + ',' else: seller_num = seller_num + str(splits) return seller_num @staticmethod def udf_get_seller_bsr_orders(seller_type, all_seller, all_seller_bsr_orders): all_seller_type = str(seller_type).split(",") all_seller_list = str(all_seller).split(",") all_seller_bsr_orders = str(all_seller_bsr_orders).split(",") seller_bsr_orders = "" for seller in all_seller_type: one_seller_bsr_orders = 0 for (i, j) in zip(all_seller_list, all_seller_bsr_orders): if seller == i: one_seller_bsr_orders = int(one_seller_bsr_orders) + int(j) if (seller != all_seller_type[-1]): seller_bsr_orders = seller_bsr_orders + str(one_seller_bsr_orders) + ',' else: seller_bsr_orders = seller_bsr_orders + str(one_seller_bsr_orders) return seller_bsr_orders @staticmethod def udf_get_buy_box_num(buy_box_type, buy_box_list): all_buy_box_type = str(buy_box_type).split(",") buy_box_num = '' for i in range(len(all_buy_box_type)): splits = buy_box_list.count(all_buy_box_type[i]) if (i < len(all_buy_box_type) - 1): buy_box_num = buy_box_num + str(splits) + ',' else: buy_box_num = buy_box_num + str(splits) return buy_box_num @staticmethod def udf_judge_color(color): color_len = len(str(color)) if str(color).lower() not in ['none', 'null'] and color_len > 1: return 1 else: return 0 @staticmethod def udf_judge_multi_size(size, style): size = str(size).lower() style = str(style).lower() # 变体表中即有size又有style时,取size进行计数。如果无size,则判断是否有style进行计数 if size not in ['none', 'null']: return size elif style not in ['none', 'null']: return style @staticmethod def udf_reverse_package_num_array(package_num_list): if str(package_num_list) != '': package_num_trend = str(package_num_list).strip("[").strip("]").replace("\'", '').replace(' ', '') return package_num_trend @staticmethod def udf_package_num_counts(package_num_list): if str(package_num_list) != '': package_num_list = str(package_num_list).strip("[").strip("]").replace(' ', '') package_num_trend_market_share = '' package_num_arr = str(package_num_list).split(",") package_num_int_list = list(map(int, package_num_arr)) total_package_num = sum(package_num_int_list) for i in range(len(package_num_int_list)): if (i < len(package_num_int_list) - 1): package_num_trend_market_share = package_num_trend_market_share + str( round(package_num_int_list[i] / total_package_num, 3)) + ',' else: package_num_trend_market_share = package_num_trend_market_share + str( round(package_num_int_list[i] / total_package_num, 3)) return package_num_trend_market_share @staticmethod def udf_package_num_corresponding_asin_list(asin_list): if str(asin_list) != '': package_num_corresponding_asin = str(asin_list).strip("[").strip("]").replace("\'", '').replace(' ', '') return package_num_corresponding_asin @staticmethod def udf_price_interval_info(st_price_avg, asin_price): def get_price_interval(interval_num, interval_span, interval_boundary=0): price_interval_list = [] for i in range(interval_num): lower_bound = interval_boundary + interval_span * i upper_bound = interval_boundary + interval_span * (i + 1) price_interval_list.append(f"{lower_bound}-{upper_bound}") price_interval = ','.join(price_interval_list) return price_interval if st_price_avg > 0 and asin_price > 0: price_range = [ (0, 25, 5, 10), (25, 30, 10, 6), (30, 40, 10, 8), (40, 50, 10, 10), (50, 60, 10, 12), (60, 70, 10, 14), (70, 80, 10, 16), (80, 90, 10, 18), (90, 100, 10, 20), (100, 105, 15, 14), (105, 120, 15, 16), (120, 135, 15, 18), (135, 150, 15, 20), (150, 160, 20, 16), (160, 180, 20, 18), (180, 200, 20, 20), (200, 0, 20, 20) ] for price_lower, price_upper, interval_span, interval_num_item in price_range: if price_lower <= st_price_avg < price_upper: interval_num = interval_num_item elif price_lower > price_upper: interval_num = interval_num_item else: continue interval_boundary = max(int(math.ceil(float(st_price_avg) / 10.0) * 10) - 200, 0) price_interval = get_price_interval(interval_num, interval_span, interval_boundary) price_type = min(max(int((asin_price - interval_boundary) // interval_span) + 1, 1), interval_num) return { 'price_interval': price_interval, 'price_type': str(price_type), 'interval_num': str(interval_num) } def hadle_cols(self, col_list1, col_list2, col_list3, df): for col1, col2, col3 in zip(col_list1, col_list2, col_list3): df = df.withColumn( col1, F.round(df[col2] / df[col3], 4) ) return df def check_cols(self, actual_col_list, schema_col_list, df): for schema_col in schema_col_list: if schema_col not in actual_col_list: df = df.withColumn( schema_col, F.lit(None).astype('int') ) return df def read_data(self): print("1.1 读取dwd_st_asin_measure系列表") sql = f""" select search_term, asin from dwd_st_asin_measure where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'; """ print("sql:", sql) self.df_st_asin_measure = self.spark.sql(sql).repartition(80, 'asin').cache() print("1.2 读取dim_asin_detail系列表") sql = f""" select asin, asin_title, asin_price as price, asin_rating as rating, asin_is_new, asin_img_type, asin_is_aadd, asin_launch_time, asin_total_comments, asin_buy_box_seller_type, lower(asin_color) as asin_color, asin_brand_name, lower(asin_size) as asin_size, lower(asin_style) as asin_style from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'; """ print("sql:", sql) self.df_asin_history_detail = self.spark.sql(sql).repartition(80, 'asin').cache() self.df_asin_history_detail = self.df_asin_history_detail.na.fill({ "asin_img_type": "3", "asin_size": "none", "asin_style": "none" }) print("1.3 读取dwd_st_measure系列表") sql = f""" select search_term, st_zr_orders as orders, st_price_std, st_price_avg from dwd_st_measure where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'; """ print("sql:", sql) self.df_st_measure = self.spark.sql(sql).repartition(80, 'search_term').cache() self.df_st_measure = self.df_st_measure.na.fill({ "orders": 0 }) print("1.4 读取ods_st_key系列表") sql = f""" select cast(st_key as int) as search_term_id, search_term from ods_st_key where site_name='{self.site_name}'; """ print("sql:", sql) self.df_st_key = self.spark.sql(sql).repartition(80, 'search_term').cache() print("1.5 读取dws_top100_asin_info系列表") sql = f""" select search_term_id, top100_asin, top100_orders, top100_market_share, top100_is_new from dws_top100_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'; """ print("sql:", sql) self.df_top100_asin = self.spark.sql(sql).repartition(80, 'search_term_id').cache() print("1.6 读取dwd_asin_measure系列表") sql = f""" select asin, cast(asin_bsr_orders as int) as asin_bsr_orders, asin_ao_val from dwd_asin_measure where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'; """ print("sql:", sql) self.df_asin_measure = self.spark.sql(sql).repartition(80, 'asin').cache() self.df_asin_measure = self.df_asin_measure.na.fill({"asin_bsr_orders": 0}) print("1.7 读取dim_seller_asin_history_info系列表") sql = f""" select asin, case when upper(fd_country_name) = 'US' then 'US' when upper(fd_country_name) = 'CN' then 'CN' when upper(fd_country_name) = 'HK' then 'HK' when upper(fd_country_name) = 'FR' then 'FR' when upper(fd_country_name) = 'DE' then 'DE' else 'OTHER' end as seller_name from dim_fd_asin_info where site_name = '{self.site_name}'; """ print("sql:", sql) self.df_seller_asin = self.spark.sql(sql).repartition(80, 'asin').cache() self.df_seller_asin = self.df_seller_asin.drop_duplicates(['asin']) print("1.8 读取dim_st_detail系列表") sql = f""" select search_term from dim_st_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'; """ print("sql:", sql) self.df_st_detail = self.spark.sql(sql).repartition(80, 'search_term').cache() def get_st_asin_detail(self): # 获取asin详情 self.df_st_asin_detail = self.df_st_asin_measure.join( self.df_asin_history_detail, on=['asin'], how='left' ).join( self.df_asin_measure, on=['asin'], how='left' ).repartition(80, 'search_term') self.df_st_asin_detail = self.df_st_asin_detail.withColumn( 'aadd_asin_bsr_orders', F.when( F.col("asin_is_aadd") == 1, F.lit(self.df_st_asin_detail.asin_bsr_orders) ).otherwise(F.lit(0)) ).withColumn( 'new_asin_bsr_orders_detail', F.when( F.col("asin_is_new") == 1, F.lit(self.df_st_asin_detail.asin_bsr_orders) ).otherwise(F.lit(0)) ) def get_st_attribute(self): # 通过asin_detail获取新品、bsr销量、A+产品bsr销量等信息 self.df_st_attribute = self.df_st_asin_detail.groupby(["search_term"]).agg( F.sum("asin_is_new").alias('new_asin_num'), F.sum("asin_bsr_orders").alias('bsr_orders'), F.sum("aadd_asin_bsr_orders").alias('aadd_bsr_orders'), F.sum("new_asin_bsr_orders_detail").alias("new_asin_bsr_orders") ) def get_img_type(self): # 获取图片类型信息 self.df_st_asin_detail = self.df_st_asin_detail.withColumn( 'img_type', self.u_get_img_type(self.df_st_asin_detail.asin_img_type) ) self.df_st_img_type_group = self.df_st_asin_detail.groupby(["search_term"]).pivot(f"img_type").agg( F.count(f"search_term") ) img_type_group_list = self.df_st_img_type_group.columns img_type_schema_list = ['aadd_video_num', 'aadd_no_video_num', 'no_aadd_no_video_num', 'no_aadd_video_num'] self.df_st_img_type_group = self.check_cols( actual_col_list=img_type_group_list, schema_col_list=img_type_schema_list, df=self.df_st_img_type_group ) def get_asin_count(self): # 获取asin总数信息 self.df_asin_count = self.df_st_asin_detail.groupby(["search_term"]).agg( F.count(f"asin").alias('total_asin_num') ) def get_price_range(self): df_st_asin_price_info = self.df_st_asin_detail.select("search_term", "price").filter("price > 0") df_st_price_agg = self.df_st_measure.select("search_term", "st_price_std", "st_price_avg") self.df_st_measure = self.df_st_measure.drop("st_price_std", "st_price_avg") df_st_asin_price_agg = df_st_asin_price_info.join( df_st_price_agg, on=['search_term'], how='left' ) df_st_asin_price_agg = df_st_asin_price_agg.filter("price <= st_price_std") asin_pirce_info_map = self.u_get_price_interval( df_st_asin_price_agg.st_price_avg, df_st_asin_price_agg.price ) df_st_asin_price_agg = df_st_asin_price_agg.withColumn( "price_interval", asin_pirce_info_map["price_interval"] ).withColumn( "price_type", asin_pirce_info_map["price_type"] ).withColumn( "interval_num", asin_pirce_info_map['interval_num'] ) df_st_price_interval = df_st_asin_price_agg.groupby('search_term').agg( F.first('price_interval').alias('price_interval'), F.first('interval_num').alias('interval_num') ) self.df_st_price_group = df_st_asin_price_agg.groupby(['search_term']).pivot("price_type").agg( F.count(F.col("search_term")) ) self.df_st_price_group = self.df_st_price_group.join( df_st_price_interval, on=['search_term'], how='left' ) price_type_list = [f"{i}" for i in range(1, 21)] price_group_list = self.df_st_price_group.columns self.df_st_price_group = self.check_cols( actual_col_list=price_group_list, schema_col_list=price_type_list, df=self.df_st_price_group ) self.df_st_price_group = self.df_st_price_group.select( "search_term", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "price_interval", "interval_num" ).na.fill({ "1": 0, "2": 0, "3": 0, "4": 0, "5": 0, "6": 0, "7": 0, "8": 0, "9": 0, "10": 0, "11": 0, "12": 0, "13": 0, "14": 0, "15": 0, "16": 0, "17": 0, "18": 0, "19": 0, "20": 0 }) self.df_st_price_group = self.df_st_price_group.join( self.df_asin_count, on=['search_term'], how='left' ) column_conditions = { 20: list(range(1, 21)), 18: list(range(1, 19)), 16: list(range(1, 17)), 14: list(range(1, 15)), 12: list(range(1, 13)), 10: list(range(1, 11)), 8: list(range(1, 9)), 6: list(range(1, 7)) } asin_count_case_expr = "CASE " + " ".join([ f"WHEN interval_num = '{interval}' THEN CONCAT_WS(',', {', '.join([f'`{col}`' for col in columns])})" for interval, columns in column_conditions.items() ]) + " ELSE NULL END" self.df_st_price_group = self.df_st_price_group.withColumn( "price_interval_asin_count", F.expr(asin_count_case_expr) ) asin_market_share_case_expr = "CASE " + " ".join([ f"WHEN interval_num = '{interval}' THEN CONCAT_WS(',', {', '.join([f'ROUND(`{col}`/`total_asin_num`,4)' for col in columns])})" for interval, columns in column_conditions.items() ]) + " ELSE NULL END" self.df_st_price_group = self.df_st_price_group.withColumn( "price_interval_asin_market_share", F.expr(asin_market_share_case_expr) ) self.df_st_price_group = self.df_st_price_group.select( 'search_term', 'price_interval', 'price_interval_asin_count', 'price_interval_asin_market_share' ) def get_ao_range(self): # 获取ao值信息 df_st_asin_detail_ao = self.df_st_asin_detail.withColumn( 'ao_type', F.when( (F.col("asin_ao_val") >= 0) & (F.col("asin_ao_val") < 0.05), "ao_range_val1" ).when( (F.col("asin_ao_val") >= 0.05) & (F.col("asin_ao_val") < 0.1), "ao_range_val2" ).when( (F.col("asin_ao_val") >= 0.1) & (F.col("asin_ao_val") < 0.2), "ao_range_val3" ).when( (F.col("asin_ao_val") >= 0.2) & (F.col("asin_ao_val") < 0.3), "ao_range_val4" ).when( (F.col("asin_ao_val") >= 0.3) & (F.col("asin_ao_val") < 0.4), "ao_range_val5" ).when( (F.col("asin_ao_val") >= 0.4) & (F.col("asin_ao_val") < 0.5), "ao_range_val6" ).when( (F.col("asin_ao_val") >= 0.5) & (F.col("asin_ao_val") < 0.6), "ao_range_val7" ).when( (F.col("asin_ao_val") >= 0.6) & (F.col("asin_ao_val") < 0.7), "ao_range_val8" ).when( (F.col("asin_ao_val") >= 0.7) & (F.col("asin_ao_val") < 0.8), "ao_range_val9" ).when( (F.col("asin_ao_val") >= 0.8) & (F.col("asin_ao_val") < 0.9), "ao_range_val10" ).when( (F.col("asin_ao_val") >= 0.9) & (F.col("asin_ao_val") < 1), "ao_range_val11" ).when( F.col("asin_ao_val") >= 1, "ao_range_val12" ) ) self.df_st_ao_group = df_st_asin_detail_ao.groupby(["search_term"]).pivot(f"ao_type").agg( F.count(f"search_term") ) ao_range_val_list = [f"ao_range_val{i}" for i in range(1, 13)] ao_group_list = self.df_st_ao_group.columns self.df_st_ao_group = self.check_cols( actual_col_list=ao_group_list, schema_col_list=ao_range_val_list, df=self.df_st_ao_group ) self.df_st_ao_group = self.df_st_ao_group.select( "search_term", "ao_range_val1", "ao_range_val2", "ao_range_val3", "ao_range_val4", "ao_range_val5", "ao_range_val6", "ao_range_val7", "ao_range_val8", "ao_range_val9", "ao_range_val10", "ao_range_val11", "ao_range_val12" ) self.df_st_ao_group = self.df_st_ao_group.join( self.df_asin_count, on=['search_term'], how='left' ) ao_range_market_share_list = [f"ao_range_market_share{i}" for i in range(1, 13)] total_asin_num_list = [f"total_asin_num" for i in range(1, 13)] self.df_st_ao_group = self.hadle_cols( col_list1=ao_range_market_share_list, col_list2=ao_range_val_list, col_list3=total_asin_num_list, df=self.df_st_ao_group ) # self.df_st_ao_group = self.df_st_ao_group.drop("total_asin_num") def get_st_buy_box(self): # 获取配送方式信息 df_st_asin_buy_box_detail = self.df_st_asin_detail.withColumn( "buy_box_seller_type", self.u_get_buy_box(self.df_st_asin_detail.asin_buy_box_seller_type) ) df_st_asin_buy_box_detail_agg = df_st_asin_buy_box_detail.groupby(['search_term']).agg( F.concat_ws(",", F.collect_set(df_st_asin_buy_box_detail.buy_box_seller_type)).alias("buy_box_name"), F.concat_ws(",", F.collect_list(df_st_asin_buy_box_detail.buy_box_seller_type)).alias("buy_box_list") ) self.df_st_buy_box = self.df_st_measure.join( df_st_asin_buy_box_detail_agg, on=['search_term'], how='left' ) self.df_st_buy_box = self.df_st_buy_box.withColumn( "buy_box_num", self.u_get_buy_box_num(self.df_st_buy_box.buy_box_name, self.df_st_buy_box.buy_box_list) ) self.df_st_buy_box = self.df_st_buy_box.drop("buy_box_list", "orders") def get_st_color(self): df_st_asin_color_detail = self.df_st_asin_detail.withColumn( "asin_is_color_flag", self.u_judge_color(F.col("asin_color")) ) df_st_asin_color_detail = df_st_asin_color_detail.filter("asin_is_color_flag = 1") df_st_color_agg = df_st_asin_color_detail.groupby(['search_term', 'asin_color']).agg( F.sum("asin_bsr_orders").alias("color_bsr_orders"), F.count("asin_color").alias("color_quantity") ) df_st_color_agg = df_st_color_agg.join( self.df_st_attribute, on=['search_term'], how='left' ) df_st_color_agg = df_st_color_agg.withColumn( "color_bsr_orders_percent", F.round(F.col("color_bsr_orders") / F.col("bsr_orders"), 3) ) df_st_color_agg = df_st_color_agg.select( "search_term", "asin_color", "color_quantity", "color_bsr_orders_percent" ) self.df_st_color = df_st_color_agg.groupby(['search_term']).agg( F.concat_ws("&&&&", F.collect_list(df_st_color_agg.asin_color)).alias("color_name"), F.concat_ws(",", F.collect_list(df_st_color_agg.color_quantity)).alias("color_num"), F.concat_ws(",", F.collect_list(df_st_color_agg.color_bsr_orders_percent)).alias("color_bsr_orders_percent"), ) def get_launch_time_range(self): # 获取上架时间信息 time = datetime.now().date() one_month_time = time + timedelta(days=-30) three_months_time = time + timedelta(days=-90) six_months_time = time + timedelta(days=-180) twelve_months_time = time + timedelta(days=-360) fifteen_months_time = time + timedelta(days=-450) twenty_four_months_time = time + timedelta(days=-720) thirty_six_months_time = time + timedelta(days=-1080) df_st_asin_detail_launch_time = self.df_st_asin_detail.withColumn( 'launch_time_type', F.when( F.col("asin_launch_time") >= one_month_time, "launch_time_num1" ).when( (F.col("asin_launch_time") >= three_months_time) & (F.col("asin_launch_time") < one_month_time), "launch_time_num2" ).when( (F.col("asin_launch_time") >= six_months_time) & (F.col("asin_launch_time") < three_months_time), "launch_time_num3" ).when( (F.col("asin_launch_time") >= twelve_months_time) & (F.col("asin_launch_time") < six_months_time), "launch_time_num4" ).when( (F.col("asin_launch_time") >= fifteen_months_time) & (F.col("asin_launch_time") < twelve_months_time), "launch_time_num5" ).when( (F.col("asin_launch_time") >= twenty_four_months_time) & (F.col("asin_launch_time") < fifteen_months_time), "launch_time_num6" ).when( (F.col("asin_launch_time") >= thirty_six_months_time) & (F.col("asin_launch_time") < twenty_four_months_time), "launch_time_num7" ).when( (F.col("asin_launch_time") < thirty_six_months_time), "launch_time_num8" ) ) self.df_st_launch_time_group = df_st_asin_detail_launch_time.groupby(["search_term"]).pivot(f"launch_time_type").agg( F.count(f"search_term") ) launch_time_num_list = [f"launch_time_num{i}" for i in range(1, 9)] launch_time_group = self.df_st_launch_time_group.columns self.df_st_launch_time_group = self.check_cols( actual_col_list=launch_time_group, schema_col_list=launch_time_num_list, df=self.df_st_launch_time_group ) self.df_st_launch_time_group = self.df_st_launch_time_group.select( "search_term", "launch_time_num1", "launch_time_num2", "launch_time_num3", "launch_time_num4", "launch_time_num5", "launch_time_num6", "launch_time_num7", "launch_time_num8" ) self.df_st_launch_time_group = self.df_st_launch_time_group.join( self.df_asin_count, on=["search_term"], how='left' ) launch_time_market_share_list = [f"launch_time_market_share{i}" for i in range(1, 9)] total_asin_num_list = [f"total_asin_num" for i in range(1, 9)] self.df_st_launch_time_group = self.hadle_cols( col_list1=launch_time_market_share_list, col_list2=launch_time_num_list, col_list3=total_asin_num_list, df=self.df_st_launch_time_group ) self.df_st_launch_time_group = self.df_st_launch_time_group.drop("total_asin_num") def get_comments_num_range(self): # 获取评论数信息 df_st_asin_detail_comments = self.df_st_asin_detail.withColumn( "comments_type", F.when( (F.col("asin_total_comments") >= 0) & (F.col("asin_total_comments") < 50), "comments_num1" ).when( (F.col("asin_total_comments") >= 50) & (F.col("asin_total_comments") < 100), "comments_num2" ).when( (F.col("asin_total_comments") >= 100) & (F.col("asin_total_comments") < 150), "comments_num3" ).when( (F.col("asin_total_comments") >= 150) & (F.col("asin_total_comments") < 200), "comments_num4" ).when( (F.col("asin_total_comments") >= 200) & (F.col("asin_total_comments") < 300), "comments_num5" ).when( (F.col("asin_total_comments") >= 300) & (F.col("asin_total_comments") < 400), "comments_num6" ).when( (F.col("asin_total_comments") >= 400) & (F.col("asin_total_comments") < 500), "comments_num7" ).when( (F.col("asin_total_comments") >= 500) & (F.col("asin_total_comments") < 600), "comments_num8" ).when( (F.col("asin_total_comments") >= 600) & (F.col("asin_total_comments") < 700), "comments_num9" ).when( (F.col("asin_total_comments") >= 700) & (F.col("asin_total_comments") < 800), "comments_num10" ).when( (F.col("asin_total_comments") >= 800) & (F.col("asin_total_comments") < 900), "comments_num11" ).when( (F.col("asin_total_comments") >= 900) & (F.col("asin_total_comments") < 1000), "comments_num12" ).when( F.col("asin_total_comments") >= 1000, "comments_num13" ) ) self.df_st_comments_group = df_st_asin_detail_comments.groupby(["search_term"]).pivot(f"comments_type").agg( F.count(f"search_term") ) comments_num_list = [f"comments_num{i}" for i in range(1, 14)] comments_group_list = self.df_st_comments_group.columns self.df_st_comments_group = self.check_cols( actual_col_list=comments_group_list, schema_col_list=comments_num_list, df=self.df_st_comments_group ) self.df_st_comments_group = self.df_st_comments_group.select( "search_term", "comments_num1", "comments_num2", "comments_num3", "comments_num4", "comments_num5", "comments_num6", "comments_num7", "comments_num8", "comments_num9", "comments_num10", "comments_num11", "comments_num12", "comments_num13" ) self.df_st_comments_group = self.df_st_comments_group.join( self.df_asin_count, on=["search_term"], how='left' ) comments_num_market_share_list = [f"comments_num_market_share{i}" for i in range(1, 14)] total_asin_num_list = [f"total_asin_num" for i in range(1, 14)] self.df_st_comments_group = self.hadle_cols( col_list1=comments_num_market_share_list, col_list2=comments_num_list, col_list3=total_asin_num_list, df=self.df_st_comments_group ) self.df_st_comments_group = self.df_st_comments_group.drop("total_asin_num") def get_rating_group(self): df_st_asin_detail_rating = self.df_st_asin_detail.withColumn( "rating_type", F.when( (F.col("rating") >= 0) & (F.col("rating") < 1), "rating_section1" ).when( (F.col("rating") >= 1) & (F.col("rating") < 2), "rating_section2" ).when( (F.col("rating") >= 2) & (F.col("rating") < 3), "rating_section3" ).when( (F.col("rating") >= 3) & (F.col("rating") < 4), "rating_section4" ).when( (F.col("rating") >= 4) & (F.col("rating") < 4.3), "rating_section5" ).when( (F.col("rating") >= 4.3) & (F.col("rating") < 4.5), "rating_section6" ).when( F.col("rating") >= 4.5, "rating_section7" ) ) self.df_st_rating_group = df_st_asin_detail_rating.groupby(["search_term"]).pivot(f"rating_type").agg( F.count(f"search_term") ) rating_section_list = [f"rating_section{i}" for i in range(1, 8)] rating_group_list = self.df_st_rating_group.columns self.df_st_rating_group = self.check_cols( actual_col_list=rating_group_list, schema_col_list=rating_section_list, df=self.df_st_rating_group ) self.df_st_rating_group = self.df_st_rating_group.select( "search_term", "rating_section1", "rating_section2", "rating_section3", "rating_section4", "rating_section5", "rating_section6", "rating_section7" ).na.fill({ "rating_section1": 0, "rating_section2": 0, "rating_section3": 0, "rating_section4": 0, "rating_section5": 0, "rating_section6": 0, "rating_section7": 0 }) self.df_st_rating_group = self.df_st_rating_group.join( self.df_asin_count, on=["search_term"], how='left' ) total_asin_num_list = [f"total_asin_num" for i in range(1, 14)] rating_market_share_section_list = [f"rating_market_share_section{i}" for i in range(1, 8)] self.df_st_rating_group = self.hadle_cols( col_list1=rating_market_share_section_list, col_list2=rating_section_list, col_list3=total_asin_num_list, df=self.df_st_rating_group ) self.df_st_rating_group = self.df_st_rating_group.na.fill({ "rating_market_share_section1": 0.0, "rating_market_share_section2": 0.0, "rating_market_share_section3": 0.0, "rating_market_share_section4": 0.0, "rating_market_share_section5": 0.0, "rating_market_share_section6": 0.0, "rating_market_share_section7": 0.0 }) self.df_st_rating_group = self.df_st_rating_group.withColumn( "rating_section", F.concat_ws(",", F.col("rating_section1"), F.col("rating_section2"), F.col("rating_section3"), F.col("rating_section4"), F.col("rating_section5"), F.col("rating_section6"), F.col("rating_section7") ) ) self.df_st_rating_group = self.df_st_rating_group.withColumn( "rating_market_share_section", F.concat_ws(",", F.col("rating_market_share_section1"), F.col("rating_market_share_section2"), F.col("rating_market_share_section3"), F.col("rating_market_share_section4"), F.col("rating_market_share_section5"), F.col("rating_market_share_section6"), F.col("rating_market_share_section7") ) ) self.df_st_rating_group = self.df_st_rating_group.drop( "total_asin_num", "rating_section1", "rating_section2", "rating_section3", "rating_section4", "rating_section5", "rating_section6", "rating_section7", "rating_market_share_section1", "rating_market_share_section2", "rating_market_share_section3", "rating_market_share_section4", "rating_market_share_section5", "rating_market_share_section6", "rating_market_share_section7" ) def get_st_asin_seller(self): # 获取卖家相关信息 df_seller_asin_detail = self.df_st_asin_measure.join( self.df_seller_asin, on=['asin'], how='left' ).join( self.df_asin_measure, on=['asin'], how='left' ) df_seller_asin_detail = df_seller_asin_detail.na.fill({ "seller_name": 'OTHER' }) df_seller_asin_agg = df_seller_asin_detail.groupBy(['search_term']).agg( F.concat_ws(",", F.collect_list("seller_name")).alias("seller_name_list"), F.concat_ws(",", F.collect_list("asin_bsr_orders")).alias("seller_bsr_orders_list"), F.concat_ws(",", F.collect_set("seller_name")).alias("seller_name_type") ) self.df_st_seller = self.df_st_measure.join( df_seller_asin_agg, on=['search_term'], how='left' ) self.df_st_seller = self.df_st_seller.withColumn( "seller_num", self.u_get_seller_num(self.df_st_seller.seller_name_type, self.df_st_seller.seller_name_list) ).withColumn( "seller_bsr_orders", self.u_get_seller_bsr_orders(self.df_st_seller.seller_name_type, self.df_st_seller.seller_name_list, self.df_st_seller.seller_bsr_orders_list) ) self.df_st_seller = self.df_st_seller.drop("st_zr_orders", "orders") self.df_st_seller = self.df_st_seller.withColumnRenamed("seller_name_type", "seller_name") def get_top20_asin(self): # 获取特定asin下某品牌的总商品数,新品数,bsr销量 df_st_brand_asin_agg = self.df_st_asin_detail.groupBy(["search_term", "asin_brand_name"]).agg( F.count("asin").alias("brand_total_asin"), F.sum("asin_is_new").alias("brand_new_asin"), F.sum("asin_bsr_orders").alias("brand_bsr_orders") ) df_st_brand_asin_agg = df_st_brand_asin_agg.na.fill({ "brand_new_asin": 0, "brand_bsr_orders": 0.0, "asin_brand_name": "None" }) df_st_brand_asin_agg = df_st_brand_asin_agg.filter("asin_brand_name not in ('null', 'None', 'none', 'NULL')") brand_bsr_orders_window = Window.partitionBy(["search_term"]).orderBy( df_st_brand_asin_agg.brand_bsr_orders.desc_nulls_last() ) df_st_top20_asin_detail_agg = df_st_brand_asin_agg.withColumn( "brand_bsr_orders_rank", F.row_number().over(window=brand_bsr_orders_window) ) df_st_top20_asin_detail_agg = df_st_top20_asin_detail_agg.filter("brand_bsr_orders_rank<=20") df_st_top20_agg = df_st_top20_asin_detail_agg.select("search_term", "asin_brand_name") df_st_top20_brand = df_st_top20_agg.groupBy(["search_term"]).agg( F.concat_ws("&&&&", F.collect_list(df_st_top20_agg.asin_brand_name)).alias("top20_brand") ) df_st_top20_brand_asin_agg = df_st_top20_asin_detail_agg.join( self.df_st_attribute, on=["search_term"], how='left' ) df_st_top20_brand_asin_agg = df_st_top20_brand_asin_agg.withColumn( "brand_new_num_proportion", df_st_top20_brand_asin_agg.brand_new_asin / df_st_top20_brand_asin_agg.brand_total_asin ).withColumn( "brand_market_share", F.when( F.col("bsr_orders") == 0, F.lit(0) ).otherwise( df_st_top20_brand_asin_agg.brand_bsr_orders / df_st_top20_brand_asin_agg.bsr_orders ) ) df_st_top20_brand_agg = df_st_top20_brand_asin_agg.groupBy(["search_term"]).agg( F.concat_ws(",", F.collect_list(df_st_top20_brand_asin_agg.brand_new_num_proportion)).alias("top20_brand_new_num_proportion"), F.concat_ws(",", F.collect_list(df_st_top20_brand_asin_agg.brand_bsr_orders)).alias("top20_brand_bsr_oders"), F.concat_ws(",", F.collect_list(df_st_top20_brand_asin_agg.brand_market_share)).alias("top20_brand_market_share") ) self.df_st_top20 = self.df_st_measure.join( df_st_top20_brand_agg, on=["search_term"], how='left' ).join( df_st_top20_brand, on=['search_term'], how='left' ) # 弃用字段 self.df_st_top20 = self.df_st_top20.withColumn( "top20_asin", F.lit(None) ).withColumn( "top20_orders", F.lit(None) ) self.df_st_top20 = self.df_st_top20.select( "search_term", "top20_asin", "top20_orders", "top20_brand", "top20_brand_new_num_proportion", "top20_brand_bsr_oders", "top20_brand_market_share" ) def get_st_size(self): df_st_asin_size = self.df_st_asin_detail.withColumn( "asin_size_flag", self.u_judge_size(F.col("asin_size"), F.col("asin_style")) ) df_st_asin_size = df_st_asin_size.filter("asin_size_flag not in ('none','null')") df_st_size_agg = df_st_asin_size.groupby(['search_term', 'asin_size_flag']).agg( F.sum("asin_bsr_orders").alias("size_bsr_orders"), F.count("asin_size_flag").alias("size_quantity") ) df_st_size_agg = df_st_size_agg.join( self.df_st_attribute, on=['search_term'], how='left' ) df_st_size_agg = df_st_size_agg.withColumn( "size_bsr_orders_percent", F.round(F.col("size_bsr_orders") / F.col("bsr_orders"), 3) ) df_st_size_agg = df_st_size_agg.select( "search_term", "asin_size_flag", "size_quantity", "size_bsr_orders_percent" ) self.df_st_size = df_st_size_agg.groupby(['search_term']).agg( F.concat_ws("&&&&", F.collect_list(df_st_size_agg.asin_size_flag)).alias("size_name"), F.concat_ws(",", F.collect_list(df_st_size_agg.size_quantity)).alias("size_num"), F.concat_ws(",", F.collect_list(df_st_size_agg.size_bsr_orders_percent)).alias("size_bsr_orders_percent") ) def get_asin_package_num(self): self.df_st_asin_detail = self.df_st_asin_detail.withColumn( "package_num", self.u_get_package_num(self.df_st_asin_detail.asin_title) ) self.df_st_asin_detail = self.df_st_asin_detail.drop("asin_title") self.df_st_asin_detail = self.df_st_asin_detail.na.fill({ "package_num": 1 }) self.df_st_package_num = self.df_st_asin_detail.groupby(["search_term"]).agg( F.sort_array(F.collect_list(F.struct(F.col("package_num"), F.col("asin"))), False).alias("value") ) self.df_st_package_num = self.df_st_package_num.select( "search_term", F.expr("transform(value, x -> x.asin)").alias("asin_list"), F.expr("transform(value, x -> x.package_num)").alias("package_num_list") ) # self.df_st_package_num.show(10, truncate=False) self.df_st_package_num = self.df_st_package_num.withColumn( "package_num_trend", self.u_get_package_num_trend(self.df_st_package_num.package_num_list) ).withColumn( "package_num_trend_market_share", self.u_get_package_num_trend_market_share(self.df_st_package_num.package_num_list) ).withColumn( "package_num_corresponding_asin", self.u_get_package_num_corresponding_asin(self.df_st_package_num.asin_list) ) self.df_st_package_num = self.df_st_package_num.drop("package_num_array", "asin_list") def handle_data_group(self): self.df_st_measure = self.df_st_measure.join( self.df_st_key, on=['search_term'], how='inner' ).join( self.df_st_detail, on=['search_term'], how='inner' ) self.df_save = self.df_st_measure.join( self.df_st_price_group, on=['search_term'], how='left' ).join( self.df_st_ao_group, on=['search_term'], how='left' ).join( self.df_st_attribute, on=['search_term'], how='left' ).join( self.df_st_img_type_group, on=['search_term'], how='left' ).join( self.df_top100_asin, on=['search_term_id'], how='left' ).join( self.df_st_launch_time_group, on=['search_term'], how='left' ).join( self.df_st_comments_group, on=['search_term'], how='left' ).join( self.df_st_buy_box, on=['search_term'], how='left' ).join( self.df_st_seller, on=['search_term'], how='left' ).join( self.df_st_color, on=['search_term'], how='left' ).join( self.df_st_top20, on=['search_term'], how='left' ).join( self.df_st_rating_group, on=['search_term'], how='left' ).join( self.df_st_size, on=['search_term'], how='left' ).join( self.df_st_package_num, on=['search_term'], how='left' ) for i in range(1, 19): self.df_save = self.df_save.withColumn(f"price_range_num{i}_deprecated", F.lit(-1)) self.df_save = self.df_save.withColumn(f"price_range_market_share{i}_deprecated", F.lit(-1.0)) self.df_save = self.df_save.select( "search_term_id", "search_term", "price_range_num1_deprecated", "price_range_num2_deprecated", "price_range_num3_deprecated", "price_range_num4_deprecated", "price_range_num5_deprecated", "price_range_num6_deprecated", "price_range_num7_deprecated", "price_range_num8_deprecated", "price_range_num9_deprecated", "price_range_num10_deprecated", "price_range_num11_deprecated", "price_range_num12_deprecated", "price_range_num13_deprecated", "price_range_num14_deprecated", "price_range_num15_deprecated", "price_range_num16_deprecated", "price_range_num17_deprecated", "price_range_num18_deprecated", "price_range_market_share1_deprecated", "price_range_market_share2_deprecated", "price_range_market_share3_deprecated", "price_range_market_share4_deprecated", "price_range_market_share5_deprecated", "price_range_market_share6_deprecated", "price_range_market_share7_deprecated", "price_range_market_share8_deprecated", "price_range_market_share9_deprecated", "price_range_market_share10_deprecated", "price_range_market_share11_deprecated", "price_range_market_share12_deprecated", "price_range_market_share13_deprecated", "price_range_market_share14_deprecated", "price_range_market_share15_deprecated", "price_range_market_share16_deprecated", "price_range_market_share17_deprecated", "price_range_market_share18_deprecated", "ao_range_val1", "ao_range_val2", "ao_range_val3", "ao_range_val4", "ao_range_val5", "ao_range_val6", "ao_range_val7", "ao_range_val8", "ao_range_val9", "ao_range_val10", "ao_range_val11", "ao_range_val12", "ao_range_market_share1", "ao_range_market_share2", "ao_range_market_share3", "ao_range_market_share4", "ao_range_market_share5", "ao_range_market_share6", "ao_range_market_share7", "ao_range_market_share8", "ao_range_market_share9", "ao_range_market_share10", "ao_range_market_share11", "ao_range_market_share12", "total_asin_num", "new_asin_num", "orders", "bsr_orders", "aadd_bsr_orders", "aadd_video_num", "aadd_no_video_num", "no_aadd_no_video_num", "no_aadd_video_num", "top100_asin", "top100_orders", "top100_market_share", "top100_is_new", "launch_time_num1", "launch_time_num2", "launch_time_num3", "launch_time_num4", "launch_time_num5", "launch_time_num6", "launch_time_num7", "launch_time_num8", "launch_time_market_share1", "launch_time_market_share2", "launch_time_market_share3", "launch_time_market_share4", "launch_time_market_share5", "launch_time_market_share6", "launch_time_market_share7", "launch_time_market_share8", "top20_asin", "top20_orders", "top20_brand", "top20_brand_new_num_proportion", "top20_brand_bsr_oders", "top20_brand_market_share", "comments_num1", "comments_num2", "comments_num3", "comments_num4", "comments_num5", "comments_num6", "comments_num7", "comments_num8", "comments_num9", "comments_num10", "comments_num11", "comments_num12", "comments_num13", "comments_num_market_share1", "comments_num_market_share2", "comments_num_market_share3", "comments_num_market_share4", "comments_num_market_share5", "comments_num_market_share6", "comments_num_market_share7", "comments_num_market_share8", "comments_num_market_share9", "comments_num_market_share10", "comments_num_market_share11", "comments_num_market_share12", "comments_num_market_share13", "buy_box_name", "buy_box_num", "seller_name", "seller_num", "seller_bsr_orders", "color_name", "color_num", "new_asin_bsr_orders", "rating_section", "rating_market_share_section", "size_name", "size_num", "package_num_trend", "package_num_trend_market_share", "package_num_corresponding_asin", "price_interval", "price_interval_asin_count", "price_interval_asin_market_share", "color_bsr_orders_percent", "size_bsr_orders_percent" ) self.df_save = self.df_save.withColumn( "created_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS') ).withColumn( "updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS') ) self.df_save = self.df_save.na.fill({ "ao_range_val1": 0, "ao_range_val2": 0, "ao_range_val3": 0, "ao_range_val4": 0, "ao_range_val5": 0, "ao_range_val6": 0, "ao_range_val7": 0, "ao_range_val8": 0, "ao_range_val9": 0, "ao_range_val10": 0, "ao_range_val11": 0, "ao_range_val12": 0, "ao_range_market_share1": 0.0, "ao_range_market_share2": 0.0, "ao_range_market_share3": 0.0, "ao_range_market_share4": 0.0, "ao_range_market_share5": 0.0, "ao_range_market_share6": 0.0, "ao_range_market_share7": 0.0, "ao_range_market_share8": 0.0, "ao_range_market_share9": 0.0, "ao_range_market_share10": 0.0, "ao_range_market_share11": 0.0, "ao_range_market_share12": 0.0, "total_asin_num": 0, "new_asin_num": 0, "orders": 0, "bsr_orders": 0, "aadd_video_num": 0, "aadd_no_video_num": 0, "no_aadd_no_video_num": 0, "no_aadd_video_num": 0, "launch_time_num1": 0, "launch_time_num2": 0, "launch_time_num3": 0, "launch_time_num4": 0, "launch_time_num5": 0, "launch_time_num6": 0, "launch_time_num7": 0, "launch_time_num8": 0, "launch_time_market_share1": 0.0, "launch_time_market_share2": 0.0, "launch_time_market_share3": 0.0, "launch_time_market_share4": 0.0, "launch_time_market_share5": 0.0, "launch_time_market_share6": 0.0, "launch_time_market_share7": 0.0, "launch_time_market_share8": 0.0, "comments_num1": 0, "comments_num2": 0, "comments_num3": 0, "comments_num4": 0, "comments_num5": 0, "comments_num6": 0, "comments_num7": 0, "comments_num8": 0, "comments_num9": 0, "comments_num10": 0, "comments_num11": 0, "comments_num12": 0, "comments_num13": 0, "comments_num_market_share1": 0.0, "comments_num_market_share2": 0.0, "comments_num_market_share3": 0.0, "comments_num_market_share4": 0.0, "comments_num_market_share5": 0.0, "comments_num_market_share6": 0.0, "comments_num_market_share7": 0.0, "comments_num_market_share8": 0.0, "comments_num_market_share9": 0.0, "comments_num_market_share10": 0.0, "comments_num_market_share11": 0.0, "comments_num_market_share12": 0.0, "comments_num_market_share13": 0.0 }) # 预留字段补全 self.df_save = self.df_save.withColumn( "re_string_field1", F.lit("null") ).withColumn( "re_string_field2", F.lit("null") ).withColumn( "re_string_field3", F.lit("null") ).withColumn( "re_int_field1", F.lit(0) ).withColumn( "re_int_field2", F.lit(0) ).withColumn( "re_int_field3", F.lit(0) ).withColumn( "site_name", F.lit(self.site_name) ).withColumn( "date_type", F.lit(self.date_type) ).withColumn( "date_info", F.lit(self.date_info) ) def handle_data(self): self.get_st_asin_detail() self.get_st_attribute() self.get_img_type() self.get_asin_count() self.get_price_range() self.get_ao_range() self.get_st_buy_box() self.get_st_color() self.get_launch_time_range() self.get_comments_num_range() self.get_rating_group() self.get_st_asin_seller() self.get_top20_asin() self.get_st_size() self.get_asin_package_num() self.handle_data_group() if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwtAbaStAnalyticsReport(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()