Commit 1e0dca4d by chenyuanjie

流量选品30天增加字段is_amazon_new,Amazon新品标记

parent e22e3efe
...@@ -618,7 +618,13 @@ class KafkaFlowAsinDetail(Templates): ...@@ -618,7 +618,13 @@ class KafkaFlowAsinDetail(Templates):
movie_label_list, movie_label_list,
F.lit(False) F.lit(False)
) )
df = df.withColumn("is_movie_label", condition.cast("int")).drop("asin_label_list") df = df.withColumn("is_movie_label", condition.cast("int"))
# 3. amazon 新品标签: badge_list 任意元素匹配 %new on amazon% → 1, 否则 0
# otherwise(0) 已覆盖 badge_list 为 NULL 的情况(exists(NULL,...)=NULL → 走 otherwise)
df = df.withColumn(
"is_amazon_new", F.when(F.expr("exists(badge_list, x -> x like '%new on amazon%')"), F.lit(1)).otherwise(F.lit(0))
)
df = df.drop("asin_label_list", "badge_list")
return df return df
# 12. 处理变化率相关字段(环比_mom / 同比_yoy 统一处理) # 12. 处理变化率相关字段(环比_mom / 同比_yoy 统一处理)
...@@ -773,6 +779,7 @@ class KafkaFlowAsinDetail(Templates): ...@@ -773,6 +779,7 @@ class KafkaFlowAsinDetail(Templates):
"asin_lqs_rating_detail", # 派生:LQS 各维度评分 JSON "asin_lqs_rating_detail", # 派生:LQS 各维度评分 JSON
# ── 标识/标签 ────────────────────────────────────────────────── # ── 标识/标签 ──────────────────────────────────────────────────
"amazon_label", # Kafka → 解析 badge_type(兼容新旧格式) "amazon_label", # Kafka → 解析 badge_type(兼容新旧格式)
"is_amazon_new", # 派生:badge_text 含 "new on amazon" 则 1
"is_movie_label", # 派生:是否电影/媒体类(0/1) "is_movie_label", # 派生:是否电影/媒体类(0/1)
"is_brand_label", # 派生:是否有品牌(0/1) "is_brand_label", # 派生:是否有品牌(0/1)
"multi_color_flag", # 派生:多色标识(0.非多色/1.属性多色/2.标题降级) "multi_color_flag", # 派生:多色标识(0.非多色/1.属性多色/2.标题降级)
...@@ -866,15 +873,21 @@ class KafkaFlowAsinDetail(Templates): ...@@ -866,15 +873,21 @@ class KafkaFlowAsinDetail(Templates):
.persist(StorageLevel.DISK_ONLY) .persist(StorageLevel.DISK_ONLY)
# df_seller_info 已完成派生,及时释放缓存 # df_seller_info 已完成派生,及时释放缓存
self.df_seller_info.unpersist() self.df_seller_info.unpersist()
print("3. 读取asin_label信息") print("3. 读取asin_label信息(含 badge_text)")
sql = f""" sql = f"""
select asin, label from select asin, lower(label) as label, lower(badge_text) as badge_text
(select asin, lower(label) as label, created_time,row_number() over(partition by asin,label order by updated_time desc) as crank from ods_other_search_term_data
from ods_other_search_term_data where site_name='{self.site_name}' and date_type='30day' and trim(label) not in ('null','') and label is not null) t where t.crank=1 where site_name = '{self.site_name}' and date_type = '30day'
""" """
print("sql=", sql) print("sql=", sql)
self.df_asin_label_info = self.spark.sql(sqlQuery=sql) df_label_raw = self.spark.sql(sqlQuery=sql) \
self.df_asin_label_info = self.df_asin_label_info.groupby(['asin']).agg(F.collect_set("label").alias("asin_label_list")) .withColumn("label", F.when(F.trim(F.col("label")) .isin('null', ''), F.lit(None)).otherwise(F.col("label"))) \
.withColumn("badge_text", F.when(F.trim(F.col("badge_text")) .isin('null', ''), F.lit(None)).otherwise(F.col("badge_text")))
# 按 asin 聚合: collect_set 自动去重 + 过滤 NULL
self.df_asin_label_info = df_label_raw.groupby(['asin']).agg(
F.collect_set("label").alias("asin_label_list"),
F.collect_set("badge_text").alias("badge_list"),
)
self.df_asin_label_info = self.df_asin_label_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY) self.df_asin_label_info = self.df_asin_label_info.repartition(self.repartition_num).persist(StorageLevel.DISK_ONLY)
self.df_asin_label_info.show(10, truncate=False) self.df_asin_label_info.show(10, truncate=False)
print("4. 读取dwd_asin_measure拿取ao及各类型数量") print("4. 读取dwd_asin_measure拿取ao及各类型数量")
...@@ -1061,7 +1074,7 @@ class KafkaFlowAsinDetail(Templates): ...@@ -1061,7 +1074,7 @@ class KafkaFlowAsinDetail(Templates):
site_name, asin_bought_month, asin_lqs_rating, asin_lqs_rating_detail, asin_lob_info, site_name, asin_bought_month, asin_lqs_rating, asin_lqs_rating_detail, asin_lob_info,
is_contains_lob_info, is_package_quantity_abnormal, zr_flow_proportion, matrix_flow_proportion, is_contains_lob_info, is_package_quantity_abnormal, zr_flow_proportion, matrix_flow_proportion,
matrix_ao_val, product_features, img_info, collapse_asin, follow_sellers_count, asin_describe, matrix_ao_val, product_features, img_info, collapse_asin, follow_sellers_count, asin_describe,
fbm_price, describe_len, multi_color_flag, multi_color_str, amazon_label""" fbm_price, describe_len, multi_color_flag, multi_color_str, amazon_label, is_amazon_new"""
print(f"写入Doris {self.doris_30day_table}") print(f"写入Doris {self.doris_30day_table}")
DorisHelper.spark_export_with_columns( DorisHelper.spark_export_with_columns(
df_save=df, db_name=self.doris_db_dwt, df_save=df, db_name=self.doris_db_dwt,
......
...@@ -28,7 +28,8 @@ if __name__ == '__main__': ...@@ -28,7 +28,8 @@ if __name__ == '__main__':
label, label,
created_time, created_time,
updated_time, updated_time,
asin_brand asin_brand,
badge_text
from {import_table} from {import_table}
where 1=1 where 1=1
and \$CONDITIONS and \$CONDITIONS
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment