Commit fdc90083 by chenyuanjie

实时消费-asin月销fix

parent b34a7b2c
......@@ -18,7 +18,7 @@ from pyspark.sql import Window
from pyspark.storagelevel import StorageLevel
from utils.DorisHelper import DorisHelper
from yswg_utils.common_df import get_node_first_id_df, get_first_id_from_category_desc_df
from yswg_utils.common_udf import udf_parse_bs_category, parse_weight_str, udf_extract_volume_dimensions, udf_get_package_quantity_with_flag as udf_get_package_quantity, udf_parse_seller_json
from yswg_utils.common_udf import udf_parse_bs_category, parse_weight_str, udf_extract_volume_dimensions, udf_get_package_quantity_with_flag as udf_get_package_quantity, udf_parse_seller_json, udf_parse_amazon_orders
class KafkaFlowAsinDetail(Templates):
......@@ -125,6 +125,7 @@ class KafkaFlowAsinDetail(Templates):
])
self.u_parse_seller_info = self.spark.udf.register('u_parse_seller_info', udf_parse_seller_json, seller_schema)
self.u_rank_and_category = self.spark.udf.register('u_rank_and_category', self.udf_rank_and_category, StringType())
self.u_parse_amazon_orders = self.spark.udf.register('u_parse_amazon_orders', udf_parse_amazon_orders, IntegerType())
@staticmethod
def init_schema():
......@@ -378,7 +379,17 @@ class KafkaFlowAsinDetail(Templates):
WHEN asin_ao_val >= 2 THEN 7 ELSE 0 END"""))
df = df.withColumnRenamed("asin_zr_counts", "zr_counts").withColumnRenamed("asin_ao_val", "ao_val") \
.withColumnRenamed("asin_zr_flow_proportion", "zr_flow_proportion") \
.withColumnRenamed("asin_amazon_orders", "asin_bought_month").drop("asin_st_counts", "asin_adv_counts")
.drop("asin_st_counts", "asin_adv_counts")
# asin_bought_month 取数规则:优先解析 buy_sales 文本("200+ bought in past month"),
# 解析为 NULL 或 0 时用 dwd_asin_measure 的 asin_amazon_orders 兜底
df = df.withColumn(
"asin_bought_month",
F.coalesce(
F.when(self.u_parse_amazon_orders(F.col("buy_sales")) > 0,
self.u_parse_amazon_orders(F.col("buy_sales"))),
F.col("asin_amazon_orders"),
)
).drop("asin_amazon_orders")
# 获取parent_asin下最新ASIN信息,导出到 doris 父ASIN最新详情表(仅 latest+normal 模式)
if self.consumer_type == 'latest' and self.test_flag == 'normal':
df_parent_asin_info = df.filter("parent_asin is not null").select("parent_asin", "asin_vartion_list", "asinUpdateTime")
......@@ -802,28 +813,14 @@ class KafkaFlowAsinDetail(Templates):
def read_data(self):
# 1a/1b 共用读取逻辑:单表读取 + PySpark DataFrame join,避免 Hive 服务端 3 表 JOIN 性能差
def _load_baseline(date_info_target, alias_prefix):
sql_measure = f"""
select asin, asin_ao_val, asin_bsr_orders
from dwd_asin_measure
where site_name = '{self.site_name}' and date_type = 'month' and date_info = '{date_info_target}'
"""
sql_detail = f"""
select asin, asin_price, variation_num, asin_rating, asin_total_comments, asin_bought_month
from dim_asin_detail
where site_name = '{self.site_name}' and date_type = 'month' and date_info = '{date_info_target}'
"""
sql_bs = f"""
select asin, asin_bs_cate_1_rank
from dim_asin_bs_info
sql_dwt = f"""
select asin, asin_ao_val, asin_price, variation_num, asin_rating, asin_total_comments,
first_category_rank, bsr_orders, asin_bought_month
from dwt_flow_asin
where site_name = '{self.site_name}' and date_type = 'month' and date_info = '{date_info_target}'
"""
print(f"sql_measure({alias_prefix})=", sql_measure)
print(f"sql_detail ({alias_prefix})=", sql_detail)
print(f"sql_bs ({alias_prefix})=", sql_bs)
df_m = self.spark.sql(sqlQuery=sql_measure).repartition(self.repartition_num, 'asin')
df_d = self.spark.sql(sqlQuery=sql_detail).repartition(self.repartition_num, 'asin')
df_b = self.spark.sql(sqlQuery=sql_bs).repartition(self.repartition_num, 'asin')
df = df_m.join(df_d, on='asin', how='left').join(df_b, on='asin', how='left')
print(f"sql_dwt({alias_prefix})=", sql_dwt)
df = self.spark.sql(sqlQuery=sql_dwt).repartition(self.repartition_num, 'asin')
return df.select(
F.col('asin'),
F.round(F.col('asin_ao_val'), 3).alias(f'{alias_prefix}_asin_ao_val'),
......@@ -831,9 +828,9 @@ class KafkaFlowAsinDetail(Templates):
F.col('variation_num').alias(f'{alias_prefix}_asin_variation_num'),
F.col('asin_rating').alias(f'{alias_prefix}_asin_rating'),
F.col('asin_total_comments').alias(f'{alias_prefix}_asin_total_comments'),
F.col('asin_bs_cate_1_rank').alias(f'{alias_prefix}_first_category_rank'),
F.col('asin_bsr_orders').alias(f'{alias_prefix}_asin_bsr_orders'),
F.round(F.col('asin_bsr_orders') * F.col('asin_price'), 2).alias(f'{alias_prefix}_sales'),
F.col('first_category_rank').alias(f'{alias_prefix}_first_category_rank'),
F.col('bsr_orders').alias(f'{alias_prefix}_asin_bsr_orders'),
F.round(F.col('bsr_orders') * F.col('asin_price'), 2).alias(f'{alias_prefix}_sales'),
F.col('asin_bought_month').alias(f'{alias_prefix}_asin_bought_month'),
).persist(StorageLevel.DISK_ONLY)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment