Commit 1a27c704 by chenyuanjie

Doris变体表增加字段母体AO及母体流量占比

parent c3d08b7b
...@@ -29,7 +29,6 @@ from utils.hdfs_utils import HdfsUtils ...@@ -29,7 +29,6 @@ from utils.hdfs_utils import HdfsUtils
from utils.db_util import DBUtil from utils.db_util import DBUtil
from datetime import datetime from datetime import datetime
from pyspark.storagelevel import StorageLevel from pyspark.storagelevel import StorageLevel
from utils.DorisHelper import DorisHelper
from yswg_utils.common_udf import udf_get_package_quantity_with_flag as udf_get_package_quantity, udf_parse_seller_json, udf_parse_amazon_orders from yswg_utils.common_udf import udf_get_package_quantity_with_flag as udf_get_package_quantity, udf_parse_seller_json, udf_parse_amazon_orders
...@@ -58,9 +57,6 @@ class DimAsinDetail(object): ...@@ -58,9 +57,6 @@ class DimAsinDetail(object):
self.cal_date = CommonUtil.get_calDay_by_dateInfo(self.spark, self.date_type, self.date_info) self.cal_date = CommonUtil.get_calDay_by_dateInfo(self.spark, self.date_type, self.date_info)
self.partitions_by = ['site_name', 'date_type', 'date_info'] self.partitions_by = ['site_name', 'date_type', 'date_info']
self.partition_num = CommonUtil.reset_partitions(self.site_name, partitions_num=80) self.partition_num = CommonUtil.reset_partitions(self.site_name, partitions_num=80)
# Doris相关参数
self.doris_db = "selection"
self.parent_asin_latest_detail_table = f"{self.site_name}_parent_asin_latest_detail"
# 自定义df相关 # 自定义df相关
self.df_asin_keep_date = self.spark.sql(f"select 1+1;") self.df_asin_keep_date = self.spark.sql(f"select 1+1;")
self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_asin_detail = self.spark.sql(f"select 1+1;")
...@@ -486,40 +482,6 @@ class DimAsinDetail(object): ...@@ -486,40 +482,6 @@ class DimAsinDetail(object):
else: else:
pass pass
# 处理parent_asin下最新变体信息
def handle_latest_variation_info(self):
if self.date_type in ['month', 'month_week', 'month_aba_me'] and self.date_info >= '2024-06':
df_asin_variat = self.df_asin_detail.filter("parent_asin is not null").select("parent_asin", "variat_list_change", "created_time")
latest_asin_window = Window.partitionBy('parent_asin').orderBy(
F.desc_nulls_last("created_time")
)
df_asin_variat = df_asin_variat.withColumn("p_rank", F.row_number().over(window=latest_asin_window))
df_asin_variat = df_asin_variat.filter("p_rank = 1").drop("p_rank")
df_asin_variat = df_asin_variat.filter(F.size("variat_list_change") > 0). \
select("parent_asin", "created_time", F.explode("variat_list_change").alias("variant_attribute")). \
select("parent_asin", "created_time", F.col("variant_attribute")[0].alias("asin"),
F.col("variant_attribute")[1].alias("color"), F.col("variant_attribute")[3].alias("size"),
F.col("variant_attribute")[5].alias("style"))
df_asin_variat_agg = df_asin_variat.groupby(['parent_asin']).agg(
F.first("created_time").alias("asin_crawl_date"),
F.concat_ws(',', F.collect_list("asin")).alias("variation_info"),
F.to_json(F.collect_list(F.struct(F.col("color"), F.col("size"), F.col("style")))).alias("attr_info")
).repartition(100).persist(StorageLevel.DISK_ONLY)
print(f"父ASIN变体聚合数量:{df_asin_variat_agg.count()}")
print("导出父ASIN最新变体信息到doris:")
df_doris = df_asin_variat_agg.select(
"parent_asin",
F.lit(self.date_info).alias("date_info"),
# Doris 新表 asin_crawl_date 是 DATETIME,需 string → timestamp 显式转
F.to_timestamp(F.col("asin_crawl_date")).alias("asin_crawl_date"),
"variation_info", "attr_info",
F.current_timestamp().alias("updated_at"))
table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info, updated_at"
DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns)
df_asin_variat_agg.unpersist()
else:
pass
# 字段标准化及存储 # 字段标准化及存储
def df_save(self): def df_save(self):
df_save = self.df_asin_detail \ df_save = self.df_asin_detail \
...@@ -613,8 +575,6 @@ class DimAsinDetail(object): ...@@ -613,8 +575,6 @@ class DimAsinDetail(object):
self.handle_asin_label() self.handle_asin_label()
# 处理asin小图信息 # 处理asin小图信息
self.handle_asin_img_info() self.handle_asin_img_info()
# 处理parent_asin下最新变体信息
self.handle_latest_variation_info()
# 字段标准化及存储 # 字段标准化及存储
self.df_save() self.df_save()
......
...@@ -58,6 +58,7 @@ class DwtFlowAsin(Templates): ...@@ -58,6 +58,7 @@ class DwtFlowAsin(Templates):
# doris相关配置 # doris相关配置
self.doris_db = "test" if self.test_flag == "test" else "selection" self.doris_db = "test" if self.test_flag == "test" else "selection"
self.asin_latest_detail_table = f"{self.site_name}_asin_latest_detail" self.asin_latest_detail_table = f"{self.site_name}_asin_latest_detail"
self.parent_asin_latest_detail_table = f"{self.site_name}_parent_asin_latest_detail"
# 写入、分区初始化 # 写入、分区初始化
self.df_save = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name', 'date_type', 'date_info'] self.partitions_by = ['site_name', 'date_type', 'date_info']
...@@ -78,6 +79,7 @@ class DwtFlowAsin(Templates): ...@@ -78,6 +79,7 @@ class DwtFlowAsin(Templates):
self.df_flow_asin_last_year = self.spark.sql(f"select 1+1;") self.df_flow_asin_last_year = self.spark.sql(f"select 1+1;")
self.df_keepa_asin = self.spark.sql(f"select 1+1;") self.df_keepa_asin = self.spark.sql(f"select 1+1;")
self.df_asin_source_flag = self.spark.sql(f"select 1+1;") self.df_asin_source_flag = self.spark.sql(f"select 1+1;")
self.df_parent_asin_variat_agg = self.spark.sql(f"select 1+1;")
self.color_set = set() # 颜色词表,read_data 阶段填充 self.color_set = set() # 颜色词表,read_data 阶段填充
@staticmethod @staticmethod
...@@ -210,7 +212,7 @@ class DwtFlowAsin(Templates): ...@@ -210,7 +212,7 @@ class DwtFlowAsin(Templates):
date_format(created_time, 'yyyy-MM-dd HH:mm:ss') as asin_crawl_date, asin_bought_month, asin_image_view, date_format(created_time, 'yyyy-MM-dd HH:mm:ss') as asin_crawl_date, asin_bought_month, asin_image_view,
case when product_description is not null then 1 else 0 end as is_with_product_description, asin_describe, case when product_description is not null then 1 else 0 end as is_with_product_description, asin_describe,
category_id as top_category_id, category_first_id as top_category_first_id, customer_reviews_json, img_list as img_info, category_id as top_category_id, category_first_id as top_category_first_id, customer_reviews_json, img_list as img_info,
asin_follow_sellers as follow_sellers_count, asin_fbm_price, current_asin, amazon_label asin_follow_sellers as follow_sellers_count, asin_fbm_price, current_asin, amazon_label, variat_list
from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'""" from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"""
print("sql:" + sql) print("sql:" + sql)
self.df_asin_detail = self.spark.sql(sqlQuery=sql) self.df_asin_detail = self.spark.sql(sqlQuery=sql)
...@@ -425,6 +427,44 @@ class DwtFlowAsin(Templates): ...@@ -425,6 +427,44 @@ class DwtFlowAsin(Templates):
).drop("parent_matrix_ao_val", "parent_matrix_flow_proportion") ).drop("parent_matrix_ao_val", "parent_matrix_flow_proportion")
self.df_asin_measure.unpersist() self.df_asin_measure.unpersist()
def handle_parent_asin_variation(self):
"""处理父ASIN变体聚合数据,结果存入 self.df_parent_asin_variat_agg"""
if self.date_type not in ['month', 'month_week'] or self.date_info < '2024-06':
return
variat_schema = ArrayType(ArrayType(StringType()))
latest_window = Window.partitionBy('parent_asin').orderBy(F.desc_nulls_last("asin_crawl_date"))
df_asin_variat = (
self.df_asin_detail
.filter(F.col("parent_asin").isNotNull() & F.col("variat_list").isNotNull())
.withColumn("variat_list_parsed", F.from_json(F.col("variat_list"), variat_schema))
.filter(F.size("variat_list_parsed") > 0)
.withColumn("p_rank", F.row_number().over(latest_window))
.filter(F.col("p_rank") == 1).drop("p_rank")
.select("parent_asin", "asin_crawl_date", "matrix_ao_val", "matrix_flow_proportion",
F.explode("variat_list_parsed").alias("variant_attribute"))
.select("parent_asin", "asin_crawl_date", "matrix_ao_val", "matrix_flow_proportion",
F.col("variant_attribute")[0].alias("asin"),
F.col("variant_attribute")[1].alias("color"),
F.col("variant_attribute")[3].alias("size"),
F.col("variant_attribute")[5].alias("style"))
)
df_asin_variat_agg = df_asin_variat.groupby(['parent_asin']).agg(
F.first("asin_crawl_date").alias("asin_crawl_date"),
F.first("matrix_ao_val").alias("matrix_ao_val"),
F.first("matrix_flow_proportion").alias("matrix_flow_proportion"),
F.concat_ws(',', F.collect_list("asin")).alias("variation_info"),
F.to_json(F.collect_list(F.struct(F.col("color"), F.col("size"), F.col("style")))).alias("attr_info")
)
self.df_parent_asin_variat_agg = df_asin_variat_agg.select(
"parent_asin",
F.lit(self.date_info).alias("date_info"),
F.to_timestamp(F.col("asin_crawl_date")).alias("asin_crawl_date"),
"variation_info", "attr_info",
F.current_timestamp().alias("updated_at"),
F.round(F.col("matrix_flow_proportion"), 4).alias("matrix_flow_proportion"),
F.round(F.col("matrix_ao_val"), 4).alias("matrix_ao_val")
).persist(StorageLevel.DISK_ONLY)
# 处理配送方式、卖家所在地以及卖家所在地类型 # 处理配送方式、卖家所在地以及卖家所在地类型
def handle_seller_country(self): def handle_seller_country(self):
if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') or (self.date_type == '4_week' and self.date_info >= '2024-21'): if (self.date_type in ['month', 'month_week'] and self.date_info >= '2024-05') or (self.date_type == '4_week' and self.date_info >= '2024-21'):
...@@ -832,7 +872,8 @@ class DwtFlowAsin(Templates): ...@@ -832,7 +872,8 @@ class DwtFlowAsin(Templates):
F.col("asin_buy_box_seller_type").cast('tinyint').alias("buy_box_seller_type"), F.col("asin_buy_box_seller_type").cast('tinyint').alias("buy_box_seller_type"),
"asin_describe", "asin_fbm_price", "asin_describe", "asin_fbm_price",
F.col("describe_len").alias("asin_describe_len"), F.col("describe_len").alias("asin_describe_len"),
"asin_type" "asin_type",
F.lit(None).cast("int").alias("is_amazon_new")
) )
table_columns = """asin, asin_ao_val, asin_title, asin_title_len, asin_category_desc, asin_volume, table_columns = """asin, asin_ao_val, asin_title, asin_title_len, asin_category_desc, asin_volume,
asin_weight, asin_weight_str, asin_launch_time, asin_brand_name, asin_weight, asin_weight_str, asin_launch_time, asin_brand_name,
...@@ -847,12 +888,18 @@ class DwtFlowAsin(Templates): ...@@ -847,12 +888,18 @@ class DwtFlowAsin(Templates):
bsr_orders, bsr_orders_sale, page_inventory, asin_bought_month, bsr_orders, bsr_orders_sale, page_inventory, asin_bought_month,
seller_json, buy_box_seller_type, seller_json, buy_box_seller_type,
asin_describe, asin_fbm_price, asin_describe_len, asin_describe, asin_fbm_price, asin_describe_len,
asin_type""" asin_type, is_amazon_new"""
DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.asin_latest_detail_table, table_columns=table_columns) DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.asin_latest_detail_table, table_columns=table_columns)
print("save asin_latest_detail success") print("save asin_latest_detail success")
print("往doris存储父ASIN最新变体信息:")
table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info, updated_at, matrix_flow_proportion, matrix_ao_val"
DorisHelper.spark_export_with_columns(
df_save=self.df_parent_asin_variat_agg, db_name=self.doris_db,
table_name=self.parent_asin_latest_detail_table, table_columns=table_columns
)
print("save parent_asin_latest_detail success")
else: else:
print("不用导出旧数据到doris中") print("不用导出旧数据到doris中")
pass
def handle_asin_redirect(self): def handle_asin_redirect(self):
"""asin跳转处理:将跳转asin替换为current_asin,按asin去重保留最新抓取时间""" """asin跳转处理:将跳转asin替换为current_asin,按asin去重保留最新抓取时间"""
...@@ -876,6 +923,7 @@ class DwtFlowAsin(Templates): ...@@ -876,6 +923,7 @@ class DwtFlowAsin(Templates):
self.handle_asin_detail_all_type() self.handle_asin_detail_all_type()
self.handle_asin_category_info() self.handle_asin_category_info()
self.handle_asin_measure() self.handle_asin_measure()
self.handle_parent_asin_variation()
self.handle_seller_country() self.handle_seller_country()
self.handle_asin_lqs_rating() self.handle_asin_lqs_rating()
self.handle_asin_is_hide() self.handle_asin_is_hide()
......
...@@ -380,20 +380,11 @@ class KafkaFlowAsinDetail(Templates): ...@@ -380,20 +380,11 @@ class KafkaFlowAsinDetail(Templates):
WHEN asin_ao_val >= 2 THEN 7 ELSE 0 END""")) WHEN asin_ao_val >= 2 THEN 7 ELSE 0 END"""))
df = df.withColumnRenamed("asin_zr_counts", "zr_counts").withColumnRenamed("asin_ao_val", "ao_val") \ df = df.withColumnRenamed("asin_zr_counts", "zr_counts").withColumnRenamed("asin_ao_val", "ao_val") \
.withColumnRenamed("asin_zr_flow_proportion", "zr_flow_proportion") \ .withColumnRenamed("asin_zr_flow_proportion", "zr_flow_proportion") \
.withColumnRenamed("asin_amazon_orders", "asin_bought_month") \
.drop("asin_st_counts", "asin_adv_counts") .drop("asin_st_counts", "asin_adv_counts")
# asin_bought_month 取数规则:优先解析 buy_sales 文本("200+ bought in past month"),
# 解析为 NULL 或 0 时用 dwd_asin_measure 的 asin_amazon_orders 兜底
df = df.withColumn(
"asin_bought_month",
F.coalesce(
F.when(self.u_parse_amazon_orders(F.col("buy_sales")) > 0,
self.u_parse_amazon_orders(F.col("buy_sales"))),
F.col("asin_amazon_orders"),
)
).drop("asin_amazon_orders")
# 获取parent_asin下最新ASIN信息,导出到 doris 父ASIN最新详情表(仅 latest+normal 模式) # 获取parent_asin下最新ASIN信息,导出到 doris 父ASIN最新详情表(仅 latest+normal 模式)
if self.consumer_type == 'latest' and self.test_flag == 'normal': if self.test_flag == 'normal':
df_parent_asin_info = df.filter("parent_asin is not null").select("parent_asin", "asin_vartion_list", "asinUpdateTime") df_parent_asin_info = df.filter("parent_asin is not null").select("parent_asin", "asin_vartion_list", "asinUpdateTime", "matrix_flow_proportion", "matrix_ao_val")
parent_asin_window = Window.partitionBy(['parent_asin']).orderBy(F.desc_nulls_last("asinUpdateTime")) parent_asin_window = Window.partitionBy(['parent_asin']).orderBy(F.desc_nulls_last("asinUpdateTime"))
df_parent_asin_info = df_parent_asin_info.withColumn("u_rank", F.row_number().over(window=parent_asin_window)) df_parent_asin_info = df_parent_asin_info.withColumn("u_rank", F.row_number().over(window=parent_asin_window))
df_parent_asin_info = df_parent_asin_info.repartition(self.repartition_num) df_parent_asin_info = df_parent_asin_info.repartition(self.repartition_num)
...@@ -408,6 +399,9 @@ class KafkaFlowAsinDetail(Templates): ...@@ -408,6 +399,9 @@ class KafkaFlowAsinDetail(Templates):
F.concat_ws(',', F.collect_list("asin")).alias("variation_info"), F.concat_ws(',', F.collect_list("asin")).alias("variation_info"),
F.to_json(F.collect_list(F.struct(F.col("color"), F.col("size"), F.col("style")))).alias("attr_info") F.to_json(F.collect_list(F.struct(F.col("color"), F.col("size"), F.col("style")))).alias("attr_info")
) )
# 关联母体 AO 值和自然流量占比(取 parent_asin 维度最新记录的值)
df_parent_matrix = df_parent_asin_info.select("parent_asin", "matrix_flow_proportion", "matrix_ao_val")
df_asin_variat_agg = df_asin_variat_agg.join(df_parent_matrix, on="parent_asin", how="left")
print("导出父ASIN最新变体信息到doris:") print("导出父ASIN最新变体信息到doris:")
df_doris = df_asin_variat_agg.select( df_doris = df_asin_variat_agg.select(
"parent_asin", "parent_asin",
...@@ -415,8 +409,10 @@ class KafkaFlowAsinDetail(Templates): ...@@ -415,8 +409,10 @@ class KafkaFlowAsinDetail(Templates):
# Doris 新表 asin_crawl_date 是 DATETIME,需 string → timestamp # Doris 新表 asin_crawl_date 是 DATETIME,需 string → timestamp
F.to_timestamp(F.col("asin_crawl_date")).alias("asin_crawl_date"), F.to_timestamp(F.col("asin_crawl_date")).alias("asin_crawl_date"),
"variation_info", "attr_info", "variation_info", "attr_info",
F.current_timestamp().alias("updated_at")) F.current_timestamp().alias("updated_at"),
table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info, updated_at" F.round(F.col("matrix_flow_proportion"), 4).alias("matrix_flow_proportion"),
F.round(F.col("matrix_ao_val"), 4).alias("matrix_ao_val"))
table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info, updated_at, matrix_flow_proportion, matrix_ao_val"
DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db_selection, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns) DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db_selection, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns)
return df return df
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment