Commit 7ad606c5 by wangjing

Merge branch 'developer' of http://47.106.101.75/abel_cjy/Amazon-Selection-Data into developer

parents 14af7a9b 0f05969c
...@@ -159,7 +159,8 @@ class DimAsinDetail(object): ...@@ -159,7 +159,8 @@ class DimAsinDetail(object):
REGEXP_REPLACE(seller_json, chr(10), '') as seller_json, buy_box_seller_type as asin_buy_box_seller_type, REGEXP_REPLACE(seller_json, chr(10), '') as seller_json, buy_box_seller_type as asin_buy_box_seller_type,
customer_reviews_json, parent_asin, img_list, created_at as created_time, updated_at as updated_time, customer_reviews_json, parent_asin, img_list, created_at as created_time, updated_at as updated_time,
updated_at as dt, variat_num as variation_num, fbm_delivery_price as asin_fbm_price, updated_at as dt, variat_num as variation_num, fbm_delivery_price as asin_fbm_price,
get_json_object(product_json, '$.Color') as product_json_color get_json_object(product_json, '$.Color') as product_json_color,
get_json_object(product_json, '$.Number of Items') as product_json_number_of_items
from ods_asin_detail where site_name='{self.site_name}' {self.date_sql}""" from ods_asin_detail where site_name='{self.site_name}' {self.date_sql}"""
print(sql) print(sql)
self.df_asin_detail = self.spark.sql(sqlQuery=sql) self.df_asin_detail = self.spark.sql(sqlQuery=sql)
...@@ -348,15 +349,23 @@ class DimAsinDetail(object): ...@@ -348,15 +349,23 @@ class DimAsinDetail(object):
).withColumn( ).withColumn(
"variat_package_quantity_is_abnormal", self.df_asin_detail.variat_parse.getField("is_package_quantity_abnormal") "variat_package_quantity_is_abnormal", self.df_asin_detail.variat_parse.getField("is_package_quantity_abnormal")
).drop("title_parse", "variat_parse", "variat_attribute") ).drop("title_parse", "variat_parse", "variat_attribute")
# Number of Items:直接从 product_json 提取,cast 失败(脏数据)自动为 null
self.df_asin_detail = self.df_asin_detail.withColumn( self.df_asin_detail = self.df_asin_detail.withColumn(
"package_quantity", F.expr(""" CASE "number_of_items", F.col("product_json_number_of_items").cast("int")
WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity ).drop("product_json_number_of_items")
WHEN title_package_quantity is not null THEN title_package_quantity # 优先级:Number of Items > 属性字段 > 标题解析 > 默认1
self.df_asin_detail = self.df_asin_detail.withColumn(
"package_quantity", F.expr("""CASE
WHEN number_of_items IS NOT NULL AND number_of_items > 0 THEN number_of_items
WHEN variat_package_quantity IS NOT NULL THEN variat_package_quantity
WHEN title_package_quantity IS NOT NULL THEN title_package_quantity
ELSE 1 END""")).withColumn( ELSE 1 END""")).withColumn(
"is_package_quantity_abnormal", F.expr("""CASE "is_package_quantity_abnormal", F.expr("""CASE
WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity_is_abnormal WHEN number_of_items IS NOT NULL AND number_of_items > 0 THEN 0
WHEN title_package_quantity is not null THEN title_package_quantity_is_abnormal WHEN variat_package_quantity IS NOT NULL THEN variat_package_quantity_is_abnormal
ELSE 2 END""")).drop("title_package_quantity", "variat_package_quantity", "title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal") WHEN title_package_quantity IS NOT NULL THEN title_package_quantity_is_abnormal
ELSE 2 END""")).drop("number_of_items", "title_package_quantity", "variat_package_quantity",
"title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal")
self.df_asin_detail = self.df_asin_detail.join(self.df_user_package_num, on=['asin', 'asin_title'], how='left') self.df_asin_detail = self.df_asin_detail.join(self.df_user_package_num, on=['asin', 'asin_title'], how='left')
self.df_asin_detail = self.df_asin_detail.withColumn( self.df_asin_detail = self.df_asin_detail.withColumn(
"package_quantity", F.coalesce(F.col("user_package_num"), F.col("package_quantity"))).withColumn( "package_quantity", F.coalesce(F.col("user_package_num"), F.col("package_quantity"))).withColumn(
......
...@@ -99,7 +99,8 @@ class DwtFbBaseReport(object): ...@@ -99,7 +99,8 @@ class DwtFbBaseReport(object):
cur_fd.fb_crawl_date, cur_fd.fb_crawl_date,
round((count_30_day_num - last_30_day_num) / last_30_day_num, 4) as count_30_day_rate, round((count_30_day_num - last_30_day_num) / last_30_day_num, 4) as count_30_day_rate,
round((count_1_year_num - last_1_year_num) / last_1_year_num, 4) as count_1_year_rate, round((count_1_year_num - last_1_year_num) / last_1_year_num, 4) as count_1_year_rate,
round((count_lifetime_num - last_lifetime_num) / last_lifetime_num, 4) as count_life_time_rate round((count_lifetime_num - last_lifetime_num) / last_lifetime_num, 4) as count_life_time_rate,
seller_rating
from from
( (
select select
......
...@@ -521,10 +521,27 @@ class DwtFlowAsin(Templates): ...@@ -521,10 +521,27 @@ class DwtFlowAsin(Templates):
CASE WHEN hide_flag = 1 THEN 1 WHEN category_first_id = 'grocery' and category_id != '6492272011' THEN 1 CASE WHEN hide_flag = 1 THEN 1 WHEN category_first_id = 'grocery' and category_id != '6492272011' THEN 1
WHEN category_id in ('21393128011', '21377129011', '21377127011', '21377130011', '21388218011', '21377132011') THEN 1 WHEN category_id in ('21393128011', '21377129011', '21377127011', '21377130011', '21388218011', '21377132011') THEN 1
ELSE 0 END""")).drop("hide_flag") ELSE 0 END""")).drop("hide_flag")
self.df_asin_detail = self.df_asin_detail.withColumn("asin_is_need", F.expr(""" # 解析 asin_category_desc 取 › 分隔的第一个元素作为补充分类名称
CASE WHEN category_first_id in ('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed') THEN 1 self.df_asin_detail = self.df_asin_detail.withColumn(
WHEN asin NOT LIKE 'B0%' THEN 1 "desc_category_first_name",
F.lower(F.trim(F.split(F.col("asin_category_desc"), "›").getItem(0)))
)
# 读取 Hive 分类维表,获取分类名称与ID的对应关系
sql_dim = f"""
select lower(trim(en_name)) as desc_category_first_name, category_first_id as desc_category_first_id
from dim_bsr_category_tree where site_name = '{self.site_name}' and category_parent_id = 0 and leaf_node = 2
"""
df_bsr_category = F.broadcast(self.spark.sql(sqlQuery=sql_dim))
# join 补充分类ID
self.df_asin_detail = self.df_asin_detail.join(df_bsr_category, on=['desc_category_first_name'], how='left')
# 两个分类ID均在过滤列表中才标记为1
need_categories = "('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed')"
self.df_asin_detail = self.df_asin_detail.withColumn("asin_is_need", F.expr(f"""
CASE WHEN category_first_id in {need_categories}
AND desc_category_first_id in {need_categories} THEN 1
WHEN asin NOT LIKE 'B0%' THEN 1
ELSE 0 END""")) ELSE 0 END"""))
self.df_asin_detail = self.df_asin_detail.drop("desc_category_first_name", "desc_category_first_id")
self.df_asin_detail = self.df_asin_detail.withColumn("asin_type", F.expr(""" self.df_asin_detail = self.df_asin_detail.withColumn("asin_type", F.expr("""
CASE WHEN asin_is_self=1 THEN 1 WHEN asin_is_need=1 THEN 2 WHEN asin_is_hide=1 THEN 3 ELSE 0 END""" CASE WHEN asin_is_self=1 THEN 1 WHEN asin_is_need=1 THEN 2 WHEN asin_is_hide=1 THEN 3 ELSE 0 END"""
)).drop("asin_is_self", "asin_is_need", "asin_is_hide") )).drop("asin_is_self", "asin_is_need", "asin_is_hide")
......
...@@ -154,7 +154,7 @@ class DwtNsrAsinDetail(object): ...@@ -154,7 +154,7 @@ class DwtNsrAsinDetail(object):
select asin, select asin,
asin_ao_val as ao_val, asin_ao_val as ao_val,
bsr_orders as bsr_orders, bsr_orders as bsr_orders,
asin_bsr_orders_change as bsr_orders_change, asin_bsr_orders_mom as bsr_orders_change,
asin_air_freight_gross_margin as asin_air_freight_gross_margin, asin_air_freight_gross_margin as asin_air_freight_gross_margin,
asin_ocean_freight_gross_margin as asin_ocean_freight_gross_margin, asin_ocean_freight_gross_margin as asin_ocean_freight_gross_margin,
cast(asin_bought_month as int ) as asin_bought_month cast(asin_bought_month as int ) as asin_bought_month
......
...@@ -29,6 +29,7 @@ class EsAiAsinAdd(object): ...@@ -29,6 +29,7 @@ class EsAiAsinAdd(object):
self.df_ai_asin_detail = self.spark.sql(f"select 1+1;") self.df_ai_asin_detail = self.spark.sql(f"select 1+1;")
self.df_ai_asin_analyze = self.spark.sql(f"select 1+1;") self.df_ai_asin_analyze = self.spark.sql(f"select 1+1;")
self.df_profit_rate = self.spark.sql(f"select 1+1;")
self.df_save_pg = self.spark.sql(f"select 1+1;") self.df_save_pg = self.spark.sql(f"select 1+1;")
self.df_save_es = self.spark.sql(f"select 1+1;") self.df_save_es = self.spark.sql(f"select 1+1;")
...@@ -126,7 +127,10 @@ class EsAiAsinAdd(object): ...@@ -126,7 +127,10 @@ class EsAiAsinAdd(object):
title_word_content, title_word_content,
array_to_string(package_quantity_arr, ',') as package_quantity_arr, array_to_string(package_quantity_arr, ',') as package_quantity_arr,
package_quantity_flag, package_quantity_flag,
label_content label_content,
festival,
multi_color_flag,
multi_color_content
from {self.site_name}_ai_asin_analyze_detail from {self.site_name}_ai_asin_analyze_detail
""" """
self.df_ai_asin_analyze = SparkUtil.read_jdbc_query( self.df_ai_asin_analyze = SparkUtil.read_jdbc_query(
...@@ -143,6 +147,16 @@ class EsAiAsinAdd(object): ...@@ -143,6 +147,16 @@ class EsAiAsinAdd(object):
print("AI分析数据如下:") print("AI分析数据如下:")
self.df_ai_asin_analyze.show(10, True) self.df_ai_asin_analyze.show(10, True)
# 读取利润率数据
sql3 = f"""
select asin, price, ocean_profit, air_profit
from dim_asin_profit_rate_info
where site_name = '{self.site_name}'
"""
self.df_profit_rate = self.spark.sql(sql3).repartition(40, 'asin').cache()
print("利润率数据如下:")
self.df_profit_rate.show(10, True)
def handle_data(self): def handle_data(self):
self.df_save_pg = self.df_ai_asin_detail.join( self.df_save_pg = self.df_ai_asin_detail.join(
self.df_ai_asin_analyze, 'asin', 'left_anti' self.df_ai_asin_analyze, 'asin', 'left_anti'
...@@ -155,20 +169,63 @@ class EsAiAsinAdd(object): ...@@ -155,20 +169,63 @@ class EsAiAsinAdd(object):
'is_ascending_flag' 'is_ascending_flag'
) )
df_profit = self.df_profit_rate.withColumn(
"profit_rate_extra",
F.when(
F.col("ocean_profit").isNull() & F.col("air_profit").isNull(),
F.lit(None)
).otherwise(
F.struct(
F.col("ocean_profit").alias("ocean_profit"),
F.col("air_profit").alias("air_profit")
)
)
).drop("ocean_profit", "air_profit")
self.df_save_es = self.df_ai_asin_detail.join( self.df_save_es = self.df_ai_asin_detail.join(
self.df_ai_asin_analyze, 'asin', 'inner' self.df_ai_asin_analyze, 'asin', 'inner'
).withColumn(
'profit_key', F.concat_ws("_", F.col("asin"), F.col("price"))
).join(
df_profit, on=["asin", "price"], how="left"
).select( ).select(
'account_addr', 'account_name', 'analyze_id', 'ao_val', 'appearance', 'asin', 'bought_month', 'account_addr', 'account_name', 'analyze_id', 'ao_val', 'appearance', 'asin', 'bought_month',
'bought_month_mom', 'bought_month_yoy', 'brand', 'bsr_rank', 'buy_box_seller_type', 'category', 'bought_month_mom', 'bought_month_yoy', 'brand', 'bsr_rank', 'buy_box_seller_type', 'category',
'category_current_id', 'category_id', 'color', 'crowd', 'fb_country_name', 'function', 'img', 'category_current_id', 'category_id', 'color', 'crowd', 'fb_country_name', 'festival', 'function',
'img_num', 'is_ascending_flag', 'is_new_flag', 'label_content', 'launch_time', 'launch_time_type', 'img', 'img_num', 'is_ascending_flag', 'is_new_flag', 'label_content', 'launch_time', 'launch_time_type',
'material', 'package_quantity', 'package_quantity_arr', 'package_quantity_flag', 'parent_asin', 'material', 'multi_color_content', 'multi_color_flag', 'package_quantity', 'package_quantity_arr',
'price', 'rating', 'scene_comment', 'scene_title', 'seller_id', 'shape', 'short_desc', 'site_name', 'package_quantity_flag', 'parent_asin', 'price', 'profit_key', 'profit_rate_extra', 'rating',
'size', 'theme', 'title', 'title_pic_content', 'title_pic_flag', 'title_word_content', 'scene_comment', 'scene_title', 'seller_id', 'shape', 'short_desc', 'site_name', 'size', 'theme',
'title_word_flag', 'total_comments', 'uses', 'variation_flag', 'variation_num', 'weight' 'title', 'title_pic_content', 'title_pic_flag', 'title_word_content', 'title_word_flag',
'total_comments', 'uses', 'variation_flag', 'variation_num', 'weight'
) )
def create_pg_table(self):
"""
根据模板表创建月度 PG 表:
1. LIKE INCLUDING ALL:复制所有字段类型、其他列默认值、约束、索引
2. 重建 id 列独立序列,避免与模板表共享同一序列
"""
template_tb = "us_ai_asin_detail_month_2026_01"
engine = DBUtil.get_db_engine("postgresql", "us")
# 表已存在则跳过
result = DBUtil.engine_exec_sql(engine, f"SELECT to_regclass('{self.export_pg_tb}')")
if list(result)[0][0] is not None:
print(f"PostgreSQL 表 {self.export_pg_tb} 已存在,跳过建表")
return
# 建表 + 为 id 列创建独立序列
sql = f"""
CREATE TABLE {self.export_pg_tb} (LIKE {template_tb} INCLUDING ALL);
ALTER TABLE {self.export_pg_tb} ALTER COLUMN id DROP DEFAULT;
CREATE SEQUENCE {self.export_pg_tb}_id_seq OWNED BY {self.export_pg_tb}.id;
ALTER TABLE {self.export_pg_tb} ALTER COLUMN id SET DEFAULT nextval('{self.export_pg_tb}_id_seq')
"""
DBUtil.exec_sql("postgresql", "us", sql)
print(f"PostgreSQL 表 {self.export_pg_tb} 创建完成(独立自增序列)")
def save_data(self): def save_data(self):
# 创建月度 PG 表
self.create_pg_table()
# 将新增asin导出给济苍 # 将新增asin导出给济苍
try: try:
self.df_save_pg.write.format("jdbc") \ self.df_save_pg.write.format("jdbc") \
...@@ -180,11 +237,12 @@ class EsAiAsinAdd(object): ...@@ -180,11 +237,12 @@ class EsAiAsinAdd(object):
.save() .save()
CommonUtil.send_wx_msg(['wujicang', 'chenyuanjie'], 'ASIN信息库增量数据导出', f'详情:{self.export_pg_tb} {self.site_name} {self.date_type} {self.date_info}') CommonUtil.send_wx_msg(['wujicang', 'chenyuanjie'], 'ASIN信息库增量数据导出', f'详情:{self.export_pg_tb} {self.site_name} {self.date_type} {self.date_info}')
except Exception as e: except Exception as e:
print("An error occurred while writing to Elasticsearch:", str(e)) print("An error occurred while writing to PostgreSQL:", str(e))
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ASIN信息库增量数据导出失败', f'详情:{self.export_pg_tb} {self.site_name} {self.date_type} {self.date_info}') CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ASIN信息库增量数据导出失败', f'详情:{self.export_pg_tb} {self.site_name} {self.date_type} {self.date_info}')
# 将增量asin导出到es # 将增量asin导出到es
try: try:
EsUtils.create_index(self.es_index, self.es_client, EsUtils.get_es_ai_body())
self.df_save_es.write.format("org.elasticsearch.spark.sql") \ self.df_save_es.write.format("org.elasticsearch.spark.sql") \
.options(**self.es_options) \ .options(**self.es_options) \
.mode("append") \ .mode("append") \
......
...@@ -33,10 +33,12 @@ sys.path.append(os.path.dirname(sys.path[0])) ...@@ -33,10 +33,12 @@ sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil from utils.spark_util import SparkUtil
from utils.es_util import EsUtils from utils.es_util import EsUtils
from utils.DorisHelper import DorisHelper
from pyspark.sql import functions as F from pyspark.sql import functions as F
from utils.common_util import CommonUtil from utils.common_util import CommonUtil
from datetime import datetime, timedelta from datetime import datetime, timedelta
class EsAsinProfitRate(object): class EsAsinProfitRate(object):
def __init__(self, site_name, date_info): def __init__(self, site_name, date_info):
...@@ -53,6 +55,8 @@ class EsAsinProfitRate(object): ...@@ -53,6 +55,8 @@ class EsAsinProfitRate(object):
self.es_profit_rate_options = self.get_es_profit_rate_options(self.es_profit_rate_index) self.es_profit_rate_options = self.get_es_profit_rate_options(self.es_profit_rate_index)
self.df_asin_profit_rate = self.spark.sql(f"select 1+1;") self.df_asin_profit_rate = self.spark.sql(f"select 1+1;")
self.df_keepa_add = self.spark.sql(f"select 1+1;")
self.df_cate_flag = self.spark.sql(f"select 1+1;")
@staticmethod @staticmethod
def get_es_profit_rate_body(): def get_es_profit_rate_body():
...@@ -109,12 +113,39 @@ class EsAsinProfitRate(object): ...@@ -109,12 +113,39 @@ class EsAsinProfitRate(object):
def run(self): def run(self):
self.read_profit_rate_add() self.read_profit_rate_add()
self.update_history_index() self.read_keepa_add()
self.save_profit_rate_to_es() # 遍历完成后统一更新利润率索引 # 利润率主索引
# 更新信息库年表索引和流量选品30天索引的利润率字段 self.save_profit_rate_to_es()
self.update_index_profit_rate(f"{self.site_name}_ai_asin_analyze_detail_last365_day") # st_detail_month 近3个月
self.update_index_profit_rate(f"{self.site_name}_flow_asin_30day") for index_name in self.get_recent_indexes("st_detail_month"):
date_info = self.get_date_info_from_index(index_name)
self.update_index_combined(index_name, self.get_month_last_day(date_info))
# 年度信息库
year_max_date_info = self.spark.sql(f"""
select max(date_info) from dwt_ai_asin_year where site_name='{self.site_name}'
""").collect()[0][0]
if year_max_date_info is None:
print(f"dwt_ai_asin_year 无数据,跳过年度索引更新")
else:
self.update_index_combined(
f"{self.site_name}_ai_asin_analyze_detail_last365_day",
self.get_month_last_day(str(year_max_date_info))
)
# ai_asin_analyze_detail 近3个月
for index_name in self.get_recent_indexes("ai_asin_analyze_detail"):
date_info = self.get_date_info_from_index(index_name)
self.update_index_combined(index_name, self.get_month_last_day(date_info))
# 30天流量选品(base_date=None → 以每个 asin 的 asin_crawl_date 为基准)
self.read_cate_flag()
self.update_index_combined(f"{self.site_name}_flow_asin_30day")
# 释放缓存
self.df_asin_profit_rate.unpersist()
self.df_keepa_add.unpersist()
self.df_cate_flag.unpersist()
# ------------------------------------------------------------------ #
# 数据读取
# ------------------------------------------------------------------ #
def read_profit_rate_add(self): def read_profit_rate_add(self):
# 读取利润率整合数据(增量数据) # 读取利润率整合数据(增量数据)
sql = f""" sql = f"""
...@@ -123,29 +154,65 @@ class EsAsinProfitRate(object): ...@@ -123,29 +154,65 @@ class EsAsinProfitRate(object):
""" """
self.df_asin_profit_rate = self.spark.sql(sqlQuery=sql).repartition(40, 'asin') self.df_asin_profit_rate = self.spark.sql(sqlQuery=sql).repartition(40, 'asin')
self.df_asin_profit_rate = self.df_asin_profit_rate.withColumn( self.df_asin_profit_rate = self.df_asin_profit_rate.withColumn(
'price', F.round(F.col('price'), 2)
).withColumn(
'profit_key', F.concat_ws("_", F.col("asin"), F.col("price")) 'profit_key', F.concat_ws("_", F.col("asin"), F.col("price"))
).withColumn( ).withColumn(
"update_time", "update_time", F.substring(F.col("updated_time"), 1, 10)
F.when( ).select(
F.col("updated_time").isNotNull(), 'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time'
F.substring(F.col("updated_time"), 1, 10) )
).otherwise(F.lit("1970-01-01"))
# 从Doris获取asin_crawl_date(用于利润率主索引写入)
df_crawl_date = DorisHelper.spark_import_with_connector(
session=self.spark,
table_identifier=f"selection.{self.site_name}_asin_latest_detail",
read_fields="asin,asin_crawl_date"
).withColumn( ).withColumn(
"asin_crawl_date", F.lit(None).cast("string") # 初始化为 null "asin_crawl_date", F.substring(F.col("asin_crawl_date"), 1, 10)
).repartition(40, 'asin')
self.df_asin_profit_rate = self.df_asin_profit_rate.join(
df_crawl_date, on='asin', how='left'
).select( ).select(
'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time', 'asin_crawl_date' 'profit_key', 'asin', 'price', 'ocean_profit', 'air_profit', 'update_time', 'asin_crawl_date'
).cache() ).cache()
print(f"增量利润率数据如下:") print(f"增量利润率数据如下:")
self.df_asin_profit_rate.show(10, False) self.df_asin_profit_rate.show(10, False)
def read_keepa_add(self):
# 读取keepa增量数据
sql = f"""
select asin, tracking_since from dim_keepa_asin_info
where site_name = '{self.site_name}' and updated_time >= '{self.last_date_info}'
"""
self.df_keepa_add = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
print(f"增量keepa数据如下:")
self.df_keepa_add.show(10, False)
def read_cate_flag(self):
# 读取30day流量选品分类标记数据(用于更新30day索引额外字段)
sql = f"""
select asin,
asin_cate_flag as asin_source_flag,
bsr_latest_date as bsr_last_seen_at,
bsr_30day_count as bsr_seen_count_30d,
nsr_latest_date as nsr_last_seen_at,
nsr_30day_count as nsr_seen_count_30d
from dwd_asin_cate_flag
where site_name = '{self.site_name}' and date_type = '30day'
"""
self.df_cate_flag = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
print(f"cate_flag数据量:{self.df_cate_flag.count()}")
self.df_cate_flag.show(10, False)
# ------------------------------------------------------------------ #
# 利润率主索引
# ------------------------------------------------------------------ #
def save_profit_rate_to_es(self): def save_profit_rate_to_es(self):
"""遍历完成后,统一更新利润率索引"""
print(f"\n{'='*60}") print(f"\n{'='*60}")
print(f"开始更新利润率索引:{self.es_profit_rate_index}") print(f"开始更新利润率索引:{self.es_profit_rate_index}")
print(f"{'='*60}") print(f"{'='*60}")
print(f"最终利润率数据如下:")
self.df_asin_profit_rate.show(10, False)
EsUtils.create_index(self.es_profit_rate_index, self.es_client, self.es_profit_rate_body) EsUtils.create_index(self.es_profit_rate_index, self.es_client, self.es_profit_rate_body)
try: try:
self.df_asin_profit_rate.write.format("org.elasticsearch.spark.sql") \ self.df_asin_profit_rate.write.format("org.elasticsearch.spark.sql") \
...@@ -157,146 +224,86 @@ class EsAsinProfitRate(object): ...@@ -157,146 +224,86 @@ class EsAsinProfitRate(object):
print("An error occurred while writing to Elasticsearch:", str(e)) print("An error occurred while writing to Elasticsearch:", str(e))
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES数据更新失败', f'失败索引:{self.es_profit_rate_index}') CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES数据更新失败', f'失败索引:{self.es_profit_rate_index}')
def update_history_index(self): # ------------------------------------------------------------------ #
"""更新历史月度索引的利润率数据""" # 工具方法
# 从 2025-05 开始,遍历到最新索引 # ------------------------------------------------------------------ #
start_date = datetime(2025, 5, 1) def get_recent_indexes(self, prefix, max_count=3):
current_date = start_date """从date_info对应月份开始往前找,返回最近max_count个存在的索引名"""
indexes = []
while True: current = datetime.strptime(self.date_info, "%Y-%m-%d").replace(day=1)
year = current_date.year checked = 0
month = current_date.month while len(indexes) < max_count and checked < 24:
month_str = f"{year}-{month:02d}" index_name = f"{self.site_name}_{prefix}_{current.year}_{current.month:02d}"
index_name = f"{self.site_name}_st_detail_month_{year}_{month:02d}" if EsUtils.exist_index(index_name, self.es_client):
indexes.append(index_name)
# 检查索引是否存在 if current.month == 1:
if not self.es_client.indices.exists(index=index_name): current = current.replace(year=current.year - 1, month=12)
print(f"索引 {index_name} 不存在,停止遍历")
break
print(f"\n{'='*60}")
print(f"开始处理索引: {index_name}")
print(f"{'='*60}")
try:
self.update_single_history_index(index_name, month_str)
except Exception as e:
print(f"更新索引 {index_name} 失败: {str(e)}")
# 移动到下个月
if month == 12:
current_date = datetime(year + 1, 1, 1)
else: else:
current_date = datetime(year, month + 1, 1) current = current.replace(month=current.month - 1)
checked += 1
def update_single_history_index(self, index_name, month_str): print(f"[{prefix}] 找到 {len(indexes)} 个索引:{indexes}")
"""更新单个历史索引,同时更新 asin_crawl_date""" return indexes
hive_sql = f"""
SELECT asin, asin_price as price, asin_crawl_date FROM dwt_flow_asin def get_date_info_from_index(self, index_name):
WHERE site_name = '{self.site_name}' AND date_type = 'month' AND date_info = '{month_str}' AND asin_price IS NOT NULL # 从索引名末尾提取 date_info,如 us_st_detail_month_2026_02 → 2026-02
""" parts = index_name.rsplit('_', 2)
df_hive = self.spark.sql(hive_sql) return f"{parts[-2]}-{parts[-1]}"
df_update = self.df_asin_profit_rate.join( def get_month_last_day(self, year_month_str):
df_hive, on=['asin', 'price'], how='inner' # year_month_str: "yyyy-MM",返回该月最后一天 "yyyy-MM-dd"
).withColumn( year, month = map(int, year_month_str.split('-'))
"profit_rate_extra", if month == 12:
F.struct( last_day = datetime(year + 1, 1, 1) - timedelta(days=1)
F.col("ocean_profit").alias("ocean_profit"), else:
F.col("air_profit").alias("air_profit") last_day = datetime(year, month + 1, 1) - timedelta(days=1)
) return last_day.strftime('%Y-%m-%d')
).select("asin", "profit_rate_extra")
def get_tracking_interval_dict(self, base_date):
# 更新 df_asin_profit_rate 的 asin_crawl_date 字段(用当前分区的值覆盖) base = datetime.strptime(base_date, '%Y-%m-%d')
df_crawl_date = df_hive.select( return {
'asin', 'price', F.substring(F.col('asin_crawl_date'), 1, 10).alias('new_crawl_date') "one_month": (base - timedelta(days=30)).strftime('%Y-%m-%d'),
) "three_month": (base - timedelta(days=90)).strftime('%Y-%m-%d'),
self.df_asin_profit_rate = self.df_asin_profit_rate.join( "six_month": (base - timedelta(days=180)).strftime('%Y-%m-%d'),
df_crawl_date, on=['asin', 'price'], how='left' "twelve_month": (base - timedelta(days=360)).strftime('%Y-%m-%d'),
).withColumn( "twenty_four_month": (base - timedelta(days=720)).strftime('%Y-%m-%d'),
"asin_crawl_date", F.coalesce(F.col("new_crawl_date"), F.col("asin_crawl_date")) "thirty_six_month": (base - timedelta(days=1080)).strftime('%Y-%m-%d'),
).drop("new_crawl_date").cache()
es_options = {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.mapping.id": "asin",
"es.resource": f"{index_name}/_doc",
"es.batch.write.refresh": "false",
"es.batch.size.entries": "5000", # 批次数据量
"es.write.operation": "update",
"es.batch.write.concurrency": "5", # 降低并发数,默认是自动(较高)
"es.batch.write.retry.count": "3", # 重试次数
"es.batch.write.retry.wait": "30s", # 重试等待
"es.http.timeout": "5m", # 增加超时时间
"es.internal.es.version.ignore": "true" # 忽略版本检查
} }
print(f"索引 {index_name} 待更新数据量: {df_update.count()}") def get_tracking_since_type_expr(self, iv):
df_update.show(5, False) return f"""
df_update.repartition(10).write.format("org.elasticsearch.spark.sql") \ CASE WHEN tracking_since >= '{iv['one_month']}' THEN 1
.options(**es_options) \ WHEN tracking_since >= '{iv['three_month']}' AND tracking_since < '{iv['one_month']}' THEN 2
.mode("append") \ WHEN tracking_since >= '{iv['six_month']}' AND tracking_since < '{iv['three_month']}' THEN 3
.save() WHEN tracking_since >= '{iv['twelve_month']}' AND tracking_since < '{iv['six_month']}' THEN 4
WHEN tracking_since >= '{iv['twenty_four_month']}' AND tracking_since < '{iv['twelve_month']}' THEN 5
print(f"索引 {index_name} 更新完毕!") WHEN tracking_since >= '{iv['thirty_six_month']}' AND tracking_since < '{iv['twenty_four_month']}' THEN 6
WHEN tracking_since < '{iv['thirty_six_month']}' THEN 7
ELSE 0 END
def update_index_profit_rate(self, index_name):
"""
从指定索引读取 asin + profit_key,
与新增利润率数据 inner join 后,只更新利润率相关字段回索引
""" """
if not EsUtils.exist_index(index_name, self.es_client):
print(f"索引 {index_name} 不存在,跳过更新")
return
print(f"\n{'='*60}") # ------------------------------------------------------------------ #
print(f"开始更新索引利润率字段:{index_name}") # ES 读写配置
print(f"{'='*60}") # ------------------------------------------------------------------ #
def get_es_read_options(self, include_crawl_date=False):
# 从索引中读取 asin + profit_key # es.mapping.date.rich=false:date字段以原始字符串返回,避免转为timestamp带来的精度问题
read_options = { # profit_rate_extra 不从ES读取:月度索引所有文档初始值为{},ES-Hadoop推断为空struct,
# 写回时F.when/otherwise两分支类型不一致导致struct字段被丢弃,序列化为{}
# 改为直接从Hive新值构建struct写入,彻底绕开空struct的schema推断问题
fields = "asin,profit_key,tracking_since,tracking_since_type"
if include_crawl_date:
fields += ",asin_crawl_date"
return {
"es.nodes": EsUtils.__es_ip__, "es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__, "es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__, "es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__, "es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.nodes.wan.only": "true", "es.nodes.wan.only": "false",
"es.read.field.include": "asin,profit_key" "es.mapping.date.rich": "false",
"es.read.field.include": fields,
"es.scroll.size": "5000"
} }
df_index = self.spark.read.format("org.elasticsearch.spark.sql") \
.options(**read_options) \
.load(index_name) \
.select("asin", "profit_key") \
.dropna(subset=["profit_key"]) \
.repartition(40, "profit_key")
# 与新增利润率数据 inner join
df_update = df_index.join(
self.df_asin_profit_rate.select("profit_key", "ocean_profit", "air_profit"),
on="profit_key",
how="inner"
).withColumn(
"profit_rate_extra",
F.when(
F.col("ocean_profit").isNull() & F.col("air_profit").isNull(),
F.lit(None)
).otherwise(
F.struct(
F.col("ocean_profit").alias("ocean_profit"),
F.col("air_profit").alias("air_profit")
)
)
).select("asin", "profit_rate_extra").cache()
count = df_update.count()
print(f"索引 {index_name} 待更新利润率数据量: {count}")
if count == 0:
print("无待更新数据,跳过")
return
def write_combined_update(self, df, index_name):
write_options = { write_options = {
"es.nodes": EsUtils.__es_ip__, "es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__, "es.port": EsUtils.__es_port__,
...@@ -305,23 +312,164 @@ class EsAsinProfitRate(object): ...@@ -305,23 +312,164 @@ class EsAsinProfitRate(object):
"es.mapping.id": "asin", "es.mapping.id": "asin",
"es.resource": f"{index_name}/_doc", "es.resource": f"{index_name}/_doc",
"es.write.operation": "update", "es.write.operation": "update",
"es.batch.write.abort.on.failure": "false",
"es.update.retry.on.conflict": "3", "es.update.retry.on.conflict": "3",
"es.batch.write.refresh": "false", "es.batch.write.refresh": "false",
"es.batch.size.entries": "2000", "es.batch.size.entries": "5000",
"es.batch.write.concurrency": "5", "es.batch.write.concurrency": "20",
"es.batch.write.retry.count": "5", "es.batch.write.retry.count": "3",
"es.batch.write.retry.wait": "60s", "es.batch.write.retry.wait": "60s",
"es.nodes.wan.only": "true" "es.nodes.wan.only": "false"
} }
print(f"\n{'='*60}")
print(f"开始写回索引:{index_name}")
print(f"{'='*60}")
try: try:
df_update.repartition(10).write.format("org.elasticsearch.spark.sql") \ df.repartition(10).write.format("org.elasticsearch.spark.sql") \
.options(**write_options) \ .options(**write_options) \
.mode("append") \ .mode("append") \
.save() .save()
print(f"索引 {index_name} 利润率字段更新完毕!") print(f"索引 {index_name} 利润率+追踪时间字段更新完毕!")
except Exception as e: except Exception as e:
print(f"更新索引 {index_name} 失败: {str(e)}") print(f"更新索引 {index_name} 失败: {str(e)}")
CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES利润率更新失败', f'失败索引:{index_name}') CommonUtil.send_wx_msg(['chenyuanjie'], '\u26A0 ES索引更新失败', f'失败索引:{index_name}')
# ------------------------------------------------------------------ #
# 核心更新逻辑:单次读 + 分类写入
# ------------------------------------------------------------------ #
def update_index_combined(self, index_name, base_date=None):
"""
base_date: 固定基准日期(yyyy-MM-dd),用于月度/年度索引 tracking_since_type 计算
None 表示 30day 索引,以每个 asin 的 asin_crawl_date 为基准动态计算
"""
if not EsUtils.exist_index(index_name, self.es_client):
print(f"索引 {index_name} 不存在,跳过更新")
return
print(f"\n{'='*60}")
print(f"开始处理索引:{index_name},base_date={base_date or 'asin_crawl_date(动态)'}")
print(f"{'='*60}")
# 1. 读取ES现有数据,date字段以原始字符串返回
# profit_rate_extra 不读取(见 get_es_read_options 注释)
# asin_crawl_date 仅 30day 动态模式需要,按需读取
# cache:30day索引步骤8需复用 df_es.select('asin'),避免二次全量读取
df_es = self.spark.read.format("org.elasticsearch.spark.sql") \
.options(**self.get_es_read_options(include_crawl_date=(base_date is None))) \
.load(index_name) \
.withColumn('tracking_since_type', F.col('tracking_since_type').cast('int')) \
.repartition(40, 'asin') \
.cache()
# 2. 准备利润率增量(直接用 profit_key 关联,无需读取 ES price 字段)
df_profit = self.df_asin_profit_rate.select(
'profit_key',
F.col('ocean_profit').cast('float').alias('new_ocean_profit'),
F.col('air_profit').cast('float').alias('new_air_profit')
)
# 3. 准备keepa增量(重命名避免列冲突)
df_keepa = self.df_keepa_add.select(
'asin',
F.col('tracking_since').alias('kp_tracking_since')
)
# 4. left join 两个增量数据集
# profit_key 格式统一(asin_price,已 round),ES 与 Hive 两侧直接对齐,无需借助 price 列
df = df_es \
.join(df_profit, on='profit_key', how='left') \
.join(df_keepa, on='asin', how='left')
# 5. 过滤:至少一方有增量数据,减少无效写入
df = df.filter(F.col('new_ocean_profit').isNotNull() | F.col('new_air_profit').isNotNull() | F.col('kp_tracking_since').isNotNull())
# 6. keepa字段合并:有新数据用新的,否则保留ES旧值
# 将 kp_tracking_since(keepa原始分钟值)转为日期字符串
df = df.withColumn('tracking_since',
F.when(
F.col('kp_tracking_since').isNotNull(),
F.date_format(
F.from_unixtime((F.col('kp_tracking_since') + F.lit(21564000)) * 60),
'yyyy-MM-dd'
)
).otherwise(F.col('tracking_since'))
)
# 计算 tracking_since_type
if base_date is not None:
# 固定基准:月度/年度索引
iv = self.get_tracking_interval_dict(base_date)
df = df.withColumn('tracking_since_type',
F.when(F.col('kp_tracking_since').isNotNull(), F.expr(self.get_tracking_since_type_expr(iv)))
.otherwise(F.col('tracking_since_type'))
)
else:
# 动态基准:30day索引,以每个 asin 的 asin_crawl_date 为基准
# asin_crawl_date 格式为 "yyyy-MM-dd HH:mm:ss",截取前10位转为日期字符串
# 将 tracking_since 字符串显式转为 DateType,避免与 date_sub 返回值类型不一致
df = df.withColumn('asin_crawl_date', F.substring(F.col('asin_crawl_date'), 1, 10))
ts_date = F.to_date(F.col('tracking_since'), 'yyyy-MM-dd')
df = df.withColumn('tracking_since_type',
F.when(F.col('kp_tracking_since').isNotNull(),
F.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 30), F.lit(1))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 90), F.lit(2))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 180), F.lit(3))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 360), F.lit(4))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 720), F.lit(5))
.when(ts_date >= F.date_sub(F.col('asin_crawl_date'), 1080), F.lit(6))
.when(ts_date < F.date_sub(F.col('asin_crawl_date'), 1080), F.lit(7))
.otherwise(F.lit(0))
).otherwise(F.col('tracking_since_type'))
)
# 7. 写回索引(tracking_since_type 写回 ES short 类型,需显式 cast)
# 拆为两次写入:
# 7a. 有利润率数据的行 → profit_rate_extra struct 从新值直接构建,彻底绕开空struct schema推断问题
# 7b. 仅有 keepa 数据的行 → 不写 profit_rate_extra,避免 null 覆盖已有利润率值
df = df.cache()
df_profit_update = df.filter(
F.col('new_ocean_profit').isNotNull() | F.col('new_air_profit').isNotNull()
).select(
'asin', 'profit_key',
F.struct(
F.col('new_ocean_profit').cast('float').alias('ocean_profit'),
F.col('new_air_profit').cast('float').alias('air_profit')
).alias('profit_rate_extra'),
'tracking_since',
F.col('tracking_since_type').cast('short')
)
self.write_combined_update(df_profit_update, index_name)
df_keepa_update = df.filter(
F.col('new_ocean_profit').isNull() & F.col('new_air_profit').isNull()
).select(
'asin', 'profit_key', 'tracking_since',
F.col('tracking_since_type').cast('short')
)
self.write_combined_update(df_keepa_update, index_name)
df.unpersist()
# 8. 30day 索引额外更新 cate_flag 相关字段(partial update,不影响其他字段)
# inner join df_es 确保只更新索引中已存在的 asin,避免 doc missing 报错
if base_date is None:
print(f"[30day] 开始更新 cate_flag 字段:asin_source_flag / bsr / nsr")
df_cate_update = self.df_cate_flag.join(
df_es.select('asin'), on='asin', how='inner'
).select(
'asin', 'asin_source_flag',
'bsr_last_seen_at', 'bsr_seen_count_30d',
'nsr_last_seen_at', 'nsr_seen_count_30d'
).na.fill({
'asin_source_flag': '0',
'bsr_last_seen_at': '1970-01-01',
'bsr_seen_count_30d': 0,
'nsr_last_seen_at': '1970-01-01',
'nsr_seen_count_30d': 0
})
self.write_combined_update(df_cate_update, index_name)
df_es.unpersist()
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -21,7 +21,7 @@ class ImportStToPg14(object): ...@@ -21,7 +21,7 @@ class ImportStToPg14(object):
self.df_st_month = pd.DataFrame() self.df_st_month = pd.DataFrame()
self.df_st_month_state = pd.DataFrame() self.df_st_month_state = pd.DataFrame()
self.df_save = pd.DataFrame() self.df_save = pd.DataFrame()
self.fetch_year_month_by_week() # 如果传的date_type='week', 将date_info转换成月的值 # self.fetch_year_month_by_week() # 如果传的date_type='week', 将date_info转换成月的值
self.year, self.month = self.date_info.split("-")[0], int(self.date_info.split("-")[1]) self.year, self.month = self.date_info.split("-")[0], int(self.date_info.split("-")[1])
def fetch_year_month_by_week(self): def fetch_year_month_by_week(self):
...@@ -31,6 +31,7 @@ class ImportStToPg14(object): ...@@ -31,6 +31,7 @@ class ImportStToPg14(object):
self.date_info = list(df.year_month)[0] self.date_info = list(df.year_month)[0]
def read_data(self): def read_data(self):
self.fetch_year_month_by_week() # 如果传的date_type='week', 将date_info转换成月的值
# 1. 读取date_20_to_30表获取月份对应的周 # 1. 读取date_20_to_30表获取月份对应的周
sql_get_week = f"select year_week, year, week from selection.date_20_to_30 WHERE `year_month`='{self.date_info}' and week_day=1" sql_get_week = f"select year_week, year, week from selection.date_20_to_30 WHERE `year_month`='{self.date_info}' and week_day=1"
df_week = pd.read_sql(sql_get_week, con=self.engine_mysql) df_week = pd.read_sql(sql_get_week, con=self.engine_mysql)
......
...@@ -99,6 +99,7 @@ class KafkaFlowAsinDetail(Templates): ...@@ -99,6 +99,7 @@ class KafkaFlowAsinDetail(Templates):
self.df_asin_keep_date = self.spark.sql("select 1+1;") self.df_asin_keep_date = self.spark.sql("select 1+1;")
self.df_asin_bsr_end = self.spark.sql("select 1+1;") self.df_asin_bsr_end = self.spark.sql("select 1+1;")
self.df_hide_category = self.spark.sql("select 1+1;") self.df_hide_category = self.spark.sql("select 1+1;")
self.df_bsr_category = self.spark.sql("select 1+1;")
self.df_asin_new_cate = self.spark.sql("select 1+ 1;") self.df_asin_new_cate = self.spark.sql("select 1+ 1;")
self.df_user_package_num = self.spark.sql("select 1+1;") self.df_user_package_num = self.spark.sql("select 1+1;")
self.df_asin_category = self.spark.sql("select 1+1;") self.df_asin_category = self.spark.sql("select 1+1;")
...@@ -190,7 +191,8 @@ class KafkaFlowAsinDetail(Templates): ...@@ -190,7 +191,8 @@ class KafkaFlowAsinDetail(Templates):
StructField("customer_reviews_json", StringType(), True), StructField("customer_reviews_json", StringType(), True),
StructField("img_list", StringType(), True), StructField("img_list", StringType(), True),
StructField("follow_sellers", IntegerType(), True), StructField("follow_sellers", IntegerType(), True),
StructField("fbm_delivery_price", FloatType(), True) StructField("fbm_delivery_price", FloatType(), True),
StructField("product_json", StringType(), True)
]) ])
return schema return schema
...@@ -448,15 +450,26 @@ class KafkaFlowAsinDetail(Templates): ...@@ -448,15 +450,26 @@ class KafkaFlowAsinDetail(Templates):
withColumn("title_package_quantity_is_abnormal", df.title_parse.getField("is_package_quantity_abnormal")). \ withColumn("title_package_quantity_is_abnormal", df.title_parse.getField("is_package_quantity_abnormal")). \
withColumn("variat_package_quantity_is_abnormal", df.variat_parse.getField("is_package_quantity_abnormal")). \ withColumn("variat_package_quantity_is_abnormal", df.variat_parse.getField("is_package_quantity_abnormal")). \
drop("title_parse", "variat_parse", "variat_attribute") drop("title_parse", "variat_parse", "variat_attribute")
# Number of Items:从 product_json 提取,cast 失败(脏数据)自动为 null,提取后立即 drop
df = df.withColumn(
"number_of_items",
F.get_json_object(F.col("product_json"), "$.Number of Items").cast("int")
).drop("product_json")
# 优先级:Number of Items > 属性字段 > 标题解析 > 默认1
df = df.withColumn( df = df.withColumn(
"package_quantity", F.expr(""" "package_quantity", F.expr("""
CASE WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity CASE WHEN number_of_items IS NOT NULL AND number_of_items > 0 THEN number_of_items
WHEN title_package_quantity is not null THEN title_package_quantity ELSE 1 END""") WHEN variat_package_quantity IS NOT NULL THEN variat_package_quantity
WHEN title_package_quantity IS NOT NULL THEN title_package_quantity
ELSE 1 END""")
).withColumn( ).withColumn(
"is_package_quantity_abnormal", F.expr(""" "is_package_quantity_abnormal", F.expr("""
CASE WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity_is_abnormal CASE WHEN number_of_items IS NOT NULL AND number_of_items > 0 THEN 0
WHEN title_package_quantity is not null THEN title_package_quantity_is_abnormal ELSE 2 END""") WHEN variat_package_quantity IS NOT NULL THEN variat_package_quantity_is_abnormal
).drop("title_package_quantity", "variat_package_quantity", "title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal") WHEN title_package_quantity IS NOT NULL THEN title_package_quantity_is_abnormal
ELSE 2 END""")
).drop("number_of_items", "title_package_quantity", "variat_package_quantity",
"title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal")
df = df.withColumn("title", F.lower(F.col("title"))) df = df.withColumn("title", F.lower(F.col("title")))
df = df.join(self.df_user_package_num, on=['asin', 'title'], how='left') df = df.join(self.df_user_package_num, on=['asin', 'title'], how='left')
df = df.withColumn("package_quantity", F.coalesce(F.col("user_package_num"), F.col("package_quantity"))). \ df = df.withColumn("package_quantity", F.coalesce(F.col("user_package_num"), F.col("package_quantity"))). \
...@@ -567,11 +580,18 @@ class KafkaFlowAsinDetail(Templates): ...@@ -567,11 +580,18 @@ class KafkaFlowAsinDetail(Templates):
df = df.withColumn("bsr_type", F.expr(""" df = df.withColumn("bsr_type", F.expr("""
CASE WHEN limit_rank is null and asin_bs_cate_1_rank <= 500000 THEN 1 WHEN limit_rank is not null and asin_bs_cate_1_rank <= limit_rank THEN 1 ELSE 0 END""" CASE WHEN limit_rank is null and asin_bs_cate_1_rank <= 500000 THEN 1 WHEN limit_rank is not null and asin_bs_cate_1_rank <= limit_rank THEN 1 ELSE 0 END"""
)).drop("limit_rank") )).drop("limit_rank")
# 5. 是否必需ASIN # 5. 是否必需ASIN(双重确认:BSR分类 + 页面描述分类,两者均在排除列表才标记为1)
df = df.withColumn("is_need_asin", F.expr(""" df = df.withColumn(
CASE WHEN asin_bs_cate_1_id in ('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed') THEN 1 "desc_category_first_name",
WHEN asin NOT LIKE 'B0%' THEN 1 F.lower(F.trim(F.split(F.col("category"), "›").getItem(0)))
).join(self.df_bsr_category, on=['desc_category_first_name'], how='left')
need_categories = "('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed')"
df = df.withColumn("is_need_asin", F.expr(f"""
CASE WHEN asin_bs_cate_1_id in {need_categories}
AND desc_category_first_id in {need_categories} THEN 1
WHEN asin NOT LIKE 'B0%' THEN 1
ELSE 0 END""")) ELSE 0 END"""))
df = df.drop("desc_category_first_name", "desc_category_first_id")
# 6. asin_type # 6. asin_type
df = df.withColumn("asin_type", F.expr(""" df = df.withColumn("asin_type", F.expr("""
CASE WHEN is_self_asin=1 THEN 1 WHEN is_need_asin=1 THEN 2 WHEN is_hide_asin=1 THEN 3 ELSE 0 END""" CASE WHEN is_self_asin=1 THEN 1 WHEN is_need_asin=1 THEN 2 WHEN is_hide_asin=1 THEN 3 ELSE 0 END"""
...@@ -805,6 +825,13 @@ class KafkaFlowAsinDetail(Templates): ...@@ -805,6 +825,13 @@ class KafkaFlowAsinDetail(Templates):
username=us_mysql_con_info['username'], query=sql) username=us_mysql_con_info['username'], query=sql)
self.df_hide_category = F.broadcast(df_hide_category) self.df_hide_category = F.broadcast(df_hide_category)
self.df_hide_category.show(10, truncate=False) self.df_hide_category.show(10, truncate=False)
print("5.1 读取BSR分类树(用于双重确认is_need_asin)")
sql = f"""
select lower(trim(en_name)) as desc_category_first_name, category_first_id as desc_category_first_id
from dim_bsr_category_tree where site_name = '{self.site_name}' and category_parent_id = 0 and leaf_node = 2
"""
self.df_bsr_category = F.broadcast(self.spark.sql(sqlQuery=sql))
self.df_bsr_category.show(10, truncate=False)
print("6. 读取asin_label信息") print("6. 读取asin_label信息")
sql = f""" sql = f"""
select asin, label from select asin, label from
...@@ -986,6 +1013,7 @@ class KafkaFlowAsinDetail(Templates): ...@@ -986,6 +1013,7 @@ class KafkaFlowAsinDetail(Templates):
F.col("describe_len").alias("asin_describe_len") F.col("describe_len").alias("asin_describe_len")
) )
df = df.drop("category", "seller_json") df = df.drop("category", "seller_json")
df = df.withColumn("date_info_del", F.lit(self.date_info))
df.write.format("org.elasticsearch.spark.sql").options(**self.es_options).mode("append").save() df.write.format("org.elasticsearch.spark.sql").options(**self.es_options).mode("append").save()
end_time = time.time() end_time = time.time()
elapsed_time = end_time - start_time elapsed_time = end_time - start_time
......
...@@ -98,6 +98,7 @@ class KafkaRankAsinDetail(Templates): ...@@ -98,6 +98,7 @@ class KafkaRankAsinDetail(Templates):
self.df_asin_keep_date = self.spark.sql("select 1+1;") self.df_asin_keep_date = self.spark.sql("select 1+1;")
self.df_asin_bsr_end = self.spark.sql("select 1+1;") self.df_asin_bsr_end = self.spark.sql("select 1+1;")
self.df_hide_category = self.spark.sql("select 1+1;") self.df_hide_category = self.spark.sql("select 1+1;")
self.df_bsr_category = self.spark.sql("select 1+1;")
self.df_asin_new_cate = self.spark.sql("select 1+ 1;") self.df_asin_new_cate = self.spark.sql("select 1+ 1;")
self.df_user_package_num = self.spark.sql("select 1+1;") self.df_user_package_num = self.spark.sql("select 1+1;")
self.df_asin_category = self.spark.sql("select 1+1;") self.df_asin_category = self.spark.sql("select 1+1;")
...@@ -189,7 +190,8 @@ class KafkaRankAsinDetail(Templates): ...@@ -189,7 +190,8 @@ class KafkaRankAsinDetail(Templates):
StructField("customer_reviews_json", StringType(), True), StructField("customer_reviews_json", StringType(), True),
StructField("img_list", StringType(), True), StructField("img_list", StringType(), True),
StructField("follow_sellers", IntegerType(), True), StructField("follow_sellers", IntegerType(), True),
StructField("fbm_delivery_price", FloatType(), True) StructField("fbm_delivery_price", FloatType(), True),
StructField("product_json", StringType(), True)
]) ])
return schema return schema
...@@ -447,15 +449,26 @@ class KafkaRankAsinDetail(Templates): ...@@ -447,15 +449,26 @@ class KafkaRankAsinDetail(Templates):
withColumn("title_package_quantity_is_abnormal", df.title_parse.getField("is_package_quantity_abnormal")). \ withColumn("title_package_quantity_is_abnormal", df.title_parse.getField("is_package_quantity_abnormal")). \
withColumn("variat_package_quantity_is_abnormal", df.variat_parse.getField("is_package_quantity_abnormal")). \ withColumn("variat_package_quantity_is_abnormal", df.variat_parse.getField("is_package_quantity_abnormal")). \
drop("title_parse", "variat_parse", "variat_attribute") drop("title_parse", "variat_parse", "variat_attribute")
# Number of Items:从 product_json 提取,cast 失败(脏数据)自动为 null,提取后立即 drop
df = df.withColumn(
"number_of_items",
F.get_json_object(F.col("product_json"), "$.Number of Items").cast("int")
).drop("product_json")
# 优先级:Number of Items > 属性字段 > 标题解析 > 默认1
df = df.withColumn( df = df.withColumn(
"package_quantity", F.expr(""" "package_quantity", F.expr("""
CASE WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity CASE WHEN number_of_items IS NOT NULL AND number_of_items > 0 THEN number_of_items
WHEN title_package_quantity is not null THEN title_package_quantity ELSE 1 END""") WHEN variat_package_quantity IS NOT NULL THEN variat_package_quantity
WHEN title_package_quantity IS NOT NULL THEN title_package_quantity
ELSE 1 END""")
).withColumn( ).withColumn(
"is_package_quantity_abnormal", F.expr(""" "is_package_quantity_abnormal", F.expr("""
CASE WHEN title_package_quantity is null and variat_package_quantity is not null THEN variat_package_quantity_is_abnormal CASE WHEN number_of_items IS NOT NULL AND number_of_items > 0 THEN 0
WHEN title_package_quantity is not null THEN title_package_quantity_is_abnormal ELSE 2 END""") WHEN variat_package_quantity IS NOT NULL THEN variat_package_quantity_is_abnormal
).drop("title_package_quantity", "variat_package_quantity", "title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal") WHEN title_package_quantity IS NOT NULL THEN title_package_quantity_is_abnormal
ELSE 2 END""")
).drop("number_of_items", "title_package_quantity", "variat_package_quantity",
"title_package_quantity_is_abnormal", "variat_package_quantity_is_abnormal")
df = df.withColumn("title", F.lower(F.col("title"))) df = df.withColumn("title", F.lower(F.col("title")))
df = df.join(self.df_user_package_num, on=['asin', 'title'], how='left') df = df.join(self.df_user_package_num, on=['asin', 'title'], how='left')
df = df.withColumn("package_quantity", F.coalesce(F.col("user_package_num"), F.col("package_quantity"))). \ df = df.withColumn("package_quantity", F.coalesce(F.col("user_package_num"), F.col("package_quantity"))). \
...@@ -566,11 +579,18 @@ class KafkaRankAsinDetail(Templates): ...@@ -566,11 +579,18 @@ class KafkaRankAsinDetail(Templates):
df = df.withColumn("bsr_type", F.expr(""" df = df.withColumn("bsr_type", F.expr("""
CASE WHEN limit_rank is null and asin_bs_cate_1_rank <= 500000 THEN 1 WHEN limit_rank is not null and asin_bs_cate_1_rank <= limit_rank THEN 1 ELSE 0 END""" CASE WHEN limit_rank is null and asin_bs_cate_1_rank <= 500000 THEN 1 WHEN limit_rank is not null and asin_bs_cate_1_rank <= limit_rank THEN 1 ELSE 0 END"""
)).drop("limit_rank") )).drop("limit_rank")
# 5. 是否必需ASIN # 5. 是否必需ASIN(双重确认:BSR分类 + 页面描述分类,两者均在排除列表才标记为1)
df = df.withColumn("is_need_asin", F.expr(""" df = df.withColumn(
CASE WHEN asin_bs_cate_1_id in ('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed') THEN 1 "desc_category_first_name",
WHEN asin NOT LIKE 'B0%' THEN 1 F.lower(F.trim(F.split(F.col("category"), "›").getItem(0)))
).join(self.df_bsr_category, on=['desc_category_first_name'], how='left')
need_categories = "('mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text', 'magazines', 'movies-tv', 'software', 'videogames', 'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed')"
df = df.withColumn("is_need_asin", F.expr(f"""
CASE WHEN asin_bs_cate_1_id in {need_categories}
AND desc_category_first_id in {need_categories} THEN 1
WHEN asin NOT LIKE 'B0%' THEN 1
ELSE 0 END""")) ELSE 0 END"""))
df = df.drop("desc_category_first_name", "desc_category_first_id")
# 6. asin_type # 6. asin_type
df = df.withColumn("asin_type", F.expr(""" df = df.withColumn("asin_type", F.expr("""
CASE WHEN is_self_asin=1 THEN 1 WHEN is_need_asin=1 THEN 2 WHEN is_hide_asin=1 THEN 3 ELSE 0 END""" CASE WHEN is_self_asin=1 THEN 1 WHEN is_need_asin=1 THEN 2 WHEN is_hide_asin=1 THEN 3 ELSE 0 END"""
...@@ -804,6 +824,13 @@ class KafkaRankAsinDetail(Templates): ...@@ -804,6 +824,13 @@ class KafkaRankAsinDetail(Templates):
username=us_mysql_con_info['username'], query=sql) username=us_mysql_con_info['username'], query=sql)
self.df_hide_category = F.broadcast(df_hide_category) self.df_hide_category = F.broadcast(df_hide_category)
self.df_hide_category.show(10, truncate=False) self.df_hide_category.show(10, truncate=False)
print("5.1 读取BSR分类树(用于双重确认is_need_asin)")
sql = f"""
select lower(trim(en_name)) as desc_category_first_name, category_first_id as desc_category_first_id
from dim_bsr_category_tree where site_name = '{self.site_name}' and category_parent_id = 0 and leaf_node = 2
"""
self.df_bsr_category = F.broadcast(self.spark.sql(sqlQuery=sql))
self.df_bsr_category.show(10, truncate=False)
print("6. 读取asin_label信息") print("6. 读取asin_label信息")
sql = f""" sql = f"""
select asin, label from select asin, label from
...@@ -982,6 +1009,7 @@ class KafkaRankAsinDetail(Templates): ...@@ -982,6 +1009,7 @@ class KafkaRankAsinDetail(Templates):
F.col("describe_len").alias("asin_describe_len") F.col("describe_len").alias("asin_describe_len")
) )
df = df.drop("category", "seller_json") df = df.drop("category", "seller_json")
df = df.withColumn("date_info_del", F.lit("1970-01"))
df.write.format("org.elasticsearch.spark.sql").options(**self.es_options).mode("append").save() df.write.format("org.elasticsearch.spark.sql").options(**self.es_options).mode("append").save()
end_time = time.time() end_time = time.time()
elapsed_time = end_time - start_time elapsed_time = end_time - start_time
......
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.DolphinschedulerHelper import DolphinschedulerHelper
from utils.redis_utils import RedisUtils
from utils.common_util import CommonUtil
if __name__ == '__main__':
# 监听半小时之内报错的任务
client = RedisUtils.getClient()
redis_key = "dolphinscheduler_task_monitor:lateTime"
lateTime = client.get(redis_key)
lateTime = lateTime or "2026-03-20 00:00:00"
import json
print(lateTime)
req_params = {
"pageNo": 1,
"pageSize": 100,
"stateType": "FAILURE",
"startDate": lateTime
}
project_name = "big_data_selection"
errList = DolphinschedulerHelper.list_projects_task(project_name=project_name, req_params=req_params)
errMsg = []
for it in errList:
task_name = "-".join(it['name'].split("-")[:-2])
paramMap = DolphinschedulerHelper.view_process_instance_variables(project_name, it["id"])
errMsg.append(f"""任务[{task_name}]执行失败,启动参数为:{json.dumps(paramMap)}""")
from datetime import datetime
now = datetime.now()
formatted_time = now.strftime("%Y-%m-%d %H:%M:%S")
if len(errMsg) > 0:
all_msg = "\n".join(errMsg)
msg = f"""截止到日期{lateTime}到{formatted_time},海豚任务报错详情如下\n{all_msg}"""
CommonUtil.send_msg_robot(msg)
client.set(redis_key, formatted_time)
"""
@Author : CT
@Description : 导出没有keepa数据的流量选品ASIN
筛选条件(两种模式通用):
- asin_type in (0, 1, 3)
- asin_price > 0
- asin_bought_month >= 50 OR first_category_rank <= 600000
- 排除数字类目
- 排除 dim_keepa_asin_info 中已有 keepa 数据的 ASIN
- 排除 {site_name}_asin_profit_keepa_add 中已导出的 ASIN
date_type=month : 旧逻辑,直接从 dwt_flow_asin 读取
(asin_type/category_first_id/first_category_rank 已计算好)
date_type=month_week: 新逻辑,从 dim_asin_detail 读取
补充计算(对齐dwt逻辑):
- first_category_rank : 来自 dim_asin_bs_info
- category_first_id : BSR来源优先,dim兜底
- category_id : BSR来源优先,dim兜底
- asin_bought_month : dim值 + dwd_asin_measure 兜底
- asin_type : 参考 dwt.handle_asin_is_hide() 计算
@CreateTime : 2026-03-17
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from pyspark.sql import functions as F
# 数字/虚拟类目排除列表 — 筛选条件用
EXCLUDE_CATEGORIES = (
"audible", "books", "digital-text", "dmusic",
"mobile-apps", "movies-tv", "music", "software", "videogames"
)
# asin_is_need 判断中需要过滤的类目(对齐 dwt.handle_asin_is_hide)
NEED_FILTER_CATEGORIES = (
'mobile-apps', 'audible', 'books', 'music', 'dmusic', 'digital-text',
'magazines', 'movies-tv', 'software', 'videogames',
'amazon-devices', 'boost', 'us-live-explorations', 'amazon-renewed'
)
# asin_is_hide 中 category_id 级别的隐藏节点
HIDE_CATEGORY_IDS = (
'21393128011', '21377129011', '21377127011',
'21377130011', '21388218011', '21377132011'
)
class ExportAsinWithoutKeepa(object):
def __init__(self, site_name, date_info, date_type):
self.site_name = site_name
self.date_info = date_info
self.date_type = date_type
self.pg_table = f"{site_name}_asin_profit_keepa_add"
self.spark = SparkUtil.get_spark_session(
f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
)
self.df_flow_asin = self.spark.sql("select 1+1;")
self.df_save = self.spark.sql("select 1+1;")
# ------------------------------------------------------------------ #
# 步骤1:读取数据(按 date_type 分支) #
# ------------------------------------------------------------------ #
def read_data(self):
if self.date_type == 'month':
self._read_data_month()
elif self.date_type == 'month_week':
self._read_data_month_week()
else:
raise ValueError(f"不支持的 date_type: {self.date_type},仅支持 month / month_week")
def _read_data_month(self):
"""旧逻辑:直接从 dwt_flow_asin 读取,筛选条件在 SQL 中完成"""
print("1. [month] 从 dwt_flow_asin 读取并筛选")
exclude_str = ", ".join(f"'{c}'" for c in EXCLUDE_CATEGORIES)
sql = f"""
select asin
from dwt_flow_asin
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
and asin_type in (0, 1, 3)
and asin_price is not null
and asin_price > 0
and (asin_bought_month >= 50 or first_category_rank <= 600000)
and (category_first_id is null
or category_first_id not in ({exclude_str}))
"""
self.df_flow_asin = self.spark.sql(sql).repartition(40, 'asin').cache()
print(f"dwt_flow_asin 筛选后数据量: {self.df_flow_asin.count()}")
def _read_data_month_week(self):
"""新逻辑:从 dim_asin_detail 读取,补充计算 asin_type / first_category_rank 等字段"""
# ① dim_asin_detail 主表
print("1. [month_week] 读取 dim_asin_detail")
sql = f"""
select asin, asin_price, asin_bought_month,
asin_is_self,
category_id as top_category_id,
category_first_id as top_category_first_id
from dim_asin_detail
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
df_dim = self.spark.sql(sql).repartition(40, 'asin').cache()
print(f"dim_asin_detail 数据量: {df_dim.count()}")
# ② dim_asin_bs_info → first_category_rank / category_first_id(BSR) / category_id(BSR)
print("2. 读取 dim_asin_bs_info (BSR分类与排名)")
sql = f"""
select asin,
asin_bs_cate_1_rank as first_category_rank,
asin_bs_cate_1_id as bsr_category_first_id,
asin_bs_cate_current_id as bsr_category_id
from dim_asin_bs_info
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
df_bsr = self.spark.sql(sql).repartition(40, 'asin')
# ③ dwd_asin_measure → asin_amazon_orders(asin_bought_month 兜底)
print("3. 读取 dwd_asin_measure (asin_amazon_orders 兜底)")
sql = f"""
select asin, asin_amazon_orders
from dwd_asin_measure
where site_name = '{self.site_name}'
and date_type = '{self.date_type}'
and date_info = '{self.date_info}'
"""
df_measure = self.spark.sql(sql).repartition(40, 'asin')
# ④ us_bs_category_hide → 隐藏分类(用于 asin_type 计算)
print("4. 读取 us_bs_category_hide (隐藏分类)")
mysql_con = DBUtil.get_connection_info("mysql", self.site_name)
sql = "select category_id_base as category_id, 1 as hide_flag from us_bs_category_hide group by category_id_base"
df_hide = SparkUtil.read_jdbc_query(
session=self.spark, url=mysql_con['url'],
pwd=mysql_con['pwd'], username=mysql_con['username'], query=sql
)
# ⑤ 组装
print("5. 组装主DataFrame")
df = df_dim \
.join(df_bsr, on='asin', how='left') \
.join(df_measure, on='asin', how='left')
# category_first_id / category_id:BSR优先,dim兜底
df = df.withColumn(
"category_first_id",
F.coalesce(F.col("bsr_category_first_id"), F.col("top_category_first_id"))
).withColumn(
"category_id",
F.coalesce(F.col("bsr_category_id"), F.col("top_category_id"))
).drop("bsr_category_first_id", "bsr_category_id",
"top_category_first_id", "top_category_id")
# asin_bought_month:dim值优先,measure兜底
df = df.withColumn(
"asin_bought_month",
F.coalesce(F.col("asin_bought_month"), F.col("asin_amazon_orders"))
).drop("asin_amazon_orders")
# asin_type 计算(对齐 dwt.handle_asin_is_hide)
df = df.join(F.broadcast(df_hide), on='category_id', how='left')
df = df.withColumn(
"asin_is_hide",
F.expr(f"""
CASE WHEN hide_flag = 1 THEN 1
WHEN category_first_id = 'grocery' AND category_id != '6492272011' THEN 1
WHEN category_id IN {HIDE_CATEGORY_IDS} THEN 1
ELSE 0 END
""")
).withColumn(
"asin_is_need",
F.expr(f"""
CASE WHEN category_first_id IN {NEED_FILTER_CATEGORIES} THEN 1
WHEN asin NOT LIKE 'B0%' THEN 1
ELSE 0 END
""")
).withColumn(
"asin_type",
F.expr("""
CASE WHEN asin_is_self = 1 THEN 1
WHEN asin_is_need = 1 THEN 2
WHEN asin_is_hide = 1 THEN 3
ELSE 0 END
""")
).drop("hide_flag", "asin_is_self", "asin_is_need", "asin_is_hide")
self.df_flow_asin = df.repartition(40, 'asin').cache()
print(f"组装完成,总数据量: {self.df_flow_asin.count()}")
# ------------------------------------------------------------------ #
# 步骤2:数据处理 — 筛选(month_week)+ 排除keepa / 已导出ASIN #
# ------------------------------------------------------------------ #
def handle_data(self):
df = self.df_flow_asin
# month_week:字段在 Python 中计算,需在此处做条件过滤
# month:SQL 中已完成过滤,直接跳过此步
if self.date_type == 'month_week':
print("6. [month_week] 筛选目标ASIN")
df = df.filter(
F.col("asin_type").isin(0, 1, 3)
).filter(
F.col("asin_price").isNotNull() & (F.col("asin_price") > 0)
).filter(
(F.col("asin_bought_month") >= 50) | (F.col("first_category_rank") <= 600000)
).filter(
F.col("category_first_id").isNull() |
~F.col("category_first_id").isin(*EXCLUDE_CATEGORIES)
)
df = df.cache()
print(f"筛选后数据量: {df.count()}")
# 排除 dim_keepa_asin_info 中已有有效keepa数据的ASIN
# 若 package_length/width/height/weight 任意一个 < 0,视为数据异常,不排除(需重新抓取)
print("7. 排除已有keepa数据的ASIN (dim_keepa_asin_info)")
df_keepa = self.spark.sql(f"""
select asin from dim_keepa_asin_info
where site_name = '{self.site_name}'
and package_length >= 0
and package_width >= 0
and package_height >= 0
and weight >= 0
""").repartition(40, 'asin')
df = df.join(df_keepa, on='asin', how='left_anti').cache()
print(f"排除keepa后数据量: {df.count()}")
# 排除 {pg_table} 中已导出的ASIN
print(f"8. 排除已导出的ASIN ({self.pg_table})")
pg_con_info = DBUtil.get_connection_info("postgresql_cluster", self.site_name)
df_exported = SparkUtil.read_jdbc_query(
session=self.spark,
url=pg_con_info['url'],
username=pg_con_info['username'],
pwd=pg_con_info['pwd'],
query=f"select asin from {self.pg_table}"
).repartition(40, 'asin')
df = df.join(df_exported, on='asin', how='left_anti').cache()
print(f"排除已导出后数据量: {df.count()}")
self.df_save = df.select(
F.col("asin"),
F.lit(self.date_info).alias("month")
).cache()
# ------------------------------------------------------------------ #
# 步骤3:写入 PostgreSQL #
# ------------------------------------------------------------------ #
def save_data(self):
total = self.df_save.count()
print(f"9. 写入 PostgreSQL 表 {self.pg_table},共 {total} 条")
con_info = DBUtil.get_connection_info('postgresql_cluster', self.site_name)
self.df_save.write.format("jdbc") \
.option("url", con_info["url"]) \
.option("dbtable", self.pg_table) \
.option("user", con_info["username"]) \
.option("password", con_info["pwd"]) \
.mode("append") \
.save()
print(f"写入完毕")
def run(self):
self.read_data()
self.handle_data()
self.save_data()
if __name__ == "__main__":
site_name = sys.argv[1] # 参数1:站点,如 us / uk / de
date_type = sys.argv[2] # 参数2:month 或 month_week
date_info = sys.argv[3] # 参数3:年-月 / 年-月_周,如 2026-02
handle_obj = ExportAsinWithoutKeepa(
site_name=site_name, date_info=date_info, date_type=date_type
)
handle_obj.run()
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
print(f"执行参数为{sys.argv}")
# 获取数据库引擎
db_type = "postgresql_15"
engine = get_remote_engine(
site_name='us',
db_type=db_type
)
if site_name == 'us':
export_tb = f"ai_asin_detail_month_{date_info.replace('-', '_')}"
else:
export_tb = f"{site_name}_ai_asin_detail_month_{date_info.replace('-', '_')}"
# 导出数据
engine.sqoop_raw_export(
hive_table="dwt_ai_asin_add",
import_table=export_tb,
partitions={
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
},
m=30,
cols="site_name,asin,weight,bought_month,category,img,title,brand,account_name,account_addr,buy_box_seller_type,"
"launch_time,img_num,variation_flag,variation_num,ao_val,category_id,category_current_id,parent_asin,bsr_rank,"
"price,rating,total_comments,seller_id,fb_country_name,review_json_list,launch_time_type,describe,product_json,"
"product_detail_json,bought_month_mom,bought_month_yoy,is_new_flag,is_ascending_flag"
)
print("success")
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"ods_merchantwords_brand_analytics"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
year, month, day = date_info.split("-")
db_type = 'postgresql_16'
import_table = f"{site_name}_merchantwords_brand_analytics_{year}_{month}_{day}"
sql_query = f"""
select
id,
search_term,
quantity_being_sold,
created_time,
updated_time,
quantity_being_sold_str,
result_count,
departments
from {import_table}
where 1=1
and \$CONDITIONS
"""
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'],
partition_dict=partition_dict)
# 生成导出脚本
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path,
map_num=1,
key='id'
)
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
client.close()
CommonUtil.check_import_sync_num(db_type=db_type,
partition_dict=partition_dict,
import_query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'])
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"ods_merchantwords_other_search_term_data"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 落表路径校验
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
year, month, day = date_info.split("-")
db_type = 'postgresql_16'
import_table = f"{site_name}_merchantwords_other_search_term_{year}_{month}_{day}"
sql_query = f"""
select
id,
search_term,
asin,
page,
buy_data,
label,
created_time,
updated_time
from {import_table}
where 1=1
and \$CONDITIONS
"""
# 进行schema和数据校验
CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=sql_query,
hive_tb_name=hive_table,
msg_usr=['chenyuanjie'],
partition_dict=partition_dict)
import_sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=sql_query,
hdfs_path=hdfs_path,
map_num=35,
key='id'
)
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, import_sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_table)
client.close()
\ No newline at end of file
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
st_type = CommonUtil.get_sys_arg(2, None)
date_type = CommonUtil.get_sys_arg(3, None)
date_info = CommonUtil.get_sys_arg(4, None)
assert site_name is not None, "site_name 不能为空!"
assert st_type is not None, "st_type 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
if site_name == 'us':
if date_info == '2024-05-05':
if st_type == "bs":
quit()
elif date_info in ['2024-06-06', '2024-06-07', '2024-06-08']:
if st_type == "hr":
quit()
hive_tb = f"ods_merchantwords_search_term_{st_type}"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
if st_type in ["zr", "sp"]:
cols = "search_term,asin,page,page_row,id,created_time,updated_time"
elif st_type in ["sb"]:
cols = "search_term,asin,page,data_type,id,created_time,updated_time"
else:
cols = "search_term,asin,page,created_time,updated_time"
db_type = 'postgresql_16'
year, month, day = date_info.split("-")
import_tb = f"{site_name}_merchantwords_search_term_rank_{st_type}_{year}_{month}_{day}"
query = f"""
select
{cols}
from
{import_tb}
where 1 = 1
and \$CONDITIONS
"""
print(f"当前同步的表为:{import_tb}")
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict)
print(f"hdfs_path is {hdfs_path}")
empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type,
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['chenyuanjie'],
partition_dict=partition_dict
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
if not empty_flag:
if st_type in ["zr", "sp", "sb"]:
if st_type == "zr":
map_num = 50
elif st_type == "sp":
map_num = 25
else:
map_num = 15
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path,
map_num=map_num,
key="id")
else:
sh = CommonUtil.build_import_sh(site_name=site_name,
db_type=db_type,
query=query,
hdfs_path=hdfs_path)
HdfsUtils.delete_hdfs_file(hdfs_path)
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, sh, ignore_err=False)
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb)
client.close()
pass
\ No newline at end of file
...@@ -498,3 +498,26 @@ class DolphinschedulerHelper(object): ...@@ -498,3 +498,26 @@ class DolphinschedulerHelper(object):
return resp_json['msg'] return resp_json['msg']
else: else:
raise Exception(f"任务停止失败") raise Exception(f"任务停止失败")
@classmethod
def list_projects_task(cls, project_name: str = _def_project_name, req_params=None):
"""
根据当前运行脚本判断是否是海豚上正在运行的任务并获取任务参数
:param project_name: 默认是 big_data_selection
"""
if req_params is None:
req_params = {}
project_map = cls.get_project_map()
project_code = project_map.get(project_name)
url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-instances"
resp = requests.get(
url,
headers=cls.get_http_header(),
params=req_params
)
resp_json = json.loads(resp.content.decode("utf-8"))
if bool(resp_json['success']):
return resp_json['data']['totalList']
else:
return None
...@@ -1884,17 +1884,24 @@ outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' ...@@ -1884,17 +1884,24 @@ outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
df_latest_asin_detail_with_parent = df_latest_asin_detail_with_parent.withColumnRenamed(f"new_{column}", f"{column}") df_latest_asin_detail_with_parent = df_latest_asin_detail_with_parent.withColumnRenamed(f"new_{column}", f"{column}")
return df_asin_detail, df_latest_asin_detail_with_parent return df_asin_detail, df_latest_asin_detail_with_parent
@staticmethod
def send_msg_robot(msg: str):
webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=c519c702-6164-45c8-98b5-a87e52150f19"
headers = {
"Content-Type": "application/json"
}
data = {
"msgtype": "text",
"text": {
"content": msg
}
}
resp = requests.post(webhook_url, json=data, headers=headers)
if resp.status_code == 200:
result = resp.json()
if result.get("errcode") == 0:
print("发送成功")
else:
print("发送失败:", result)
else:
print("HTTP错误:", resp.status_code, resp.text)
...@@ -1044,6 +1044,141 @@ class EsUtils(object): ...@@ -1044,6 +1044,141 @@ class EsUtils(object):
}, },
"img_type_arr": { "img_type_arr": {
"type": "integer" "type": "integer"
},
"date_info_del": {
"type": "keyword"
}
}
}
}
@staticmethod
def get_es_ai_body():
return {
"settings": {
"number_of_shards": "3",
"number_of_replicas": "1",
"analysis": {
"filter": {
"en_snowball": {
"type": "snowball",
"language": "English"
},
"en_synonym": {
"type": "synonym_graph",
"synonyms_path": "analysis/synonyms_en.txt",
"updateable": "true"
}
},
"analyzer": {
"en_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase",
"en_snowball"
]
},
"en_search_analyzer": {
"tokenizer": "standard",
"filter": [
"lowercase",
"en_synonym",
"en_snowball"
]
}
},
"normalizer": {
"lowercase_normalizer": {
"type": "custom",
"char_filter": [],
"filter": [
"lowercase"
]
}
}
}
},
"mappings": {
"properties": {
"asin": {"type": "keyword"},
"parent_asin": {"type": "keyword"},
"site_name": {"type": "keyword"},
"analyze_id": {"type": "integer"},
"seller_id": {"type": "keyword"},
"title": {
"type": "text",
"analyzer": "en_analyzer",
"search_analyzer": "en_search_analyzer"
},
"img": {"type": "keyword"},
"img_num": {"type": "integer"},
"launch_time": {"type": "keyword"},
"launch_time_type": {"type": "integer"},
"price": {"type": "scaled_float", "scaling_factor": 100},
"rating": {"type": "scaled_float", "scaling_factor": 100},
"total_comments": {"type": "integer"},
"bought_month": {"type": "integer"},
"bought_month_mom": {"type": "scaled_float", "scaling_factor": 100},
"bought_month_yoy": {"type": "scaled_float", "scaling_factor": 100},
"bsr_rank": {"type": "integer"},
"bsr_rank_str": {"type": "keyword"},
"ao_val": {"type": "scaled_float", "scaling_factor": 100},
"variation_flag": {"type": "integer"},
"variation_num": {"type": "integer"},
"is_ascending_flag": {"type": "integer"},
"is_new_flag": {"type": "integer"},
"buy_box_seller_type": {"type": "keyword"},
"category": {"type": "keyword"},
"category_id": {"type": "keyword"},
"category_current_id": {"type": "keyword"},
"festival": {"type": "keyword"},
"fb_country_name": {"type": "keyword"},
"profit_key": {"type": "keyword"},
"multi_color_flag": {"type": "keyword"},
"package_quantity_flag": {"type": "keyword"},
"title_pic_flag": {"type": "keyword"},
"title_word_flag": {"type": "keyword"},
"account_addr": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"account_name": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"appearance": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"brand": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"color": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"crowd": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"function": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"material": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"multi_color_content": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"package_quantity": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"package_quantity_arr": {"type": "integer"},
"scene_comment": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"scene_title": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"shape": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"short_desc": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"size": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"theme": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"title_pic_content": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"title_word_content": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"uses": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"weight": {"type": "keyword", "normalizer": "lowercase_normalizer"},
"label_content": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
}
}
},
"profit_rate_extra": {
"type": "object",
"properties": {
"ocean_profit": {
"type": "float"
},
"air_profit": {
"type": "float"
}
}
} }
} }
} }
......
...@@ -93,6 +93,8 @@ class Templates(object): ...@@ -93,6 +93,8 @@ class Templates(object):
# 测试标识 # 测试标识
self.test_flag = 'normal' self.test_flag = 'normal'
self.beginning_offsets_dict = {} # history消费时, 初始的偏移量 self.beginning_offsets_dict = {} # history消费时, 初始的偏移量
# 记录最后一次收到非空批次的时间,用于无新数据超时检测
self.last_data_time = time.time()
# redis连接对象--用来锁定--解决并发 # redis连接对象--用来锁定--解决并发
self.client = get_redis_h14() self.client = get_redis_h14()
...@@ -453,11 +455,25 @@ class Templates(object): ...@@ -453,11 +455,25 @@ class Templates(object):
self.query.awaitTermination() self.query.awaitTermination()
def handle_kafka_stream_templates(self, kafka_df, epoch_id): def handle_kafka_stream_templates(self, kafka_df, epoch_id):
if self.spider_type == 'asin详情' and kafka_df.count() > 0: has_data = self.spider_type == 'asin详情' and kafka_df.count() > 0
if has_data:
kafka_df = self.deduplication_kafka_data(kafka_df, "asin", "asinUpdateTime") kafka_df = self.deduplication_kafka_data(kafka_df, "asin", "asinUpdateTime")
self.handle_kafka_stream(kafka_df, epoch_id) self.handle_kafka_stream(kafka_df, epoch_id)
if has_data:
# 处理完成后更新时间戳,避免长批次处理耗时误触发超时
self.last_data_time = time.time()
if self.test_flag == 'normal': if self.test_flag == 'normal':
self.kafka_consumption_is_finished() self.kafka_consumption_is_finished()
# 仅当前批次无新数据时才做超时检测
# 若距上次有效数据已超过 30 分钟,说明爬虫可能已完成但状态表尚未更新
# 进入轮询,每 2 分钟重新检查一次,直到状态更新后 kafka_consumption_is_finished() 内部 exit(0)
if not has_data:
elapsed = time.time() - self.last_data_time
if elapsed > 30 * 60:
print(f"[超时检测] 已 {elapsed / 60:.1f} 分钟无新数据,进入状态轮询(每2分钟检查一次),等待爬虫状态更新")
while True:
time.sleep(120)
self.kafka_consumption_is_finished()
def handle_kafka_stream(self, kafka_df, epoch_id): def handle_kafka_stream(self, kafka_df, epoch_id):
pass pass
...@@ -657,14 +673,19 @@ class Templates(object): ...@@ -657,14 +673,19 @@ class Templates(object):
wx_msg = f"站点: {self.site_name}, {self.date_type}, {self.date_info} asin详情实时消费数据到redis准备工作已完成,可以开启详情爬取!" wx_msg = f"站点: {self.site_name}, {self.date_type}, {self.date_info} asin详情实时消费数据到redis准备工作已完成,可以开启详情爬取!"
else: else:
pass pass
try: sql = f"UPDATE selection.workflow_progress SET {kafka_field}=3, updated_at=CURRENT_TIMESTAMP where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and page='asin详情'"
sql = f"UPDATE selection.workflow_progress SET {kafka_field}=3, updated_at=CURRENT_TIMESTAMP where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and page='asin详情'" for retry in range(5):
DBUtil.exec_sql('mysql', 'us', sql) try:
CommonUtil.send_wx_msg(wx_users, f"asin详情kafka消费", wx_msg) DBUtil.exec_sql('mysql', 'us', sql)
except Exception as e: CommonUtil.send_wx_msg(wx_users, f"asin详情kafka消费", wx_msg)
print(e, traceback.format_exc()) break
CommonUtil.send_wx_msg(wx_users, f"\u26A0asin详情kafka实时消费\u26A0", except Exception as e:
f"站点: {self.site_name} asin详情实时消费准备失败,请等待处理!") print(f"UPDATE workflow_progress 失败(第{retry + 1}次),等待10s重试", e, traceback.format_exc())
if retry == 4:
CommonUtil.send_wx_msg(wx_users, f"\u26A0asin详情kafka实时消费\u26A0",
f"站点: {self.site_name} asin详情实时消费准备失败,请等待处理!")
else:
time.sleep(10)
else: else:
pass pass
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment