Commit 7c4d38d7 by chenyuanjie

fix

parent 2f887935
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.es_util import EsUtils
from pyspark.sql import functions as F, Window
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from datetime import datetime, timedelta
class DwtFlowKeepaAsin(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info
self.spark = SparkUtil.get_spark_session(f"{self.__class__.__name__}: {self.site_name} {self.date_info}")
self.df_flow_asin = self.spark.sql(f"select 1+1;")
self.df_category_id = self.spark.sql(f"select 1+1;")
self.df_keepa_asin = self.spark.sql(f"select 1+1;")
self.df_calc_asin = self.spark.sql(f"select 1+1;")
self.df_export_asin = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select 1+1;")
def run(self):
self.read_data()
self.handle_data()
self.save_data()
def read_data(self):
# 读取流量选品月asin
sql = f"""
select asin, asin_price as price, category_first_id, date_info as source_month
from dwt_flow_asin
where site_name = '{self.site_name}'
and date_type = 'month'
and date_info >= '2025-05'
and asin_price is not null
and asin_price > 0
"""
df_flow_asin_month = self.spark.sql(sqlQuery=sql) \
.withColumn('price', F.round(F.col('price'), 2).cast('decimal(10,2)'))
# 读取ES最近30天缺少利润率的asin
days_30_ago = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d 00:00:00")
es_read_options = {
"es.nodes": EsUtils.__es_ip__,
"es.port": EsUtils.__es_port__,
"es.net.http.auth.user": EsUtils.__es_user__,
"es.net.http.auth.pass": EsUtils.__es_passwd__,
"es.nodes.wan.only": "false",
"es.mapping.date.rich": "false",
"es.scroll.size": "2000",
"es.read.field.include": "asin,price,category_first_id,asin_crawl_date",
"es.query": f'{{"query":{{"bool":{{"must":[{{"range":{{"price":{{"gt":0}}}}}},{{"range":{{"asin_crawl_date":{{"gte":"{days_30_ago}"}}}}}}],"must_not":{{"exists":{{"field":"profit_rate_extra.ocean_profit"}}}}}}}}}}'
}
df_flow_asin_30day = self.spark.read.format("org.elasticsearch.spark.sql") \
.options(**es_read_options) \
.load(f"{self.site_name}_flow_asin_30day") \
.withColumn('price', F.round(F.col('price'), 2).cast('decimal(10,2)')) \
.withColumn('source_month', F.date_format(F.col('asin_crawl_date'), 'yyyy-MM')) \
.select('asin', 'price', 'category_first_id', 'source_month')
# 合并两部分,按(asin, price)去重保留最新source_month
self.df_flow_asin = df_flow_asin_month.union(df_flow_asin_30day).repartition(40, 'asin')
window = Window.partitionBy(['asin', 'price']).orderBy(F.col('source_month').desc_nulls_last())
self.df_flow_asin = self.df_flow_asin.withColumn(
'rank', F.row_number().over(window=window)
).filter('rank = 1').drop('rank').cache()
# 读取分类数据
sql = f"""
select category_first_id, en_name as category from dim_bsr_category_tree where site_name = '{self.site_name}' and nodes_num = 2
"""
self.df_category_id = self.spark.sql(sqlQuery=sql).cache()
# 读取keepa数据
sql = f"""
select asin, package_length, package_width, package_height, item_weight as weight from dim_keepa_asin_info where site_name = '{self.site_name}' and date_info = '{self.date_info}'
"""
self.df_keepa_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').filter(
(F.col("package_length") > 0) & (F.col("package_width") > 0) & (F.col("package_height") > 0) & (F.col("weight") > 0)
).cache()
# 读取已经计算过利润率的asin
sql = f"""
select asin, price, package_length, package_width, package_height, weight from dim_asin_profit_rate_info where site_name = '{self.site_name}' and date_info = '{self.date_info}'
"""
self.df_calc_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
# 读取已经导出过asin+price,避免重复计算
sql = f"""
select asin, price, package_length, package_width, package_height, weight from dwt_flow_keepa_asin where site_name = '{self.site_name}'
"""
self.df_export_asin = self.spark.sql(sqlQuery=sql).repartition(40, 'asin').cache()
def handle_data(self):
self.df_save = self.df_flow_asin.join(
self.df_category_id, on='category_first_id', how='left'
).join(
self.df_keepa_asin, on='asin', how='inner'
).join(
self.df_calc_asin, on=['asin', 'price', 'package_length', 'package_width', 'package_height', 'weight'], how='left_anti'
).join(
self.df_export_asin, on=['asin', 'price', 'package_length', 'package_width', 'package_height', 'weight'], how='left_anti'
).cache()
self.df_flow_asin.unpersist()
self.df_category_id.unpersist()
self.df_keepa_asin.unpersist()
self.df_calc_asin.unpersist()
self.df_export_asin.unpersist()
start_key = 1
self.df_save = self.df_save.withColumn(
'part_key', F.ntile(50).over(Window.orderBy(F.rand())) + (start_key - 1)
).select(
F.col('asin'),
F.col('price'),
F.col('category'),
F.col('package_length'),
F.col('package_width'),
F.col('package_height'),
F.col('weight'),
F.col('part_key'),
F.col('source_month'),
F.lit(self.site_name).alias('site_name'),
F.lit(self.date_info).alias('date_info')
).repartition(10)
def save_data(self):
hive_tb = "dwt_flow_keepa_asin"
partition_dict = {
"site_name": self.site_name,
"date_info": self.date_info
}
partition_by = list(partition_dict.keys())
hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
HdfsUtils.delete_file_in_folder(hdfs_path)
print(f"正在进行数据存储,当前存储的表名为:{hive_tb},存储路径:{hdfs_path}")
self.df_save.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
print("success")
if __name__ == "__main__":
site_name = sys.argv[1]
date_info = sys.argv[2]
handle_obj = DwtFlowKeepaAsin(site_name, date_info)
handle_obj.run()
......@@ -6,12 +6,14 @@ description: 导出待计算利润率的 ASIN
2) Doris dwt.{site}_flow_asin_30day 读取所有相关 ASIN:
asin / price / category_first_id / asin_crawl_date
3) union 后按 (asin, price) 去重保留 asin_crawl_date 最新
4) LEFT JOIN 分类、INNER JOIN 当日 keepa 增量
4) LEFT JOIN 分类、INNER JOIN keepa 增量
keepa 表已整合为单分区快照,按 updated_time > last_date_info 筛增量
5) keepa 关联到的 ASIN 全部导出 PG {site}_asin_profit_rate_calc 重新计算利润率
执行示例: spark-submit export_need_profit_rate.py us 2026-05-15
"""
import os
import sys
from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0]))
......@@ -28,7 +30,9 @@ class ExportNeedProfitRate(object):
def __init__(self, site_name, date_info):
self.site_name = site_name
self.date_info = date_info # 计算时间 yyyy-MM-dd(用作当日 keepa 增量分区)
self.date_info = date_info # 计算时间 yyyy-MM-dd
# keepa 增量过滤下限:date_info - 1 天(如 date_info=2026-05-13 → last_date_info=2026-05-12)
self.last_date_info = (datetime.strptime(date_info, "%Y-%m-%d").date() - timedelta(days=1)).strftime("%Y-%m-%d")
self.spark = SparkUtil.get_spark_session(
f"{self.__class__.__name__}: {self.site_name} {self.date_info}"
)
......@@ -56,13 +60,11 @@ class ExportNeedProfitRate(object):
.withColumn('asin_crawl_date', F.to_timestamp(F.col('asin_crawl_date')))
# 2. Doris dwt.{site}_flow_asin_30day 所有相关 ASIN
doris_sql = f"""
SELECT asin, price, category_first_id, asin_crawl_date
FROM dwt.{self.site_name}_flow_asin_30day
WHERE price > 0
"""
print(f"doris_sql =\n{doris_sql}")
df_doris = DorisHelper.spark_import_with_sql(self.spark, doris_sql) \
# spark_import_with_connector 不支持 WHERE 下推,price > 0 过滤改到 Spark 端
table_identifier = f"dwt.{self.site_name}_flow_asin_30day"
read_fields = "asin,price,category_first_id,asin_crawl_date"
df_doris = DorisHelper.spark_import_with_connector(self.spark, table_identifier, read_fields) \
.filter(F.col('price') > 0) \
.withColumn('price', F.round(F.col('price'), 2).cast('decimal(20,2)')) \
.withColumn('asin_crawl_date', F.col('asin_crawl_date').cast('timestamp')) \
.select('asin', 'price', 'category_first_id', 'asin_crawl_date')
......@@ -84,10 +86,11 @@ class ExportNeedProfitRate(object):
df_cate = self.spark.sql(sqlQuery=sql_cate)
# 5. keepa 当日增量 INNER JOIN
# keepa 表已整合为单分区快照,用 updated_time > last_date_info 筛"近一天更新"的增量
sql_keepa = f"""
SELECT asin, package_length, package_width, package_height, item_weight AS weight
FROM dim_keepa_asin_info
WHERE site_name = '{self.site_name}' AND date_info = '{self.date_info}'
WHERE site_name = '{self.site_name}' AND updated_time >= '{self.last_date_info}'
"""
df_keepa = self.spark.sql(sqlQuery=sql_keepa) \
.filter((F.col('package_length') > 0) &
......@@ -105,7 +108,7 @@ class ExportNeedProfitRate(object):
'asin', 'price', 'category',
'package_length', 'package_width', 'package_height', 'weight',
'part_key', 'source_month', 'asin_crawl_date',
)
).cache()
count = df_result.count()
print(f"待计算利润率数据量:{count:,}")
......
......@@ -33,9 +33,9 @@ if __name__ == '__main__':
db_type=db_type
)
# 增量区间:[last_date_info, 程序运行时刻)
# 增量区间:[last_date_info, 程序运行当天 00:00:00)
# 下限:last_date_info(前一日),间隔几天补跑时传更早 date_info 自动覆盖区间
# 上限:upper_bound(程序运行时刻),半开区间避免拉到 PG 正在写入的最新数据,且不固定为一天
# 上限:upper_bound(程序运行当天 00:00:00),半开区间避免拉到 PG 当日正在写入的数据
query = f"""
SELECT asin, last_detail::text as last_detail, update_at FROM {import_table}
WHERE update_at >= '{last_date_info}' AND update_at < '{upper_bound}' AND last_detail is not null AND \$CONDITIONS
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment