Commit 5bf88f84 by 吴济苍

Merge remote-tracking branch 'origin/developer' into developer

parents 67c72130 5ead1c9f
......@@ -521,10 +521,27 @@ class DwtFlowAsin(Templates):
CASE WHEN hide_flag = 1 THEN 1 WHEN category_first_id = 'grocery' and category_id != '6492272011' THEN 1
WHEN category_id in ('21393128011', '21377129011', '21377127011', '21377130011', '21388218011', '21377132011') THEN 1
ELSE 0 END""")).drop("hide_flag")
self.df_asin_detail = self.df_asin_detail.withColumn("asin_is_need", F.expr("""
CASE WHEN category_first_id in ('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed') THEN 1
# 解析 asin_category_desc 取 › 分隔的第一个元素作为补充分类名称
self.df_asin_detail = self.df_asin_detail.withColumn(
"desc_category_first_name",
F.lower(F.trim(F.split(F.col("asin_category_desc"), "›").getItem(0)))
)
# 读取 Hive 分类维表,获取分类名称与ID的对应关系
sql_dim = f"""
select lower(trim(en_name)) as desc_category_first_name, category_first_id as desc_category_first_id
from dim_bsr_category_tree where site_name = '{self.site_name}' and category_parent_id = 0 and leaf_node = 2
"""
df_bsr_category = F.broadcast(self.spark.sql(sqlQuery=sql_dim))
# join 补充分类ID
self.df_asin_detail = self.df_asin_detail.join(df_bsr_category, on=['desc_category_first_name'], how='left')
# 两个分类ID均在过滤列表中才标记为1
need_categories = "('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed')"
self.df_asin_detail = self.df_asin_detail.withColumn("asin_is_need", F.expr(f"""
CASE WHEN category_first_id in {need_categories}
AND desc_category_first_id in {need_categories} THEN 1
WHEN asin NOT LIKE 'B0%' THEN 1
ELSE 0 END"""))
self.df_asin_detail = self.df_asin_detail.drop("desc_category_first_name", "desc_category_first_id")
self.df_asin_detail = self.df_asin_detail.withColumn("asin_type", F.expr("""
CASE WHEN asin_is_self=1 THEN 1 WHEN asin_is_need=1 THEN 2 WHEN asin_is_hide=1 THEN 3 ELSE 0 END"""
)).drop("asin_is_self", "asin_is_need", "asin_is_hide")
......
......@@ -154,7 +154,7 @@ class DwtNsrAsinDetail(object):
select asin,
asin_ao_val as ao_val,
bsr_orders as bsr_orders,
asin_bsr_orders_change as bsr_orders_change,
asin_bsr_orders_mom as bsr_orders_change,
asin_air_freight_gross_margin as asin_air_freight_gross_margin,
asin_ocean_freight_gross_margin as asin_ocean_freight_gross_margin,
cast(asin_bought_month as int ) as asin_bought_month
......
......@@ -200,7 +200,32 @@ class EsAiAsinAdd(object):
'total_comments', 'uses', 'variation_flag', 'variation_num', 'weight'
)
def create_pg_table(self):
"""
根据模板表创建月度 PG 表:
1. LIKE INCLUDING ALL:复制所有字段类型、其他列默认值、约束、索引
2. 重建 id 列独立序列,避免与模板表共享同一序列
"""
template_tb = "us_ai_asin_detail_month_2026_01"
engine = DBUtil.get_db_engine("postgresql", "us")
# 表已存在则跳过
result = DBUtil.engine_exec_sql(engine, f"SELECT to_regclass('{self.export_pg_tb}')")
if list(result)[0][0] is not None:
print(f"PostgreSQL 表 {self.export_pg_tb} 已存在,跳过建表")
return
# 建表 + 为 id 列创建独立序列
sql = f"""
CREATE TABLE {self.export_pg_tb} (LIKE {template_tb} INCLUDING ALL);
ALTER TABLE {self.export_pg_tb} ALTER COLUMN id DROP DEFAULT;
CREATE SEQUENCE {self.export_pg_tb}_id_seq OWNED BY {self.export_pg_tb}.id;
ALTER TABLE {self.export_pg_tb} ALTER COLUMN id SET DEFAULT nextval('{self.export_pg_tb}_id_seq')
"""
DBUtil.exec_sql("postgresql", "us", sql)
print(f"PostgreSQL 表 {self.export_pg_tb} 创建完成(独立自增序列)")
def save_data(self):
# 创建月度 PG 表
self.create_pg_table()
# 将新增asin导出给济苍
try:
self.df_save_pg.write.format("jdbc") \
......
......@@ -38,6 +38,7 @@ from pyspark.sql import functions as F
from utils.common_util import CommonUtil
from datetime import datetime, timedelta
class EsAsinProfitRate(object):
def __init__(self, site_name, date_info):
......@@ -54,6 +55,7 @@ class EsAsinProfitRate(object):
self.es_profit_rate_options = self.get_es_profit_rate_options(self.es_profit_rate_index)
self.df_asin_profit_rate = self.spark.sql(f"select 1+1;")
self.df_keepa_add = self.spark.sql(f"select 1+1;")
@staticmethod
def get_es_profit_rate_body():
......@@ -110,16 +112,37 @@ class EsAsinProfitRate(object):
def run(self):
self.read_profit_rate_add()
self.read_keepa_add()
# 利润率主索引
self.save_profit_rate_to_es()
# 更新最新3个月度详情索引和信息库月索引的利润率字段
for index in self.get_recent_indexes("st_detail_month"):
self.update_index_profit_rate(index)
for index in self.get_recent_indexes("ai_asin_analyze_detail"):
self.update_index_profit_rate(index)
# 更新信息库年表索引和流量选品30天索引的利润率字段
self.update_index_profit_rate(f"{self.site_name}_ai_asin_analyze_detail_last365_day")
self.update_index_profit_rate(f"{self.site_name}_flow_asin_30day")
# st_detail_month 近3个月
for index_name in self.get_recent_indexes("st_detail_month"):
date_info = self.get_date_info_from_index(index_name)
self.update_index_combined(index_name, self.get_month_last_day(date_info))
# 年度信息库
year_max_date_info = self.spark.sql(f"""
select max(date_info) from dwt_ai_asin_year where site_name='{self.site_name}'
""").collect()[0][0]
if year_max_date_info is None:
print(f"dwt_ai_asin_year 无数据,跳过年度索引更新")
else:
self.update_index_combined(
f"{self.site_name}_ai_asin_analyze_detail_last365_day",
self.get_month_last_day(str(year_max_date_info))
)
# ai_asin_analyze_detail 近3个月
for index_name in self.get_recent_indexes("ai_asin_analyze_detail"):
date_info = self.get_date_info_from_index(index_name)
self.update_index_combined(index_name, self.get_month_last_day(date_info))
# 30天流量选品(base_date=None → 以每个 asin 的 asin_crawl_date 为基准)
self.update_index_combined(f"{self.site_name}_flow_asin_30day")
# 释放缓存
self.df_asin_profit_rate.unpersist()
self.df_keepa_add.unpersist()
# ------------------------------------------------------------------ #
# 数据读取
# ------------------------------------------------------------------ #
def read_profit_rate_add(self):
# 读取利润率整合数据(增量数据)
sql = f"""
......@@ -128,6 +151,8 @@ class EsAsinProfitRate(object):
"""
self.df_asin_profit_rate = self.spark.sql(sqlQuery=sql).repartition(40, 'asin')
self.df_asin_profit_rate = self.df_asin_profit_rate.withColumn(
'price', F.round(F.col('price'), 2)
).withColumn(
'profit_key', F.concat_ws("_", F.col("asin"), F.col("price"))
).withColumn(
"update_time", F.substring(F.col("updated_time"), 1, 10)
......@@ -135,7 +160,7 @@ class EsAsinProfitRate(object):
'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time'
)
# 从Doris获取asin_crawl_date
# 从Doris获取asin_crawl_date(用于利润率主索引写入)
df_crawl_date = DorisHelper.spark_import_with_connector(
session=self.spark,
table_identifier=f"selection.{self.site_name}_asin_latest_detail",
......@@ -152,6 +177,19 @@ class EsAsinProfitRate(object):
print(f"增量利润率数据如下:")
self.df_asin_profit_rate.show(10, False)
def read_keepa_add(self):
# 读取keepa增量数据
sql = f"""
select asin, tracking_since from dim_keepa_asin_info
where site_name = '{self.site_name}' and updated_time >= '{self.last_date_info}'
"""
self.df_keepa_add = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
print(f"增量keepa数据如下:")
self.df_keepa_add.show(10, False)
# ------------------------------------------------------------------ #
# 利润率主索引
# ------------------------------------------------------------------ #
def save_profit_rate_to_es(self):
print(f"\n{'='*60}")
print(f"开始更新利润率索引:{self.es_profit_rate_index}")
......@@ -167,6 +205,9 @@ class EsAsinProfitRate(object):
print("An error occurred while writing to Elasticsearch:", str(e))
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES数据更新失败', f'失败索引:{self.es_profit_rate_index}')
# ------------------------------------------------------------------ #
# 工具方法
# ------------------------------------------------------------------ #
def get_recent_indexes(self, prefix, max_count=3):
"""从date_info对应月份开始往前找,返回最近max_count个存在的索引名"""
indexes = []
......@@ -184,31 +225,62 @@ class EsAsinProfitRate(object):
print(f"[{prefix}] 找到 {len(indexes)} 个索引:{indexes}")
return indexes
def update_index_profit_rate(self, index_name):
"""
直接将利润率数据写入目标索引,asin存在则更新利润率字段,不存在则跳过
"""
if not EsUtils.exist_index(index_name, self.es_client):
print(f"索引 {index_name} 不存在,跳过更新")
return
def get_date_info_from_index(self, index_name):
# 从索引名末尾提取 date_info,如 us_st_detail_month_2026_02 → 2026-02
parts = index_name.rsplit('_', 2)
return f"{parts[-2]}-{parts[-1]}"
print(f"\n{'='*60}")
print(f"开始更新索引利润率字段:{index_name}")
print(f"{'='*60}")
def get_month_last_day(self, year_month_str):
# year_month_str: "yyyy-MM",返回该月最后一天 "yyyy-MM-dd"
year, month = map(int, year_month_str.split('-'))
if month == 12:
last_day = datetime(year + 1, 1, 1) - timedelta(days=1)
else:
last_day = datetime(year, month + 1, 1) - timedelta(days=1)
return last_day.strftime('%Y-%m-%d')
df_update = self.df_asin_profit_rate.withColumn(
"profit_rate_extra",
F.when(
F.col("ocean_profit").isNull() & F.col("air_profit").isNull(),
F.lit(None)
).otherwise(
F.struct(
F.col("ocean_profit").alias("ocean_profit"),
F.col("air_profit").alias("air_profit")
)
)
).select("asin", "profit_key", "profit_rate_extra")
def get_tracking_interval_dict(self, base_date):
base = datetime.strptime(base_date, '%Y-%m-%d')
return {
"one_month": (base - timedelta(days=30)).strftime('%Y-%m-%d'),
"three_month": (base - timedelta(days=90)).strftime('%Y-%m-%d'),
"six_month": (base - timedelta(days=180)).strftime('%Y-%m-%d'),
"twelve_month": (base - timedelta(days=360)).strftime('%Y-%m-%d'),
"twenty_four_month": (base - timedelta(days=720)).strftime('%Y-%m-%d'),
"thirty_six_month": (base - timedelta(days=1080)).strftime('%Y-%m-%d'),
}
def get_tracking_since_type_expr(self, iv):
return f"""
CASE WHEN tracking_since >= '{iv['one_month']}' THEN 1
WHEN tracking_since >= '{iv['three_month']}' AND tracking_since < '{iv['one_month']}' THEN 2
WHEN tracking_since >= '{iv['six_month']}' AND tracking_since < '{iv['three_month']}' THEN 3
WHEN tracking_since >= '{iv['twelve_month']}' AND tracking_since < '{iv['six_month']}' THEN 4
WHEN tracking_since >= '{iv['twenty_four_month']}' AND tracking_since < '{iv['twelve_month']}' THEN 5
WHEN tracking_since >= '{iv['thirty_six_month']}' AND tracking_since < '{iv['twenty_four_month']}' THEN 6
WHEN tracking_since < '{iv['thirty_six_month']}' THEN 7
ELSE 0 END
"""
# ------------------------------------------------------------------ #
# ES 读写配置
# ------------------------------------------------------------------ #
def get_es_read_options(self, include_crawl_date=False):
# es.mapping.date.rich=false:date字段以原始字符串返回,避免转为timestamp带来的精度问题
fields = "asin,price,profit_key,profit_rate_extra.ocean_profit,profit_rate_extra.air_profit,tracking_since,tracking_since_type"
if include_crawl_date:
fields += ",asin_crawl_date"
return {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.nodes.wan.only": "true",
"es.mapping.date.rich": "false",
"es.read.field.include": fields
}
def write_combined_update(self, df, index_name):
write_options = {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
......@@ -222,19 +294,130 @@ class EsAsinProfitRate(object):
"es.batch.write.refresh": "false",
"es.batch.size.entries": "2000",
"es.batch.write.concurrency": "5",
"es.batch.write.retry.count": "5",
"es.batch.write.retry.count": "3",
"es.batch.write.retry.wait": "60s",
"es.nodes.wan.only": "true"
"es.nodes.wan.only": "false"
}
print(f"\n{'='*60}")
print(f"开始写回索引:{index_name}")
print(f"{'='*60}")
try:
df_update.repartition(10).write.format("org.elasticsearch.spark.sql") \
df.repartition(10).write.format("org.elasticsearch.spark.sql") \
.options(**write_options) \
.mode("append") \
.save()
print(f"索引 {index_name} 利润率字段更新完毕!")
print(f"索引 {index_name} 利润率+追踪时间字段更新完毕!")
except Exception as e:
print(f"更新索引 {index_name} 失败: {str(e)}")
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES利润率更新失败', f'失败索引:{index_name}')
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES索引更新失败', f'失败索引:{index_name}')
# ------------------------------------------------------------------ #
# 核心更新逻辑:单次读 + 单次写
# ------------------------------------------------------------------ #
def update_index_combined(self, index_name, base_date=None):
"""
base_date: 固定基准日期(yyyy-MM-dd),用于月度/年度索引 tracking_since_type 计算
None 表示 30day 索引,以每个 asin 的 asin_crawl_date 为基准动态计算
"""
if not EsUtils.exist_index(index_name, self.es_client):
print(f"索引 {index_name} 不存在,跳过更新")
return
print(f"\n{'='*60}")
print(f"开始处理索引:{index_name},base_date={base_date or 'asin_crawl_date(动态)'}")
print(f"{'='*60}")
# 1. 读取ES现有数据,date字段以原始字符串返回,profit_rate_extra以StructType读回
# asin_crawl_date 仅 30day 动态模式需要,按需读取
df_es = self.spark.read.format("org.elasticsearch.spark.sql") \
.options(**self.get_es_read_options(include_crawl_date=(base_date is None))) \
.load(index_name) \
.withColumn('price', F.round(F.col('price'), 2)) \
.withColumn('tracking_since_type', F.col('tracking_since_type').cast('int')) \
.repartition(40, 'asin')
# 2. 准备利润率增量(重命名避免列冲突)
df_profit = self.df_asin_profit_rate.select(
'asin', 'price',
F.col('profit_key').alias('new_profit_key'),
F.col('ocean_profit').cast('float').alias('new_ocean_profit'),
F.col('air_profit').cast('float').alias('new_air_profit')
)
# 3. 准备keepa增量(重命名避免列冲突)
df_keepa = self.df_keepa_add.select(
'asin',
F.col('tracking_since').alias('kp_tracking_since')
)
# 4. left join 两个增量数据集
df = df_es \
.join(df_profit, on=['asin', 'price'], how='left') \
.join(df_keepa, on='asin', how='left')
# 5. 过滤:至少一方有增量数据,减少无效写入
df = df.filter(F.col('new_profit_key').isNotNull() | F.col('kp_tracking_since').isNotNull())
# 6. 利润率字段合并:有新数据用新的,否则保留ES旧值
# 直接访问子字段构建固定 schema 的 struct,避免不同索引 profit_rate_extra 子字段数不同导致 schema 冲突
df = df.withColumn('profit_key',
F.when(F.col('new_profit_key').isNotNull(), F.col('new_profit_key'))
.otherwise(F.col('profit_key'))
).withColumn('profit_rate_extra',
F.struct(
F.when(F.col('new_ocean_profit').isNotNull(), F.col('new_ocean_profit'))
.otherwise(F.col('profit_rate_extra.ocean_profit').cast('float'))
.alias('ocean_profit'),
F.when(F.col('new_air_profit').isNotNull(), F.col('new_air_profit'))
.otherwise(F.col('profit_rate_extra.air_profit').cast('float'))
.alias('air_profit')
)
)
# 7. keepa字段合并:有新数据用新的,否则保留ES旧值
# 将 kp_tracking_since(keepa原始分钟值)转为日期字符串
df = df.withColumn('tracking_since',
F.when(
F.col('kp_tracking_since').isNotNull(),
F.date_format(
F.from_unixtime((F.col('kp_tracking_since') + F.lit(21564000)) * 60),
'yyyy-MM-dd'
)
).otherwise(F.col('tracking_since'))
)
# 计算 tracking_since_type
if base_date is not None:
# 固定基准:月度/年度索引
iv = self.get_tracking_interval_dict(base_date)
df = df.withColumn('tracking_since_type',
F.when(F.col('kp_tracking_since').isNotNull(), F.expr(self.get_tracking_since_type_expr(iv)))
.otherwise(F.col('tracking_since_type'))
)
else:
# 动态基准:30day索引,以每个 asin 的 asin_crawl_date 为基准
# asin_crawl_date 格式为 "yyyy-MM-dd HH:mm:ss",截取前10位转为日期字符串
# 将 tracking_since 字符串显式转为 DateType,避免与 date_sub 返回值类型不一致
df = df.withColumn('asin_crawl_date', F.substring(F.col('asin_crawl_date'), 1, 10))
ts_date = F.to_date(F.col('tracking_since'), 'yyyy-MM-dd')
df = df.withColumn('tracking_since_type',
F.when(F.col('kp_tracking_since').isNotNull(),
F.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 30), F.lit(1))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 90), F.lit(2))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 180), F.lit(3))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 360), F.lit(4))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 720), F.lit(5))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 1080), F.lit(6))
.when(ts_date < F.date_sub(F.col('asin_crawl_date'), 1080), F.lit(7))
.otherwise(F.lit(0))
).otherwise(F.col('tracking_since_type'))
)
# 8. 写回索引(tracking_since_type 写回 ES short 类型,需显式 cast)
df_update = df.select(
'asin', 'profit_key', 'profit_rate_extra', 'tracking_since',
F.col('tracking_since_type').cast('short')
)
self.write_combined_update(df_update, index_name)
if __name__ == "__main__":
......
......@@ -21,7 +21,7 @@ class ImportStToPg14(object):
self.df_st_month = pd.DataFrame()
self.df_st_month_state = pd.DataFrame()
self.df_save = pd.DataFrame()
self.fetch_year_month_by_week() # 如果传的date_type='week', 将date_info转换成月的值
# self.fetch_year_month_by_week() # 如果传的date_type='week', 将date_info转换成月的值
self.year, self.month = self.date_info.split("-")[0], int(self.date_info.split("-")[1])
def fetch_year_month_by_week(self):
......@@ -31,6 +31,7 @@ class ImportStToPg14(object):
self.date_info = list(df.year_month)[0]
def read_data(self):
self.fetch_year_month_by_week() # 如果传的date_type='week', 将date_info转换成月的值
# 1. 读取date_20_to_30表获取月份对应的周
sql_get_week = f"select year_week, year, week from selection.date_20_to_30 WHERE `year_month`='{self.date_info}' and week_day=1"
df_week = pd.read_sql(sql_get_week, con=self.engine_mysql)
......
......@@ -221,11 +221,17 @@ class ExportAsinWithoutKeepa(object):
df = df.cache()
print(f"筛选后数据量: {df.count()}")
# 排除 dim_keepa_asin_info 中已有 package_length 的ASIN
# 排除 dim_keepa_asin_info 中已有有效keepa数据的ASIN
# 若 package_length/width/height/weight 任意一个 < 0,视为数据异常,不排除(需重新抓取)
print("7. 排除已有keepa数据的ASIN (dim_keepa_asin_info)")
df_keepa = self.spark.sql(
"select asin from dim_keepa_asin_info where package_length is not null"
).repartition(40, 'asin')
df_keepa = self.spark.sql(f"""
select asin from dim_keepa_asin_info
where site_name = '{self.site_name}'
and package_length >= 0
and package_width >= 0
and package_height >= 0
and weight >= 0
""").repartition(40, 'asin')
df = df.join(df_keepa, on='asin', how='left_anti').cache()
print(f"排除keepa后数据量: {df.count()}")
......
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
print(f"执行参数为{sys.argv}")
# 获取数据库引擎
db_type = "postgresql_15"
engine = get_remote_engine(
site_name='us',
db_type=db_type
)
if site_name == 'us':
export_tb = f"ai_asin_detail_month_{date_info.replace('-', '_')}"
else:
export_tb = f"{site_name}_ai_asin_detail_month_{date_info.replace('-', '_')}"
# 导出数据
engine.sqoop_raw_export(
hive_table="dwt_ai_asin_add",
import_table=export_tb,
partitions={
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
},
m=30,
cols="site_name,asin,weight,bought_month,category,img,title,brand,account_name,account_addr,buy_box_seller_type,"
"launch_time,img_num,variation_flag,variation_num,ao_val,category_id,category_current_id,parent_asin,bsr_rank,"
"price,rating,total_comments,seller_id,fb_country_name,review_json_list,launch_time_type,describe,product_json,"
"product_detail_json,bought_month_mom,bought_month_yoy,is_new_flag,is_ascending_flag"
)
print("success")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment