""" @Author : HuangJian @Description : 关键词与Asin详情维表 @SourceTable : ①ods_asin_keep_date ②ods_asin_variat ③ods_asin_detail ④dwd_bs_category_asin @SinkTable : dim_asin_detail @CreateTime : 2022/11/14 15:56 @UpdateTime : 2022/11/14 15:56 """ import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.sql.window import Window from pyspark.sql import functions as F from utils.spark_util import SparkUtil from pyspark.sql.types import * from yswg_utils.common_udf import udf_handle_string_null_value as myUDF from yswg_utils.common_df import get_node_first_id_df, get_first_id_from_category_desc_df from utils.common_util import CommonUtil, DateTypes from yswg_utils.common_udf import udf_new_asin_flag from utils.hdfs_utils import HdfsUtils from utils.db_util import DBUtil from datetime import datetime from pyspark.storagelevel import StorageLevel from utils.DorisHelper import DorisHelper from yswg_utils.common_udf import udf_get_package_quantity_with_flag as udf_get_package_quantity, udf_parse_seller_json, udf_parse_amazon_orders class DimAsinDetail(object): def __init__(self, site_name, date_type, date_info): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.hive_tb = f'dim_asin_detail' self.partition_dict = { "site_name": site_name, "date_type": date_type, "date_info": date_info } # 落表路径校验 self.hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=self.partition_dict) # 创建spark_session对象相关 app_name = f"{self.__class__.__name__}:{site_name}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.complete_date_info_tuple = self.get_complete_week_tuple() self.date_sql = self.date_sql_padding() self.launch_time_upper_limit = self.get_effective_launch_time() self.launch_time_lower_limit = "1995-01-01" self.cal_date = CommonUtil.get_calDay_by_dateInfo(self.spark, self.date_type, self.date_info) self.partitions_by = ['site_name', 'date_type', 'date_info'] self.partition_num = CommonUtil.reset_partitions(self.site_name, partitions_num=80) # Doris相关参数 self.doris_db = "selection" self.parent_asin_latest_detail_table = f"{self.site_name}_parent_asin_latest_detail" # 自定义df相关 self.df_asin_keep_date = self.spark.sql(f"select 1+1;") self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_asin_label = self.spark.sql("select 1+1;") self.df_asin_weight_and_volume = self.spark.sql("select 1+1;") self.df_asin_new_cate = self.spark.sql("select 1+1;") self.df_user_package_num = self.spark.sql(f"select 1+1;") self.df_self_asin = self.spark.sql(f"select 1+1;") self.df_asin_category = self.spark.sql(f"select 1+1;") self.df_asin_variat = self.spark.sql(f"select 1+1;") # 调用公用udf函数 self.udf_new_asin_flag = F.udf(udf_new_asin_flag, IntegerType()) self.handle_string_num_value = F.udf(myUDF, StringType()) seller_schema = StructType([ StructField("buy_box_seller_type", IntegerType(), True), StructField("account_name", StringType(), True), StructField("account_id", StringType(), True) ]) self.u_parse_seller_info = self.spark.udf.register('u_parse_seller_info', udf_parse_seller_json, seller_schema) package_schema = StructType([ StructField("parse_package_quantity", IntegerType(), True), StructField("is_package_quantity_abnormal", IntegerType(), True), ]) self.u_judge_package_quantity = self.spark.udf.register('u_judge_package_quantity', udf_get_package_quantity, package_schema) self.u_parse_amazon_orders = F.udf(udf_parse_amazon_orders, IntegerType()) def get_complete_week_tuple(self): complete_date_info_tuple = None df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;") df = df_date.toPandas() if self.date_type == 'week': complete_date_info_tuple = f"('{self.date_info}')" elif self.date_type == '4_week': print(self.date_info) df_loc = df.loc[(df.year_week == f"{self.date_info}") & (df.week_day == 1)] cur_id = list(df_loc.id)[0] df_loc = df.loc[df.id == int(cur_id)] week1 = list(df_loc.year_week)[0] df_loc = df.loc[df.id == int(cur_id) - 7] week2 = list(df_loc.year_week)[0] df_loc = df.loc[df.id == int(cur_id) - 14] week3 = list(df_loc.year_week)[0] df_loc = df.loc[df.id == int(cur_id) - 21] week4 = list(df_loc.year_week)[0] complete_date_info_tuple = (week1, week2, week3, week4) elif self.date_type in ['month', 'month_week']: df_loc = df.loc[(df.year_month == f"{self.date_info}") & (df.week_day == 1)] complete_date_info_tuple = tuple(df_loc.year_week) print("self.complete_date_info_tuple:", complete_date_info_tuple) return complete_date_info_tuple def date_sql_padding(self): if 'us' == self.site_name: if self.date_type == DateTypes.month_week.name: date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'" elif self.date_type == DateTypes.month.name and self.date_info >= '2023-10': date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'" else: date_sql = f"and date_type='week' and date_info in {self.complete_date_info_tuple}" elif self.site_name in ['uk', 'de']: if self.date_type in [DateTypes.month.name, DateTypes.month_week.name] and self.date_info >= '2024-05': date_sql = f" and date_type='{self.date_type}' and date_info='{self.date_info}'" else: date_sql = f" and date_type='week' and date_info in {self.complete_date_info_tuple}" print(date_sql) return date_sql def get_effective_launch_time(self): year_upper_limit = datetime.now().year + 1 launch_time_upper_limit = str(year_upper_limit) + "-01-01" return launch_time_upper_limit # 读取数据 def read_data(self): # 获取dim层的dim_asin_launchtime_info print("1. 获取dim层的dim_asin_launchtime_info填充的上架日期") sql = f"""select asin, asin_launch_time as populated_asin_launch_time from dim_asin_launchtime_info where site_name='{self.site_name}' and asin_launch_time is not null""" print(sql) self.df_asin_keep_date = self.spark.sql(sqlQuery=sql) self.df_asin_keep_date = self.df_asin_keep_date.repartition(100).persist(StorageLevel.DISK_ONLY) self.df_asin_keep_date.show(10, truncate=False) print("2. 获取ods_asin_detail") sql = f""" select asin, img_url as asin_img_url, lower(title) as asin_title, title_len as asin_title_len, price as asin_price, rating as asin_rating, total_comments as asin_total_comments, page_inventory as asin_page_inventory, category as asin_category_desc, launch_time as crawl_asin_launch_time, img_num as asin_img_num, img_type as asin_img_type, category_state as asin_category_state, material as asin_material, lower(brand) as asin_brand_name, activity_type as asin_activity_type, one_two_val as act_one_two_val, three_four_val as act_three_four_val, five_six_val as act_five_six_val, eight_val as act_eight_val, one_star, two_star, three_star, four_star, five_star, low_star, together_asin, ac_name, node_id, data_type as asin_data_type, variat_list, `describe` as asin_describe, follow_sellers as asin_follow_sellers, product_description, image_view as asin_image_view, spider_int as asin_spider_num, buy_sales, lob_asin_json as asin_lob_info, REGEXP_REPLACE(seller_json, chr(10), '') as seller_json, buy_box_seller_type as asin_buy_box_seller_type, customer_reviews_json, parent_asin, img_list, created_at as created_time, updated_at as updated_time, updated_at as dt, variat_num as variation_num from ods_asin_detail where site_name='{self.site_name}' {self.date_sql}""" print(sql) self.df_asin_detail = self.spark.sql(sqlQuery=sql) self.df_asin_detail = self.df_asin_detail.repartition(100).persist(StorageLevel.DISK_ONLY) self.df_asin_detail.show(10, truncate=False) print("3. 读取asin体积重量相关信息") sql = f""" SELECT asin, asin_volume, asin_volume_type, asin_length_sorted as asin_length, asin_width_sorted as asin_width, asin_height_sorted as asin_height, asin_weight, asin_weight_str, asin_weight_type FROM dim_asin_stable_info WHERE site_name='{self.site_name}'""" print(sql) self.df_asin_weight_and_volume = self.spark.sql(sqlQuery=sql) self.df_asin_weight_and_volume = self.df_asin_weight_and_volume.repartition(100).persist(StorageLevel.DISK_ONLY) self.df_asin_weight_and_volume.show(10, truncate=False) print("4. 获取用户修改打包数量信息") pg_con_info = DBUtil.get_connection_info("postgresql", "us") sql = f""" WITH ranked_edit_logs AS (SELECT edit_key_id, lower(val_related_info) AS val_related_info, val_after, ROW_NUMBER() OVER (PARTITION BY edit_key_id ORDER BY create_time DESC) AS rn FROM sys_edit_log WHERE module = '流量选品' AND filed = 'package_quantity' and site_name='{self.site_name}') SELECT edit_key_id AS asin, val_related_info AS asin_title, cast(val_after as int) AS user_package_num, 0 AS user_is_package_quantity_abnormal FROM ranked_edit_logs WHERE rn = 1""" print(sql) if pg_con_info is not None: df_user_package_num = SparkUtil.read_jdbc_query(session=self.spark, url=pg_con_info['url'], pwd=pg_con_info['pwd'], username=pg_con_info['username'], query=sql) self.df_user_package_num = F.broadcast(df_user_package_num) self.df_user_package_num.show(10, truncate=False) print("5.读取ods_self_asin,获得公司内部asin信息") sql = f"""select asin, 1 as asin_is_self from ods_self_asin where site_name='{self.site_name}' group by asin""" print("sql:" + sql) df_self_asin = self.spark.sql(sqlQuery=sql) self.df_self_asin = F.broadcast(df_self_asin) self.df_self_asin.show(10, truncate=False) print("6. node_id对应的头部分类信息") self.df_asin_new_cate = get_node_first_id_df(self.site_name, self.spark) self.df_asin_new_cate = self.df_asin_new_cate.filter('node_id is not null').persist(StorageLevel.DISK_ONLY) self.df_asin_new_cate.show(10, truncate=False) print("7. 获取asin的标签信息") sql = f""" select asin, asin_label_list, asin_label_type as asin_is_movie from dim_asin_label where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' """ print("sql:" + sql) self.df_asin_label = self.spark.sql(sqlQuery=sql) self.df_asin_label = self.df_asin_label.repartition(100).persist(StorageLevel.DISK_ONLY) self.df_asin_label.show(10, truncate=False) print("8. 获取分类id和分类名称的对应关系") self.df_asin_category = get_first_id_from_category_desc_df(self.site_name, self.spark) self.df_asin_category = self.df_asin_category.withColumn( "category_first_name", F.lower("category_first_name") ).repartition(100).persist(StorageLevel.DISK_ONLY) self.df_asin_category.show(10, truncate=False) if self.date_type in ['month', 'month_week'] and self.date_info < '2024-06': sql = f""" SELECT asin, parent_asin, color as asin_color, `size` as asin_size, style as asin_style, CASE WHEN state = 1 THEN 1 WHEN state = 2 THEN 0 ElSE NULL END as asin_is_sale, updated_time FROM dim_asin_variation_info where site_name='{self.site_name}' and created_date >='2024-05-21' """ print("sql:" + sql) self.df_asin_variat = self.spark.sql(sqlQuery=sql) self.df_asin_variat = self.df_asin_variat.repartition(100).persist(StorageLevel.DISK_ONLY) self.df_asin_variat.show(10, truncate=False) else: pass # asin详情数据去重 def handle_df_duplicated(self): # 12月数据标记多个data_type用于区分改asin由什么场景抓取 df_asin_datatype_handle = self.df_asin_detail.select("asin", "asin_data_type") df_asin_datatype_handle_agg = df_asin_datatype_handle.groupby(["asin"]).agg( F.concat_ws(",", F.collect_set("asin_data_type")).alias("asin_data_type")) # asin窗口内排序,按照dt降序 window = Window.partitionBy(['asin']).orderBy(self.df_asin_detail.dt.desc_nulls_last()) self.df_asin_detail = self.df_asin_detail.withColumn("dt_rank", F.row_number().over(window=window)) # 取按asin分组的组内第一条,就是去重后的最新asin_detail self.df_asin_detail = self.df_asin_detail.filter("dt_rank=1").drop("dt", "dt_rank", "asin_data_type") self.df_asin_detail = self.df_asin_detail.repartition(100) df_asin_datatype_handle_agg = df_asin_datatype_handle_agg.repartition(100) self.df_asin_detail = self.df_asin_detail.join(df_asin_datatype_handle_agg, on=['asin'], how='left').persist(StorageLevel.MEMORY_ONLY) print("asin详情去重后的数量为: ", self.df_asin_detail.count()) if self.date_type == 'month' and self.date_info == '2024-03': sql = f""" select asin from dwd_asin_measure where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' group by asin""" df_effective_asin = self.spark.sql(sql) self.df_asin_detail = self.df_asin_detail.join(df_effective_asin, on=['asin'], how='inner') # 处理asin的亚马逊月销信息 def handle_asin_bought_month(self): self.df_asin_detail = self.df_asin_detail.withColumn("asin_bought_month", self.u_parse_amazon_orders(self.df_asin_detail.buy_sales)) # 处理asin的lob_info信息 def handle_asin_lob_info(self): self.df_asin_detail = self.df_asin_detail.withColumn( "is_contains_lob_info", F.when(F.col("asin_lob_info").isNotNull(), F.lit(1)).otherwise(F.lit(0))) df_asin_detail_parsed = self.df_asin_detail.withColumn( "parse_asin_lob", F.when(F.col("is_contains_lob_info") == 1, F.from_json("asin_lob_info", "array<struct<lob_asin:string>>"))) df_asin_detail_result = df_asin_detail_parsed.withColumn("asin_lob_info", F.expr("transform(parse_asin_lob, x -> x.lob_asin)")) self.df_asin_detail = df_asin_detail_result.withColumn( "asin_lob_info", F.regexp_replace(F.concat_ws(",", "asin_lob_info"), "[{}]", "")).drop("parse_asin_lob") # 处理asin的变体信息 def handle_asin_variation_attribute(self): if self.date_type in ['month', 'month_week'] and self.date_info >= '2024-06': print("执行新版的变体信息整合") variat_schema = ArrayType(ArrayType(StringType())) self.df_asin_detail = self.df_asin_detail.withColumn("variat_list_change", F.from_json(F.col("variat_list"), variat_schema)) df_asin_with_variation = self.df_asin_detail.filter(F.size("variat_list_change") > 0).\ select("asin", F.explode("variat_list_change").alias("variant_attribute")).\ select("asin", F.col("variant_attribute")[0].alias("son_asin"), F.col("variant_attribute")[1].alias("asin_color"), F.col("variant_attribute")[3].alias("asin_size"), F.col("variant_attribute")[4].alias("asin_is_sale"), F.col("variant_attribute")[5].alias("asin_style")) df_asin_with_variation = df_asin_with_variation.filter(F.col("asin") == F.col("son_asin")).drop("son_asin", "variant_attribute") df_asin_with_variation = df_asin_with_variation.withColumn( "asin_is_sale", F.when(F.col("asin_is_sale")==1, F.lit(1)).when(F.col("asin_is_sale")==2, F.lit(0)).otherwise(F.lit(None)) ) self.df_asin_detail = self.df_asin_detail.join( df_asin_with_variation, on=['asin'], how='left' ) elif self.date_type in ['month', 'month_week'] and self.date_info < '2024-06': print("执行历史数据的变体信息整合") window = Window.partitionBy(self.df_asin_variat.asin).orderBy( self.df_asin_variat.updated_time.desc_nulls_last()) self.df_asin_variat = self.df_asin_variat.withColumn("t_rank", F.row_number().over(window=window)) self.df_asin_variat = self.df_asin_variat.filter("t_rank = 1").drop("updated_time", "t_rank") self.df_asin_variat = self.df_asin_variat.repartition(100) self.df_asin_detail = self.df_asin_detail.drop("variation_num", "parent_asin") df_asin_variat = self.df_asin_variat.filter("parent_asin is not null").select("asin", "parent_asin") df_parent_asin_agg = df_asin_variat.groupby(['parent_asin']).agg(F.count("asin").alias("variation_num")) df_parent_asin_agg = df_parent_asin_agg.repartition(100) self.df_asin_variat = self.df_asin_variat.join(df_parent_asin_agg, on=['parent_asin'], how='left') self.df_asin_detail = self.df_asin_detail.join(self.df_asin_variat, on=['asin'], how='left').cache() self.df_asin_detail.show(10, truncate=False) self.df_asin_variat.unpersist() else: pass # 处理asin的配送方式信息 def handle_asin_buy_box_seller_type(self): if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') \ or (self.date_type == '4_week' and self.date_info >= '2024-21'): self.df_asin_detail = self.df_asin_detail.withColumn( 'seller_json', F.when( F.length(F.trim(F.regexp_replace('seller_json', chr(10), ''))) == 0, F.lit('') ).otherwise(F.regexp_replace('seller_json', chr(10), ''))) self.df_asin_detail = self.df_asin_detail.drop("asin_buy_box_seller_type") self.df_asin_detail = self.df_asin_detail.withColumn("seller_json_parsed", self.u_parse_seller_info(self.df_asin_detail.seller_json)) self.df_asin_detail = self.df_asin_detail.withColumn( "asin_buy_box_seller_type", self.df_asin_detail.seller_json_parsed.buy_box_seller_type).withColumn( "account_name", self.df_asin_detail.seller_json_parsed.account_name).withColumn( "account_id", self.df_asin_detail.seller_json_parsed.account_id).drop("seller_json_parsed") else: self.df_asin_detail = self.df_asin_detail.withColumn("account_id", F.lit(None)).\ withColumn("seller_json", F.lit(None)).withColumn("account_name", F.lit(None)) # 处理asin体积重量信息 def handle_asin_basic_attribute(self): self.df_asin_detail = self.df_asin_detail.repartition(100) df_weight = self.df_asin_weight_and_volume.filter("asin_weight is not null").select("asin", "asin_weight", "asin_weight_str", "asin_weight_type") self.df_asin_detail = self.df_asin_detail.join(df_weight, on=['asin'], how='left') if self.site_name == 'us': df_volume = self.df_asin_weight_and_volume.filter("asin_volume_type is not null and asin_volume_type not in ('cm', 'none')").select("asin", "asin_volume", "asin_length", "asin_width", "asin_height") else: df_volume = self.df_asin_weight_and_volume.filter("asin_volume_type is not null and asin_volume_type not in ('inches', 'none')").select("asin", "asin_volume", "asin_length", "asin_width", "asin_height") self.df_asin_detail = self.df_asin_detail.join(df_volume, on=['asin'], how='left').drop("asin_volume_type") self.df_asin_weight_and_volume.unpersist() # 打包数量解析 def get_package_quantity(self): self.df_asin_detail = self.df_asin_detail.repartition(100) self.df_user_package_num = self.df_user_package_num.repartition(100) self.df_asin_detail = self.df_asin_detail.withColumn("variat_attribute", F.concat_ws("&&&%", F.col("asin_color"), F.col("asin_style"), F.col("asin_size"), F.col("asin_material"))) self.df_asin_detail = self.df_asin_detail.withColumn( "title_parse", self.u_judge_package_quantity(self.df_asin_detail.asin_title)).withColumn( "variat_parse", self.u_judge_package_quantity(self.df_asin_detail.variat_attribute)) self.df_asin_detail = self.df_asin_detail.withColumn( "title_package_quantity", self.df_asin_detail.title_parse.getField("parse_package_quantity")).withColumn( "variat_package_quantity", self.df_asin_detail.variat_parse.getField("parse_package_quantity")).withColumn( "title_package_quantity_is_abnormal", self.df_asin_detail.title_parse.getField("is_package_quantity_abnormal") ).withColumn( "variat_package_quantity_is_abnormal", self.df_asin_detail.variat_parse.getField("is_package_quantity_abnormal") ).drop("title_parse", "variat_parse", "variat_attribute") self.df_asin_detail = self.df_asin_detail.withColumn( "package_quantity", F.expr(""" CASE WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity WHEN title_package_quantity is not null THEN title_package_quantity ELSE 1 END""")).withColumn( "is_package_quantity_abnormal", F.expr("""CASE WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity_is_abnormal WHEN title_package_quantity is not null THEN title_package_quantity_is_abnormal ELSE 2 END""")).drop("title_package_quantity", "variat_package_quantity", "title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal") self.df_asin_detail = self.df_asin_detail.join(self.df_user_package_num, on=['asin', 'asin_title'], how='left') self.df_asin_detail = self.df_asin_detail.withColumn( "package_quantity", F.coalesce(F.col("user_package_num"), F.col("package_quantity"))).withColumn( "is_package_quantity_abnormal", F.coalesce(F.col("user_is_package_quantity_abnormal"), F.col("is_package_quantity_abnormal"))).\ drop("user_package_num", "user_is_package_quantity_abnormal").persist(StorageLevel.MEMORY_ONLY) print("打包数量解析完毕") self.df_asin_detail.select("asin", "asin_title", "package_quantity", "is_package_quantity_abnormal").show(10, truncate=False) self.df_user_package_num.unpersist() # 补充asin的一级分类ID(通过node_id以及分类描述补充) def handle_asin_top_category(self): self.df_asin_detail = self.df_asin_detail.join(self.df_asin_new_cate, on=['node_id'], how='left') self.df_asin_detail = self.df_asin_detail.withColumn("category_id", F.col("node_id")) df_asin_with_category_desc = self.df_asin_detail.filter("category_first_id is null and asin_category_desc is not null").select("asin", "asin_category_desc") df_asin_with_category_desc = df_asin_with_category_desc.withColumn( "asin_category_split", F.split(F.col("asin_category_desc"), "›") ).withColumn("category_first_name", F.lower(F.col("asin_category_split").getItem(0))).drop("asin_category_split", "asin_category_desc") df_asin_with_category_desc = df_asin_with_category_desc.join(self.df_asin_category, on=['category_first_name'], how='inner') df_asin_with_category_desc = df_asin_with_category_desc.withColumnRenamed("category_first_id", "category_first_id_with_name").drop("category_first_name") self.df_asin_detail = self.df_asin_detail.join(df_asin_with_category_desc, on=['asin'], how='left') self.df_asin_detail = self.df_asin_detail.withColumn("category_first_id", F.coalesce(F.col("category_first_id"), F.col("category_first_id_with_name"))).drop("category_first_id_with_name") self.df_asin_new_cate.unpersist() self.df_asin_category.unpersist() # 处理asin上架时间信息 def handle_asin_launch_time(self): # 根据asin,且launch_time为空的,去找keep_date补全launch_time self.df_asin_detail = self.df_asin_detail.repartition(100) self.df_asin_detail = self.df_asin_detail.join(self.df_asin_keep_date, on='asin', how='left') self.df_asin_detail = self.df_asin_detail.withColumn( "crawl_asin_launch_time", F.when( (F.col("crawl_asin_launch_time") <= self.launch_time_upper_limit) & (F.col("crawl_asin_launch_time") >= self.launch_time_lower_limit), F.col("crawl_asin_launch_time") ).otherwise(F.lit(None)) ) self.df_asin_detail = self.df_asin_detail.withColumn( "asin_launch_time", F.when((F.isnull("crawl_asin_launch_time")) | (F.col("crawl_asin_launch_time") == 'null'), F.col("populated_asin_launch_time")).otherwise(F.col("crawl_asin_launch_time"))) self.df_asin_detail = self.df_asin_detail.withColumn( "asin_launch_time", F.when( (F.col("asin_launch_time") <= self.launch_time_upper_limit) & (F.col("asin_launch_time") >= self.launch_time_lower_limit), F.col("asin_launch_time") ).otherwise(F.lit(None)) ) self.df_asin_keep_date.unpersist() # 处理asin各类型信息 def handle_asin_flag(self): # 生成is_asin_new字段(是否asin新品标记) self.df_asin_detail = self.df_asin_detail.withColumn( "asin_is_new", self.udf_new_asin_flag(F.col('asin_launch_time'), F.lit(self.cal_date)))\ .withColumn("asin_is_aadd", F.expr(f"""CASE WHEN INSTR(asin_img_type, '3') > 0 THEN 1 ELSE 0 END"""))\ .withColumn("asin_is_video", F.expr(f"""CASE WHEN INSTR(asin_img_type, '2') > 0 THEN 1 ELSE 0 END"""))\ .withColumn("asin_is_picture", F.expr(f"""CASE WHEN INSTR(asin_img_type, '1') > 0 THEN 1 ELSE 0 END"""))\ .withColumn("asin_is_amazon", F.expr(f"""CASE WHEN asin_buy_box_seller_type == 1 THEN 1 ELSE 0 END"""))\ .withColumn("asin_is_fba", F.expr(f"""CASE WHEN asin_buy_box_seller_type == 2 THEN 1 ELSE 0 END"""))\ .withColumn("asin_is_fbm", F.expr(f"""CASE WHEN asin_buy_box_seller_type == 3 THEN 1 ELSE 0 END"""))\ .withColumn("asin_is_other", F.expr(f"""CASE WHEN asin_buy_box_seller_type == 4 THEN 1 ELSE 0 END"""))\ .withColumn("asin_is_brand", F.when((F.col("asin_brand_name").cast("string") != 'null') & (F.col("asin_brand_name").cast("string") != 'none'), 1).otherwise(F.lit(0)))\ .withColumn("asin_quantity_variation_type", F.when((F.lower(F.col("asin_size").cast("string")) != 'null') & (F.lower(F.col("asin_size").cast("string")) != 'none') & (F.lower(F.col("asin_size").cast("string")).contains('quantity')), 1).otherwise(F.lit(0))) if self.site_name == 'us': pg_sql = f""" select asin_brand_name, 1 as asin_is_alarm from (select lower(trim(brand_name)) as asin_brand_name from brand_alert_erp where brand_name is not null) t group by asin_brand_name""" db_type = "postgresql_cluster" con_info = DBUtil.get_connection_info(db_type=db_type, site_name=self.site_name) if con_info is not None: df_alarm_brand = SparkUtil.read_jdbc_query( session=self.spark, url=con_info['url'], pwd=con_info['pwd'], username=con_info['username'], query=pg_sql) df_alarm_brand = df_alarm_brand.repartition(100) self.df_asin_detail = self.df_asin_detail.join(df_alarm_brand, on=['asin_brand_name'], how='left') else: self.df_asin_detail = self.df_asin_detail.withColumn("asin_is_alarm", F.lit(0)) self.df_asin_detail = self.df_asin_detail.na.fill({"asin_is_alarm": 0}) # 处理是否内部asin信息 self.df_asin_detail = self.df_asin_detail.join(self.df_self_asin, on=['asin'], how='left') self.df_asin_detail = self.df_asin_detail.na.fill({"asin_is_self": 0}) self.df_self_asin.unpersist() # 处理影视标签字段 def handle_asin_label(self): self.df_asin_detail = self.df_asin_detail.join(self.df_asin_label, on=['asin'], how='left') self.df_asin_label.unpersist() # 处理asin小图信息 def handle_asin_img_info(self): if self.date_type in ['month', 'month_week'] and self.date_info >= '2024-06': img_schema = ArrayType(ArrayType(StringType())) df_asin_with_img = self.df_asin_detail.withColumn("img_list", F.from_json(F.col("img_list"), img_schema)).filter(F.size("img_list") > 0).\ select("asin", F.explode("img_list").alias("img_attributes")).\ select("asin", F.col("img_attributes")[1].alias("img_url"), F.col("img_attributes")[2].alias("img_order_by"), F.col("img_attributes")[3].alias("data_type")) df_asin_with_img_agg = df_asin_with_img.groupby("asin").agg( F.to_json(F.collect_list(F.struct(F.col("img_url"), F.col("img_order_by"), F.col("data_type")))).alias("img_list") ) self.df_asin_detail = self.df_asin_detail.drop("img_list") self.df_asin_detail = self.df_asin_detail.join(df_asin_with_img_agg, on=['asin'], how='left') else: pass # 处理parent_asin下最新变体信息 def handle_latest_variation_info(self): if self.date_type in ['month', 'month_week'] and self.date_info >= '2024-06': max_report_sql = f""" SELECT MAX(date_info) as table_date_info FROM {self.doris_db}.{self.parent_asin_latest_detail_table} """ df_date_info = DorisHelper.spark_import_with_sql(self.spark, query=max_report_sql) table_date_info = df_date_info.take(1)[0]['table_date_info'] print("doris中记录最新的日期为:", table_date_info) if self.date_info >= table_date_info: df_asin_variat = self.df_asin_detail.filter("parent_asin is not null").select("parent_asin", "variat_list_change", "created_time") latest_asin_window = Window.partitionBy('parent_asin').orderBy( F.desc_nulls_last("created_time") ) df_asin_variat = df_asin_variat.withColumn("p_rank", F.row_number().over(window=latest_asin_window)) df_asin_variat = df_asin_variat.filter("p_rank = 1").drop("p_rank") df_asin_variat =df_asin_variat.filter(F.size("variat_list_change") > 0). \ select("parent_asin", "created_time", F.explode("variat_list_change").alias("variant_attribute")). \ select("parent_asin", "created_time", F.col("variant_attribute")[0].alias("asin"), F.col("variant_attribute")[1].alias("color"), F.col("variant_attribute")[3].alias("size"), F.col("variant_attribute")[5].alias("style")) df_asin_variat_agg = df_asin_variat.groupby(['parent_asin']).agg( F.first("created_time").alias("asin_crawl_date"), F.concat_ws(',', F.collect_list("asin")).alias("variation_info"), F.to_json(F.collect_list(F.struct(F.col("color"), F.col("size"), F.col("style")))).alias("attr_info") ) print("导出父ASIN最新变体信息到doris:") df_doris = df_asin_variat_agg.select( "parent_asin", F.lit(self.date_info).alias("date_info"), "asin_crawl_date", "variation_info", "attr_info") table_columns="parent_asin, date_info, asin_crawl_date, variation_info, attr_info" DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns) else: print("不用导出旧数据到doris中") pass else: pass # 字段标准化及存储 def df_save(self): df_save = self.df_asin_detail \ .select("asin", self.handle_string_num_value('asin_title').alias('asin_title'), "asin_title_len", F.when(F.col('asin_price') < 0, F.lit(None)).otherwise(F.col('asin_price')).alias('asin_price'), "asin_rating", "asin_total_comments", "asin_buy_box_seller_type", "asin_page_inventory", self.handle_string_num_value('asin_category_desc').alias('asin_category_desc'), self.handle_string_num_value('asin_volume').alias('asin_volume'), "asin_weight", "asin_color", "asin_size", "asin_style", "asin_is_sale", F.lit(None).alias("asin_rank"), self.handle_string_num_value('asin_launch_time').alias('asin_launch_time'), "asin_is_new", "asin_img_num", self.handle_string_num_value('asin_img_type').alias('asin_img_type'), "asin_category_state", self.handle_string_num_value('asin_material').alias('asin_material'), self.handle_string_num_value('asin_brand_name').alias('asin_brand_name'), F.lit(None).alias("bsr_cate_1_id"), F.lit(None).alias("bsr_cate_current_id"), self.handle_string_num_value('asin_activity_type').alias('asin_activity_type'), "act_one_two_val", "act_three_four_val", "act_five_six_val", "act_eight_val", F.lit(None).alias("qa_num"), "one_star", "two_star", "three_star", "four_star", "five_star", "low_star", self.handle_string_num_value('together_asin').alias('together_asin'), self.handle_string_num_value('ac_name').alias('ac_name'), self.handle_string_num_value('node_id').alias('node_id'), "asin_data_type", F.lit(None).alias('sp_num'), self.handle_string_num_value('asin_describe').alias('asin_describe'), "asin_is_amazon", "asin_is_fba", "asin_is_fbm", "asin_is_other", "asin_is_picture", "asin_is_video", "asin_is_aadd", self.handle_string_num_value('asin_img_url').alias('asin_img_url'), "account_id", "account_name", "variation_num", "asin_is_brand", "asin_is_alarm", "created_time", "updated_time", "parent_asin", "asin_is_movie", self.handle_string_num_value('asin_label_list').alias('asin_label_list'), self.handle_string_num_value('asin_weight_type').alias('asin_weight_type'), self.handle_string_num_value('asin_weight_str').alias('asin_weight_str'), F.lit(None).alias('asin_package_quantity'), F.lit(None).alias('asin_pattern_name'), "category_id", "category_first_id", F.lit(None).alias("buy_data_bought_month"), F.lit(None).alias("buy_data_bought_week"), F.lit(None).alias("buy_data_viewed_month"), F.lit(None).alias("buy_data_viewed_week"), "crawl_asin_launch_time", "populated_asin_launch_time", "asin_follow_sellers", "asin_image_view", "product_description", "asin_spider_num", "asin_lob_info", "is_contains_lob_info", "package_quantity", "is_package_quantity_abnormal", "asin_quantity_variation_type", "seller_json", "asin_bought_month", "asin_length", "asin_width", "asin_height", "asin_is_self", "customer_reviews_json", "img_list", "variat_list", F.lit(self.site_name).alias('site_name'), F.lit(self.date_type).alias('date_type'), F.lit(self.date_info).alias('date_info')).persist(StorageLevel.MEMORY_ONLY) print("dim_asin_detail处理完毕, 最后的数据量为: ", df_save.count()) df_save = df_save.filter(F.length(F.col("asin")) <= 10) df_save = df_save.repartition(100) df_save.show(10, truncate=False) print(f"清除hdfs目录中:{self.hdfs_path}") HdfsUtils.delete_file_in_folder(self.hdfs_path) partition_by = ["site_name", "date_type", "date_info"] print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}") df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=self.partitions_by) print("success") def run(self): # 读取数据 self.read_data() # asin详情数据去重 self.handle_df_duplicated() # 处理asin的亚马逊月销信息 self.handle_asin_bought_month() # 处理asin的lob_info信息 self.handle_asin_lob_info() # 处理asin的变体信息 self.handle_asin_variation_attribute() # 处理asin的配送方式信息 self.handle_asin_buy_box_seller_type() # 处理asin体积重量信息 self.handle_asin_basic_attribute() # 打包数量解析 self.get_package_quantity() # 获取node_id对应的分类信息(头部分类) self.handle_asin_top_category() # 处理asin上架时间信息 self.handle_asin_launch_time() # 处理asin各类型信息 self.handle_asin_flag() # 处理影视标签字段 self.handle_asin_label() # 处理asin小图信息 self.handle_asin_img_info() # 处理parent_asin下最新变体信息 self.handle_latest_variation_info() # 字段标准化及存储 self.df_save() if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 handle_obj = DimAsinDetail(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()