Commit aa8471e3 by chenyuanjie

导出keepa_asin补充分类过滤逻辑

parent d9aa0ce9
...@@ -101,7 +101,7 @@ class ExportAsinWithoutKeepa(object): ...@@ -101,7 +101,7 @@ class ExportAsinWithoutKeepa(object):
print("1. [month_week] 读取 dim_asin_detail") print("1. [month_week] 读取 dim_asin_detail")
sql = f""" sql = f"""
select asin, asin_price, asin_bought_month, select asin, asin_price, asin_bought_month,
asin_is_self, asin_is_self, asin_category_desc,
category_id as top_category_id, category_id as top_category_id,
category_first_id as top_category_first_id category_first_id as top_category_first_id
from dim_asin_detail from dim_asin_detail
...@@ -137,8 +137,20 @@ class ExportAsinWithoutKeepa(object): ...@@ -137,8 +137,20 @@ class ExportAsinWithoutKeepa(object):
""" """
df_measure = self.spark.sql(sql).repartition(40, 'asin') df_measure = self.spark.sql(sql).repartition(40, 'asin')
# ④ us_bs_category_hide → 隐藏分类(用于 asin_type 计算) # ④ dim_bsr_category_tree → desc_category_first_id(用于 asin_is_need 双重校验)
print("4. 读取 us_bs_category_hide (隐藏分类)") print("4. 读取 dim_bsr_category_tree (分类名称→ID 映射)")
sql = f"""
select lower(trim(en_name)) as desc_category_first_name,
category_first_id as desc_category_first_id
from dim_bsr_category_tree
where site_name = '{self.site_name}'
and category_parent_id = 0
and leaf_node = 2
"""
df_bsr_category = F.broadcast(self.spark.sql(sqlQuery=sql))
# ⑤ us_bs_category_hide → 隐藏分类(用于 asin_type 计算)
print("5. 读取 us_bs_category_hide (隐藏分类)")
mysql_con = DBUtil.get_connection_info("mysql", self.site_name) mysql_con = DBUtil.get_connection_info("mysql", self.site_name)
sql = "select category_id_base as category_id, 1 as hide_flag from us_bs_category_hide group by category_id_base" sql = "select category_id_base as category_id, 1 as hide_flag from us_bs_category_hide group by category_id_base"
df_hide = SparkUtil.read_jdbc_query( df_hide = SparkUtil.read_jdbc_query(
...@@ -146,8 +158,8 @@ class ExportAsinWithoutKeepa(object): ...@@ -146,8 +158,8 @@ class ExportAsinWithoutKeepa(object):
pwd=mysql_con['pwd'], username=mysql_con['username'], query=sql pwd=mysql_con['pwd'], username=mysql_con['username'], query=sql
) )
# 组装 # 组装
print("5. 组装主DataFrame") print("6. 组装主DataFrame")
df = df_dim \ df = df_dim \
.join(df_bsr, on='asin', how='left') \ .join(df_bsr, on='asin', how='left') \
.join(df_measure, on='asin', how='left') .join(df_measure, on='asin', how='left')
...@@ -169,7 +181,15 @@ class ExportAsinWithoutKeepa(object): ...@@ -169,7 +181,15 @@ class ExportAsinWithoutKeepa(object):
).drop("asin_amazon_orders") ).drop("asin_amazon_orders")
# asin_type 计算(对齐 dwt.handle_asin_is_hide) # asin_type 计算(对齐 dwt.handle_asin_is_hide)
# desc_category_first_name:解析 asin_category_desc,取 › 分隔的第一段
df = df.withColumn(
"desc_category_first_name",
F.lower(F.trim(F.split(F.col("asin_category_desc"), "›").getItem(0)))
).join(df_bsr_category, on='desc_category_first_name', how='left') \
.drop("desc_category_first_name", "asin_category_desc")
df = df.join(F.broadcast(df_hide), on='category_id', how='left') df = df.join(F.broadcast(df_hide), on='category_id', how='left')
need_categories = NEED_FILTER_CATEGORIES
df = df.withColumn( df = df.withColumn(
"asin_is_hide", "asin_is_hide",
F.expr(f""" F.expr(f"""
...@@ -181,11 +201,13 @@ class ExportAsinWithoutKeepa(object): ...@@ -181,11 +201,13 @@ class ExportAsinWithoutKeepa(object):
).withColumn( ).withColumn(
"asin_is_need", "asin_is_need",
F.expr(f""" F.expr(f"""
CASE WHEN category_first_id IN {NEED_FILTER_CATEGORIES} THEN 1 CASE WHEN category_first_id IN {need_categories}
AND desc_category_first_id IN {need_categories} THEN 1
WHEN asin NOT LIKE 'B0%' THEN 1 WHEN asin NOT LIKE 'B0%' THEN 1
ELSE 0 END ELSE 0 END
""") """)
).withColumn( ).drop("desc_category_first_id") \
.withColumn(
"asin_type", "asin_type",
F.expr(""" F.expr("""
CASE WHEN asin_is_self = 1 THEN 1 CASE WHEN asin_is_self = 1 THEN 1
...@@ -207,7 +229,7 @@ class ExportAsinWithoutKeepa(object): ...@@ -207,7 +229,7 @@ class ExportAsinWithoutKeepa(object):
# month_week:字段在 Python 中计算,需在此处做条件过滤 # month_week:字段在 Python 中计算,需在此处做条件过滤
# month:SQL 中已完成过滤,直接跳过此步 # month:SQL 中已完成过滤,直接跳过此步
if self.date_type == 'month_week': if self.date_type == 'month_week':
print("6. [month_week] 筛选目标ASIN") print("7. [month_week] 筛选目标ASIN")
df = df.filter( df = df.filter(
F.col("asin_type").isin(0, 1, 3) F.col("asin_type").isin(0, 1, 3)
).filter( ).filter(
...@@ -223,7 +245,7 @@ class ExportAsinWithoutKeepa(object): ...@@ -223,7 +245,7 @@ class ExportAsinWithoutKeepa(object):
# 排除 dim_keepa_asin_info 中已有有效keepa数据的ASIN # 排除 dim_keepa_asin_info 中已有有效keepa数据的ASIN
# 若 package_length/width/height/weight 任意一个 < 0,视为数据异常,不排除(需重新抓取) # 若 package_length/width/height/weight 任意一个 < 0,视为数据异常,不排除(需重新抓取)
print("7. 排除已有keepa数据的ASIN (dim_keepa_asin_info)") print("8. 排除已有keepa数据的ASIN (dim_keepa_asin_info)")
df_keepa = self.spark.sql(f""" df_keepa = self.spark.sql(f"""
select asin from dim_keepa_asin_info select asin from dim_keepa_asin_info
where site_name = '{self.site_name}' where site_name = '{self.site_name}'
...@@ -236,7 +258,7 @@ class ExportAsinWithoutKeepa(object): ...@@ -236,7 +258,7 @@ class ExportAsinWithoutKeepa(object):
print(f"排除keepa后数据量: {df.count()}") print(f"排除keepa后数据量: {df.count()}")
# 排除 {pg_table} 中已导出的ASIN # 排除 {pg_table} 中已导出的ASIN
print(f"8. 排除已导出的ASIN ({self.pg_table})") print(f"9. 排除已导出的ASIN ({self.pg_table})")
pg_con_info = DBUtil.get_connection_info("postgresql_cluster", self.site_name) pg_con_info = DBUtil.get_connection_info("postgresql_cluster", self.site_name)
df_exported = SparkUtil.read_jdbc_query( df_exported = SparkUtil.read_jdbc_query(
session=self.spark, session=self.spark,
...@@ -258,7 +280,7 @@ class ExportAsinWithoutKeepa(object): ...@@ -258,7 +280,7 @@ class ExportAsinWithoutKeepa(object):
# ------------------------------------------------------------------ # # ------------------------------------------------------------------ #
def save_data(self): def save_data(self):
total = self.df_save.count() total = self.df_save.count()
print(f"9. 写入 PostgreSQL 表 {self.pg_table},共 {total} 条") print(f"10. 写入 PostgreSQL 表 {self.pg_table},共 {total} 条")
con_info = DBUtil.get_connection_info('postgresql_cluster', self.site_name) con_info = DBUtil.get_connection_info('postgresql_cluster', self.site_name)
self.df_save.write.format("jdbc") \ self.df_save.write.format("jdbc") \
.option("url", con_info["url"]) \ .option("url", con_info["url"]) \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment