Commit 42cac787 by chenyuanjie

Amazon页面存在跳转的asin处理

parent af235b1f
......@@ -160,7 +160,8 @@ class DimAsinDetail(object):
customer_reviews_json, parent_asin, img_list, created_at as created_time, updated_at as updated_time,
updated_at as dt, variat_num as variation_num, fbm_delivery_price as asin_fbm_price,
get_json_object(product_json, '$.Color') as product_json_color,
get_json_object(product_json, '$.Number of Items') as product_json_number_of_items
get_json_object(product_json, '$.Number of Items') as product_json_number_of_items,
current_asin
from ods_asin_detail where site_name='{self.site_name}' {self.date_sql}"""
print(sql)
self.df_asin_detail = self.spark.sql(sqlQuery=sql)
......@@ -557,6 +558,7 @@ class DimAsinDetail(object):
"asin_bought_month", "asin_length", "asin_width", "asin_height", "asin_is_self",
"customer_reviews_json", "img_list", "variat_list",
F.round("asin_fbm_price", 2).alias("asin_fbm_price"),
"current_asin",
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_type).alias('date_type'),
F.lit(self.date_info).alias('date_info')).persist(StorageLevel.MEMORY_ONLY)
......
......@@ -177,7 +177,7 @@ class DwdStMeasure(Templates):
self.df_asin_bs = self.spark.sql(sql).cache()
self.df_asin_bs.show(10)
sql = f"select asin, asin_title, asin_price, parent_asin, asin_bought_month " \
sql = f"select asin, asin_title, asin_price, parent_asin, asin_bought_month, current_asin, updated_time " \
f"from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type.replace('_old', '')}' and date_info='{self.date_info}';"
print("sql:", sql)
self.df_asin_detail = self.spark.sql(sql).cache()
......@@ -360,6 +360,29 @@ class DwdStMeasure(Templates):
self.df_st_asin = self.df_st_asin.join(
self.df_st_asin_flow, on=['page_rank'], how='left'
)
# === asin跳转处理:将df_st_asin和df_asin_detail的asin统一替换为current_asin ===
# 1. 保留原始缓存引用,从中构建跳转映射:原始asin → current_asin
df_asin_detail_cached = self.df_asin_detail
redirect_map = df_asin_detail_cached.filter(F.col('current_asin').isNotNull()) \
.select(F.col('asin').alias('redirect_src'), F.col('current_asin'))
# 2. df_st_asin中的原始asin替换为current_asin(无跳转则保留原值)
self.df_st_asin = self.df_st_asin \
.join(redirect_map, self.df_st_asin.asin == redirect_map.redirect_src, how='left') \
.withColumn('asin', F.coalesce(F.col('current_asin'), F.col('asin'))) \
.drop('redirect_src', 'current_asin')
# 3. df_asin_detail的asin键同步替换,asin重复时按updated_time降序保留最新一条;释放原始缓存
window_redirect = Window.partitionBy('asin').orderBy(F.col('updated_time').desc())
self.df_asin_detail = df_asin_detail_cached \
.withColumn('asin', F.coalesce(F.col('current_asin'), F.col('asin'))) \
.drop('current_asin') \
.withColumn('_rk', F.row_number().over(window_redirect)) \
.filter(F.col('_rk') == 1) \
.drop('_rk', 'updated_time') \
.cache()
df_asin_detail_cached.unpersist()
# ===================================================
# st -- dim_st_detail已经有
# asin
self.df_asin_bs = self.df_asin_bs.join(
......
......@@ -210,7 +210,7 @@ class DwtFlowAsin(Templates):
date_format(created_time, 'yyyy-MM-dd HH:mm:ss') as asin_crawl_date, asin_bought_month, asin_image_view,
case when product_description is not null then 1 else 0 end as is_with_product_description, asin_describe,
category_id as top_category_id, category_first_id as top_category_first_id, customer_reviews_json, img_list as img_info,
asin_follow_sellers as follow_sellers_count, asin_fbm_price
asin_follow_sellers as follow_sellers_count, asin_fbm_price, current_asin
from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"""
print("sql:" + sql)
self.df_asin_detail = self.spark.sql(sqlQuery=sql)
......@@ -833,7 +833,24 @@ class DwtFlowAsin(Templates):
print("不用导出旧数据到doris中")
pass
def handle_asin_redirect(self):
"""asin跳转处理:将跳转asin替换为current_asin,按asin去重保留最新抓取时间"""
df_old = self.df_asin_detail
window_redirect = Window.partitionBy('asin').orderBy(F.col('asin_crawl_date').desc_nulls_last())
self.df_asin_detail = df_old \
.withColumn('asin', F.coalesce(F.col('current_asin'), F.col('asin'))) \
.drop('current_asin') \
.withColumn('_rk', F.row_number().over(window_redirect)) \
.filter(F.col('_rk') == 1) \
.drop('_rk') \
.repartition(60) \
.persist(StorageLevel.DISK_ONLY)
redirect_count = self.df_asin_detail.count()
print(f"asin跳转处理完成,去重后数据量: {redirect_count}")
df_old.unpersist()
def handle_data(self):
self.handle_asin_redirect()
self.handle_asin_basic_attribute()
self.handle_asin_detail_all_type()
self.handle_asin_category_info()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment