Commit 1cdb9181 by chenyuanjie

流量选品-实时消费toDoris

parent 62fb9745
import os
import re
import sys
import time
import traceback
sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/")
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.types import *
from utils.db_util import DBUtil
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from functools import reduce
from pyspark.sql import Window
from pyspark.storagelevel import StorageLevel
from utils.DorisHelper import DorisHelper
from yswg_utils.common_df import get_node_first_id_df, get_first_id_from_category_desc_df
from yswg_utils.common_udf import udf_parse_bs_category, parse_weight_str, udf_extract_volume_dimensions, udf_get_package_quantity_with_flag as udf_get_package_quantity, udf_parse_seller_json
class KafkaFlowAsinDetail(Templates):
def __init__(self, site_name='us', date_type="month", date_info='2026-03', consumer_type='history', test_flag='test', batch_size=100000):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.consumer_type = consumer_type
self.test_flag = test_flag
# day 模式 date_info 形如 2026-05-12,month 模式形如 2026-03;统一替换 - 为 _ 拼 topic
self.year_month = str(self.date_info).replace("-", "_")
# date_info_last_year 取月份级别同比:day 模式截取前 7 位(2026-05-12 → 2026-05),month 模式直接用
self.date_info_last_year = CommonUtil.get_month_offset(self.date_info[:7], -12)
# spark相关参数
self.app_name = self.get_app_name()
self.spark = SparkUtil.get_stream_spark(app_name=self.app_name)
self.processing_time = 900 if self.site_name == 'us' else 600
self.repartition_num = 80
# kafka相关参数(topic 按 date_type 动态:day → {site}_asin_detail_day_{yyyy_MM_dd},month → {site}_asin_detail_month_{yyyy_MM})
self.topic_name = f"{self.site_name}_asin_detail_{self.date_type}_{self.year_month}"
self.batch_size = batch_size
self.batch_size_history = 20000
self.check_path = f"/home/big_data_selection/tmp/kafka_checkpoint/{self.topic_name}_{self.consumer_type}_test" if self.test_flag == 'test' else f"/home/big_data_selection/tmp/kafka_checkpoint/{self.topic_name}_{self.consumer_type}"
self.schema = self.init_schema()
# doris相关参数
self.doris_db = "test" if self.test_flag == "test" else "selection"
self.doris_30day_table = f"{self.site_name}_flow_asin_30day_test" if self.test_flag == "test" else f"{self.site_name}_flow_asin_30day"
self.parent_asin_latest_detail_table = f"{self.site_name}_parent_asin_latest_detail_test" if self.test_flag == "test" else f"{self.site_name}_parent_asin_latest_detail"
self.asin_latest_detail_table = f"{self.site_name}_asin_latest_detail_test" if self.test_flag == "test" else f"{self.site_name}_asin_latest_detail"
# 分类解析模板
self.pattern1_dict = {
"us": "See Top 100 in ".lower(),
"uk": "See Top 100 in ".lower(),
"de": "Siehe Top 100 in ".lower(),
"es": "Ver el Top 100 en ".lower(),
"fr": "Voir les 100 premiers en ".lower(),
"it": "Visualizza i Top 100 nella categoria ".lower(),
}
self.pattern_current_dict = {
"us": "#(\d+) ",
"uk": "(\d+) in ",
"de": "(\d+) in ",
"es": "(\d+) en ",
"fr": "(\d+) en ",
"it": "(\d+) in ",
}
# DataFrame初始化
self.df_previous_flow_asin = self.spark.sql("select 1+1;")
self.df_previous_flow_asin_lastyear = self.spark.sql("select 1+1;")
self.df_seller_info = self.spark.sql("select 1+1;")
self.df_seller_country = self.spark.sql("select 1+1;")
self.df_asin_seller = self.spark.sql("select 1+1;")
self.df_asin_label_info = self.spark.sql("select 1+1;")
self.df_asin_measure = self.spark.sql("select 1+1;")
self.df_bs_report = self.spark.sql("select 1+1;")
self.df_asin_new_cate = self.spark.sql("select 1+ 1;")
self.df_asin_category = self.spark.sql("select 1+1;")
self.color_set = set()
# udf函数注册
package_schema = StructType([
StructField("parse_package_quantity", IntegerType(), True),
StructField("is_package_quantity_abnormal", IntegerType(), True),
])
self.u_parse_package_quantity = self.spark.udf.register('u_parse_package_quantity', udf_get_package_quantity, package_schema)
bs_category_schema = StructType([
StructField('asin_bs_cate_1_id', StringType(), True),
StructField('asin_bs_cate_current_id', StringType(), True),
StructField('asin_bs_cate_1_rank', IntegerType(), True),
StructField('asin_bs_cate_current_rank', IntegerType(), True),
])
self.u_parse_bs_category = self.spark.udf.register('u_parse_bs_category', udf_parse_bs_category, bs_category_schema)
weight_schema = StructType([
StructField('weight', FloatType(), True),
StructField('weight_type', StringType(), True)
])
self.u_parse_weight = self.spark.udf.register('u_parse_weight', parse_weight_str, weight_schema)
volume_schema = StructType([
StructField("length", FloatType(), True),
StructField("width", FloatType(), True),
StructField("height", FloatType(), True),
StructField("asin_volume_type", StringType(), True)
])
self.u_parse_volume = self.spark.udf.register('u_parse_volume', udf_extract_volume_dimensions, volume_schema)
seller_schema = StructType([
StructField("buy_box_seller_type", IntegerType(), True),
StructField("account_name", StringType(), True),
StructField("account_id", StringType(), True)
])
self.u_parse_seller_info = self.spark.udf.register('u_parse_seller_info', udf_parse_seller_json, seller_schema)
@staticmethod
def init_schema():
schema = StructType([
StructField("asin", StringType(), True),
StructField("current_asin", StringType(), True),
StructField("title", StringType(), True),
StructField("img_url", StringType(), True),
StructField("rating", DoubleType(), True),
StructField("total_comments", IntegerType(), True),
StructField("price", FloatType(), True),
StructField("category", StringType(), True),
StructField("launch_time", StringType(), True),
StructField("volume", StringType(), True),
StructField("page_inventory", IntegerType(), True),
StructField("asin_vartion_list", ArrayType(ArrayType(StringType()), True), True),
StructField("title_len", IntegerType(), True),
StructField("img_num", IntegerType(), True),
StructField("img_type", StringType(), True),
StructField("activity_type", StringType(), True),
StructField("one_two_val", StringType(), True),
StructField("three_four_val", StringType(), True),
StructField("five_six_val", StringType(), True),
StructField("eight_val", StringType(), True),
StructField("node_id", StringType(), True),
StructField("five_star", IntegerType(), True),
StructField("four_star", IntegerType(), True),
StructField("three_star", IntegerType(), True),
StructField("two_star", IntegerType(), True),
StructField("one_star", IntegerType(), True),
StructField("low_star", IntegerType(), True),
StructField("together_asin", StringType(), True),
StructField("brand", StringType(), True),
StructField("ac_name", StringType(), True),
StructField("material", StringType(), True),
StructField("data_type", IntegerType(), True),
StructField("weight_str", StringType(), True),
StructField("seller_id", StringType(), True),
StructField("variat_num", IntegerType(), True),
StructField("best_sellers_rank", StringType(), True),
StructField("best_sellers_herf", StringType(), True),
StructField("account_name", StringType(), True),
StructField("parentAsin", StringType(), True),
StructField("asinUpdateTime", StringType(), True),
StructField("all_best_sellers_herf", StringType(), True),
StructField("image_view", IntegerType(), True),
StructField("product_description", StringType(), True),
StructField("describe", StringType(), True),
StructField("buy_sales", StringType(), True),
StructField("lob_asin_json", StringType(), True),
StructField("seller_json", StringType(), True),
StructField("customer_reviews_json", StringType(), True),
StructField("img_list", StringType(), True),
StructField("follow_sellers", IntegerType(), True),
StructField("fbm_delivery_price", FloatType(), True),
StructField("product_json", StringType(), True),
StructField("amazon_label", StringType(), True)
])
return schema
# 0. 处理跳转asin:current_asin 非空时用其替换 asin(在去重前执行,保证跳转后的重复 asin 能被正确合并)
def handle_asin_jump(self, df):
df = df.withColumn(
"asin",
F.when(
F.col("current_asin").isNotNull() & (F.col("current_asin") != ""),
F.col("current_asin")
).otherwise(F.col("asin"))
).drop("current_asin")
return df
# 覆写模板去重方法:去重前先做 current_asin 替换,确保跳转后同 asin 多条记录在去重时被合并
def deduplication_kafka_data(self, kafka_df, deduplicaiton_key_field, deduplication_time_field):
if deduplicaiton_key_field == "asin" and "current_asin" in kafka_df.columns:
kafka_df = self.handle_asin_jump(kafka_df)
return super().deduplication_kafka_data(kafka_df, deduplicaiton_key_field, deduplication_time_field)
# 1. 处理asin分类及排名以及排名类型字段
def handle_asin_bs_category_info(self, df):
df = df.withColumnRenamed("parentAsin", "parent_asin")
cate_current_pattern = self.pattern_current_dict[self.site_name]
cate_1_pattern = self.pattern1_dict[self.site_name]
df = df.withColumn("asin_bs_sellers_rank_lower", F.lower("best_sellers_rank"))
df = df.withColumn("asin_bs", self.u_parse_bs_category(
"asin_bs_sellers_rank_lower", "best_sellers_herf", "all_best_sellers_herf", F.lit(cate_current_pattern), F.lit(cate_1_pattern), "node_id"))
df = df.withColumn("asin_bs_cate_1_id", df.asin_bs.getField("asin_bs_cate_1_id")) \
.withColumn("asin_bs_cate_current_id", df.asin_bs.getField("asin_bs_cate_current_id")) \
.withColumn("asin_bs_cate_1_rank", df.asin_bs.getField("asin_bs_cate_1_rank")) \
.withColumn("asin_bs_cate_current_rank", df.asin_bs.getField("asin_bs_cate_current_rank")) \
.drop("asin_bs", "asin_bs_sellers_rank_lower", "best_sellers_herf", "all_best_sellers_herf", "best_sellers_rank")
df = df.withColumn("rank_type", F.expr("""
CASE WHEN asin_bs_cate_1_rank IS NOT NULL AND asin_bs_cate_1_rank BETWEEN 0 AND 1000 THEN 1
WHEN asin_bs_cate_1_rank BETWEEN 1000 AND 5000 THEN 2 WHEN asin_bs_cate_1_rank BETWEEN 5000 AND 10000 THEN 3
WHEN asin_bs_cate_1_rank BETWEEN 10000 AND 20000 THEN 4 WHEN asin_bs_cate_1_rank BETWEEN 20000 AND 30000 THEN 5
WHEN asin_bs_cate_1_rank BETWEEN 30000 AND 50000 THEN 6 WHEN asin_bs_cate_1_rank BETWEEN 50000 AND 70000 THEN 7
WHEN asin_bs_cate_1_rank >= 70000 THEN 8 ELSE 0 END"""))
return df
# 2. 利用node_id进行分类补充(此时无排名信息)及解析全路径分类
def handle_asin_category_supplement(self, df):
df = df.join(self.df_asin_new_cate, on=['node_id'], how='left')
df = df.withColumn("asin_bs_cate_current_id", F.coalesce(F.col("asin_bs_cate_current_id"), F.col("node_id"))). \
withColumn("asin_bs_cate_1_id", F.coalesce(F.col("asin_bs_cate_1_id"), F.col("category_first_id"))). \
drop("category_first_id", "node_id")
df = df.withColumn("desc_category_first_name", F.lower(F.trim(F.split(F.col("category"), "›").getItem(0))))
df = df.join(self.df_asin_category, on=['desc_category_first_name'], how='left')
df = df.withColumn("asin_bs_cate_1_id", F.coalesce(F.col("asin_bs_cate_1_id"), F.col("desc_category_first_id"))). \
drop("desc_category_first_name")
return df
# 3. 处理bsr销量、价格类型字段以及BSR销售额信息
def handle_asin_bsr_orders(self, df):
df = df.join(self.df_bs_report, on=['asin_bs_cate_1_id', 'asin_bs_cate_1_rank'], how='left')
df = df.withColumn("price_type", F.expr("""
CASE WHEN price IS NOT NULL AND price > 0 AND price < 10 THEN 1 WHEN price >= 10 AND price < 15 THEN 2
WHEN price >= 15 AND price < 20 THEN 3 WHEN price >= 20 AND price < 30 THEN 4
WHEN price >= 30 AND price < 50 THEN 5 WHEN price >= 50 THEN 6 ELSE 0 END""")).\
withColumn("bsr_orders_sale", F.round(F.col("bsr_orders") * F.col("price"), 2))
return df
# 4.解析Make-It-A-Bundle信息
def handle_asin_lob_info(self, df):
df = df.withColumn("is_contains_lob_info",
F.when(F.col("lob_asin_json").isNotNull(), F.lit(1)).otherwise(F.lit(0)))
df_parsed = df.withColumn("parse_asin_lob",
F.when(F.col("is_contains_lob_info") == 1, F.from_json("lob_asin_json", "array<struct<lob_asin:string>>")))
df_result = df_parsed.withColumn("asin_lob_info", F.expr("transform(parse_asin_lob, x -> x.lob_asin)"))
df = df_result.withColumn(
"asin_lob_info", F.regexp_replace(F.concat_ws(",", "asin_lob_info"), "[{}]", "")).drop(
"parse_asin_lob", "lob_asin_json")
return df
# 5. 处理配送方式、卖家所在地以及卖家所在地类型
def handle_asin_buy_box_seller_type(self, df):
df = df.withColumn("seller_json_parsed", self.u_parse_seller_info(df.seller_json))
df = df.withColumn("buy_box_seller_type", df.seller_json_parsed.buy_box_seller_type).withColumn(
"account_name", df.seller_json_parsed.account_name).drop("seller_json_parsed")
# 1. 关联全局缓存的df_asin_seller,用于填充seller_id为空的情况
df = df.join(self.df_asin_seller, on=['asin'], how='left')
# 2. 优先使用kafka消息中的seller_id,为空则使用df_asin_seller的数据
df = df.withColumn(
"seller_id", F.coalesce(F.col("seller_id"), F.col("fd_seller_id"))
).withColumn(
"account_name", F.coalesce(F.col("account_name"), F.col("fd_account_name"))
).drop("fd_seller_id", "fd_account_name")
# 3. 关联全局缓存的df_seller_country获取seller_country_name
df = df.join(self.df_seller_country, on=['seller_id'], how='left')
df = df.withColumn("site_name_type", F.expr("""
CASE WHEN buy_box_seller_type = 1 THEN 4
WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%US%' THEN 1
WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%CN%' THEN 2
WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%HK%' THEN 5
WHEN buy_box_seller_type != 1 AND seller_country_name is not null AND seller_country_name like '%TW%' THEN 6
ELSE 3 END"""))
return df
# 6. 处理asin基础属性信息(长宽高重量等)
def handle_asin_basic_attribute_info(self, df):
# 1.解析ASIN重量相关信息
df = df.withColumn("weight_str", F.lower(F.col("weight_str"))).withColumn("asin_weight", self.u_parse_weight("weight_str", F.lit(self.site_name))).drop("weight_str")
df = df.withColumn(
"weight", F.when(df.asin_weight.getField("weight_type") == 'pounds', df.asin_weight.getField("weight")).otherwise(F.lit(0))).drop("asin_weight")
# 2.处理重量类型
df = df.withColumn("weight_type", F.expr("""
CASE WHEN weight BETWEEN 0 AND 0.2 THEN 1 WHEN weight BETWEEN 0.2 AND 0.4 THEN 2
WHEN weight BETWEEN 0.4 AND 0.6 THEN 3 WHEN weight BETWEEN 0.6 AND 1 THEN 4
WHEN weight BETWEEN 1 AND 2 THEN 5 WHEN weight >= 2 THEN 6 ELSE 0 END"""))
# 3.解析ASIN体积相关信息
df = df.withColumn("asin_volume", self.u_parse_volume("volume"))
df = df.withColumn("asin_volume_type", df.asin_volume.getField("asin_volume_type")) \
.withColumn("asin_length", F.when(F.col("asin_volume_type") == 'inches', df.asin_volume.getField("length"))) \
.withColumn("asin_width", F.when(F.col("asin_volume_type") == 'inches', df.asin_volume.getField("width"))) \
.withColumn("asin_height", F.when(F.col("asin_volume_type") == 'inches', df.asin_volume.getField("height"))) \
.drop("asin_volume", "asin_volume_type")
# 4.获取体积重/毛重相关信息
df = df.withColumn(
"asin_weight_ratio", F.when(
F.col("asin_length").isNotNull() & (F.col("asin_width").isNotNull()) &
(F.col("asin_height").isNotNull()) &
(F.col("weight") > 0), F.round(F.col("asin_length") * F.col("asin_width") * F.col("asin_height") * 3.2774128 / (F.col("weight") * 453.59), 4))
.otherwise(F.lit(-1)))
# 5.处理尺寸类型
if self.site_name == 'us':
expr_str = f"""
CASE WHEN weight > 0 AND weight * 16 <= 16 AND asin_length > 0 AND asin_length <= 15 AND asin_width > 0 AND asin_width <= 12 AND asin_height > 0 AND asin_height <= 0.75 THEN 1
WHEN weight > 0 AND weight <= 20 AND asin_length > 0 AND asin_length <= 18 AND asin_width > 0 AND asin_width <= 14 AND asin_height > 0 AND asin_height <= 8 THEN 2
WHEN weight > 0 AND weight <= 70 AND asin_length > 0 AND asin_length <= 60 AND asin_width > 0 AND asin_width <= 30 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 130 THEN 3
WHEN weight > 0 AND weight <= 150 AND asin_length > 0 AND asin_length <= 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 130 THEN 4
WHEN weight > 0 AND weight <= 150 AND asin_length > 0 AND asin_length <= 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 <= 165 THEN 5
WHEN weight > 150 AND asin_length > 108 AND asin_length + asin_length + (asin_width + asin_height) * 2 > 165 THEN 6 ELSE 0 END"""
else:
expr_str = f"""
CASE WHEN weight > 0 AND weight <= 100 AND asin_length > 0 AND asin_length <= 20 AND asin_width > 0 AND asin_width <= 15 AND asin_height > 0 AND asin_height <= 1 THEN 1
WHEN weight > 0 AND weight <= 500 AND asin_length > 0 AND asin_length <= 33 AND asin_width > 0 AND asin_width <= 23 AND asin_height > 0 AND asin_height <= 2.5 THEN 2
WHEN weight > 0 AND weight <= 1000 AND asin_length > 0 AND asin_length <= 33 AND asin_width > 0 AND asin_width <= 23 AND asin_height > 0 AND asin_height <= 5 THEN 3
WHEN weight > 0 AND weight <= 12000 AND asin_length > 0 AND asin_length <= 45 AND asin_width > 0 AND asin_width <= 34 AND asin_height > 0 AND asin_height <= 26 THEN 4
WHEN weight > 0 AND weight <= 2000 AND asin_length > 0 AND asin_length <= 61 AND asin_width > 0 AND asin_width <= 46 AND asin_height > 0 AND asin_height <= 46 THEN 5
WHEN asin_length > 0 AND asin_length <= 150 AND asin_length + asin_length + (asin_width + asin_height) <= 300 THEN 6
WHEN asin_length > 150 AND asin_length + asin_length + (asin_width + asin_height) > 300 THEN 7 ELSE 0 END"""
df = df.withColumn("size_type", F.expr(expr_str)).drop("asin_length", "asin_width", "asin_height")
# 6.处理五点描述长度
df = df.withColumn("describe_len", F.length(F.regexp_replace(F.col("describe"), "\\|-\\|", "")))
return df
# 7. 处理asin图片信息
def handle_asin_img_info(self, df):
img_schema = ArrayType(ArrayType(StringType()))
df = df.withColumn("img_list", F.from_json(F.col("img_list"), img_schema))
df_with_img = df.filter(F.size("img_list") > 0).select("asin", "img_list")
df_with_img_attribute = df_with_img.select(
"asin", F.explode("img_list").alias("img_attributes")
).select(
"asin", F.col("img_attributes")[1].alias("img_url"), F.col("img_attributes")[2].alias("img_order_by"),
F.col("img_attributes")[3].alias("data_type")
)
df_with_img_attribute_agg = df_with_img_attribute.groupby("asin").agg(
F.to_json(F.collect_list(F.struct(F.col("img_url"), F.col("img_order_by"), F.col("data_type")))).alias(
"img_info")
)
df = df.drop("img_list")
df = df.join(df_with_img_attribute_agg, on=['asin'], how='left')
df = df.withColumn("img_type", F.split(F.col("img_type"), ",")) \
.withColumn("img_type", F.expr("transform(img_type, x -> cast(x as int))"))
return df
# 8. 处理变体相关(ao及母体相关,自然占比及母体自然占比,各类型数量,月销信息等)
def handle_asin_measure(self, df):
df = CommonUtil.get_asin_variant_attribute(df_asin_detail=df, df_asin_measure=self.df_asin_measure,
partition_num=self.repartition_num, use_type=1)
# 是否数量变体类型和ao的类型
df = df.withColumn("quantity_variation_type", F.expr("""
CASE WHEN size is not null and size != '' and lower(size) like '%quantity%' THEN 1 ELSE 0 END""")).withColumn(
"ao_val_type", F.expr("""
CASE WHEN asin_ao_val BETWEEN 0 AND 0.1 THEN 1 WHEN asin_ao_val BETWEEN 0.1 AND 0.2 THEN 2
WHEN asin_ao_val BETWEEN 0.2 AND 0.4 THEN 3 WHEN asin_ao_val BETWEEN 0.4 AND 0.8 THEN 4
WHEN asin_ao_val BETWEEN 0.8 AND 1.2 THEN 5 WHEN asin_ao_val BETWEEN 1.2 AND 2 THEN 6
WHEN asin_ao_val >= 2 THEN 7 ELSE 0 END"""))
df = df.withColumnRenamed("asin_zr_counts", "zr_counts").withColumnRenamed("asin_ao_val", "ao_val") \
.withColumnRenamed("asin_zr_flow_proportion", "zr_flow_proportion") \
.withColumnRenamed("asin_amazon_orders", "asin_bought_month").drop("asin_st_counts", "asin_adv_counts")
# 获取parent_asin下最新ASIN信息,导出到 doris 父ASIN最新详情表(仅 latest+normal 模式)
if self.consumer_type == 'latest' and self.test_flag == 'normal':
df_parent_asin_info = df.filter("parent_asin is not null").select("parent_asin", "asin_vartion_list", "asinUpdateTime")
parent_asin_window = Window.partitionBy(['parent_asin']).orderBy(F.desc_nulls_last("asinUpdateTime"))
df_parent_asin_info = df_parent_asin_info.withColumn("u_rank", F.row_number().over(window=parent_asin_window))
df_parent_asin_info = df_parent_asin_info.repartition(self.repartition_num)
df_parent_asin_info = df_parent_asin_info.filter("u_rank = 1").drop("u_rank")
df_asin_variat = df_parent_asin_info.filter(F.size("asin_vartion_list") > 0).\
select("parent_asin", "asinUpdateTime", F.explode("asin_vartion_list").alias("variant_attribute")).\
select("parent_asin", F.col("asinUpdateTime").alias("asin_crawl_date"),
F.col("variant_attribute")[0].alias("asin"), F.col("variant_attribute")[1].alias("color"),
F.col("variant_attribute")[3].alias("size"), F.col("variant_attribute")[5].alias("style"))
df_asin_variat_agg = df_asin_variat.groupby(['parent_asin']).agg(
F.first("asin_crawl_date").alias("asin_crawl_date"),
F.concat_ws(',', F.collect_list("asin")).alias("variation_info"),
F.to_json(F.collect_list(F.struct(F.col("color"), F.col("size"), F.col("style")))).alias("attr_info")
)
print("导出父ASIN最新变体信息到doris:")
df_doris = df_asin_variat_agg.select(
"parent_asin", F.lit(self.date_info).alias("date_info"), "asin_crawl_date", "variation_info", "attr_info",
F.current_timestamp().alias("updated_at"))
table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info, updated_at"
DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns)
return df
# 9. 提取打包数量字段
def handle_asin_package_quantity(self, df):
df = df.withColumn("material",
F.when(F.lower(F.col("material")) == 'nan', F.lit(None)).otherwise(F.col("material"))
)
df = df.withColumn(
"variat_attribute", F.concat_ws("&&&%", F.col("color"), F.col("style"), F.col("size"), F.col("material")))
df = df.withColumn("title_parse", self.u_parse_package_quantity(df.title)).withColumn(
"variat_parse", self.u_parse_package_quantity(df.variat_attribute))
df = df.withColumn("title_package_quantity", df.title_parse.getField("parse_package_quantity")). \
withColumn("variat_package_quantity", df.variat_parse.getField("parse_package_quantity")). \
withColumn("title_package_quantity_is_abnormal", df.title_parse.getField("is_package_quantity_abnormal")). \
withColumn("variat_package_quantity_is_abnormal", df.variat_parse.getField("is_package_quantity_abnormal")). \
drop("title_parse", "variat_parse", "variat_attribute")
# 从 product_json 额外提取 Color 字段作为颜色来源备用
df = df.withColumn(
"_product_json_color",
F.lower(F.get_json_object(F.col("product_json"), "$.Color"))
)
# Number of Items:从 product_json 提取,cast 失败(脏数据)自动为 null,提取后立即 drop
df = df.withColumn(
"number_of_items",
F.get_json_object(F.col("product_json"), "$.Number of Items").cast("int")
).drop("product_json")
# 优先级:Number of Items > 属性字段 > 标题解析 > 默认1
df = df.withColumn(
"package_quantity", F.expr("""
CASE WHEN number_of_items IS NOT NULL AND number_of_items > 0 THEN number_of_items
WHEN variat_package_quantity IS NOT NULL THEN variat_package_quantity
WHEN title_package_quantity IS NOT NULL THEN title_package_quantity
ELSE 1 END""")
).withColumn(
"is_package_quantity_abnormal", F.expr("""
CASE WHEN number_of_items IS NOT NULL AND number_of_items > 0 THEN 0
WHEN variat_package_quantity IS NOT NULL THEN variat_package_quantity_is_abnormal
WHEN title_package_quantity IS NOT NULL THEN title_package_quantity_is_abnormal
ELSE 2 END""")
).drop("number_of_items", "title_package_quantity", "variat_package_quantity",
"title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal")
df = df.withColumn("title", F.lower(F.col("title")))
# color 优先使用变体属性颜色,fallback 到 product_json 中的 Color 字段
df = df.withColumn("color", F.coalesce(F.col("color"), F.col("_product_json_color"))).drop("_product_json_color")
return df
# 10. 处理品牌标签、是否告警品牌、处理asin_lqs_rating信息
def handle_asin_lqs_and_brand(self, df):
# 1.品牌标签以及是否告警品牌
df = df.withColumn("is_brand_label", F.expr("""CASE WHEN brand is not null THEN 1 ELSE 0 END"""))
df = df.withColumn("brand", F.lower("brand"))
# 2. lqs评分
df = df.withColumn("category_node_rating",
F.expr(f"""CASE WHEN asin_bs_cate_current_id is not null THEN 1 ELSE 0 END""")) \
.withColumn("zr_rating", F.expr(f"""CASE WHEN zr_counts > 0 THEN 0.5 ELSE 0 END""")) \
.withColumn("sp_rating", F.expr(f"""CASE WHEN sp_counts > 0 THEN 1 ELSE 0 END""")) \
.withColumn("a_add_rating", F.expr(f"""CASE WHEN array_contains(img_type, 3) THEN 1 ELSE 0 END""")) \
.withColumn("video_rating", F.expr(f"""CASE WHEN array_contains(img_type, 2) THEN 0.5 ELSE 0 END""")) \
.withColumn("brand_rating", F.expr(f"""CASE WHEN is_brand_label = 1 THEN 0.2 ELSE 0 END""")) \
.withColumn("product_describe_rating",
F.expr(f"""CASE WHEN product_description is not null THEN 0.2 ELSE 0 END""")) \
.withColumn("highlight_rating", F.expr(f"""
CASE WHEN describe is not null AND size(split(describe, '\\|-\\|')) <= 4 THEN size(split(describe, '\\|-\\|')) * 0.4
WHEN describe is not null AND size(split(describe, '\\|-\\|')) > 4 THEN 1.6 ELSE 0 END""")) \
.withColumn("title_len_rating", F.expr(f"""CASE WHEN title_len >= 50 AND title_len <=200 THEN 0.5 ELSE 0 END""")) \
.withColumn("title_brand_rating", F.expr(f"""
CASE WHEN brand is not null AND lower(regexp_replace(title, '[^a-zA-Z0-9\\s]', '')) LIKE lower(regexp_replace(brand, '[^a-zA-Z0-9\\s]', '')) || '%' THEN 0.5
ELSE 0 END""")) \
.withColumn("img_num_rating", F.expr(f"""
CASE WHEN img_num <= 4 THEN img_num * 0.5 WHEN img_num >4 THEN 2 ELSE 0 END""")) \
.withColumn("img_enlarge_rating", F.expr(f"""CASE WHEN image_view = 1 THEN 0.5 ELSE 0 END"""))
df = df.withColumn(
"asin_lqs_rating",
(F.col("category_node_rating") + F.col("zr_rating") + F.col("sp_rating") + F.col("a_add_rating") +
F.col("video_rating") + F.col("brand_rating") + F.col("product_describe_rating") +
F.col("highlight_rating") + F.col("title_len_rating") + F.col("title_brand_rating") +
F.col("img_num_rating") + F.col("img_enlarge_rating")).cast("double")).withColumn(
"asin_lqs_rating_detail", F.to_json(
F.struct(F.col("category_node_rating"), F.col("zr_rating"), F.col("sp_rating"), F.col("a_add_rating"),
F.col("video_rating"), F.col("brand_rating"), F.col("product_describe_rating"),
F.col("highlight_rating"), F.col("title_len_rating"), F.col("title_brand_rating"),
F.col("img_num_rating"), F.col("img_enlarge_rating")))
)
df = df.drop("product_description", "image_view", "category_node_rating", "zr_rating", "sp_rating",
"a_add_rating", "video_rating", "brand_rating", "product_describe_rating", "highlight_rating",
"title_len_rating", "title_brand_rating", "img_num_rating", "img_enlarge_rating")
return df
def handle_multi_color_flag(self, df):
"""判断 ASIN 是否为颜色组合产品
multi_color_flag:
0 = 非多色
1 = 从颜色变体属性字段(color)解析为多色
2 = 从标题或五点描述解析为多色(降级 fallback)
颜色变体属性三层判断:
1. 命中颜色组合关键词
2. 含分隔符 (/ + & ; , and) 或 "数字 color"
3. 含 2 个及以上颜色表中的单色词
"""
# ── 第1层:关键词正则(支持 color/colors/colour/colours)──
KEYWORD_PATTERN = (
r"(?i)("
r"\bmulticolou?rs?\b|\bmulti[\s\-]colou?rs?\b|\bmulti[\s\-]colored\b|\bmulticolored\b|"
r"\bassorted\b|\bmorandi\b|\bpastel\b|\bvibrant\b|\bvintage\b|\bboho\b|\bgradient\b|"
r"\bcandy\b|\bdusty\b|\bfluorescent\b|\bgentle\b|\bneutral\b|\bsoft\b|\bmuted\b|"
r"\brainbow\b|\bmaillard\b|\bcolorful\b|\bcolourful\b|\bmulti\b|"
r"\baesthetic colou?rs?\b|\bdreamy colou?rs?\b|\bearthy colou?rs?\b|\bshades of\b|"
r"\bvarious colou?rs?\b|\bsolid colou?rs?\b|\bmix colou?rs?\b|\bmixed colou?rs?\b|"
r"\bbasic colou?rs?\b|\blightcolor\b|\bdarkcolor\b|\battractive colou?rs?\b|"
r"\bmultiple colou?rs?\b|\bbright colorful\b|\bdifferent colou?rs?\b|\bclassic colou?rs?\b|"
r"\bfriendly colou?rs?\b|\bwarm colou?rs?\b|\bfun colou?rs?\b|\bmetallic colou?rs?\b|"
r"\bbright colou?rs?\b|\bdark colou?rs?\b|\blight colou?rs?\b|"
r"\bautumn colou?rs?\b|\bsummer colou?rs?\b|\bwinter colou?rs?\b|\bspring colou?rs?\b"
r")"
)
# ── 第2层:分隔符正则(/ + & ; 逗号 and 数字+color)──
SEPARATOR_PATTERN = r"(?i)[/+&;;,,]|\band\b|\d+\s*colou?rs?"
# ── 第3层:颜色表多色词检测 UDF ──
single_colors = sorted(
[c for c in self.color_set
if c.strip() and not re.search(r'[/+&;;,,]|\band\b', c)],
key=len, reverse=True
)
if single_colors:
color_regex = re.compile(
r'(?i)\b(' + '|'.join(re.escape(c) for c in single_colors) + r')\b'
)
else:
color_regex = None
def _get_matched_colors(color_str):
if not color_str or color_regex is None:
return None
matched = sorted({m.group(1).lower() for m in color_regex.finditer(color_str)})
return "/".join(matched) if len(matched) >= 2 else None
udf_matched_colors = F.udf(_get_matched_colors, StringType())
# ── 降级:标题 + 五点描述关键词正则 ──
FALLBACK_PATTERN = (
r"(?i)("
r"\bmulticolou?rs?\b|\bmulti[\s\-]colou?rs?\b|\bmulti[\s\-]colored\b|\bmulticolored\b|"
r"\bassorted colou?rs?\b|\bfluorescent colou?rs?\b|\bdifferent colou?rs?\b|"
r"\bbright colou?rs?\b|\bcolorful\b|\bcolourful\b|\battractive colou?rs?\b|"
r"\bvibrant colou?rs?\b|\d+\s*colou?rs?"
r")"
)
# 提前计算 UDF 结果,避免 multi_color_flag / multi_color_str 各调用一次(性能优化)
df = df.withColumn("_matched_colors_str", udf_matched_colors(F.col("color")))
df = df.withColumn(
"multi_color_flag",
F.when(
F.col("color").isNotNull() & F.col("color").rlike(KEYWORD_PATTERN), 1
).when(
F.col("color").isNotNull() & F.col("color").rlike(SEPARATOR_PATTERN), 1
).when(
F.col("color").isNotNull() & F.col("_matched_colors_str").isNotNull(), 1
).when(
F.lower(F.concat_ws(" ", F.col("title"), F.col("describe"))).rlike(FALLBACK_PATTERN), 2
).otherwise(0)
).withColumn(
"multi_color_str",
F.when(
F.col("color").isNotNull() & F.col("color").rlike(KEYWORD_PATTERN),
F.regexp_extract(F.col("color"), KEYWORD_PATTERN, 1)
).when(
F.col("color").isNotNull() & F.col("color").rlike(SEPARATOR_PATTERN),
F.col("color")
).when(
F.col("color").isNotNull() & F.col("_matched_colors_str").isNotNull(),
F.col("_matched_colors_str")
).when(
F.lower(F.concat_ws(" ", F.col("title"), F.col("describe"))).rlike(FALLBACK_PATTERN),
F.regexp_extract(
F.lower(F.concat_ws(" ", F.col("title"), F.col("describe"))),
FALLBACK_PATTERN, 1
)
).otherwise(F.lit(None))
).drop("_matched_colors_str")
return df
# 11. 通过ASIN页面信息处理(评分类型、电影标签)
def handle_asin_detail_all_type(self, df):
# 1. 评分类型
df = df.withColumn("rating_type", F.expr("""
CASE WHEN rating >= 4.5 THEN 1 WHEN rating >= 4 AND rating < 4.5 THEN 2 WHEN rating >= 3.5 AND rating < 4 THEN 3
WHEN rating >= 3 AND rating < 3.5 THEN 4 WHEN rating < 3 AND rating >= 0 THEN 5 ELSE 0 END"""))
# 2. 电影标签
movie_label_list = ['prime video', 'dvd', 'blu-ray', 'kindle', 'app', 'paperback', 'audible audiobook',
'kindle edition', 'kindle & comixology', 'hardcover', 'comic', 'multi-format', '4k',
'library binding', 'vinyl', 'audio cd', 'mp3 music', 'single issue magazine',
'print magazine', 'unknown binding']
df = df.join(self.df_asin_label_info, on=['asin'], how='left')
condition = reduce(
lambda acc, keyword: acc | F.expr(f"exists(asin_label_list, x -> x like '%{keyword}%')"),
movie_label_list,
F.lit(False)
)
df = df.withColumn("is_movie_label", condition.cast("int")).drop("asin_label_list")
return df
# 12. 处理变化率相关字段(环比_mom / 同比_yoy 统一处理)
def handle_asin_attribute_change(self, df):
df = df.join(self.df_previous_flow_asin, on=['asin'], how='left')
df = df.join(self.df_previous_flow_asin_lastyear, on=['asin'], how='left')
# (current_col, prev_col, lastyear_col, suffix, rise_round)
# rise_round: None=不计算_rise; 整数=round精度; 'int'=IntegerType
# 哨兵语义(全字段统一):
# both null → null (新ASIN,双边均无历史,无法比较)
# current null → -1000 (本期无数据,上期有数据,ASIN消失/下架)
# previous null/0 → 1000 (上期无数据或为0,本期新出现)
columns_to_change = [
("ao_val", "previous_asin_ao_val", "lastyear_asin_ao_val", "ao", 4),
("price", "previous_asin_price", "lastyear_asin_price", "price", 2),
("asin_bs_cate_1_rank","previous_first_category_rank","lastyear_first_category_rank", "rank", "int"),
("bsr_orders", "previous_asin_bsr_orders", "lastyear_asin_bsr_orders", "bsr_orders", "int"),
("rating", "previous_asin_rating", "lastyear_asin_rating", "rating", 1),
("total_comments", "previous_asin_total_comments", "lastyear_asin_total_comments", "comments", "int"),
("variat_num", "previous_asin_variation_num", "lastyear_asin_variation_num", "variation", "int"),
("bsr_orders_sale", "previous_sales", "lastyear_sales", "sales", 2),
("asin_bought_month", "previous_asin_bought_month", "lastyear_asin_bought_month", "bought_month", None),
]
def calculate_change(current_col, previous_col):
rise_col = F.col(current_col) - F.col(previous_col)
change_col = F.when(
F.col(current_col).isNull() & F.col(previous_col).isNull(), F.lit(None)
).when(
F.col(current_col).isNull(), F.lit(-1000.0)
).when(
F.col(previous_col).isNull() | (F.col(previous_col) == 0), F.lit(1000.0)
).otherwise(
F.round((F.col(current_col) - F.col(previous_col)) / F.col(previous_col), 4)
)
return rise_col, change_col
for current_col, prev_col, lastyear_col, suffix, rise_round in columns_to_change:
rise_col, mom_col = calculate_change(current_col, prev_col)
_, yoy_col = calculate_change(current_col, lastyear_col)
if rise_round is not None:
if rise_round == "int":
df = df.withColumn(f"{suffix}_rise", rise_col.cast(IntegerType()))
else:
df = df.withColumn(f"{suffix}_rise", F.round(rise_col, rise_round))
df = df.withColumn(f"{suffix}_mom", F.round(mom_col, 4)) \
.withColumn(f"{suffix}_yoy", F.round(yoy_col, 4))
df = df.drop('previous_asin_bought_month', 'lastyear_asin_bought_month')
return df
# 13. 字段标准化
def handle_column_name(self, df):
df = df.withColumnRenamed("asin_bs_cate_1_id", "category_first_id")\
.withColumnRenamed("asin_bs_cate_current_id", "category_id") \
.withColumnRenamed("asin_bs_cate_1_rank", "first_category_rank")\
.withColumnRenamed("asin_bs_cate_current_rank", "current_category_rank") \
.withColumnRenamed("variat_num", "variation_num")\
.withColumnRenamed("seller_id", "account_id").withColumnRenamed("seller_country_name", "site_name") \
.withColumnRenamed("asinUpdateTime", "asin_crawl_date")\
.withColumnRenamed("customer_reviews_json", "product_features")\
.withColumn("asin_crawl_date", F.coalesce(
F.to_timestamp(F.col("asin_crawl_date")),
F.current_timestamp()
)) \
.withColumn("launch_time", F.to_timestamp(F.col("launch_time"))) \
.withColumn("rating", F.round(F.col("rating"), 1)) \
.withColumn("collapse_asin", F.coalesce(F.col("parent_asin"), F.col("asin")))\
.withColumn("amazon_label", F.when(
F.coalesce(F.get_json_object(F.col("amazon_label"), "$[0].badge_type"),
F.get_json_object(F.col("amazon_label"), "$.badge_type")) != "unknown",
F.coalesce(F.get_json_object(F.col("amazon_label"), "$[0].badge_type"),
F.get_json_object(F.col("amazon_label"), "$.badge_type"))
))
df_save = df.select(
# ── 主键/标识 ──────────────────────────────────────────────────
"asin", # Kafka,ASIN 唯一标识
"parent_asin", # Kafka,父 ASIN
"collapse_asin", # 派生:coalesce(parent_asin, asin),折叠用 ASIN
"asin_crawl_date", # Kafka asinUpdateTime,本次爬取时间
# ── 标题/描述 ──────────────────────────────────────────────────
"title", # Kafka,商品标题(已转小写)
"title_len", # Kafka,标题字符数
"brand", # Kafka,品牌(已转小写)
F.col("describe").alias("asin_describe"), # Kafka describe,五点描述
"describe_len", # 派生:五点描述去分隔符后长度
"product_features", # Kafka customer_reviews_json,买家评测特性
"together_asin", # Kafka,组合购买推荐 ASIN
# ── 价格 ───────────────────────────────────────────────────────
F.round(F.col("price").cast("double"), 2).alias("price"), # Kafka,商品售价
F.round(F.col("fbm_delivery_price").cast("double"), 2).alias("fbm_price"), # Kafka,FBM 配送价
# ── 评分/评论 ──────────────────────────────────────────────────
"rating", # Kafka,综合评分
"total_comments", # Kafka,总评论数
"one_star", "two_star", "three_star", "four_star", "five_star", "low_star", # Kafka,各星级分布
# ── 销量/BSR ───────────────────────────────────────────────────
"bsr_orders", # df_bs_report,BSR 排名对应月销量估算
F.round(F.col("bsr_orders_sale").cast("double"), 2).alias("bsr_orders_sale"), # 派生:bsr_orders × price,BSR 销售额估算
"asin_bought_month", # df_asin_measure,页面月购买量
# ── 流量/占比 ──────────────────────────────────────────────────
"ao_val", # df_asin_measure,ASIN 自然流量占比值
"zr_counts", # df_asin_measure,自然排名位置数
"sp_counts", # df_asin_measure,SP 广告位置数
"sb_counts", # df_asin_measure,SB 广告位置数
"vi_counts", # df_asin_measure,视频广告位置数
"bs_counts", # df_asin_measure,BS 版位数
"ac_counts", # df_asin_measure,AC 标数
"tr_counts", # df_asin_measure,趋势标数
"er_counts", # df_asin_measure,编辑推荐位数
"zr_flow_proportion", # df_asin_measure,自然搜索流量占比
F.round(F.col("matrix_flow_proportion"), 4).alias("matrix_flow_proportion"), # get_asin_variant_attribute,母体自然流量占比
F.round(F.col("matrix_ao_val"), 4).alias("matrix_ao_val"), # get_asin_variant_attribute,母体 ao_val
F.round(self._safe_double("one_two_val"), 4).alias("one_two_val"), # Kafka,1-2 位流量占比
F.round(self._safe_double("three_four_val"), 4).alias("three_four_val"), # Kafka,3-4 位流量占比
F.round(self._safe_double("five_six_val"), 4).alias("five_six_val"), # Kafka,5-6 位流量占比
F.round(self._safe_double("eight_val"), 4).alias("eight_val"), # Kafka,8 位以上流量占比
# ── 分类 ───────────────────────────────────────────────────────
"category_first_id", # BSR 解析 + node_id 补充,一级分类 ID
"category_id", # BSR 解析 + node_id 补充,当前分类 ID
"desc_category_first_id", # category 文本截取后匹配得到的一级分类 ID
"first_category_rank", # BSR 解析,一级分类排名
"current_category_rank", # BSR 解析,当前分类排名
# ── 商品属性 ───────────────────────────────────────────────────
F.round(F.col("weight").cast("double"), 4).alias("weight"), # 派生:weight_str 解析(磅)
"volume", # Kafka,体积原始字符串
"asin_weight_ratio", # 派生:体积重 / 实重比率
"color", # 变体属性 + product_json Color 补充
"size", # 变体属性
"style", # 变体属性
"material", # Kafka,材质
"package_quantity", # 派生:包装数量(Number of Items / 属性 / 标题解析)
"is_package_quantity_abnormal", # 派生:包装数量异常(0正常/1异常/2默认)
"variation_num", # Kafka variat_num,变体总数
"page_inventory", # Kafka,页面展示库存量
"activity_type", # Kafka,促销活动类型
"launch_time", # Kafka,上架时间
# ── 图片 ───────────────────────────────────────────────────────
"img_url", # Kafka,主图 URL
"img_num", # Kafka,图片数量
"img_type", # Kafka → 处理为 array<int>,图片类型列表
"img_info", # 派生:图片详情 JSON(url/order/type)
# ── 卖家/店铺 ──────────────────────────────────────────────────
"account_id", # Kafka seller_id + df_asin_seller 补充,卖家 ID
"account_name", # UDF + df_asin_seller 补充,卖家名称
"buy_box_seller_type", # UDF,配送类型(1.自营/2.FBA/3.FBM/4.其他/0.未知)
"site_name", # df_seller_country,卖家所在地
F.col("follow_sellers").alias("follow_sellers_count"), # Kafka,关注该 ASIN 的卖家数
# ── LOB(Make-It-A-Bundle)────────────────────────────────────
"asin_lob_info", # 派生:捆绑销售关联 ASIN 列表
"is_contains_lob_info", # 派生:是否含捆绑销售信息(0/1)
# ── LQS 评分 ───────────────────────────────────────────────────
F.round(F.col("asin_lqs_rating"), 1).alias("asin_lqs_rating"), # 派生:LQS 综合评分
"asin_lqs_rating_detail", # 派生:LQS 各维度评分 JSON
# ── 标识/标签 ──────────────────────────────────────────────────
"amazon_label", # Kafka → 解析 badge_type(兼容新旧格式)
"is_movie_label", # 派生:是否电影/媒体类(0/1)
"is_brand_label", # 派生:是否有品牌(0/1)
"multi_color_flag", # 派生:多色标识(0.非多色/1.属性多色/2.标题降级)
"multi_color_str", # 派生:多色描述字符串
# ── 区间类型 ───────────────────────────────────────────────────
"rank_type", # 一级排名区间(0-8)
"ao_val_type", # ao_val 区间(0-7)
"price_type", # 价格区间(0-6)
"rating_type", # 评分区间(0-5)
"size_type", # 尺寸类型(0-6/7)
"weight_type", # 重量类型(0-6)
"site_name_type", # 卖家所在地类型(1.US/2.CN/3.其他/4.自营/5.HK/6.TW)
"quantity_variation_type", # 是否数量变体(0/1)
# ── 变化率(绝对值/环比/同比)────────────────────────────────
"rank_rise", "rank_mom", "rank_yoy",
"ao_rise", "ao_mom", "ao_yoy",
"price_rise", "price_mom", "price_yoy",
"rating_rise", "rating_mom", "rating_yoy",
"comments_rise", "comments_mom", "comments_yoy",
"bsr_orders_rise", "bsr_orders_mom", "bsr_orders_yoy",
"sales_rise", "sales_mom", "sales_yoy",
"variation_rise", "variation_mom", "variation_yoy",
"bought_month_mom", "bought_month_yoy", # 月销无绝对值,仅环比/同比
)
df_save = df_save.na.fill(
{"zr_counts": 0, "sp_counts": 0, "sb_counts": 0, "vi_counts": 0, "bs_counts": 0, "ac_counts": 0,
"tr_counts": 0, "er_counts": 0, "title_len": 0, "total_comments": 0, "variation_num": 0, "img_num": 0,
"one_two_val": 0.0, "three_four_val": 0.0, "five_six_val": 0.0, "eight_val": 0.0,
"one_star": 0, "two_star": 0, "three_star": 0, "four_star": 0, "five_star": 0, "low_star": 0,
"size_type": 0, "rating_type": 0, "site_name_type": 0, "weight_type": 0,
"ao_val_type": 0, "rank_type": 0, "price_type": 0, "quantity_variation_type": 0, "package_quantity": 1,
"is_movie_label": 0, "is_brand_label": 0, "asin_lqs_rating": 0.0, "follow_sellers_count": -1,
"describe_len": 0, "multi_color_flag": 0}
)
print("asin的标准信息:")
df_save.show(10, truncate=False)
return df_save
def read_data(self):
print("1a. 读取上个维度的flow_asin")
sql = f"""
select asin, asin_ao_val as previous_asin_ao_val, asin_price as previous_asin_price,
variation_num as previous_asin_variation_num, asin_rating as previous_asin_rating,
asin_total_comments as previous_asin_total_comments, first_category_rank as previous_first_category_rank,
bsr_orders as previous_asin_bsr_orders, sales as previous_sales, asin_bought_month as previous_asin_bought_month
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '30day'
"""
print("sql=", sql)
self.df_previous_flow_asin = self.spark.sql(sqlQuery=sql)
self.df_previous_flow_asin = self.df_previous_flow_asin.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_previous_flow_asin.show(10, truncate=False)
print("1b. 读取同比去年的flow_asin")
sql = f"""
select asin, asin_ao_val as lastyear_asin_ao_val, asin_price as lastyear_asin_price,
variation_num as lastyear_asin_variation_num, asin_rating as lastyear_asin_rating,
asin_total_comments as lastyear_asin_total_comments, first_category_rank as lastyear_first_category_rank,
bsr_orders as lastyear_asin_bsr_orders, sales as lastyear_sales, asin_bought_month as lastyear_asin_bought_month
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = 'month' and date_info = '{self.date_info_last_year}'
"""
print("sql=", sql)
self.df_previous_flow_asin_lastyear = self.spark.sql(sqlQuery=sql)
self.df_previous_flow_asin_lastyear = self.df_previous_flow_asin_lastyear.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_previous_flow_asin_lastyear.show(10, truncate=False)
print("2. 获取店铺相关信息")
sql = f"""
select fd_unique as seller_id, fd_account_name as account_name, upper(fd_country_name) as seller_country_name, asin, updated_at
from dim_fd_asin_info_30day where site_name='{self.site_name}' and date_type = '30day' and fd_unique is not null"""
print("sql=", sql)
self.df_seller_info = self.spark.sql(sqlQuery=sql)
self.df_seller_info = self.df_seller_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_seller_info.show(10, truncate=False)
# df_seller_country: 按 seller_id 去重,保留 seller_country_name 非空的最新记录
window_seller = Window.partitionBy('seller_id').orderBy(F.col('updated_at').desc())
self.df_seller_country = self.df_seller_info \
.filter(F.col('seller_country_name').isNotNull()) \
.withColumn('rank', F.row_number().over(window_seller)) \
.filter(F.col('rank') == 1) \
.select('seller_id', 'seller_country_name') \
.persist(StorageLevel.DISK_ONLY)
# df_asin_seller: 按 asin 去重,保留最新记录
window_asin = Window.partitionBy('asin').orderBy(F.col('updated_at').desc())
self.df_asin_seller = self.df_seller_info \
.withColumn('rank', F.row_number().over(window_asin)) \
.filter(F.col('rank') == 1) \
.select('asin', F.col('seller_id').alias('fd_seller_id'), F.col('account_name').alias('fd_account_name')) \
.persist(StorageLevel.DISK_ONLY)
# df_seller_info 已完成派生,及时释放缓存
self.df_seller_info.unpersist()
print("3. 读取asin_label信息")
sql = f"""
select asin, label from
(select asin, lower(label) as label, created_time,row_number() over(partition by asin,label order by updated_time desc) as crank
from ods_other_search_term_data where site_name='{self.site_name}' and date_type='30day' and trim(label) not in ('null','') and label is not null) t where t.crank=1
"""
print("sql=", sql)
self.df_asin_label_info = self.spark.sql(sqlQuery=sql)
self.df_asin_label_info = self.df_asin_label_info.groupby(['asin']).agg(F.collect_set("label").alias("asin_label_list"))
self.df_asin_label_info = self.df_asin_label_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_label_info.show(10, truncate=False)
print("4. 读取dwd_asin_measure拿取ao及各类型数量")
sql = f"""
select asin, asin_sp_counts as sp_counts, (asin_sb1_counts + asin_sb2_counts) as sb_counts, asin_sb3_counts as vi_counts,
asin_bs_counts as bs_counts, asin_ac_counts as ac_counts, asin_tr_counts as tr_counts, asin_er_counts as er_counts,
asin_st_counts, asin_zr_counts, asin_adv_counts, round(asin_zr_flow_proportion, 4) as asin_zr_flow_proportion,
round(asin_ao_val, 4) as asin_ao_val, asin_amazon_orders
from dwd_asin_measure where site_name='{self.site_name}' and date_type='30day'
"""
print("sql=", sql)
self.df_asin_measure = self.spark.sql(sqlQuery=sql)
self.df_asin_measure = self.df_asin_measure.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_measure.show(10, truncate=False)
print("5. 读取one_category_report表")
sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as bsr_orders from ods_one_category_report " \
f"where site_name='{self.site_name}' and date_type='30day'"
print("sql=", sql)
self.df_bs_report = self.spark.sql(sqlQuery=sql)
self.df_bs_report = self.df_bs_report.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_bs_report.show(10, truncate=False)
print("6. 通过node_id获取一级分类进行分类补充")
df_asin_new_cate = get_node_first_id_df(self.site_name, self.spark)
self.df_asin_new_cate = F.broadcast(df_asin_new_cate)
self.df_asin_new_cate.show(10, truncate=False)
print("7. 获取分类ID与分类名称的对应关系")
df_asin_category = get_first_id_from_category_desc_df(self.site_name, self.spark)
self.df_asin_category = F.broadcast(df_asin_category.select(
F.lower(F.col('category_first_name')).alias('desc_category_first_name'),
F.col('category_first_id').alias('desc_category_first_id')
))
self.df_asin_category.show(10, truncate=False)
print("8. 读取颜色词表 dim_asin_color_info")
color_rows = self.spark.sql(
f"SELECT lower(en_name) as en_name FROM dim_asin_color_info WHERE site_name='{self.site_name}'"
).collect()
self.color_set = {row.en_name for row in color_rows}
print(f"颜色词表共 {len(self.color_set)} 条")
# 字段处理逻辑综合
def handle_all_field(self, df):
# 1. 处理asin分类及排名以及排名类型字段
df = self.handle_asin_bs_category_info(df)
# 2. 利用node_id进行分类补充
df = self.handle_asin_category_supplement(df)
# 3. 处理bsr销量及销售额信息以及价格类型字段
df = self.handle_asin_bsr_orders(df)
# 4. 解析Make-It-A-Bundle信息
df = self.handle_asin_lob_info(df)
# 5. 处理配送方式、卖家所在地以及卖家所在地类型
df = self.handle_asin_buy_box_seller_type(df)
# 6. 处理asin基础属性信息(长宽高重量等)
df = self.handle_asin_basic_attribute_info(df)
# 7. 处理asin图片信息
df = self.handle_asin_img_info(df)
# 8. 处理变体相关(ao及母体相关,自然占比及母体自然占比,各类型数量,月销信息等)
df = self.handle_asin_measure(df)
# 9. 提取打包数量字段
df = self.handle_asin_package_quantity(df)
# 9.5. 多色判断(依赖 color 字段已从变体属性+product_json 补全)
df = self.handle_multi_color_flag(df)
# 10. 处理品牌标签、是否告警品牌、处理asin_lqs_rating信息
df = self.handle_asin_lqs_and_brand(df)
# 11.通过ASIN页面信息处理(评分类型、上架时间类型、电影标签、ASIN类型、有效类型)
df = self.handle_asin_detail_all_type(df)
# 12. 处理变化率相关字段
df = self.handle_asin_attribute_change(df)
# 13. 写入 ASIN 最新详情表(在字段标准化 select 过滤前,趁 category/seller_json 等原始字段还在)
self.save_asin_latest_detail(df)
# 14. 字段标准化
df_save = self.handle_column_name(df)
return df_save
# 字符串 → double 安全转换:拦截 'nan'/'inf'/'infinity' 等字符串
# cast('double') 对这类字符串会得到 Double.NaN / Double.Infinity,
# Doris JSON 写入时无法解析为 DECIMAL,置 null 避免整行被过滤
@staticmethod
def _safe_double(col_name):
c = F.col(col_name).cast("double")
return F.when(F.isnan(c) | (F.abs(c) == F.lit(float('inf'))), F.lit(None)).otherwise(c)
# 写入 ASIN 最新详情表 Doris(仅 latest+normal 模式)
def save_asin_latest_detail(self, df):
if self.consumer_type != 'latest' or self.test_flag != 'normal':
return
print(f"导出ASIN最新详情信息到doris:")
start_time = time.time()
df_asin_latest_detail = df.select(
"asin",
F.col("ao_val").alias("asin_ao_val"),
F.col("title").alias("asin_title"),
F.col("title_len").alias("asin_title_len"),
F.col("category").alias("asin_category_desc"),
F.col("volume").alias("asin_volume"),
F.col("weight").alias("asin_weight"),
F.col("launch_time").alias("asin_launch_time"),
F.col("brand").alias("asin_brand_name"),
"one_star", "two_star", "three_star", "four_star", "five_star", "low_star",
"account_name", "account_id",
F.col("site_name").alias("seller_country_name"),
F.col("asin_bs_cate_1_id").alias("category_first_id"),
"parent_asin",
F.col("variat_num").alias("variation_num"),
"img_info",
F.col("asinUpdateTime").alias("asin_crawl_date"),
F.col("price").alias("asin_price"),
F.col("rating").alias("asin_rating"),
F.col("total_comments").alias("asin_total_comments"),
"matrix_ao_val", "zr_flow_proportion", "matrix_flow_proportion",
F.lit(self.date_info).alias("date_info"),
"img_url",
F.col("asin_bs_cate_current_id").alias("category_current_id"),
F.col("asin_bs_cate_1_rank").alias("category_first_rank"),
F.col("asin_bs_cate_current_rank").alias("category_current_rank"),
"bsr_orders", "bsr_orders_sale", "page_inventory", "asin_bought_month",
"seller_json", "buy_box_seller_type",
F.col("describe").alias("asin_describe"),
F.col("fbm_delivery_price").alias("asin_fbm_price"),
F.col("describe_len").alias("asin_describe_len"),
)
table_columns = """asin, asin_ao_val, asin_title, asin_title_len, asin_category_desc, asin_volume,
asin_weight, asin_launch_time, asin_brand_name, one_star, two_star, three_star, four_star, five_star, low_star,
account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info,
asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion,
date_info, img_url, category_current_id, category_first_rank, category_current_rank, bsr_orders, bsr_orders_sale,
page_inventory, asin_bought_month, seller_json, buy_box_seller_type, asin_describe, asin_fbm_price, asin_describe_len"""
DorisHelper.spark_export_with_columns(
df_save=df_asin_latest_detail, db_name=self.doris_db,
table_name=self.asin_latest_detail_table, table_columns=table_columns
)
end_time = time.time()
print(f"Doris {self.asin_latest_detail_table} 写入完毕,耗时:{end_time - start_time:.1f}s")
# 写入doris逻辑
def save_to_doris(self, df, batch_num):
print(f"写入Doris,数据量:{batch_num}")
start_time = time.time()
# ---- 30day 主表写入 ----
# array 序列化为 JSON 字符串(Doris StreamLoad ARRAY<INT> 格式)
df = df.withColumn("img_type", F.to_json(F.col("img_type")))
doris_30day_columns = """asin, ao_val, zr_counts, sp_counts, sb_counts, vi_counts, bs_counts, ac_counts,
tr_counts, er_counts, bsr_orders, bsr_orders_sale, title, title_len, price, rating, total_comments,
buy_box_seller_type, page_inventory, volume, weight, color, size, style, material, launch_time,
img_num, parent_asin, img_type, img_url, activity_type, one_two_val, three_four_val, five_six_val,
eight_val, brand, variation_num, one_star, two_star, three_star, four_star, five_star, low_star,
together_asin, account_name, account_id,
rank_rise, rank_mom, rank_yoy, ao_rise, ao_mom, ao_yoy,
price_rise, price_mom, price_yoy, rating_rise, rating_mom, rating_yoy,
comments_rise, comments_mom, comments_yoy, bsr_orders_rise, bsr_orders_mom, bsr_orders_yoy,
sales_rise, sales_mom, sales_yoy, variation_rise, variation_mom, variation_yoy,
bought_month_mom, bought_month_yoy,
size_type, rating_type, site_name_type, weight_type, ao_val_type, rank_type,
price_type, quantity_variation_type, package_quantity,
is_movie_label, is_brand_label, asin_crawl_date,
category_first_id, category_id, first_category_rank, current_category_rank, desc_category_first_id, asin_weight_ratio,
site_name, asin_bought_month, asin_lqs_rating, asin_lqs_rating_detail, asin_lob_info,
is_contains_lob_info, is_package_quantity_abnormal, zr_flow_proportion, matrix_flow_proportion,
matrix_ao_val, product_features, img_info, collapse_asin, follow_sellers_count, asin_describe,
fbm_price, describe_len, multi_color_flag, multi_color_str, amazon_label"""
print(f"写入Doris {self.doris_30day_table}")
DorisHelper.spark_export_with_columns(
df_save=df, db_name=self.doris_db,
table_name=self.doris_30day_table, table_columns=doris_30day_columns
)
end_time = time.time()
print(f"Doris {self.doris_30day_table} 写入完毕,耗时:{end_time - start_time:.1f}s")
# 实时消费中批次数据的处理逻辑(latest 模式)
def handle_kafka_stream(self, df, batch_id):
try:
batch_num = df.count()
if batch_num > 0:
start_time = time.time()
print(f"当前批次:{batch_id}; 该批次数据量为:{batch_num}")
df = df.repartition(self.repartition_num)
df_save = self.handle_all_field(df)
self.save_to_doris(df_save, batch_num)
df_save.unpersist()
end_time = time.time()
print(f"当前批次:{batch_id} 执行完毕, 执行时长为:{end_time - start_time:.1f}s")
else:
print("当前批次没有数据")
except Exception as e:
print(e, traceback.format_exc())
# 消费主题下的所有历史数据
def handle_kafka_history(self, kafka_df):
print("处理kafka历史数据")
batch_num = kafka_df.count()
if batch_num > 0:
start_time = time.time()
kafka_df = kafka_df.repartition(self.repartition_num)
kafka_df = self.handle_all_field(kafka_df)
self.save_to_doris(kafka_df, batch_num)
end_time = time.time()
print("该批次数据处理完毕, 执行时长为:" + str(end_time - start_time))
else:
raise ValueError("当前主题中没有数据,请注意检查!")
if __name__ == '__main__':
arguments = sys.argv[1:]
site_name = sys.argv[1] # 参数1:站点,如 us
date_type = sys.argv[2] # 参数2:类型,如 month
date_info = sys.argv[3] # 参数3:年-月,如 2026-03
consumer_type = sys.argv[4] # 参数4:history / latest
if len(arguments) == 5:
test_flag = sys.argv[5]
else:
test_flag = 'normal'
handle_obj = KafkaFlowAsinDetail(site_name=site_name, date_type=date_type, date_info=date_info, consumer_type=consumer_type, test_flag=test_flag, batch_size=200000)
handle_obj.run_kafka()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment