Commit 7c8cdf59 by chenyuanjie

实时任务-隐藏分类逻辑补充

parent 0bcdbf7c
...@@ -99,6 +99,7 @@ class KafkaFlowAsinDetail(Templates): ...@@ -99,6 +99,7 @@ class KafkaFlowAsinDetail(Templates):
self.df_asin_keep_date = self.spark.sql("select 1+1;") self.df_asin_keep_date = self.spark.sql("select 1+1;")
self.df_asin_bsr_end = self.spark.sql("select 1+1;") self.df_asin_bsr_end = self.spark.sql("select 1+1;")
self.df_hide_category = self.spark.sql("select 1+1;") self.df_hide_category = self.spark.sql("select 1+1;")
self.df_bsr_category = self.spark.sql("select 1+1;")
self.df_asin_new_cate = self.spark.sql("select 1+ 1;") self.df_asin_new_cate = self.spark.sql("select 1+ 1;")
self.df_user_package_num = self.spark.sql("select 1+1;") self.df_user_package_num = self.spark.sql("select 1+1;")
self.df_asin_category = self.spark.sql("select 1+1;") self.df_asin_category = self.spark.sql("select 1+1;")
...@@ -567,11 +568,18 @@ class KafkaFlowAsinDetail(Templates): ...@@ -567,11 +568,18 @@ class KafkaFlowAsinDetail(Templates):
df = df.withColumn("bsr_type", F.expr(""" df = df.withColumn("bsr_type", F.expr("""
CASE WHEN limit_rank is null and asin_bs_cate_1_rank <= 500000 THEN 1 WHEN limit_rank is not null and asin_bs_cate_1_rank <= limit_rank THEN 1 ELSE 0 END""" CASE WHEN limit_rank is null and asin_bs_cate_1_rank <= 500000 THEN 1 WHEN limit_rank is not null and asin_bs_cate_1_rank <= limit_rank THEN 1 ELSE 0 END"""
)).drop("limit_rank") )).drop("limit_rank")
# 5. 是否必需ASIN # 5. 是否必需ASIN(双重确认:BSR分类 + 页面描述分类,两者均在排除列表才标记为1)
df = df.withColumn("is_need_asin", F.expr(""" df = df.withColumn(
CASE WHEN asin_bs_cate_1_id in ('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed') THEN 1 "desc_category_first_name",
F.lower(F.trim(F.split(F.col("category"), "›").getItem(0)))
).join(self.df_bsr_category, on=['desc_category_first_name'], how='left')
need_categories = "('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed')"
df = df.withColumn("is_need_asin", F.expr(f"""
CASE WHEN asin_bs_cate_1_id in {need_categories}
AND desc_category_first_id in {need_categories} THEN 1
WHEN asin NOT LIKE 'B0%' THEN 1 WHEN asin NOT LIKE 'B0%' THEN 1
ELSE 0 END""")) ELSE 0 END"""))
df = df.drop("desc_category_first_name", "desc_category_first_id")
# 6. asin_type # 6. asin_type
df = df.withColumn("asin_type", F.expr(""" df = df.withColumn("asin_type", F.expr("""
CASE WHEN is_self_asin=1 THEN 1 WHEN is_need_asin=1 THEN 2 WHEN is_hide_asin=1 THEN 3 ELSE 0 END""" CASE WHEN is_self_asin=1 THEN 1 WHEN is_need_asin=1 THEN 2 WHEN is_hide_asin=1 THEN 3 ELSE 0 END"""
...@@ -805,6 +813,13 @@ class KafkaFlowAsinDetail(Templates): ...@@ -805,6 +813,13 @@ class KafkaFlowAsinDetail(Templates):
username=us_mysql_con_info['username'], query=sql) username=us_mysql_con_info['username'], query=sql)
self.df_hide_category = F.broadcast(df_hide_category) self.df_hide_category = F.broadcast(df_hide_category)
self.df_hide_category.show(10, truncate=False) self.df_hide_category.show(10, truncate=False)
print("5.1 读取BSR分类树(用于双重确认is_need_asin)")
sql = f"""
select lower(trim(en_name)) as desc_category_first_name, category_first_id as desc_category_first_id
from dim_bsr_category_tree where site_name = '{self.site_name}' and category_parent_id = 0 and leaf_node = 2
"""
self.df_bsr_category = F.broadcast(self.spark.sql(sqlQuery=sql))
self.df_bsr_category.show(10, truncate=False)
print("6. 读取asin_label信息") print("6. 读取asin_label信息")
sql = f""" sql = f"""
select asin, label from select asin, label from
......
...@@ -98,6 +98,7 @@ class KafkaRankAsinDetail(Templates): ...@@ -98,6 +98,7 @@ class KafkaRankAsinDetail(Templates):
self.df_asin_keep_date = self.spark.sql("select 1+1;") self.df_asin_keep_date = self.spark.sql("select 1+1;")
self.df_asin_bsr_end = self.spark.sql("select 1+1;") self.df_asin_bsr_end = self.spark.sql("select 1+1;")
self.df_hide_category = self.spark.sql("select 1+1;") self.df_hide_category = self.spark.sql("select 1+1;")
self.df_bsr_category = self.spark.sql("select 1+1;")
self.df_asin_new_cate = self.spark.sql("select 1+ 1;") self.df_asin_new_cate = self.spark.sql("select 1+ 1;")
self.df_user_package_num = self.spark.sql("select 1+1;") self.df_user_package_num = self.spark.sql("select 1+1;")
self.df_asin_category = self.spark.sql("select 1+1;") self.df_asin_category = self.spark.sql("select 1+1;")
...@@ -566,11 +567,18 @@ class KafkaRankAsinDetail(Templates): ...@@ -566,11 +567,18 @@ class KafkaRankAsinDetail(Templates):
df = df.withColumn("bsr_type", F.expr(""" df = df.withColumn("bsr_type", F.expr("""
CASE WHEN limit_rank is null and asin_bs_cate_1_rank <= 500000 THEN 1 WHEN limit_rank is not null and asin_bs_cate_1_rank <= limit_rank THEN 1 ELSE 0 END""" CASE WHEN limit_rank is null and asin_bs_cate_1_rank <= 500000 THEN 1 WHEN limit_rank is not null and asin_bs_cate_1_rank <= limit_rank THEN 1 ELSE 0 END"""
)).drop("limit_rank") )).drop("limit_rank")
# 5. 是否必需ASIN # 5. 是否必需ASIN(双重确认:BSR分类 + 页面描述分类,两者均在排除列表才标记为1)
df = df.withColumn("is_need_asin", F.expr(""" df = df.withColumn(
CASE WHEN asin_bs_cate_1_id in ('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed') THEN 1 "desc_category_first_name",
F.lower(F.trim(F.split(F.col("category"), "›").getItem(0)))
).join(self.df_bsr_category, on=['desc_category_first_name'], how='left')
need_categories = "('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed')"
df = df.withColumn("is_need_asin", F.expr(f"""
CASE WHEN asin_bs_cate_1_id in {need_categories}
AND desc_category_first_id in {need_categories} THEN 1
WHEN asin NOT LIKE 'B0%' THEN 1 WHEN asin NOT LIKE 'B0%' THEN 1
ELSE 0 END""")) ELSE 0 END"""))
df = df.drop("desc_category_first_name", "desc_category_first_id")
# 6. asin_type # 6. asin_type
df = df.withColumn("asin_type", F.expr(""" df = df.withColumn("asin_type", F.expr("""
CASE WHEN is_self_asin=1 THEN 1 WHEN is_need_asin=1 THEN 2 WHEN is_hide_asin=1 THEN 3 ELSE 0 END""" CASE WHEN is_self_asin=1 THEN 1 WHEN is_need_asin=1 THEN 2 WHEN is_hide_asin=1 THEN 3 ELSE 0 END"""
...@@ -804,6 +812,13 @@ class KafkaRankAsinDetail(Templates): ...@@ -804,6 +812,13 @@ class KafkaRankAsinDetail(Templates):
username=us_mysql_con_info['username'], query=sql) username=us_mysql_con_info['username'], query=sql)
self.df_hide_category = F.broadcast(df_hide_category) self.df_hide_category = F.broadcast(df_hide_category)
self.df_hide_category.show(10, truncate=False) self.df_hide_category.show(10, truncate=False)
print("5.1 读取BSR分类树(用于双重确认is_need_asin)")
sql = f"""
select lower(trim(en_name)) as desc_category_first_name, category_first_id as desc_category_first_id
from dim_bsr_category_tree where site_name = '{self.site_name}' and category_parent_id = 0 and leaf_node = 2
"""
self.df_bsr_category = F.broadcast(self.spark.sql(sqlQuery=sql))
self.df_bsr_category.show(10, truncate=False)
print("6. 读取asin_label信息") print("6. 读取asin_label信息")
sql = f""" sql = f"""
select asin, label from select asin, label from
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment