Commit 811c1967 by chenyuanjie

流量选品30天问题修复

parent dbc8e4f5
......@@ -31,8 +31,12 @@ class KafkaFlowAsinDetail(Templates):
self.test_flag = test_flag
# day 模式 date_info 形如 2026-05-12,month 模式形如 2026-03;统一替换 - 为 _ 拼 topic
self.year_month = str(self.date_info).replace("-", "_")
# date_info_last_year 取月份级别同比:day 模式截取前 7 位(2026-05-12 → 2026-05),month 模式直接用
self.date_info_last_year = CommonUtil.get_month_offset(self.date_info[:7], -12)
# date_info_month 统一月份维度(day 模式 '2026-05-01' → '2026-05',month 模式 '2026-05' 保留)
# 用于父 ASIN 详情表 / ASIN 最新详情表的 date_info 字段写入,避免日级/月级混存
self.date_info_month = self.date_info[:7]
# date_info_last_month 取上个月(previous 基线);date_info_last_year 取去年同月(yoy 基线)
self.date_info_last_month = CommonUtil.get_month_offset(self.date_info_month, -1)
self.date_info_last_year = CommonUtil.get_month_offset(self.date_info_month, -12)
# spark相关参数
self.app_name = self.get_app_name()
self.spark = SparkUtil.get_stream_spark(app_name=self.app_name)
......@@ -44,11 +48,12 @@ class KafkaFlowAsinDetail(Templates):
self.batch_size_history = 20000
self.check_path = f"/home/big_data_selection/tmp/kafka_checkpoint/{self.topic_name}_{self.consumer_type}_test" if self.test_flag == 'test' else f"/home/big_data_selection/tmp/kafka_checkpoint/{self.topic_name}_{self.consumer_type}"
self.schema = self.init_schema()
# doris相关参数
self.doris_db = "test" if self.test_flag == "test" else "selection"
self.doris_30day_table = f"{self.site_name}_flow_asin_30day_test" if self.test_flag == "test" else f"{self.site_name}_flow_asin_30day"
# doris相关参数:主表落 dwt 库,最新详情/父 ASIN 详情表落 selection 库
self.doris_db_dwt = "test" if self.test_flag == "test" else "dwt"
self.doris_db_selection = "test" if self.test_flag == "test" else "selection"
self.doris_30day_table = f"{self.site_name}_flow_asin_30day_test" if self.test_flag == "test" else f"{self.site_name}_flow_asin_30day"
self.parent_asin_latest_detail_table = f"{self.site_name}_parent_asin_latest_detail_test" if self.test_flag == "test" else f"{self.site_name}_parent_asin_latest_detail"
self.asin_latest_detail_table = f"{self.site_name}_asin_latest_detail_test" if self.test_flag == "test" else f"{self.site_name}_asin_latest_detail"
self.asin_latest_detail_table = f"{self.site_name}_asin_latest_detail_test" if self.test_flag == "test" else f"{self.site_name}_asin_latest_detail"
# 分类解析模板
self.pattern1_dict = {
"us": "See Top 100 in ".lower(),
......@@ -66,6 +71,16 @@ class KafkaFlowAsinDetail(Templates):
"fr": "(\d+) en ",
"it": "(\d+) in ",
}
self.pattern_str = {
"us": r"(\d+ in [\w&' -]+)",
"uk": r"(\d+ in [\w&' -]+)",
# de: 分类名字符集 [\w&' -] 含字母和空格,会贪婪吃后续 "nr 数字" 段;
# 用非贪婪 +? 配合 lookahead 限定结尾:下一段 nr 数字 / 括号注释 / 字符串结束
"de": r"nr (\d+ in [\w&' -]+?)(?=\s*\(|\s+nr\s+\d|$)",
"es": r"nº(\d+ en [\w&' -]+)",
"fr": r"(\d+ en [\w&' -]+)",
"it": r"n (\d+ in [\w&' -]+)", # n. → n
} # 匹配排名和分类名称(与 best_sellers_rank lower + replace(".", "") 后匹配)
# DataFrame初始化
self.df_previous_flow_asin = self.spark.sql("select 1+1;")
self.df_previous_flow_asin_lastyear = self.spark.sql("select 1+1;")
......@@ -109,6 +124,7 @@ class KafkaFlowAsinDetail(Templates):
StructField("account_id", StringType(), True)
])
self.u_parse_seller_info = self.spark.udf.register('u_parse_seller_info', udf_parse_seller_json, seller_schema)
self.u_rank_and_category = self.spark.udf.register('u_rank_and_category', self.udf_rank_and_category, StringType())
@staticmethod
def init_schema():
......@@ -194,11 +210,20 @@ class KafkaFlowAsinDetail(Templates):
df = df.withColumn("asin_bs_sellers_rank_lower", F.lower("best_sellers_rank"))
df = df.withColumn("asin_bs", self.u_parse_bs_category(
"asin_bs_sellers_rank_lower", "best_sellers_herf", "all_best_sellers_herf", F.lit(cate_current_pattern), F.lit(cate_1_pattern), "node_id"))
# 新增:解析 best_sellers_rank 为 rank_and_category(&&&& 拼接),作为最终 Doris 字段值
df = df.withColumn("rank_cate", self.u_rank_and_category(
F.col("best_sellers_rank"),
F.lit(self.pattern_str[self.site_name]),
F.lit(self.pattern1_dict[self.site_name])
))
df = df.withColumn("asin_bs_cate_1_id", df.asin_bs.getField("asin_bs_cate_1_id")) \
.withColumn("asin_bs_cate_current_id", df.asin_bs.getField("asin_bs_cate_current_id")) \
.withColumn("asin_bs_cate_1_rank", df.asin_bs.getField("asin_bs_cate_1_rank")) \
.withColumn("asin_bs_cate_current_rank", df.asin_bs.getField("asin_bs_cate_current_rank")) \
.drop("asin_bs", "asin_bs_sellers_rank_lower", "best_sellers_herf", "all_best_sellers_herf", "best_sellers_rank")
.drop("asin_bs", "asin_bs_sellers_rank_lower", "all_best_sellers_herf")
# 用解析后的 rank_and_category 覆盖原 best_sellers_rank 原文
df = df.drop("best_sellers_rank") \
.withColumnRenamed("rank_cate", "best_sellers_rank")
df = df.withColumn("rank_type", F.expr("""
CASE WHEN asin_bs_cate_1_rank IS NOT NULL AND asin_bs_cate_1_rank BETWEEN 0 AND 1000 THEN 1
WHEN asin_bs_cate_1_rank BETWEEN 1000 AND 5000 THEN 2 WHEN asin_bs_cate_1_rank BETWEEN 5000 AND 10000 THEN 3
......@@ -272,7 +297,8 @@ class KafkaFlowAsinDetail(Templates):
# 6. 处理asin基础属性信息(长宽高重量等)
def handle_asin_basic_attribute_info(self, df):
# 1.解析ASIN重量相关信息
df = df.withColumn("weight_str", F.lower(F.col("weight_str"))).withColumn("asin_weight", self.u_parse_weight("weight_str", F.lit(self.site_name))).drop("weight_str")
df = df.withColumn("weight_str", F.lower(F.col("weight_str"))).withColumn("asin_weight", self.u_parse_weight("weight_str", F.lit(self.site_name)))
df = df.withColumnRenamed("weight_str", "asin_weight_str")
df = df.withColumn(
"weight", F.when(df.asin_weight.getField("weight_type") == 'pounds', df.asin_weight.getField("weight")).otherwise(F.lit(0))).drop("asin_weight")
# 2.处理重量类型
......@@ -372,10 +398,14 @@ class KafkaFlowAsinDetail(Templates):
)
print("导出父ASIN最新变体信息到doris:")
df_doris = df_asin_variat_agg.select(
"parent_asin", F.lit(self.date_info).alias("date_info"), "asin_crawl_date", "variation_info", "attr_info",
"parent_asin",
F.lit(self.date_info_month).alias("date_info"),
# Doris 新表 asin_crawl_date 是 DATETIME,需 string → timestamp
F.to_timestamp(F.col("asin_crawl_date")).alias("asin_crawl_date"),
"variation_info", "attr_info",
F.current_timestamp().alias("updated_at"))
table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info, updated_at"
DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns)
DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db_selection, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns)
return df
# 9. 提取打包数量字段
......@@ -770,29 +800,48 @@ class KafkaFlowAsinDetail(Templates):
return df_save
def read_data(self):
print("1a. 读取上个维度的flow_asin")
sql = f"""
select asin, asin_ao_val as previous_asin_ao_val, asin_price as previous_asin_price,
variation_num as previous_asin_variation_num, asin_rating as previous_asin_rating,
asin_total_comments as previous_asin_total_comments, first_category_rank as previous_first_category_rank,
bsr_orders as previous_asin_bsr_orders, sales as previous_sales, asin_bought_month as previous_asin_bought_month
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = '30day'
"""
print("sql=", sql)
self.df_previous_flow_asin = self.spark.sql(sqlQuery=sql)
self.df_previous_flow_asin = self.df_previous_flow_asin.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
# 1a/1b 共用读取逻辑:单表读取 + PySpark DataFrame join,避免 Hive 服务端 3 表 JOIN 性能差
def _load_baseline(date_info_target, alias_prefix):
sql_measure = f"""
select asin, asin_ao_val, asin_bsr_orders
from dwd_asin_measure
where site_name = '{self.site_name}' and date_type = 'month' and date_info = '{date_info_target}'
"""
sql_detail = f"""
select asin, asin_price, variation_num, asin_rating, asin_total_comments, asin_bought_month
from dim_asin_detail
where site_name = '{self.site_name}' and date_type = 'month' and date_info = '{date_info_target}'
"""
sql_bs = f"""
select asin, asin_bs_cate_1_rank
from dim_asin_bs_info
where site_name = '{self.site_name}' and date_type = 'month' and date_info = '{date_info_target}'
"""
print(f"sql_measure({alias_prefix})=", sql_measure)
print(f"sql_detail ({alias_prefix})=", sql_detail)
print(f"sql_bs ({alias_prefix})=", sql_bs)
df_m = self.spark.sql(sqlQuery=sql_measure).repartition(self.repartition_num, 'asin')
df_d = self.spark.sql(sqlQuery=sql_detail).repartition(self.repartition_num, 'asin')
df_b = self.spark.sql(sqlQuery=sql_bs).repartition(self.repartition_num, 'asin')
df = df_m.join(df_d, on='asin', how='left').join(df_b, on='asin', how='left')
return df.select(
F.col('asin'),
F.round(F.col('asin_ao_val'), 3).alias(f'{alias_prefix}_asin_ao_val'),
F.col('asin_price').alias(f'{alias_prefix}_asin_price'),
F.col('variation_num').alias(f'{alias_prefix}_asin_variation_num'),
F.col('asin_rating').alias(f'{alias_prefix}_asin_rating'),
F.col('asin_total_comments').alias(f'{alias_prefix}_asin_total_comments'),
F.col('asin_bs_cate_1_rank').alias(f'{alias_prefix}_first_category_rank'),
F.col('asin_bsr_orders').alias(f'{alias_prefix}_asin_bsr_orders'),
F.round(F.col('asin_bsr_orders') * F.col('asin_price'), 2).alias(f'{alias_prefix}_sales'),
F.col('asin_bought_month').alias(f'{alias_prefix}_asin_bought_month'),
).persist(StorageLevel.DISK_ONLY)
print(f"1a. 读取上个月维度的flow_asin(date_type=month, date_info={self.date_info_last_month})")
self.df_previous_flow_asin = _load_baseline(self.date_info_last_month, 'previous')
self.df_previous_flow_asin.show(10, truncate=False)
print("1b. 读取同比去年的flow_asin")
sql = f"""
select asin, asin_ao_val as lastyear_asin_ao_val, asin_price as lastyear_asin_price,
variation_num as lastyear_asin_variation_num, asin_rating as lastyear_asin_rating,
asin_total_comments as lastyear_asin_total_comments, first_category_rank as lastyear_first_category_rank,
bsr_orders as lastyear_asin_bsr_orders, sales as lastyear_sales, asin_bought_month as lastyear_asin_bought_month
from dwt_flow_asin where site_name = '{self.site_name}' and date_type = 'month' and date_info = '{self.date_info_last_year}'
"""
print("sql=", sql)
self.df_previous_flow_asin_lastyear = self.spark.sql(sqlQuery=sql)
self.df_previous_flow_asin_lastyear = self.df_previous_flow_asin_lastyear.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
print(f"1b. 读取同比去年的flow_asin(date_type=month, date_info={self.date_info_last_year})")
self.df_previous_flow_asin_lastyear = _load_baseline(self.date_info_last_year, 'lastyear')
self.df_previous_flow_asin_lastyear.show(10, truncate=False)
print("2. 获取店铺相关信息")
sql = f"""
......@@ -909,9 +958,27 @@ class KafkaFlowAsinDetail(Templates):
c = F.col(col_name).cast("double")
return F.when(F.isnan(c) | (F.abs(c) == F.lit(float('inf'))), F.lit(None)).otherwise(c)
# 写入 ASIN 最新详情表 Doris(仅 latest+normal 模式)
@staticmethod
def udf_rank_and_category(best_sellers_rank, pattern_str, top100_prefix):
import re
# 原文(去掉 . 和 ,)用于截取分类名,保留大小写
original = str(best_sellers_rank).replace(".", "").replace(",", "")
# de 数据形如 "Nr X in 分类 (Siehe Top 100 in 分类)" — 先把括号注释整段删除,
# 避免分类名尾部贪婪吃到 "(" 前空格,保证与 us_asin_latest_detail.py 输出一致
original = re.sub(r"\s*\([^)]*\)", "", original)
# lower 版本仅用于正则匹配定位(pattern_str 与 top100_prefix 都是小写)
lowered = original.lower().replace(top100_prefix, "")
# 同步从原文剔除 Top100 前缀(大小写不敏感)
cleaned_original = re.sub(re.escape(top100_prefix), "", original, flags=re.IGNORECASE)
# 用小写匹配定位每个 "排名 + 分类" 在原文中的 span,再切原文保留大小写
matches = list(re.finditer(pattern_str, lowered))
if not matches:
return None
return "&&&&".join([cleaned_original[m.start():m.end()] for m in matches])
# 写入 ASIN 最新详情表 Doris(仅 normal 模式)
def save_asin_latest_detail(self, df):
if self.consumer_type != 'latest' or self.test_flag != 'normal':
if self.test_flag != 'normal':
return
print(f"导出ASIN最新详情信息到doris:")
start_time = time.time()
......@@ -923,21 +990,23 @@ class KafkaFlowAsinDetail(Templates):
F.col("category").alias("asin_category_desc"),
F.col("volume").alias("asin_volume"),
F.col("weight").alias("asin_weight"),
F.col("launch_time").alias("asin_launch_time"),
# Doris 端 DATETIME 严格类型检查,string → timestamp 需显式转换
F.to_timestamp(F.col("launch_time")).alias("asin_launch_time"),
F.col("brand").alias("asin_brand_name"),
"one_star", "two_star", "three_star", "four_star", "five_star", "low_star",
"account_name", "account_id",
F.col("site_name").alias("seller_country_name"),
"account_name",
F.col("seller_id").alias("account_id"), # handle_column_name 之前还叫 seller_id
"seller_country_name", # handle_column_name 之前还叫 seller_country_name(不是 site_name)
F.col("asin_bs_cate_1_id").alias("category_first_id"),
"parent_asin",
F.col("variat_num").alias("variation_num"),
"img_info",
F.col("asinUpdateTime").alias("asin_crawl_date"),
F.to_timestamp(F.col("asinUpdateTime")).alias("asin_crawl_date"),
F.col("price").alias("asin_price"),
F.col("rating").alias("asin_rating"),
F.col("total_comments").alias("asin_total_comments"),
"matrix_ao_val", "zr_flow_proportion", "matrix_flow_proportion",
F.lit(self.date_info).alias("date_info"),
F.lit(self.date_info_month).alias("date_info"),
"img_url",
F.col("asin_bs_cate_current_id").alias("category_current_id"),
F.col("asin_bs_cate_1_rank").alias("category_first_rank"),
......@@ -947,15 +1016,19 @@ class KafkaFlowAsinDetail(Templates):
F.col("describe").alias("asin_describe"),
F.col("fbm_delivery_price").alias("asin_fbm_price"),
F.col("describe_len").alias("asin_describe_len"),
"asin_weight_str",
"best_sellers_rank",
"best_sellers_herf",
)
table_columns = """asin, asin_ao_val, asin_title, asin_title_len, asin_category_desc, asin_volume,
asin_weight, asin_launch_time, asin_brand_name, one_star, two_star, three_star, four_star, five_star, low_star,
account_name, account_id, seller_country_name, category_first_id, parent_asin, variation_num, img_info,
asin_crawl_date, asin_price, asin_rating, asin_total_comments, matrix_ao_val, zr_flow_proportion, matrix_flow_proportion,
date_info, img_url, category_current_id, category_first_rank, category_current_rank, bsr_orders, bsr_orders_sale,
page_inventory, asin_bought_month, seller_json, buy_box_seller_type, asin_describe, asin_fbm_price, asin_describe_len"""
page_inventory, asin_bought_month, seller_json, buy_box_seller_type, asin_describe, asin_fbm_price, asin_describe_len,
asin_weight_str, best_sellers_rank, best_sellers_herf"""
DorisHelper.spark_export_with_columns(
df_save=df_asin_latest_detail, db_name=self.doris_db,
df_save=df_asin_latest_detail, db_name=self.doris_db_selection,
table_name=self.asin_latest_detail_table, table_columns=table_columns
)
end_time = time.time()
......@@ -991,7 +1064,7 @@ class KafkaFlowAsinDetail(Templates):
fbm_price, describe_len, multi_color_flag, multi_color_str, amazon_label"""
print(f"写入Doris {self.doris_30day_table}")
DorisHelper.spark_export_with_columns(
df_save=df, db_name=self.doris_db,
df_save=df, db_name=self.doris_db_dwt,
table_name=self.doris_30day_table, table_columns=doris_30day_columns
)
end_time = time.time()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment