Commit 728c54df by chenyuanjie

整合导出需要补充keepa数据的asin

parent ba46b2f2
"""
@Author : CT
@Description : 导出没有keepa数据的流量选品ASIN
筛选条件(两种模式通用):
- asin_type in (0, 1, 3)
- asin_price > 0
- asin_bought_month >= 50 OR first_category_rank <= 600000
- 排除数字类目
- 排除 dim_keepa_asin_info 中已有 keepa 数据的 ASIN
- 排除 {site_name}_asin_profit_keepa_add 中已导出的 ASIN
date_type=month : 旧逻辑,直接从 dwt_flow_asin 读取
(asin_type/category_first_id/first_category_rank 已计算好)
date_type=month_week: 新逻辑,从 dim_asin_detail 读取
补充计算(对齐dwt逻辑):
- first_category_rank : 来自 dim_asin_bs_info
- category_first_id : BSR来源优先,dim兜底
- category_id : BSR来源优先,dim兜底
- asin_bought_month : dim值 + dwd_asin_measure 兜底
- asin_type : 参考 dwt.handle_asin_is_hide() 计算
@CreateTime : 2026-03-17
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from pyspark.sql import functions as F
# 数字/虚拟类目排除列表 — 筛选条件用
EXCLUDE_CATEGORIES = (
"audible", "books", "digital-text", "dmusic",
"mobile-apps", "movies-tv", "music", "software", "videogames"
)
# asin_is_need 判断中需要过滤的类目(对齐 dwt.handle_asin_is_hide)
NEED_FILTER_CATEGORIES = (
'mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text',
'magazines', 'movies-tv', 'software', 'videogames',
'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed'
)
# asin_is_hide 中 category_id 级别的隐藏节点
HIDE_CATEGORY_IDS = (
'21393128011', '21377129011', '21377127011',
'21377130011', '21388218011', '21377132011'
)
class ExportAsinWithoutKeepa(object):
def __init__(self, site_name, date_info, date_type):
self.site_name = site_name
self.date_info = date_info
self.date_type = date_type
self.pg_table = f"{site_name}_asin_profit_keepa_add"
self.spark = SparkUtil.get_spark_session(
f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
)
self.df_flow_asin = self.spark.sql("select 1+1;")
self.df_save = self.spark.sql("select 1+1;")
# ------------------------------------------------------------------ #
# 步骤1:读取数据(按 date_type 分支) #
# ------------------------------------------------------------------ #
def read_data(self):
if self.date_type == 'month':
self._read_data_month()
elif self.date_type == 'month_week':
self._read_data_month_week()
else:
raise ValueError(f"不支持的 date_type: {self.date_type},仅支持 month / month_week")
def _read_data_month(self):
"""旧逻辑:直接从 dwt_flow_asin 读取,筛选条件在 SQL 中完成"""
print("1. [month] 从 dwt_flow_asin 读取并筛选")
exclude_str = ", ".join(f"'{c}'" for c in EXCLUDE_CATEGORIES)
sql = f"""
select asin
from dwt_flow_asin
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
and asin_type in (0, 1, 3)
and asin_price is not null
and asin_price > 0
and (asin_bought_month >= 50 or first_category_rank <= 600000)
and (category_first_id is null
or category_first_id not in ({exclude_str}))
"""
self.df_flow_asin = self.spark.sql(sql).repartition(40, 'asin').cache()
print(f"dwt_flow_asin 筛选后数据量: {self.df_flow_asin.count()}")
def _read_data_month_week(self):
"""新逻辑:从 dim_asin_detail 读取,补充计算 asin_type / first_category_rank 等字段"""
# ① dim_asin_detail 主表
print("1. [month_week] 读取 dim_asin_detail")
sql = f"""
select asin, asin_price, asin_bought_month,
asin_is_self,
category_id as top_category_id,
category_first_id as top_category_first_id
from dim_asin_detail
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
df_dim = self.spark.sql(sql).repartition(40, 'asin').cache()
print(f"dim_asin_detail 数据量: {df_dim.count()}")
# ② dim_asin_bs_info → first_category_rank / category_first_id(BSR) / category_id(BSR)
print("2. 读取 dim_asin_bs_info (BSR分类与排名)")
sql = f"""
select asin,
asin_bs_cate_1_rank as first_category_rank,
asin_bs_cate_1_id as bsr_category_first_id,
asin_bs_cate_current_id as bsr_category_id
from dim_asin_bs_info
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
df_bsr = self.spark.sql(sql).repartition(40, 'asin')
# ③ dwd_asin_measure → asin_amazon_orders(asin_bought_month 兜底)
print("3. 读取 dwd_asin_measure (asin_amazon_orders 兜底)")
sql = f"""
select asin, asin_amazon_orders
from dwd_asin_measure
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
df_measure = self.spark.sql(sql).repartition(40, 'asin')
# ④ us_bs_category_hide → 隐藏分类(用于 asin_type 计算)
print("4. 读取 us_bs_category_hide (隐藏分类)")
mysql_con = DBUtil.get_connection_info("mysql", self.site_name)
sql = "select category_id_base as category_id, 1 as hide_flag from us_bs_category_hide group by category_id_base"
df_hide = SparkUtil.read_jdbc_query(
session=self.spark, url=mysql_con['url'],
pwd=mysql_con['pwd'], username=mysql_con['username'], query=sql
)
# ⑤ 组装
print("5. 组装主DataFrame")
df = df_dim \
.join(df_bsr, on='asin', how='left') \
.join(df_measure, on='asin', how='left')
# category_first_id / category_id:BSR优先,dim兜底
df = df.withColumn(
"category_first_id",
F.coalesce(F.col("bsr_category_first_id"), F.col("top_category_first_id"))
).withColumn(
"category_id",
F.coalesce(F.col("bsr_category_id"), F.col("top_category_id"))
).drop("bsr_category_first_id", "bsr_category_id",
"top_category_first_id", "top_category_id")
# asin_bought_month:dim值优先,measure兜底
df = df.withColumn(
"asin_bought_month",
F.coalesce(F.col("asin_bought_month"), F.col("asin_amazon_orders"))
).drop("asin_amazon_orders")
# asin_type 计算(对齐 dwt.handle_asin_is_hide)
df = df.join(F.broadcast(df_hide), on='category_id', how='left')
df = df.withColumn(
"asin_is_hide",
F.expr(f"""
CASE WHEN hide_flag = 1 THEN 1
WHEN category_first_id = 'grocery' AND category_id != '6492272011' THEN 1
WHEN category_id IN {HIDE_CATEGORY_IDS} THEN 1
ELSE 0 END
""")
).withColumn(
"asin_is_need",
F.expr(f"""
CASE WHEN category_first_id IN {NEED_FILTER_CATEGORIES} THEN 1
WHEN asin NOT LIKE 'B0%' THEN 1
ELSE 0 END
""")
).withColumn(
"asin_type",
F.expr("""
CASE WHEN asin_is_self = 1 THEN 1
WHEN asin_is_need = 1 THEN 2
WHEN asin_is_hide = 1 THEN 3
ELSE 0 END
""")
).drop("hide_flag", "asin_is_self", "asin_is_need", "asin_is_hide")
self.df_flow_asin = df.repartition(40, 'asin').cache()
print(f"组装完成,总数据量: {self.df_flow_asin.count()}")
# ------------------------------------------------------------------ #
# 步骤2:数据处理 — 筛选(month_week)+ 排除keepa / 已导出ASIN #
# ------------------------------------------------------------------ #
def handle_data(self):
df = self.df_flow_asin
# month_week:字段在 Python 中计算,需在此处做条件过滤
# month:SQL 中已完成过滤,直接跳过此步
if self.date_type == 'month_week':
print("6. [month_week] 筛选目标ASIN")
df = df.filter(
F.col("asin_type").isin(0, 1, 3)
).filter(
F.col("asin_price").isNotNull() & (F.col("asin_price") > 0)
).filter(
(F.col("asin_bought_month") >= 50) | (F.col("first_category_rank") <= 600000)
).filter(
F.col("category_first_id").isNull() |
~F.col("category_first_id").isin(*EXCLUDE_CATEGORIES)
)
df = df.cache()
print(f"筛选后数据量: {df.count()}")
# 排除 dim_keepa_asin_info 中已有 package_length 的ASIN
print("7. 排除已有keepa数据的ASIN (dim_keepa_asin_info)")
df_keepa = self.spark.sql(
"select asin from dim_keepa_asin_info where package_length is not null"
).repartition(40, 'asin')
df = df.join(df_keepa, on='asin', how='left_anti').cache()
print(f"排除keepa后数据量: {df.count()}")
# 排除 {pg_table} 中已导出的ASIN
print(f"8. 排除已导出的ASIN ({self.pg_table})")
pg_con_info = DBUtil.get_connection_info("postgresql_cluster", self.site_name)
df_exported = SparkUtil.read_jdbc_query(
session=self.spark,
url=pg_con_info['url'],
username=pg_con_info['username'],
pwd=pg_con_info['pwd'],
query=f"select asin from {self.pg_table}"
).repartition(40, 'asin')
df = df.join(df_exported, on='asin', how='left_anti').cache()
print(f"排除已导出后数据量: {df.count()}")
self.df_save = df.select(
F.col("asin"),
F.lit(self.date_info).alias("month")
).cache()
# ------------------------------------------------------------------ #
# 步骤3:写入 PostgreSQL #
# ------------------------------------------------------------------ #
def save_data(self):
total = self.df_save.count()
print(f"9. 写入 PostgreSQL 表 {self.pg_table},共 {total} 条")
con_info = DBUtil.get_connection_info('postgresql_cluster', self.site_name)
self.df_save.write.format("jdbc") \
.option("url", con_info["url"]) \
.option("dbtable", self.pg_table) \
.option("user", con_info["username"]) \
.option("password", con_info["pwd"]) \
.mode("append") \
.save()
print(f"写入完毕")
def run(self):
self.read_data()
self.handle_data()
self.save_data()
if __name__ == "__main__":
site_name = sys.argv[1] # 参数1:站点,如 us / uk / de
date_type = sys.argv[2] # 参数2:month 或 month_week
date_info = sys.argv[3] # 参数3:年-月 / 年-月_周,如 2026-02
handle_obj = ExportAsinWithoutKeepa(
site_name=site_name, date_info=date_info, date_type=date_type
)
handle_obj.run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment