Commit 30b05aea by chenyuanjie

流量选品-月数据导出Doris

parent f7096a8c
"""
author: CT
description: 同步 Hive dwt_flow_asin 月维度数据到 Doris dwt.{site}_flow_asin_month
流程:
1) 在 Doris 建当月物化表 selection.{site}_flow_asin_month_{yyyy_mm}(IF NOT EXISTS)
2) Spark 读 Hive dwt_flow_asin 月数据,规范化后写入 dwt.{site}_flow_asin_month
3) Doris 端执行 INSERT OVERWRITE 把 dwt 主表 + 关联 JOIN 物化到 selection 月表
支持 us / uk / de 三站点
执行示例:
spark-submit dwt_flow_asin_month.py us 2026-05
spark-submit dwt_flow_asin_month.py uk 2026-05
spark-submit dwt_flow_asin_month.py de 2026-05
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from pyspark.sql import functions as F
from utils.spark_util import SparkUtil
from utils.DorisHelper import DorisHelper
DORIS_DB = "dwt"
SUPPORTED_SITES = ("us", "uk", "de")
def _exec_doris_sql(sql_list, use_type='selection'):
"""通过 pymysql 走 Doris jdbc_port 执行 DDL / INSERT 等
指定 database=selection 让 UDF(如 stem_en)可见"""
import pymysql
conn_info = DorisHelper.get_connection_info(use_type)
conn = pymysql.connect(
host=conn_info['ip'],
port=conn_info['jdbc_port'],
user=conn_info['user'],
password=conn_info['pwd'],
database='selection',
charset='utf8mb4',
autocommit=True,
)
try:
cur = conn.cursor()
for sql in sql_list:
print(f"[Doris SQL] {sql[:200]}{'...' if len(sql) > 200 else ''}")
cur.execute(sql)
cur.close()
finally:
conn.close()
def build_create_table_sql(site_name, date_info_underscore, date_info):
"""构建 selection.{site}_flow_asin_month_{yyyy_mm} 建表语句(与 DDL 一致)"""
table_name = f"{site_name}_flow_asin_month_{date_info_underscore}"
return f"""
CREATE TABLE IF NOT EXISTS `selection`.`{table_name}`
(
`asin` VARCHAR(20) NOT NULL COMMENT 'ASIN',
`parent_asin` STRING NULL COMMENT '父ASIN',
`collapse_asin` STRING NULL COMMENT '聚合ASIN',
`asin_crawl_date` DATETIME NULL COMMENT 'Kafka消息抓取时间',
`title` STRING NULL COMMENT '标题(lowercase)',
`title_stem` STRING NULL COMMENT '标题词干(stem_en派生)',
`title_len` INT NULL COMMENT '标题字符长度',
`brand` STRING NULL COMMENT '品牌(lowercase)',
`asin_describe` STRING NULL COMMENT '五点描述',
`describe_len` INT NULL COMMENT '五点描述字符长度',
`product_features` STRING NULL COMMENT '产品评论/特色JSON',
`together_asin` STRING NULL COMMENT '组合购买推荐ASIN',
`price` DECIMAL(20,2) NULL,
`fbm_price` DECIMAL(20,2) NULL,
`rating` DECIMAL(10,1) NULL,
`total_comments` INT NULL,
`one_star` INT NULL,
`two_star` INT NULL,
`three_star` INT NULL,
`four_star` INT NULL,
`five_star` INT NULL,
`low_star` INT NULL,
`bsr_orders` INT NULL,
`bsr_orders_sale` DECIMAL(20,2) NULL,
`asin_bought_month` INT NULL,
`ao_val` DECIMAL(20,4) NULL,
`zr_counts` INT NULL,
`sp_counts` INT NULL,
`sb_counts` INT NULL,
`vi_counts` INT NULL,
`bs_counts` INT NULL,
`ac_counts` INT NULL,
`tr_counts` INT NULL,
`er_counts` INT NULL,
`zr_flow_proportion` DECIMAL(20,4) NULL,
`matrix_flow_proportion` DECIMAL(20,4) NULL,
`matrix_ao_val` DECIMAL(20,4) NULL,
`one_two_val` DECIMAL(20,4) NULL,
`three_four_val` DECIMAL(20,4) NULL,
`five_six_val` DECIMAL(20,4) NULL,
`eight_val` DECIMAL(20,4) NULL,
`category_first_id` STRING NULL,
`category_id` STRING NULL,
`first_category_rank` INT NULL,
`current_category_rank` INT NULL,
`weight` DECIMAL(20,4) NULL,
`volume` STRING NULL,
`asin_weight_ratio` DECIMAL(20,4) NULL,
`color` STRING NULL,
`size` STRING NULL,
`style` STRING NULL,
`material` STRING NULL,
`package_quantity` INT NULL,
`is_package_quantity_abnormal` TINYINT NULL,
`variation_num` INT NULL,
`page_inventory` INT NULL,
`activity_type` STRING NULL,
`launch_time` DATETIME NULL,
`launch_time_type` INT NULL,
`img_url` STRING NULL,
`img_num` INT NULL,
`img_type_arr` ARRAY<INT> NULL,
`img_info` STRING NULL,
`account_id` STRING NULL,
`account_name` STRING NULL,
`buy_box_seller_type` TINYINT NULL,
`site_name` STRING NULL,
`follow_sellers_count` INT NULL,
`asin_lob_info` STRING NULL,
`is_contains_lob_info` TINYINT NULL,
`asin_lqs_rating` DECIMAL(20,1) NULL,
`asin_lqs_rating_detail` STRING NULL,
`amazon_label` STRING NULL,
`is_movie_label` TINYINT NULL,
`is_brand_label` TINYINT NULL,
`multi_color_flag` TINYINT NULL,
`multi_color_str` STRING NULL,
`rank_type` TINYINT NULL,
`ao_val_type` TINYINT NULL,
`price_type` TINYINT NULL,
`rating_type` TINYINT NULL,
`size_type` TINYINT NULL,
`weight_type` TINYINT NULL,
`site_name_type` TINYINT NULL,
`quantity_variation_type` TINYINT NULL,
`rank_rise` INT NULL,
`rank_change` DECIMAL(20,4) NULL,
`rank_yoy` DECIMAL(20,4) NULL,
`ao_rise` DECIMAL(20,4) NULL,
`ao_change` DECIMAL(20,4) NULL,
`ao_yoy` DECIMAL(20,4) NULL,
`price_rise` DECIMAL(20,2) NULL,
`price_change` DECIMAL(20,4) NULL,
`price_yoy` DECIMAL(20,4) NULL,
`rating_rise` DECIMAL(20,1) NULL,
`rating_change` DECIMAL(20,4) NULL,
`rating_yoy` DECIMAL(20,4) NULL,
`comments_rise` INT NULL,
`comments_change` DECIMAL(20,4) NULL,
`comments_yoy` DECIMAL(20,4) NULL,
`bsr_orders_rise` INT NULL,
`bsr_orders_change` DECIMAL(20,4) NULL,
`bsr_orders_yoy` DECIMAL(20,4) NULL,
`sales_rise` DECIMAL(20,2) NULL,
`sales_change` DECIMAL(20,4) NULL,
`sales_yoy` DECIMAL(20,4) NULL,
`variation_rise` INT NULL,
`variation_change` DECIMAL(20,4) NULL,
`variation_yoy` DECIMAL(20,4) NULL,
`bought_month_mom` DECIMAL(20,4) NULL,
`bought_month_yoy` DECIMAL(20,4) NULL,
`ocean_profit` DECIMAL(20,4) NULL,
`air_profit` DECIMAL(20,4) NULL,
`tracking_since` STRING NULL,
`tracking_since_type` INT NULL,
`package_length` INT NULL,
`package_width` INT NULL,
`package_height` INT NULL,
`item_weight` INT NULL,
`is_alarm_brand` INT NULL,
`bsr_best_orders_type` SMALLINT NULL,
`asin_source_flag` ARRAY<INT> NULL,
`bsr_last_seen_at` DATE NULL,
`bsr_seen_count_30d` INT NULL,
`nsr_last_seen_at` DATE NULL,
`nsr_seen_count_30d` INT NULL,
`asin_type` SMALLINT NULL,
`usr_mask_progress` STRING NULL,
`usr_mask_type` STRING NULL,
`auctions_num` INT NULL,
`auctions_num_all` INT NULL,
`skus_num_creat` INT NULL,
`skus_num_creat_all` INT NULL,
`title_matching_degree` DECIMAL(20,4) NULL,
INDEX idx_title (`title`) USING INVERTED PROPERTIES("parser" = "english") COMMENT '标题倒排索引',
INDEX idx_title_stem (`title_stem`) USING INVERTED PROPERTIES("parser" = "english") COMMENT '标题词干倒排索引'
) ENGINE=OLAP
UNIQUE KEY(`asin`)
COMMENT '流量选品月维度物化表 {date_info}'
DISTRIBUTED BY HASH(`asin`) BUCKETS 32
PROPERTIES (
"replication_num" = "3",
"enable_unique_key_merge_on_write" = "true"
)
"""
def build_insert_overwrite_sql(site_name, date_info_underscore, date_info):
"""构建 INSERT OVERWRITE 到 selection.{site}_flow_asin_month_{yyyy_mm} 的 SQL"""
table_name = f"{site_name}_flow_asin_month_{date_info_underscore}"
return f"""
INSERT OVERWRITE TABLE `selection`.`{table_name}`
SELECT
f.asin,
f.parent_asin,
f.collapse_asin,
f.asin_crawl_date,
f.title,
stem_en(f.title) AS title_stem,
f.title_len,
f.brand,
f.asin_describe,
f.describe_len,
f.product_features,
f.together_asin,
f.price,
f.fbm_price,
f.rating,
f.total_comments,
f.one_star, f.two_star, f.three_star, f.four_star, f.five_star, f.low_star,
f.bsr_orders,
f.bsr_orders_sale,
f.asin_bought_month,
f.ao_val,
f.zr_counts,
f.sp_counts, f.sb_counts, f.vi_counts, f.bs_counts,
f.ac_counts, f.tr_counts, f.er_counts,
f.zr_flow_proportion,
f.matrix_flow_proportion,
f.matrix_ao_val,
f.one_two_val, f.three_four_val, f.five_six_val, f.eight_val,
f.category_first_id,
f.category_id,
f.first_category_rank,
f.current_category_rank,
f.weight,
f.volume,
f.asin_weight_ratio,
f.color, f.size, f.style, f.material,
COALESCE(uma.package_quantity, f.package_quantity) AS package_quantity,
f.is_package_quantity_abnormal,
f.variation_num,
f.page_inventory,
f.activity_type,
COALESCE(f.launch_time, kp.keepa_launch_time) AS launch_time,
CASE
WHEN COALESCE(f.launch_time, kp.keepa_launch_time) IS NULL THEN 0
WHEN DATEDIFF(f.asin_crawl_date, COALESCE(f.launch_time, kp.keepa_launch_time)) <= 30 THEN 1
WHEN DATEDIFF(f.asin_crawl_date, COALESCE(f.launch_time, kp.keepa_launch_time)) <= 90 THEN 2
WHEN DATEDIFF(f.asin_crawl_date, COALESCE(f.launch_time, kp.keepa_launch_time)) <= 180 THEN 3
WHEN DATEDIFF(f.asin_crawl_date, COALESCE(f.launch_time, kp.keepa_launch_time)) <= 360 THEN 4
WHEN DATEDIFF(f.asin_crawl_date, COALESCE(f.launch_time, kp.keepa_launch_time)) <= 720 THEN 5
WHEN DATEDIFF(f.asin_crawl_date, COALESCE(f.launch_time, kp.keepa_launch_time)) <= 1080 THEN 6
ELSE 7
END AS launch_time_type,
f.img_url,
f.img_num,
f.img_type AS img_type_arr,
f.img_info,
f.account_id,
f.account_name,
f.buy_box_seller_type,
f.site_name,
f.follow_sellers_count,
f.asin_lob_info,
f.is_contains_lob_info,
f.asin_lqs_rating,
f.asin_lqs_rating_detail,
f.amazon_label,
f.is_movie_label,
f.is_brand_label,
f.multi_color_flag,
f.multi_color_str,
f.rank_type, f.ao_val_type, f.price_type, f.rating_type,
f.size_type, f.weight_type, f.site_name_type, f.quantity_variation_type,
f.rank_rise, f.rank_mom AS rank_change, f.rank_yoy,
f.ao_rise, f.ao_mom AS ao_change, f.ao_yoy,
f.price_rise, f.price_mom AS price_change, f.price_yoy,
f.rating_rise, f.rating_mom AS rating_change, f.rating_yoy,
f.comments_rise, f.comments_mom AS comments_change, f.comments_yoy,
f.bsr_orders_rise, f.bsr_orders_mom AS bsr_orders_change, f.bsr_orders_yoy,
f.sales_rise, f.sales_mom AS sales_change, f.sales_yoy,
f.variation_rise, f.variation_mom AS variation_change, f.variation_yoy,
f.bought_month_mom, f.bought_month_yoy,
pr.ocean_profit, pr.air_profit,
FROM_UNIXTIME((CAST(kp.tracking_since AS BIGINT) + 21564000) * 60) AS tracking_since,
CASE
WHEN kp.tracking_since IS NULL OR kp.tracking_since <= 0 THEN 0
WHEN DATEDIFF(f.asin_crawl_date, FROM_UNIXTIME((CAST(kp.tracking_since AS BIGINT) + 21564000) * 60)) <= 30 THEN 1
WHEN DATEDIFF(f.asin_crawl_date, FROM_UNIXTIME((CAST(kp.tracking_since AS BIGINT) + 21564000) * 60)) <= 90 THEN 2
WHEN DATEDIFF(f.asin_crawl_date, FROM_UNIXTIME((CAST(kp.tracking_since AS BIGINT) + 21564000) * 60)) <= 180 THEN 3
WHEN DATEDIFF(f.asin_crawl_date, FROM_UNIXTIME((CAST(kp.tracking_since AS BIGINT) + 21564000) * 60)) <= 360 THEN 4
WHEN DATEDIFF(f.asin_crawl_date, FROM_UNIXTIME((CAST(kp.tracking_since AS BIGINT) + 21564000) * 60)) <= 720 THEN 5
WHEN DATEDIFF(f.asin_crawl_date, FROM_UNIXTIME((CAST(kp.tracking_since AS BIGINT) + 21564000) * 60)) <= 1080 THEN 6
ELSE 7
END AS tracking_since_type,
kp.package_length,
kp.package_width,
kp.package_height,
CASE WHEN kp.item_weight > 0 THEN kp.item_weight ELSE kp.package_weight END AS item_weight,
CASE WHEN ba.brand_name_norm IS NOT NULL THEN 1 ELSE 0 END AS is_alarm_brand,
CAST(-1 AS SMALLINT) AS bsr_best_orders_type,
COALESCE(cf.asin_cate_flag, ARRAY(0)) AS asin_source_flag,
COALESCE(cf.bsr_latest_date, CAST('1970-01-01' AS DATE)) AS bsr_last_seen_at,
COALESCE(cf.bsr_30day_count, 0) AS bsr_seen_count_30d,
COALESCE(cf.nsr_latest_date, CAST('1970-01-01' AS DATE)) AS nsr_last_seen_at,
COALESCE(cf.nsr_30day_count, 0) AS nsr_seen_count_30d,
CAST(
CASE
WHEN sa.asin IS NOT NULL OR ab.brand_lower IS NOT NULL THEN 1
WHEN (
(COALESCE(chf.is_need_cat, 0) = 1 AND COALESCE(chd.is_need_cat, 0) = 1)
OR f.asin NOT LIKE 'B0%'
) THEN 2
WHEN (COALESCE(chf.is_hide_cat, 0) = 1 OR COALESCE(chc.is_hide_cat, 0) = 1)
AND NOT (COALESCE(chf.is_white_cat, 0) = 1 OR COALESCE(chc.is_white_cat, 0) = 1) THEN 3
ELSE 0
END
AS SMALLINT) AS asin_type,
uma.usr_mask_progress,
COALESCE(uma.usr_mask_type, umc.usr_mask_type) AS usr_mask_type,
COALESCE(aa.auctions_num, 0) AS auctions_num,
COALESCE(aa.auctions_num_all, 0) AS auctions_num_all,
COALESCE(aa.skus_num_creat, 0) AS skus_num_creat,
COALESCE(aa.skus_num_creat_all, 0) AS skus_num_creat_all,
f.title_matching_degree
FROM `dwt`.`{site_name}_flow_asin_month` f
LEFT JOIN `dwd`.`dwd_asin_profit_rate_latest` pr
ON f.asin = pr.asin AND f.price = pr.price AND pr.site_name = '{site_name}'
LEFT JOIN `dwd`.`dwd_keepa_asin_detail` kp
ON f.asin = kp.asin AND kp.site_name = '{site_name}'
LEFT JOIN (
SELECT DISTINCT LOWER(TRIM(brand_name)) AS brand_name_norm
FROM `selection`.`brand_alert_erp`
WHERE brand_name IS NOT NULL
) ba ON f.brand = ba.brand_name_norm
LEFT JOIN (
SELECT asin FROM `mysql_selection`.`selection`.`us_self_asin` GROUP BY asin
) sa ON f.asin = sa.asin
LEFT JOIN (
SELECT DISTINCT LOWER(TRIM(brand_name)) AS brand_lower
FROM `mysql_selection`.`selection`.`amazon_brand` WHERE brand_type = '1'
) ab ON LOWER(f.brand) = ab.brand_lower
LEFT JOIN (
SELECT category_id_base,
MAX(CASE WHEN hide_type = 'is_need' THEN 1 ELSE 0 END) AS is_need_cat,
MAX(CASE WHEN hide_type = 'is_hide' THEN 1 ELSE 0 END) AS is_hide_cat,
MAX(CASE WHEN hide_type = 'is_white' THEN 1 ELSE 0 END) AS is_white_cat
FROM `mysql_selection`.`selection`.`us_bs_category_hide` GROUP BY category_id_base
) chf ON f.category_first_id = chf.category_id_base
LEFT JOIN (
SELECT category_id_base,
MAX(CASE WHEN hide_type = 'is_need' THEN 1 ELSE 0 END) AS is_need_cat,
MAX(CASE WHEN hide_type = 'is_hide' THEN 1 ELSE 0 END) AS is_hide_cat,
MAX(CASE WHEN hide_type = 'is_white' THEN 1 ELSE 0 END) AS is_white_cat
FROM `mysql_selection`.`selection`.`us_bs_category_hide` GROUP BY category_id_base
) chc ON f.category_id = chc.category_id_base
LEFT JOIN (
SELECT category_id_base,
MAX(CASE WHEN hide_type = 'is_need' THEN 1 ELSE 0 END) AS is_need_cat,
MAX(CASE WHEN hide_type = 'is_hide' THEN 1 ELSE 0 END) AS is_hide_cat,
MAX(CASE WHEN hide_type = 'is_white' THEN 1 ELSE 0 END) AS is_white_cat
FROM `mysql_selection`.`selection`.`us_bs_category_hide` GROUP BY category_id_base
) chd ON f.desc_category_first_id = chd.category_id_base
LEFT JOIN `selection`.`user_mask_asin` uma ON f.asin = uma.asin
LEFT JOIN `selection`.`user_mask_category` umc ON f.category_id = umc.category_id
LEFT JOIN (
SELECT asin, asin_cate_flag, bsr_latest_date, bsr_30day_count, nsr_latest_date, nsr_30day_count
FROM `dwd`.`dwd_asin_source_flag`
WHERE date_info = (SELECT MAX(date_info) FROM `dwd`.`dwd_asin_source_flag` WHERE site_name = '{site_name}')
AND site_name = '{site_name}'
) cf ON f.asin = cf.asin
LEFT JOIN `dwd`.`dwd_asin_auction` aa ON f.asin = aa.asin
WHERE f.date_info = '{date_info}'
"""
def main(site_name, date_info):
assert site_name in SUPPORTED_SITES, f"不支持的站点:{site_name},仅支持 us/uk/de"
doris_table = f"{site_name}_flow_asin_month"
date_info_underscore = date_info.replace('-', '_')
selection_table = f"{site_name}_flow_asin_month_{date_info_underscore}"
spark = SparkUtil.get_spark_session(
f"DwtFlowAsinMonth: {site_name} {date_info}"
)
# ===== Step 0:Doris 端建 selection 月物化表(IF NOT EXISTS)=====
print(f"[Step 0] Doris 建表 selection.{selection_table}")
_exec_doris_sql([build_create_table_sql(site_name, date_info_underscore, date_info)])
# ===== Step 1:读 Hive dwt_flow_asin 月数据 =====
sql = f"""
SELECT
asin,
asin_ao_val,
asin_zr_counts, asin_sp_counts, asin_sb_counts, asin_vi_counts,
asin_bs_counts, asin_ac_counts, asin_tr_counts, asin_er_counts,
bsr_orders,
sales AS bsr_orders_sale,
asin_title, asin_title_len,
asin_price,
asin_rating, asin_total_comments,
asin_buy_box_seller_type,
asin_page_inventory,
asin_volume, asin_weight,
asin_color, asin_size, asin_style, asin_material,
asin_launch_time,
asin_img_num,
parent_asin,
asin_img_type,
asin_img_url,
asin_activity_type,
act_one_two_val, act_three_four_val, act_five_six_val, act_eight_val,
asin_brand_name,
variation_num,
one_star, two_star, three_star, four_star, five_star, low_star,
together_asin,
account_name, account_id,
asin_rank_rise, asin_rank_mom, asin_rank_yoy,
asin_ao_rise, asin_ao_mom, asin_ao_yoy,
asin_price_rise, asin_price_mom, asin_price_yoy,
asin_rating_rise, asin_rating_mom, asin_rating_yoy,
asin_comments_rise, asin_comments_mom, asin_comments_yoy,
asin_bsr_orders_rise, asin_bsr_orders_mom, asin_bsr_orders_yoy,
asin_sales_rise, asin_sales_mom, asin_sales_yoy,
asin_variation_rise, asin_variation_mom, asin_variation_yoy,
asin_bought_mom, asin_bought_yoy,
asin_size_type, asin_rating_type, asin_site_name_type, asin_weight_type,
asin_ao_val_type, asin_rank_type, asin_price_type,
asin_quantity_variation_type, package_quantity,
is_movie_label, is_brand_label,
asin_crawl_date,
category_first_id, category_id, desc_category_first_id,
first_category_rank, current_category_rank,
asin_weight_ratio,
seller_country_name,
asin_bought_month,
asin_lqs_rating, asin_lqs_rating_detail,
asin_lob_info, is_contains_lob_info,
is_package_quantity_abnormal,
zr_flow_proportion, matrix_flow_proportion, matrix_ao_val,
customer_reviews_json,
img_info,
coalesce(parent_asin, asin) AS collapse_asin,
follow_sellers_count,
asin_describe, asin_fbm_price, describe_len,
title_matching_degree,
multi_color_flag, multi_color_str,
amazon_label
FROM dwt_flow_asin
WHERE site_name = '{site_name}' AND date_type = 'month' AND date_info = '{date_info}'
"""
print(f"sql=\n{sql}")
df_raw = spark.sql(sqlQuery=sql).repartition(40, 'asin')
# ===== Step 2:字段规范化(与 dwt.us_flow_asin_30day DDL 对齐)=====
def _norm_dt(col_name):
"""text → DATETIME 容错:先 yyyy-MM-dd HH:mm:ss,失败退到 yyyy-MM-dd"""
c = F.col(col_name)
return F.coalesce(
F.to_timestamp(c, 'yyyy-MM-dd HH:mm:ss'),
F.to_timestamp(c, 'yyyy-MM-dd'),
)
df_save = df_raw.select(
F.lit(date_info).alias('date_info'),
F.col('asin'),
# ===== 核心选品指标 =====
F.round(F.col('asin_ao_val').cast('double'), 4).cast('decimal(20,4)').alias('ao_val'),
F.col('asin_zr_counts').cast('int').alias('zr_counts'),
F.col('asin_sp_counts').cast('int').alias('sp_counts'),
F.col('asin_sb_counts').cast('int').alias('sb_counts'),
F.col('asin_vi_counts').cast('int').alias('vi_counts'),
F.col('asin_bs_counts').cast('int').alias('bs_counts'),
F.col('asin_ac_counts').cast('int').alias('ac_counts'),
F.col('asin_tr_counts').cast('int').alias('tr_counts'),
F.col('asin_er_counts').cast('int').alias('er_counts'),
F.col('bsr_orders').cast('int').alias('bsr_orders'),
F.round(F.col('bsr_orders_sale').cast('double'), 2).cast('decimal(20,2)').alias('bsr_orders_sale'),
# ===== ASIN 基础属性 =====
F.col('asin_title').alias('title'),
F.col('asin_title_len').cast('int').alias('title_len'),
F.round(F.col('asin_price').cast('double'), 2).cast('decimal(20,2)').alias('price'),
F.round(F.col('asin_rating').cast('double'), 1).cast('decimal(10,1)').alias('rating'),
F.col('asin_total_comments').cast('int').alias('total_comments'),
F.col('asin_buy_box_seller_type').cast('tinyint').alias('buy_box_seller_type'),
F.col('asin_page_inventory').cast('int').alias('page_inventory'),
F.col('asin_volume').alias('volume'),
F.round(F.col('asin_weight').cast('double'), 4).cast('decimal(20,4)').alias('weight'),
F.col('asin_color').alias('color'),
F.col('asin_size').alias('size'),
F.col('asin_style').alias('style'),
F.col('asin_material').alias('material'),
_norm_dt('asin_launch_time').alias('launch_time'),
F.col('asin_img_num').cast('int').alias('img_num'),
F.col('parent_asin'),
# asin_img_type string "1,2,3" → ARRAY<INT> → 再 to_json 成 "[1,2,3]"(Doris StreamLoad ARRAY<INT> 要求)
F.to_json(F.expr("transform(split(asin_img_type, ','), x -> cast(x as int))")).alias('img_type'),
F.col('asin_img_url').alias('img_url'),
F.col('asin_activity_type').alias('activity_type'),
F.round(F.col('act_one_two_val').cast('double'), 4).cast('decimal(20,4)').alias('one_two_val'),
F.round(F.col('act_three_four_val').cast('double'), 4).cast('decimal(20,4)').alias('three_four_val'),
F.round(F.col('act_five_six_val').cast('double'), 4).cast('decimal(20,4)').alias('five_six_val'),
F.round(F.col('act_eight_val').cast('double'), 4).cast('decimal(20,4)').alias('eight_val'),
F.col('asin_brand_name').alias('brand'),
F.col('variation_num').cast('int').alias('variation_num'),
F.col('one_star').cast('int').alias('one_star'),
F.col('two_star').cast('int').alias('two_star'),
F.col('three_star').cast('int').alias('three_star'),
F.col('four_star').cast('int').alias('four_star'),
F.col('five_star').cast('int').alias('five_star'),
F.col('low_star').cast('int').alias('low_star'),
F.col('together_asin'),
F.col('account_name'),
F.col('account_id'),
# ===== 环比/同比变化率 =====
F.col('asin_rank_rise').cast('int').alias('rank_rise'),
F.round(F.col('asin_rank_mom').cast('double'), 4).cast('decimal(20,4)').alias('rank_mom'),
F.round(F.col('asin_rank_yoy').cast('double'), 4).cast('decimal(20,4)').alias('rank_yoy'),
F.round(F.col('asin_ao_rise').cast('double'), 4).cast('decimal(20,4)').alias('ao_rise'),
F.round(F.col('asin_ao_mom').cast('double'), 4).cast('decimal(20,4)').alias('ao_mom'),
F.round(F.col('asin_ao_yoy').cast('double'), 4).cast('decimal(20,4)').alias('ao_yoy'),
F.round(F.col('asin_price_rise').cast('double'), 2).cast('decimal(20,2)').alias('price_rise'),
F.round(F.col('asin_price_mom').cast('double'), 4).cast('decimal(20,4)').alias('price_mom'),
F.round(F.col('asin_price_yoy').cast('double'), 4).cast('decimal(20,4)').alias('price_yoy'),
F.round(F.col('asin_rating_rise').cast('double'), 1).cast('decimal(20,1)').alias('rating_rise'),
F.round(F.col('asin_rating_mom').cast('double'), 4).cast('decimal(20,4)').alias('rating_mom'),
F.round(F.col('asin_rating_yoy').cast('double'), 4).cast('decimal(20,4)').alias('rating_yoy'),
F.col('asin_comments_rise').cast('int').alias('comments_rise'),
F.round(F.col('asin_comments_mom').cast('double'), 4).cast('decimal(20,4)').alias('comments_mom'),
F.round(F.col('asin_comments_yoy').cast('double'), 4).cast('decimal(20,4)').alias('comments_yoy'),
F.col('asin_bsr_orders_rise').cast('int').alias('bsr_orders_rise'),
F.round(F.col('asin_bsr_orders_mom').cast('double'), 4).cast('decimal(20,4)').alias('bsr_orders_mom'),
F.round(F.col('asin_bsr_orders_yoy').cast('double'), 4).cast('decimal(20,4)').alias('bsr_orders_yoy'),
F.round(F.col('asin_sales_rise').cast('double'), 2).cast('decimal(20,2)').alias('sales_rise'),
F.round(F.col('asin_sales_mom').cast('double'), 4).cast('decimal(20,4)').alias('sales_mom'),
F.round(F.col('asin_sales_yoy').cast('double'), 4).cast('decimal(20,4)').alias('sales_yoy'),
F.col('asin_variation_rise').cast('int').alias('variation_rise'),
F.round(F.col('asin_variation_mom').cast('double'), 4).cast('decimal(20,4)').alias('variation_mom'),
F.round(F.col('asin_variation_yoy').cast('double'), 4).cast('decimal(20,4)').alias('variation_yoy'),
F.round(F.col('asin_bought_mom').cast('double'), 4).cast('decimal(20,4)').alias('bought_month_mom'),
F.round(F.col('asin_bought_yoy').cast('double'), 4).cast('decimal(20,4)').alias('bought_month_yoy'),
# ===== 类型枚举字段 =====
F.col('asin_size_type').cast('tinyint').alias('size_type'),
F.col('asin_rating_type').cast('tinyint').alias('rating_type'),
F.col('asin_site_name_type').cast('tinyint').alias('site_name_type'),
F.col('asin_weight_type').cast('tinyint').alias('weight_type'),
F.col('asin_ao_val_type').cast('tinyint').alias('ao_val_type'),
F.col('asin_rank_type').cast('tinyint').alias('rank_type'),
F.col('asin_price_type').cast('tinyint').alias('price_type'),
F.col('asin_quantity_variation_type').cast('tinyint').alias('quantity_variation_type'),
F.col('package_quantity').cast('int').alias('package_quantity'),
F.col('is_movie_label').cast('tinyint').alias('is_movie_label'),
F.col('is_brand_label').cast('tinyint').alias('is_brand_label'),
# ===== 分类信息 =====
_norm_dt('asin_crawl_date').alias('asin_crawl_date'),
F.col('category_first_id'),
F.col('category_id'),
F.col('desc_category_first_id'),
F.col('first_category_rank').cast('int').alias('first_category_rank'),
F.col('current_category_rank').cast('int').alias('current_category_rank'),
F.round(F.col('asin_weight_ratio').cast('double'), 4).cast('decimal(20,4)').alias('asin_weight_ratio'),
F.col('seller_country_name').alias('site_name'),
# ===== 月销及母体信息 =====
F.col('asin_bought_month').cast('int').alias('asin_bought_month'),
F.round(F.col('asin_lqs_rating').cast('double'), 1).cast('decimal(20,1)').alias('asin_lqs_rating'),
F.col('asin_lqs_rating_detail'),
F.col('asin_lob_info'),
F.col('is_contains_lob_info').cast('tinyint').alias('is_contains_lob_info'),
F.col('is_package_quantity_abnormal').cast('tinyint').alias('is_package_quantity_abnormal'),
F.round(F.col('zr_flow_proportion').cast('double'), 4).cast('decimal(20,4)').alias('zr_flow_proportion'),
F.round(F.col('matrix_flow_proportion').cast('double'), 4).cast('decimal(20,4)').alias('matrix_flow_proportion'),
F.round(F.col('matrix_ao_val').cast('double'), 4).cast('decimal(20,4)').alias('matrix_ao_val'),
F.col('customer_reviews_json').alias('product_features'),
F.col('img_info'),
F.col('collapse_asin'),
F.col('follow_sellers_count').cast('int').alias('follow_sellers_count'),
F.col('asin_describe'),
F.round(F.col('asin_fbm_price').cast('double'), 2).cast('decimal(20,2)').alias('fbm_price'),
F.col('describe_len').cast('int').alias('describe_len'),
F.round(F.col('title_matching_degree').cast('double'), 4).cast('decimal(20,4)').alias('title_matching_degree'),
# ===== 颜色组合识别 =====
F.col('multi_color_flag').cast('tinyint').alias('multi_color_flag'),
F.col('multi_color_str'),
# ===== Amazon标签 =====
F.col('amazon_label'),
).cache()
count = df_save.count()
print(f"写入数据量:{count:,}")
df_save.show(5, truncate=False)
# ===== Step 3:写入 Doris dwt 主表 =====
table_columns = (
"date_info, asin, ao_val, zr_counts, sp_counts, sb_counts, vi_counts, bs_counts, ac_counts, tr_counts, er_counts, "
"bsr_orders, bsr_orders_sale, title, title_len, price, rating, total_comments, buy_box_seller_type, page_inventory, "
"volume, weight, color, size, style, material, launch_time, img_num, parent_asin, img_type, img_url, activity_type, "
"one_two_val, three_four_val, five_six_val, eight_val, brand, variation_num, "
"one_star, two_star, three_star, four_star, five_star, low_star, together_asin, account_name, account_id, "
"rank_rise, rank_mom, rank_yoy, ao_rise, ao_mom, ao_yoy, price_rise, price_mom, price_yoy, "
"rating_rise, rating_mom, rating_yoy, comments_rise, comments_mom, comments_yoy, "
"bsr_orders_rise, bsr_orders_mom, bsr_orders_yoy, sales_rise, sales_mom, sales_yoy, "
"variation_rise, variation_mom, variation_yoy, bought_month_mom, bought_month_yoy, "
"size_type, rating_type, site_name_type, weight_type, ao_val_type, rank_type, price_type, "
"quantity_variation_type, package_quantity, is_movie_label, is_brand_label, "
"asin_crawl_date, category_first_id, category_id, desc_category_first_id, "
"first_category_rank, current_category_rank, asin_weight_ratio, site_name, "
"asin_bought_month, asin_lqs_rating, asin_lqs_rating_detail, asin_lob_info, is_contains_lob_info, "
"is_package_quantity_abnormal, zr_flow_proportion, matrix_flow_proportion, matrix_ao_val, "
"product_features, img_info, collapse_asin, follow_sellers_count, asin_describe, fbm_price, describe_len, "
"title_matching_degree, multi_color_flag, multi_color_str, amazon_label"
)
print(f"[Step 3] 写入 Doris {DORIS_DB}.{doris_table}")
DorisHelper.spark_export_with_columns(
df_save=df_save,
db_name=DORIS_DB,
table_name=doris_table,
table_columns=table_columns,
)
df_save.unpersist()
# ===== Step 4:Doris 端 INSERT OVERWRITE 到 selection 月物化表 =====
print(f"[Step 4] Doris INSERT OVERWRITE selection.{selection_table}")
_exec_doris_sql([build_insert_overwrite_sql(site_name, date_info_underscore, date_info)])
print("success!")
if __name__ == "__main__":
site_name = sys.argv[1]
date_info = sys.argv[2]
main(site_name, date_info)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment