import os import sys import time import traceback sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/") sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates from pyspark.sql import functions as F from pyspark.sql.types import * from utils.db_util import DBUtil from utils.common_util import CommonUtil from utils.spark_util import SparkUtil from datetime import datetime, timedelta from functools import reduce from utils.es_util import EsUtils from pyspark.sql import Window from pyspark.storagelevel import StorageLevel from utils.DorisHelper import DorisHelper from yswg_utils.common_df import get_node_first_id_df, get_first_id_from_category_desc_df from yswg_utils.common_udf import udf_parse_bs_category, parse_weight_str, udf_extract_volume_dimensions, udf_get_package_quantity_with_flag as udf_get_package_quantity, udf_parse_seller_json class KafkaFlowAsinDetail(Templates): def __init__(self, site_name='us', date_type="day", date_info='2022-10-01', consumer_type='latest', test_flag='normal', batch_size=100000): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.consumer_type = consumer_type # 消费实时还是消费历史 self.test_flag = test_flag # 正式环境跟测试环境 self.year = str(self.date_info).split('-')[0] self.year_month = str(self.date_info).replace("-", "_") self.repartition_num = 80 # kafka相关参数 self.topic_name = f"{self.site_name}_asin_detail_month_{self.year_month}" self.batch_size = batch_size self.schema = self.init_schema() self.batch_size_history = 20000 self.processing_time = 900 if self.site_name == 'us' else 600 self.history_batch_id = 0 # doris相关参数 self.doris_db = "test" if self.test_flag == "test" else "selection" self.max_bought_month_table = f"{self.site_name}_asin_max_bought_month_info" self.parent_asin_latest_detail_table = f"{self.site_name}_parent_asin_latest_detail" self.asin_latest_detail_table = f"{self.site_name}_asin_latest_detail" # elasticsearch相关参数 self.client = EsUtils.get_es_client() self.es_index_name = f"{self.topic_name}_test" if self.test_flag == 'test' else f"{self.topic_name}" self.es_index_alias_name = f"{self.site_name}_st_detail_last_4_week_test" if self.test_flag == 'test' else f"{self.site_name}_st_detail_last_4_week" self.es_index_body = EsUtils.get_es_body() self.es_options = EsUtils.get_es_options(self.es_index_name) self.db_save = 'kafka_flow_asin_detail' self.app_name = self.get_app_name() print(f"任务名称:{self.app_name}") # Spark实时消费相关参数 self.spark = SparkUtil.get_stream_spark(app_name=self.app_name) self.check_path = f"/tmp/wangrui/{self.topic_name}_{self.consumer_type}_test" if self.test_flag == 'test' else f"/tmp/wangrui/{self.topic_name}_{self.consumer_type}" self.previous_date = self.get_previous_date(self) self.previous_two_date = self.get_previous_two_date(self) self.launch_time_interval_dict = self.get_launch_time_interval_dict() print("日期字典:", self.launch_time_interval_dict) self.initial_batch_id = self.get_initial_batch_id(self) print("当前消费的起始批次为: ", self.initial_batch_id) self.history_batch_id = self.initial_batch_id + 1 # BSR分类解析模板 self.pattern1_dict = { "us": "See Top 100 in ".lower(), "uk": "See Top 100 in ".lower(), "de": "Siehe Top 100 in ".lower(), "es": "Ver el Top 100 en ".lower(), "fr": "Voir les 100 premiers en ".lower(), "it": "Visualizza i Top 100 nella categoria ".lower(), } self.pattern_current_dict = { "us": "#(\d+) ", "uk": "(\d+) in ", "de": "(\d+) in ", "es": "(\d+) en ", "fr": "(\d+) en ", "it": "(\d+) in ", } # DataFrame初始化 self.df_previous_flow_asin = self.spark.sql("select 1+1;") self.df_seller_info = self.spark.sql("select 1+1;") self.df_self_asin_info = self.spark.sql("select 1+1;") self.df_alarm_brand_info = self.spark.sql("select 1+1;") self.df_asin_label_info = self.spark.sql("select 1+1;") self.df_asin_measure = self.spark.sql("select 1+1;") self.df_bs_report = self.spark.sql("select 1+1;") self.df_asin_keep_date = self.spark.sql("select 1+1;") self.df_asin_bsr_end = self.spark.sql("select 1+1;") self.df_hide_category = self.spark.sql("select 1+1;") self.df_asin_new_cate = self.spark.sql("select 1+ 1;") self.df_user_package_num = self.spark.sql("select 1+1;") self.df_asin_category = self.spark.sql("select 1+1;") self.df_max_bought_month_info_update = self.spark.sql("select 1+1;") # udf函数注册 package_schema = StructType([ StructField("parse_package_quantity", IntegerType(), True), StructField("is_package_quantity_abnormal", IntegerType(), True), ]) self.u_parse_package_quantity = self.spark.udf.register('u_parse_package_quantity', udf_get_package_quantity, package_schema) bs_category_schema = StructType([ StructField('asin_bs_cate_1_id', StringType(), True), StructField('asin_bs_cate_current_id', StringType(), True), StructField('asin_bs_cate_1_rank', IntegerType(), True), StructField('asin_bs_cate_current_rank', IntegerType(), True), ]) self.u_parse_bs_category = self.spark.udf.register('u_parse_bs_category', udf_parse_bs_category, bs_category_schema) weight_schema = StructType([ StructField('weight', FloatType(), True), StructField('weight_type', StringType(), True) ]) self.u_parse_weight = self.spark.udf.register('u_parse_weight', parse_weight_str, weight_schema) volume_schema = StructType([ StructField("length", FloatType(), True), StructField("width", FloatType(), True), StructField("height", FloatType(), True), StructField("asin_volume_type", StringType(), True) ]) self.u_parse_volume = self.spark.udf.register('u_parse_volume', udf_extract_volume_dimensions, volume_schema) seller_schema = StructType([ StructField("buy_box_seller_type", IntegerType(), True), StructField("account_name", StringType(), True), StructField("account_id", StringType(), True) ]) self.u_parse_seller_info = self.spark.udf.register('u_parse_seller_info', udf_parse_seller_json, seller_schema) @staticmethod def init_schema(): schema = StructType([ StructField("asin", StringType(), True), StructField("title", StringType(), True), StructField("img_url", StringType(), True), StructField("rating", DoubleType(), True), StructField("total_comments", IntegerType(), True), StructField("price", FloatType(), True), StructField("category", StringType(), True), StructField("launch_time", StringType(), True), StructField("volume", StringType(), True), StructField("page_inventory", IntegerType(), True), StructField("asin_vartion_list", ArrayType(ArrayType(StringType()), True), True), StructField("title_len", IntegerType(), True), StructField("img_num", IntegerType(), True), StructField("img_type", StringType(), True), StructField("activity_type", StringType(), True), StructField("one_two_val", StringType(), True), StructField("three_four_val", StringType(), True), StructField("five_six_val", StringType(), True), StructField("eight_val", StringType(), True), StructField("node_id", StringType(), True), StructField("five_star", IntegerType(), True), StructField("four_star", IntegerType(), True), StructField("three_star", IntegerType(), True), StructField("two_star", IntegerType(), True), StructField("one_star", IntegerType(), True), StructField("low_star", IntegerType(), True), StructField("together_asin", StringType(), True), StructField("brand", StringType(), True), StructField("ac_name", StringType(), True), StructField("material", StringType(), True), StructField("data_type", IntegerType(), True), StructField("weight_str", StringType(), True), StructField("seller_id", StringType(), True), StructField("variat_num", IntegerType(), True), StructField("best_sellers_rank", StringType(), True), StructField("best_sellers_herf", StringType(), True), StructField("account_name", StringType(), True), StructField("parentAsin", StringType(), True), StructField("asinUpdateTime", StringType(), True), StructField("all_best_sellers_herf", StringType(), True), StructField("image_view", IntegerType(), True), StructField("product_description", StringType(), True), StructField("describe", StringType(), True), StructField("buy_sales", StringType(), True), StructField("lob_asin_json", StringType(), True), StructField("seller_json", StringType(), True), StructField("customer_reviews_json", StringType(), True), StructField("img_list", StringType(), True), StructField("follow_sellers", IntegerType(), True) ]) return schema @staticmethod def get_previous_date(self): self.df_date = self.spark.sql(f"select * from dim_date_20_to_30") df = self.df_date.toPandas() df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 1)] cur_month_id = int(list(df_loc.id)[0]) previous_date_id = cur_month_id - 1 df_loc = df.loc[df.id == previous_date_id] previous_date = str(list(df_loc.year_month)[0]) return previous_date @staticmethod def get_previous_two_date(self): self.df_date = self.spark.sql(f"select * from dim_date_20_to_30") df = self.df_date.toPandas() df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 1)] cur_month_id = int(list(df_loc.id)[0]) previous_two_date_id = cur_month_id - 40 df_loc = df.loc[df.id == previous_two_date_id] prvious_two_date = str(list(df_loc.year_month)[0]) return prvious_two_date @staticmethod def get_launch_time_interval_dict(): cur_date = datetime.now().date() return { "one_month": (cur_date + timedelta(days=-30)).strftime('%Y-%m-%d'), "three_month": (cur_date + timedelta(days=-90)).strftime('%Y-%m-%d'), "six_month": (cur_date + timedelta(days=-180)).strftime('%Y-%m-%d'), "twelve_month": (cur_date + timedelta(days=-360)).strftime('%Y-%m-%d'), "twenty_four_month": (cur_date + timedelta(days=-720)).strftime('%Y-%m-%d'), "thirty_six_month": (cur_date + timedelta(days=-1080)).strftime('%Y-%m-%d') } @staticmethod def get_initial_batch_id(self): max_bought_month_batch_id_sql = f""" SELECT MAX(batch_id) as initial_batch_id from {self.doris_db}.{self.max_bought_month_table} WHERE date_info='{self.date_info}' AND consumer_type='{self.consumer_type}' """ df_max_bought_month_batch_id = DorisHelper.spark_import_with_sql( self.spark, query=max_bought_month_batch_id_sql) max_bought_month_batch_id = 0 if df_max_bought_month_batch_id.take(1)[0]['initial_batch_id'] is None else \ df_max_bought_month_batch_id.take(1)[0]['initial_batch_id'] return max_bought_month_batch_id # 1. 处理asin分类及排名以及排名类型字段 def handle_asin_bs_category_info(self, df): df = df.withColumnRenamed("parentAsin", "parent_asin") cate_current_pattern = self.pattern_current_dict[self.site_name] cate_1_pattern = self.pattern1_dict[self.site_name] df = df.withColumn("asin_bs_sellers_rank_lower", F.lower("best_sellers_rank")) df = df.withColumn("asin_bs", self.u_parse_bs_category( "asin_bs_sellers_rank_lower", "best_sellers_herf", "all_best_sellers_herf", F.lit(cate_current_pattern), F.lit(cate_1_pattern))) df = df.withColumn("asin_bs_cate_1_id", df.asin_bs.getField("asin_bs_cate_1_id")) \ .withColumn("asin_bs_cate_current_id", df.asin_bs.getField("asin_bs_cate_current_id")) \ .withColumn("asin_bs_cate_1_rank", df.asin_bs.getField("asin_bs_cate_1_rank")) \ .withColumn("asin_bs_cate_current_rank", df.asin_bs.getField("asin_bs_cate_current_rank")) \ .drop("asin_bs", "asin_bs_sellers_rank_lower", "best_sellers_herf", "all_best_sellers_herf", "best_sellers_rank") df = df.withColumn("rank_type", F.expr(""" CASE WHEN asin_bs_cate_1_rank IS NOT NULL AND asin_bs_cate_1_rank BETWEEN 0 AND 1000 THEN 1 WHEN asin_bs_cate_1_rank BETWEEN 1000 AND 5000 THEN 2 WHEN asin_bs_cate_1_rank BETWEEN 5000 AND 10000 THEN 3 WHEN asin_bs_cate_1_rank BETWEEN 10000 AND 20000 THEN 4 WHEN asin_bs_cate_1_rank BETWEEN 20000 AND 30000 THEN 5 WHEN asin_bs_cate_1_rank BETWEEN 30000 AND 50000 THEN 6 WHEN asin_bs_cate_1_rank BETWEEN 50000 AND 70000 THEN 7 WHEN asin_bs_cate_1_rank >= 70000 THEN 8 ELSE 0 END""")) return df # 2. 利用node_id以及分类描述进行分类补充(此时无排名信息) def handle_asin_category_supplement(self, df): df = df.join(self.df_asin_new_cate, on=['node_id'], how='left') df = df.withColumn("asin_bs_cate_current_id", F.coalesce(F.col("asin_bs_cate_current_id"), F.col("node_id"))). \ withColumn("asin_bs_cate_1_id", F.coalesce(F.col("asin_bs_cate_1_id"), F.col("category_first_id"))). \ drop("category_first_id", "node_id") df_with_category = df.filter("asin_bs_cate_1_id is null and category is not null").select("asin", "category") df_with_category = df_with_category.withColumn( "category_split", F.split(F.col("category"), "›") ).withColumn( "category_first_name", F.lower(F.col("category_split").getItem(0)) ).drop("category_split", "category") df_with_category = df_with_category.join(self.df_asin_category, on=['category_first_name'], how='inner') df_with_category = df_with_category.withColumnRenamed("category_first_id", "category_first_id_with_name").drop("category_first_name") df = df.join(df_with_category, on=['asin'], how='left') df = df.withColumn("asin_bs_cate_1_id", F.coalesce(F.col("asin_bs_cate_1_id"), F.col("category_first_id_with_name"))).drop("category_first_id_with_name") return df # 3. 处理bsr销量、价格类型字段以及BSR销售额信息 def handle_asin_bsr_orders(self, df): df = df.join(self.df_bs_report, on=['asin_bs_cate_1_id', 'asin_bs_cate_1_rank'], how='left') df = df.withColumn("price_type", F.expr(""" CASE WHEN price IS NOT NULL AND price > 0 AND price < 10 THEN 1 WHEN price >= 10 AND price < 15 THEN 2 WHEN price >= 15 AND price < 20 THEN 3 WHEN price >= 20 AND price < 30 THEN 4 WHEN price >= 30 AND price < 50 THEN 5 WHEN price >= 50 THEN 6 ELSE 0 END""")).\ withColumn("bsr_orders_sale", F.round(F.col("bsr_orders") * F.col("price"), 2)) return df # 4.解析Make-It-A-Bundle信息 def handle_asin_lob_info(self, df): df = df.withColumn("is_contains_lob_info", F.when(F.col("lob_asin_json").isNotNull(), F.lit(1)).otherwise(F.lit(0))) df_parsed = df.withColumn("parse_asin_lob", F.when(F.col("is_contains_lob_info") == 1, F.from_json("lob_asin_json", "array<struct<lob_asin:string>>"))) df_result = df_parsed.withColumn("asin_lob_info", F.expr("transform(parse_asin_lob, x -> x.lob_asin)")) df = df_result.withColumn( "asin_lob_info", F.regexp_replace(F.concat_ws(",", "asin_lob_info"), "[{}]", "")).drop( "parse_asin_lob", "lob_asin_json") return df # 5. 处理配送方式、卖家所在地以及卖家所在地类型 def handle_asin_buy_box_seller_type(self, df): df = df.withColumn("seller_json_parsed", self.u_parse_seller_info(df.seller_json)) df = df.withColumn("buy_box_seller_type", df.seller_json_parsed.buy_box_seller_type).withColumn( "account_name", df.seller_json_parsed.account_name).drop("seller_json_parsed") df = df.join(self.df_seller_info, on=['seller_id'], how='left') df = df.withColumn("site_name_type", F.expr(""" CASE WHEN buy_box_seller_type = 1 THEN 4 WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%US%' THEN 1 WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%CN%' THEN 2 ELSE 3 END""")) return df # 6. 处理asin基础属性信息(长宽高重量等) def handle_asin_basic_attribute_info(self, df): # 1.解析ASIN重量相关信息 df = df.withColumn("weight_str", F.lower(F.col("weight_str"))).withColumn("asin_weight", self.u_parse_weight("weight_str", F.lit(self.site_name))).drop("weight_str") df = df.withColumn( "weight", F.when(df.asin_weight.getField("weight_type") == 'pounds', df.asin_weight.getField("weight")).otherwise(F.lit(0))).drop("asin_weight") # 2.处理重量类型 df = df.withColumn("weight_type", F.expr(""" CASE WHEN weight BETWEEN 0 AND 0.2 THEN 1 WHEN weight BETWEEN 0.2 AND 0.4 THEN 2 WHEN weight BETWEEN 0.4 AND 0.6 THEN 3 WHEN weight BETWEEN 0.6 AND 1 THEN 4 WHEN weight BETWEEN 1 AND 2 THEN 5 WHEN weight >= 2 THEN 6 ELSE 0 END""")) # 3.解析ASIN体积相关信息 df = df.withColumn("asin_volume", self.u_parse_volume("volume")) df = df.withColumn("asin_volume_type", df.asin_volume.getField("asin_volume_type")) \ .withColumn("asin_length", F.when(F.col("asin_volume_type") == 'inches', df.asin_volume.getField("length"))) \ .withColumn("asin_width", F.when(F.col("asin_volume_type") == 'inches', df.asin_volume.getField("width"))) \ .withColumn("asin_height", F.when(F.col("asin_volume_type") == 'inches', df.asin_volume.getField("height"))) \ .drop("asin_volume", "asin_volume_type") # 4.获取体积重/毛重相关信息 df = df.withColumn( "asin_weight_ratio", F.when( F.col("asin_length").isNotNull() & (F.col("asin_width").isNotNull()) & (F.col("asin_height").isNotNull()) & (F.col("weight") > 0), F.round(F.col("asin_length") * F.col("asin_width") * F.col("asin_height") * 3.2774128 / (F.col("weight") * 453.59), 3)) .otherwise(F.lit(-1))) # 5.处理尺寸类型 if self.site_name == 'us': expr_str = f""" CASE WHEN weight > 0 AND weight * 16 <= 16 AND asin_length > 0 AND asin_length <= 15 AND asin_width > 0 AND asin_width <= 12 AND asin_height > 0 AND asin_height <= 0.75 THEN 1 WHEN weight > 0 AND weight <= 20 AND asin_length > 0 AND asin_length <= 18 AND asin_width > 0 AND asin_width <= 14 AND asin_height > 0 AND asin_height <= 8 THEN 2 WHEN weight > 0 AND weight <= 70 AND asin_length > 0 AND asin_length <= 60 AND asin_width > 0 AND asin_width <= 30 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 130 THEN 3 WHEN weight > 0 AND weight <= 150 AND asin_length > 0 AND asin_length <= 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 130 THEN 4 WHEN weight > 0 AND weight <= 150 AND asin_length > 0 AND asin_length <= 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 165 THEN 5 WHEN weight > 150 AND asin_length > 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 > 165 THEN 6 ELSE 0 END""" else: expr_str = f""" CASE WHEN weight > 0 AND weight <= 100 AND asin_length > 0 AND asin_length <= 20 AND asin_width > 0 AND asin_width <= 15 AND asin_height > 0 AND asin_height <= 1 THEN 1 WHEN weight > 0 AND weight <= 500 AND asin_length > 0 AND asin_length <= 33 AND asin_width > 0 AND asin_width <= 23 AND asin_height > 0 AND asin_height <= 2.5 THEN 2 WHEN weight > 0 AND weight <= 1000 AND asin_length > 0 AND asin_length <= 33 AND asin_width > 0 AND asin_width <= 23 AND asin_height > 0 AND asin_height <= 5 THEN 3 WHEN weight > 0 AND weight <= 12000 AND asin_length > 0 AND asin_length <= 45 AND asin_width > 0 AND asin_width <= 34 AND asin_height > 0 AND asin_height <= 26 THEN 4 WHEN weight > 0 AND weight <= 2000 AND asin_length > 0 AND asin_length <= 61 AND asin_width > 0 AND asin_width <= 46 AND asin_height > 0 AND asin_height <= 46 THEN 5 WHEN asin_length > 0 AND asin_length <= 150 AND asin_length + asin_length + (asin_width + asin_height) <= 300 THEN 6 WHEN asin_length > 150 AND asin_length + asin_length + (asin_width + asin_height) > 300 THEN 7 ELSE 0 END""" df = df.withColumn("size_type", F.expr(expr_str)).drop("asin_length", "asin_width", "asin_height") return df # 7. 处理asin图片信息 def handle_asin_img_info(self, df): img_schema = ArrayType(ArrayType(StringType())) df = df.withColumn("img_list", F.from_json(F.col("img_list"), img_schema)) df_with_img = df.filter(F.size("img_list") > 0).select("asin", "img_list") df_with_img_attribute = df_with_img.select( "asin", F.explode("img_list").alias("img_attributes") ).select( "asin", F.col("img_attributes")[1].alias("img_url"), F.col("img_attributes")[2].alias("img_order_by"), F.col("img_attributes")[3].alias("data_type") ) df_with_img_attribute_agg = df_with_img_attribute.groupby("asin").agg( F.to_json(F.collect_list(F.struct(F.col("img_url"), F.col("img_order_by"), F.col("data_type")))).alias( "img_info") ) df = df.drop("img_list") df = df.join(df_with_img_attribute_agg, on=['asin'], how='left') return df # 8. 处理变体相关(ao及母体相关,自然占比及母体自然占比,各类型数量,月销信息等) def handle_asin_measure(self, df): df = CommonUtil.get_asin_variant_attribute(df_asin_detail=df, df_asin_measure=self.df_asin_measure, partition_num=self.repartition_num, use_type=1) # 是否数量变体类型和ao的类型 df = df.withColumn("quantity_variation_type", F.expr(""" CASE WHEN size is not null and size != '' and lower(size) like '%quantity%' THEN 1 ELSE 0 END""")).withColumn( "ao_val_type", F.expr(""" CASE WHEN asin_ao_val BETWEEN 0 AND 0.1 THEN 1 WHEN asin_ao_val BETWEEN 0.1 AND 0.2 THEN 2 WHEN asin_ao_val BETWEEN 0.2 AND 0.4 THEN 3 WHEN asin_ao_val BETWEEN 0.4 AND 0.8 THEN 4 WHEN asin_ao_val BETWEEN 0.8 AND 1.2 THEN 5 WHEN asin_ao_val BETWEEN 1.2 AND 2 THEN 6 WHEN asin_ao_val >= 2 THEN 7 ELSE 0 END""")) df = df.withColumnRenamed("asin_zr_counts", "zr_counts").withColumnRenamed("asin_ao_val", "ao_val") \ .withColumnRenamed("asin_zr_flow_proportion", "zr_flow_proportion") \ .withColumnRenamed("asin_amazon_orders", "asin_bought_month").drop("asin_st_counts", "asin_adv_counts") # 获取parent_asin下最新ASIN信息 df_parent_asin_info = df.filter("parent_asin is not null").select("parent_asin", "asin_vartion_list", "asinUpdateTime") parent_asin_window = Window.partitionBy(['parent_asin']).orderBy( F.desc_nulls_last("asinUpdateTime") ) df_parent_asin_info = df_parent_asin_info.withColumn("u_rank", F.row_number().over(window=parent_asin_window)) df_parent_asin_info = df_parent_asin_info.repartition(self.repartition_num) df_parent_asin_info = df_parent_asin_info.filter("u_rank = 1").drop("u_rank") df_asin_variat = df_parent_asin_info.filter(F.size("asin_vartion_list") > 0).\ select("parent_asin", "asinUpdateTime", F.explode("asin_vartion_list").alias("variant_attribute")).\ select("parent_asin", F.col("asinUpdateTime").alias("asin_crawl_date"), F.col("variant_attribute")[0].alias("asin"), F.col("variant_attribute")[1].alias("color"), F.col("variant_attribute")[3].alias("size"), F.col("variant_attribute")[5].alias("style")) df_asin_variat_agg = df_asin_variat.groupby(['parent_asin']).agg( F.first("asin_crawl_date").alias("asin_crawl_date"), F.concat_ws(',', F.collect_list("asin")).alias("variation_info"), F.to_json(F.collect_list(F.struct(F.col("color"), F.col("size"), F.col("style")))).alias("attr_info") ) print("导出父ASIN最新变体信息到doris:") df_doris = df_asin_variat_agg.select( "parent_asin", F.lit(self.date_info).alias("date_info"), "asin_crawl_date", "variation_info", "attr_info") table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info" DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns) df_doris.unpersist() return df # 9. 提取打包数量字段 def handle_asin_package_quantity(self, df): df = df.withColumn( "variat_attribute", F.concat_ws("&&&%", F.col("color"), F.col("style"), F.col("size"), F.col("material"))) df = df.withColumn("title_parse", self.u_parse_package_quantity(df.title)).withColumn( "variat_parse", self.u_parse_package_quantity(df.variat_attribute)) df = df.withColumn("title_package_quantity", df.title_parse.getField("parse_package_quantity")). \ withColumn("variat_package_quantity", df.variat_parse.getField("parse_package_quantity")). \ withColumn("title_package_quantity_is_abnormal", df.title_parse.getField("is_package_quantity_abnormal")). \ withColumn("variat_package_quantity_is_abnormal", df.variat_parse.getField("is_package_quantity_abnormal")). \ drop("title_parse", "variat_parse", "variat_attribute") df = df.withColumn( "package_quantity", F.expr(""" CASE WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity WHEN title_package_quantity is not null THEN title_package_quantity ELSE 1 END""") ).withColumn( "is_package_quantity_abnormal", F.expr(""" CASE WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity_is_abnormal WHEN title_package_quantity is not null THEN title_package_quantity_is_abnormal ELSE 2 END""") ).drop("title_package_quantity", "variat_package_quantity", "title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal") df = df.withColumn("title", F.lower(F.col("title"))) df = df.join(self.df_user_package_num, on=['asin', 'title'], how='left') df = df.withColumn("package_quantity", F.coalesce(F.col("user_package_num"), F.col("package_quantity"))). \ withColumn( "is_package_quantity_abnormal", F.coalesce(F.col("user_is_package_quantity_abnormal"), F.col("is_package_quantity_abnormal")) ).drop("user_package_num", "user_is_package_quantity_abnormal") return df # 10. 处理品牌标签、是否告警品牌、处理asin_lqs_rating信息 def handle_asin_lqs_and_brand(self, df): # 1.品牌标签以及是否告警品牌 df = df.withColumn("is_brand_label", F.expr("""CASE WHEN brand is not null THEN 1 ELSE 0 END""")) df = df.withColumn("brand", F.lower("brand")) df = df.join(self.df_alarm_brand_info, on=['brand'], how='left') df = df.withColumn("is_alarm_brand", F.when(F.col("is_alarm_brand").isNotNull(), F.col("is_alarm_brand")).otherwise(F.lit(0))) # 2. lqs评分 df = df.withColumn("category_node_rating", F.expr(f"""CASE WHEN asin_bs_cate_current_id is not null THEN 1 ELSE 0 END""")) \ .withColumn("zr_rating", F.expr(f"""CASE WHEN zr_counts > 0 THEN 0.5 ELSE 0 END""")) \ .withColumn("sp_rating", F.expr(f"""CASE WHEN sp_counts > 0 THEN 1 ELSE 0 END""")) \ .withColumn("a_add_rating", F.expr(f"""CASE WHEN img_type like '%3%' THEN 1 ELSE 0 END""")) \ .withColumn("video_rating", F.expr(f"""CASE WHEN img_type like '%2%' THEN 0.5 ELSE 0 END""")) \ .withColumn("brand_rating", F.expr(f"""CASE WHEN is_brand_label = 1 THEN 0.2 ELSE 0 END""")) \ .withColumn("product_describe_rating", F.expr(f"""CASE WHEN product_description is not null THEN 0.2 ELSE 0 END""")) \ .withColumn("highlight_rating", F.expr(f""" CASE WHEN describe is not null AND size(split(describe, '\\|-\\|')) <= 4 THEN size(split(describe, '\\|-\\|')) * 0.4 WHEN describe is not null AND size(split(describe, '\\|-\\|')) > 4 THEN 1.6 ELSE 0 END""")) \ .withColumn("title_len_rating", F.expr(f"""CASE WHEN title_len >= 50 AND title_len <=200 THEN 0.5 ELSE 0 END""")) \ .withColumn("title_brand_rating", F.expr(f""" CASE WHEN brand is not null AND lower(regexp_replace(title, '[^a-zA-Z0-9\\s]', '')) LIKE lower(regexp_replace(brand, '[^a-zA-Z0-9\\s]', '')) || '%' THEN 0.5 ELSE 0 END""")) \ .withColumn("img_num_rating", F.expr(f""" CASE WHEN img_num <= 4 THEN img_num * 0.5 WHEN img_num >4 THEN 2 ELSE 0 END""")) \ .withColumn("img_enlarge_rating", F.expr(f"""CASE WHEN image_view = 1 THEN 0.5 ELSE 0 END""")) df = df.withColumn( "asin_lqs_rating", (F.col("category_node_rating") + F.col("zr_rating") + F.col("sp_rating") + F.col("a_add_rating") + F.col("video_rating") + F.col("brand_rating") + F.col("product_describe_rating") + F.col("highlight_rating") + F.col("title_len_rating") + F.col("title_brand_rating") + F.col("img_num_rating") + F.col("img_enlarge_rating")).cast("double")).withColumn( "asin_lqs_rating_detail", F.to_json( F.struct(F.col("category_node_rating"), F.col("zr_rating"), F.col("sp_rating"), F.col("a_add_rating"), F.col("video_rating"), F.col("brand_rating"), F.col("product_describe_rating"), F.col("highlight_rating"), F.col("title_len_rating"), F.col("title_brand_rating"), F.col("img_num_rating"), F.col("img_enlarge_rating"))) ) df = df.drop("product_description", "describe", "image_view", "category_node_rating", "zr_rating", "sp_rating", "a_add_rating", "video_rating", "brand_rating", "product_describe_rating", "highlight_rating", "title_len_rating", "title_brand_rating", "img_num_rating", "img_enlarge_rating") return df # 11. 通过ASIN页面信息处理(评分类型、上架时间类型、电影标签、是否内部asin、是否隐藏分类、有效类型、必需ASIN、asin_type) def handle_asin_detail_all_type(self, df): # 1. 评分类型 df = df.withColumn("rating_type", F.expr(""" CASE WHEN rating >= 4.5 THEN 1 WHEN rating >= 4 AND rating < 4.5 THEN 2 WHEN rating >= 3.5 AND rating < 4 THEN 3 WHEN rating >= 3 AND rating < 3.5 THEN 4 WHEN rating < 3 AND rating >= 0 THEN 5 ELSE 0 END""")) # 2. 上架时间类型 df = df.join(self.df_asin_keep_date, on=['asin'], how='left') df = df.withColumn("launch_time", F.when(F.col("launch_time").isNull(), F.col("new_launch_time")).otherwise( F.col("launch_time"))) one_month = self.launch_time_interval_dict['one_month'] three_month = self.launch_time_interval_dict['three_month'] six_month = self.launch_time_interval_dict['six_month'] twelve_month = self.launch_time_interval_dict['twelve_month'] twenty_four_month = self.launch_time_interval_dict['twenty_four_month'] thirty_six_month = self.launch_time_interval_dict['thirty_six_month'] expr_str = f""" CASE WHEN launch_time >= '{one_month}' THEN 1 WHEN launch_time >= '{three_month}' AND launch_time < '{one_month}' THEN 2 WHEN launch_time >= '{six_month}' AND launch_time < '{three_month}' THEN 3 WHEN launch_time >= '{twelve_month}' AND launch_time < '{six_month}' THEN 4 WHEN launch_time >= '{twenty_four_month}' AND launch_time < '{twelve_month}' THEN 5 WHEN launch_time >= '{thirty_six_month}' AND launch_time < '{twenty_four_month}' THEN 6 WHEN launch_time < '{thirty_six_month}' THEN 7 ELSE 0 END""" df = df.withColumn("launch_time_type", F.expr(expr_str)) # 3. 电影标签 movie_label_list = ['prime video', 'dvd', 'blu-ray', 'kindle', 'app', 'paperback', 'audible audiobook', 'kindle edition', 'kindle & comixology', 'hardcover', 'comic', 'multi-format', '4k', 'library binding', 'vinyl', 'audio cd', 'mp3 music', 'single issue magazine', 'print magazine', 'unknown binding'] df = df.join(self.df_asin_label_info, on=['asin'], how='left') condition = reduce( lambda acc, keyword: acc | F.expr(f"exists(asin_label_list, x -> x like '%{keyword}%')"), movie_label_list, F.lit(False) ) df = df.withColumn("is_movie_label", condition.cast("int")).drop("asin_label_list") # 4. 是否内部asin、是否隐藏分类 df = df.join(self.df_self_asin_info, on=['asin'], how='left') df = df.withColumn( "is_self_asin", F.when(F.col("is_self_asin").isNotNull(), F.col("is_self_asin")).otherwise(F.lit(0))) df = df.join(self.df_hide_category, on=['asin_bs_cate_current_id'], how='left') df = df.na.fill({"hide_flag": 0}) df = df.withColumn("is_hide_asin", F.expr(""" CASE WHEN hide_flag = 1 THEN 1 WHEN asin_bs_cate_1_id = 'grocery' and asin_bs_cate_current_id != '6492272011' THEN 1 WHEN asin_bs_cate_current_id in ('21393128011', '21377129011', '21377127011', '21377130011', '21388218011', '21377132011') THEN 1 ELSE 0 END""")).drop("hide_flag") # 5. 有效类型 df = df.join(self.df_asin_bsr_end, on=['asin_bs_cate_1_id'], how='left') df = df.withColumn("bsr_type", F.expr(""" CASE WHEN limit_rank is null and asin_bs_cate_1_rank <= 500000 THEN 1 WHEN limit_rank is not null and asin_bs_cate_1_rank <= limit_rank THEN 1 ELSE 0 END""" )).drop("limit_rank") # 5. 是否必需ASIN df = df.withColumn("is_need_asin", F.expr(""" CASE WHEN asin_bs_cate_1_id in ('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed') THEN 1 WHEN asin NOT LIKE 'B0%' THEN 1 ELSE 0 END""")) # 6. asin_type df = df.withColumn("asin_type", F.expr(""" CASE WHEN is_self_asin=1 THEN 1 WHEN is_need_asin=1 THEN 2 WHEN is_hide_asin=1 THEN 3 ELSE 0 END""" )).drop("is_self_asin", "is_need_asin", "is_hide_asin") return df # 12. 处理变化率相关字段 def handle_asin_attribute_change(self, df): # 处理ASIN维度的变化率信息 df = df.join(self.df_previous_flow_asin, on=['asin'], how='left') columns_to_change = [ ("ao_val", "previous_asin_ao_val", "ao"), ("price", "previous_asin_price", "price"), ("asin_bs_cate_1_rank", "previous_first_category_rank", "rank"), ("bsr_orders", "previous_asin_bsr_orders", "bsr_orders"), ("rating", "previous_asin_rating", "rating"), ("total_comments", "previous_asin_total_comments", "comments"), ("variat_num", "previous_asin_variation_num", "variation"), ("bsr_orders_sale", "previous_sales", "sales") ] def calculate_change(current_col, previous_col): rise_col = F.col(current_col) - F.col(previous_col) change_col = F.when((F.col(previous_col).isNotNull()) & (F.col(previous_col) != 0), F.round((F.col(current_col) - F.col(previous_col)) / F.col(previous_col), 4) ).otherwise(None) return rise_col, change_col for current_col, previous_col, suffix in columns_to_change: rise_col, change_col = calculate_change(current_col, previous_col) if suffix == 'ao': df = df.withColumn(f"{suffix}_rise", F.round(rise_col, 3)) elif suffix in ['price', 'sales']: df = df.withColumn(f"{suffix}_rise", F.round(rise_col, 2)) elif suffix == 'rating': df = df.withColumn(f"{suffix}_rise", F.round(rise_col, 1)) else: df = df.withColumn(f"{suffix}_rise", rise_col.cast(IntegerType())) df = df.withColumn(f"{suffix}_change", F.round(change_col, 4)) df = df.drop(previous_col) return df # 13. 字段标准化 def handle_column_name(self, df): df = df.withColumnRenamed("asin_bs_cate_1_id", "category_first_id")\ .withColumnRenamed("asin_bs_cate_current_id", "category_id") \ .withColumnRenamed("asin_bs_cate_1_rank", "first_category_rank")\ .withColumnRenamed("asin_bs_cate_current_rank", "current_category_rank") \ .withColumnRenamed("variat_num", "variation_num")\ .withColumnRenamed("seller_id", "account_id").withColumnRenamed("seller_country_name", "site_name") \ .withColumnRenamed("asinUpdateTime", "asin_crawl_date")\ .withColumnRenamed("customer_reviews_json", "product_features")\ .withColumn("collapse_asin", F.coalesce(F.col("parent_asin"), F.col("asin")))\ .withColumn("bsr_best_orders_type", F.lit(-1)) df_save = df.select("asin", "ao_val", "zr_counts", "sp_counts", "sb_counts", "vi_counts", "bs_counts", "ac_counts", "tr_counts", "er_counts", "bsr_orders", "bsr_orders_sale", "title", "title_len", "price", "rating", "total_comments", "buy_box_seller_type", "page_inventory", "volume", "weight", "color", "size", "style", "material", "launch_time", "img_num", "parent_asin", "img_type", "img_url", "activity_type", "one_two_val", "three_four_val", "five_six_val", "eight_val", "brand", "variation_num", "one_star", "two_star", "three_star", "four_star", "five_star", "low_star", "together_asin", "account_name", "account_id", "rank_rise", "rank_change", "ao_rise", "ao_change", "price_rise", "price_change", "rating_rise", "rating_change", "comments_rise", "comments_change", "bsr_orders_rise", "bsr_orders_change", "sales_rise", "sales_change", "variation_rise", "variation_change", "size_type", "rating_type", "site_name_type", "weight_type", "launch_time_type", "ao_val_type", "rank_type", "price_type", "bsr_type", "bsr_best_orders_type", "quantity_variation_type", "package_quantity", "is_movie_label", "is_brand_label", "is_alarm_brand", "asin_type", "asin_crawl_date", "category_first_id", "category_id", "first_category_rank", "current_category_rank", "asin_weight_ratio", "site_name", "asin_bought_month", "asin_lqs_rating", "asin_lqs_rating_detail", "asin_lob_info", "is_contains_lob_info", "is_package_quantity_abnormal", "category", "zr_flow_proportion", "matrix_flow_proportion", "matrix_ao_val", "product_features", "img_info", "collapse_asin", F.col("follow_sellers").alias("follow_sellers_count"), "seller_json") df_save = df_save.na.fill( {"zr_counts": 0, "sp_counts": 0, "sb_counts": 0, "vi_counts": 0, "bs_counts": 0, "ac_counts": 0, "tr_counts": 0, "er_counts": 0, "title_len": 0, "total_comments": 0, "variation_num": 0, "img_num": 0, "one_two_val": 0.0, "three_four_val": 0.0, "five_six_val": 0.0, "eight_val": 0.0, "one_star": 0, "two_star": 0, "three_star": 0, "four_star": 0, "five_star": 0, "low_star": 0, "size_type": 0, "rating_type": 0, "site_name_type": 0, "weight_type": 0, "launch_time_type": 0, "ao_val_type": 0, "rank_type": 0, "price_type": 0, "quantity_variation_type": 0, "package_quantity": 1, "is_movie_label": 0, "is_brand_label": 0, "is_alarm_brand": 0, "asin_lqs_rating": 0.0, "follow_sellers_count": -1} ) print("asin的标准信息:") df_save.show(10, truncate=False) return df_save def read_data(self): print("1. 读取上个维度的flow_asin") sql = f""" select asin, asin_ao_val as previous_asin_ao_val, asin_price as previous_asin_price, variation_num as previous_asin_variation_num, asin_rating as previous_asin_rating, asin_total_comments as previous_asin_total_comments, first_category_rank as previous_first_category_rank, bsr_orders as previous_asin_bsr_orders, sales as previous_sales from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.previous_date}' """ print("sql=", sql) self.df_previous_flow_asin = self.spark.sql(sqlQuery=sql) if self.df_previous_flow_asin.count() <= 1: print("该历史节点数据不全,调整到上上个月") sql = f""" select asin, first_category_rank as previous_first_category_rank, round(asin_ao_val, 3) as previous_asin_ao_val, asin_price as previous_asin_price, bsr_orders as previous_bsr_orders, asin_rating as previous_asin_rating, asin_total_comments as previous_asin_total_comments, sales as previous_sales, variation_num as previous_variation_num from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.previous_two_date}' """ print("sql=", sql) self.df_previous_flow_asin = self.spark.sql(sqlQuery=sql) self.df_previous_flow_asin = self.df_previous_flow_asin.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_previous_flow_asin.show(10, truncate=False) print("2. 获取卖家相关信息") sql = f""" select fd_unique as seller_id, upper(fd_country_name) as seller_country_name from dim_fd_asin_info where site_name='{self.site_name}' and fd_unique is not null group by fd_unique, fd_country_name""" print("sql=", sql) self.df_seller_info = self.spark.sql(sqlQuery=sql) self.df_seller_info = self.df_seller_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_seller_info.show(10, truncate=False) print("3. 读取内部asin信息") sql = f"""select asin, 1 as is_self_asin from {self.site_name}_self_asin group by asin""" print("sql=", sql) mysql_con_info = DBUtil.get_connection_info(db_type='mysql', site_name=self.site_name) if mysql_con_info is not None: df_self_asin_info = SparkUtil.read_jdbc_query( session=self.spark, url=mysql_con_info['url'], pwd=mysql_con_info['pwd'], username=mysql_con_info['username'], query=sql) self.df_self_asin_info = F.broadcast(df_self_asin_info) self.df_self_asin_info.show(10, truncate=False) print("4. 读取告警品牌信息") sql = f""" select brand, 1 as is_alarm_brand from (select lower(trim(brand_name)) as brand from brand_alert_erp where brand_name is not null) t group by brand""" print("sql=", sql) if self.site_name == 'us': pg_cluster_con_info = DBUtil.get_connection_info(db_type="postgresql_cluster", site_name=self.site_name) if pg_cluster_con_info is not None: df_alarm_brand_info = SparkUtil.read_jdbc_query( session=self.spark, url=pg_cluster_con_info['url'], pwd=pg_cluster_con_info['pwd'], username=pg_cluster_con_info['username'], query=sql) self.df_alarm_brand_info = F.broadcast(df_alarm_brand_info) self.df_alarm_brand_info.show(10, truncate=False) else: schema = StructType([ StructField("brand", StringType(), True), StructField("is_alarm_brand", IntegerType(), True) ]) self.df_alarm_brand_info = self.spark.createDataFrame([], schema) print("5. 读取隐藏分类信息") sql = f""" select category_id_base as asin_bs_cate_current_id, 1 as hide_flag from us_bs_category_hide group by category_id_base """ print("sql=", sql) us_mysql_con_info = DBUtil.get_connection_info(db_type='mysql', site_name='us') if us_mysql_con_info is not None: df_hide_category = SparkUtil.read_jdbc_query( session=self.spark, url=us_mysql_con_info['url'], pwd=us_mysql_con_info['pwd'], username=us_mysql_con_info['username'], query=sql) self.df_hide_category = F.broadcast(df_hide_category) self.df_hide_category.show(10, truncate=False) print("6. 读取asin_label信息") sql = f""" select asin, label from (select asin, lower(label) as label, created_time,row_number() over(partition by asin,label order by updated_time desc) as crank from ods_other_search_term_data where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and trim(label) not in ('null','') and label is not null) t where t.crank=1 """ print("sql=", sql) self.df_asin_label_info = self.spark.sql(sqlQuery=sql) if self.df_asin_label_info.count() <= 1: print("该历史节点数据不全,调整到上上个月") sql = f""" select asin, label from (select asin, lower(label) as label, created_time,row_number() over(partition by asin,label order by updated_time desc) as crank from ods_other_search_term_data where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.previous_date}' and trim(label) not in ('null','') and label is not null) t where t.crank=1 """ print("sql=", sql) self.df_asin_label_info = self.spark.sql(sqlQuery=sql) self.df_asin_label_info = self.df_asin_label_info.groupby(['asin']).agg( F.collect_set("label").alias("asin_label_list")) self.df_asin_label_info = self.df_asin_label_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_asin_label_info.show(10, truncate=False) print("7. 读取dwd_asin_measure拿取ao及各类型数量") sql = f""" select asin, asin_sp_counts as sp_counts, (asin_sb1_counts + asin_sb2_counts) as sb_counts, asin_sb3_counts as vi_counts, asin_bs_counts as bs_counts, asin_ac_counts as ac_counts, asin_tr_counts as tr_counts, asin_er_counts as er_counts, asin_st_counts, asin_zr_counts, asin_adv_counts, round(asin_zr_flow_proportion, 3) as asin_zr_flow_proportion, round(asin_ao_val, 3) as asin_ao_val, asin_amazon_orders from dwd_asin_measure where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' """ print("sql=", sql) self.df_asin_measure = self.spark.sql(sqlQuery=sql) self.df_asin_measure = self.df_asin_measure.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_asin_measure.show(10, truncate=False) print("8. 读取one_category_report表") if int(self.year) == 2022 and int(self.month) < 3: sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as bsr_orders from ods_one_category_report " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='2022-12'" else: sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as bsr_orders from ods_one_category_report " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'" print("sql=", sql) self.df_bs_report = self.spark.sql(sqlQuery=sql) self.df_bs_report = self.df_bs_report.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_bs_report.show(10, truncate=False) print("9. 读取keep_date获取上架时间") sql = f""" select asin, new_launch_time from (select asin, launch_time as new_launch_time, row_number() over(partition by asin order by updated_at desc) as trank from ods_asin_keep_date where site_name='{self.site_name}' and state=3) t where t.trank=1 """ print("sql=", sql) self.df_asin_keep_date = self.spark.sql(sqlQuery=sql) self.df_asin_keep_date = self.df_asin_keep_date.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_asin_keep_date.show(10, truncate=False) print("10. 读取bsr有效排名信息") sql = f"""select rank as limit_rank, category_id as asin_bs_cate_1_id from {self.site_name}_bsr_end""" print("sql=", sql) if mysql_con_info is not None: df_asin_bsr_end = SparkUtil.read_jdbc_query( session=self.spark, url=mysql_con_info['url'], pwd=mysql_con_info['pwd'], username=mysql_con_info['username'], query=sql) self.df_asin_bsr_end = F.broadcast(df_asin_bsr_end) self.df_asin_bsr_end.show(10, truncate=False) print("11. 通过node_id获取一级分类进行分类补充") df_asin_new_cate = get_node_first_id_df(self.site_name, self.spark) self.df_asin_new_cate = F.broadcast(df_asin_new_cate) self.df_asin_new_cate.show(10, truncate=False) print("12. 获取用户修改打包数量信息") pg_con_info = DBUtil.get_connection_info("postgresql", "us") sql = f""" WITH ranked_edit_logs AS (SELECT edit_key_id, lower(val_related_info) as val_related_info, val_after, ROW_NUMBER() OVER (PARTITION BY edit_key_id ORDER BY create_time DESC) AS rn FROM sys_edit_log WHERE module = '流量选品' AND filed = 'package_quantity' AND site_name='{self.site_name}') SELECT edit_key_id as asin, val_related_info as title, cast(val_after as int) as user_package_num, 0 as user_is_package_quantity_abnormal FROM ranked_edit_logs WHERE rn = 1""" if pg_con_info is not None: df_user_package_num = SparkUtil.read_jdbc_query( session=self.spark, url=pg_con_info['url'], pwd=pg_con_info['pwd'], username=pg_con_info['username'], query=sql) self.df_user_package_num = F.broadcast(df_user_package_num) self.df_user_package_num.show(10, truncate=False) print("14. 获取分类ID与分类名称的对应关系") self.df_asin_category = get_first_id_from_category_desc_df(self.site_name, self.spark) self.df_asin_category = self.df_asin_category.withColumn( "category_first_name", F.lower("category_first_name") ).repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_asin_category.show(10, truncate=False) # 字段处理逻辑综合 def handle_all_field(self, df): # 1. 处理asin分类及排名以及排名类型字段 df = self.handle_asin_bs_category_info(df) # 2. 利用node_id进行分类补充 df = self.handle_asin_category_supplement(df) # 3. 处理bsr销量及销售额信息以及价格类型字段 df = self.handle_asin_bsr_orders(df) # 4. 解析Make-It-A-Bundle信息 df = self.handle_asin_lob_info(df) # 5. 处理配送方式、卖家所在地以及卖家所在地类型 df = self.handle_asin_buy_box_seller_type(df) # 6. 处理asin基础属性信息(长宽高重量等) df = self.handle_asin_basic_attribute_info(df) # 7. 处理asin图片信息 df = self.handle_asin_img_info(df) # 8. 处理变体相关(ao及母体相关,自然占比及母体自然占比,各类型数量,月销信息等) df = self.handle_asin_measure(df) # 9. 提取打包数量字段 df = self.handle_asin_package_quantity(df) # 10. 处理品牌标签、是否告警品牌、处理asin_lqs_rating信息 df = self.handle_asin_lqs_and_brand(df) # 11.通过ASIN页面信息处理(评分类型、上架时间类型、电影标签、ASIN类型、有效类型) df = self.handle_asin_detail_all_type(df) # 12. 处理变化率相关字段 df = self.handle_asin_attribute_change(df) # 13. 字段标准化 df_save = self.handle_column_name(df) return df_save # 写入es前的准备工作 def es_prepare(self): # 创建对应es索引 EsUtils.create_index(self.es_index_name, self.client, self.es_index_body) print("索引名称为:", self.es_index_name) if not EsUtils.exist_index_alias(self.es_index_alias_name, self.client): EsUtils.create_index_alias(self.es_index_name, self.es_index_alias_name, self.client) else: index_name_list = EsUtils.get_index_names_associated_alias(self.es_index_alias_name, self.client) if self.es_index_name not in index_name_list: EsUtils.delete_index_alias(self.es_index_alias_name, self.client) EsUtils.create_index_alias(self.es_index_name, self.es_index_alias_name, self.client) else: pass # 写入elasticsearch逻辑 def save_to_es(self, df, batch_num): print("插入当前批次数据, 插入的数量量为: " + str(batch_num)) start_time = time.time() df_asin_latest_detail = df. \ select("asin", F.col("ao_val").alias("asin_ao_val"), F.col("title").alias("asin_title"), F.col("title_len").alias("asin_title_len"), F.col("category").alias("asin_category_desc"), F.col("volume").alias("asin_volume"), F.col("weight").alias("asin_weight"), F.col("launch_time").alias("asin_launch_time"), F.col("brand").alias("asin_brand_name"), "one_star", "two_star", "three_star", "four_star", "five_star", "low_star", "account_name", "account_id", F.col("site_name").alias("seller_country_name"), "category_first_id", "parent_asin", "variation_num", "img_info", "asin_crawl_date", F.col("price").alias("asin_price"), F.col("rating").alias("asin_rating"), F.col("total_comments").alias("asin_total_comments"), "matrix_ao_val", "zr_flow_proportion", "matrix_flow_proportion", F.lit(self.date_info).alias("date_info"), "img_url", F.col("category_id").alias("category_current_id"), F.col("first_category_rank").alias("category_first_rank"), F.col("current_category_rank").alias("category_current_rank"), "asin_type", "bsr_orders", "bsr_orders_sale", "page_inventory", "asin_bought_month", "seller_json", "buy_box_seller_type") df = df.drop("category", "seller_json") df.write.format("org.elasticsearch.spark.sql").options(**self.es_options).mode("append").save() end_time = time.time() elapsed_time = end_time - start_time print("当前插入时长为:" + str(elapsed_time)) # ASIN最新详情的信息 if self.consumer_type == 'latest' and self.test_flag == 'normal': print("导出ASIN最新详情信息到doris:") table_columns = """asin, asin_ao_val, asin_title, asin_title_len, asin_category_desc, asin_volume, asin_weight, asin_launch_time, asin_brand_name, one_star, two_star, three_star, four_star, five_star, low_star, account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info, asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion, date_info, img_url, category_current_id, category_first_rank, category_current_rank, asin_type, bsr_orders, bsr_orders_sale, page_inventory, asin_bought_month, seller_json, buy_box_seller_type""" DorisHelper.spark_export_with_columns(df_save=df_asin_latest_detail, db_name=self.doris_db, table_name=self.asin_latest_detail_table, table_columns=table_columns) df_asin_latest_detail.unpersist() # 实时消费中批次数据的处理逻辑 def handle_kafka_stream(self, df, batch_id): try: batch_num = df.count() if batch_num > 0: start_time = time.time() print("当前批次:" + str(batch_id) + "; 该批次数据量为:" + str(batch_num)) df = df.repartition(self.repartition_num) batch_id = int(batch_id) + self.initial_batch_id df_save = self.handle_all_field(df) self.es_prepare() self.save_to_es(df_save, batch_num) df_save.unpersist() end_time = time.time() print("当前批次:" + str(batch_id) + "执行完毕, 执行时长为:" + str(end_time - start_time)) else: print("当前批次没有数据") except Exception as e: print(e, traceback.format_exc()) # 消费主题下的所有历史数据 def handle_kafka_history(self, kafka_df): print("处理kafka历史数据") batch_num = kafka_df.count() if batch_num > 0: self.history_batch_id = self.history_batch_id + 1 start_time = time.time() kafka_df = kafka_df.repartition(self.repartition_num) kafka_df = self.handle_all_field(kafka_df) self.es_prepare() self.save_to_es(kafka_df, batch_num) end_time = time.time() print("该批次数据处理完毕, 执行时长为:" + str(end_time - start_time)) else: raise ValueError("当前主题中没有数据,请注意检查!") if __name__ == '__main__': arguments = sys.argv[1:] site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 consumer_type = sys.argv[4] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 if len(arguments) == 5: test_flag = sys.argv[5] else: test_flag = 'normal' handle_obj = KafkaFlowAsinDetail(site_name=site_name, date_type=date_type, date_info=date_info, consumer_type=consumer_type, test_flag=test_flag, batch_size=200000) handle_obj.run_kafka()