Commit 6d70e970 by chenyuanjie

利润率异常修复

parent 7c4d38d7
""" """
author: CT author: CT
description: 利润率数据增量同步 + 去重整合 — 一站式 PG → Hive → Doris description: 利润率数据增量整合 — ODS → Hive → Doris
前置:ods_asin_profit_rate.py 已将当日 PG 增量 sqoop 到 ods_asin_profit_rate
步骤: 步骤:
1) sqoop 从 PG {site_name}_asin_profit_rate_calc 增量拉取 1) 读 ods_asin_profit_rate 当日分区(原始增量)+ dim_asin_profit_rate_info 历史分区
时间窗 [last_date_info, 当天 00:00:00),先写入 Hive 2) 按 (asin, price) 去重,updated_time desc 保留最新一行
dim_asin_profit_rate_info (site_name, date_info=今日) 分区作为"原始增量" 3) 覆盖 dim_asin_profit_rate_info 当日分区为整合后全量快照;校验后删除历史分区
2) Spark 读当日 sqoop 增量 + Hive 历史所有分区,按 (asin, price) 去重 4) 当日 ODS 增量写入 Doris dwd.dwd_asin_profit_rate_latest
排序键:updated_time desc 保留最新一行
3) 覆盖当日分区为整合后全量快照;写入校验通过后删除所有 < 今日 的历史分区
4) 当日 sqoop 增量(不含历史回灌)写入 Doris dwd.dwd_asin_profit_rate_latest
Doris UNIQUE KEY(site_name, asin, price) + sequence_col=update_time 自动取最新 Doris UNIQUE KEY(site_name, asin, price) + sequence_col=update_time 自动取最新
执行示例: spark-submit dim_asin_profit_rate_info.py us 2026-05-15 执行示例: spark-submit dim_asin_profit_rate_info.py us 2026-05-15
""" """
import os import os
import sys import sys
from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0])) sys.path.append(os.path.dirname(sys.path[0]))
...@@ -22,7 +19,6 @@ from pyspark.sql import functions as F, Window ...@@ -22,7 +19,6 @@ from pyspark.sql import functions as F, Window
from utils.spark_util import SparkUtil from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
from utils.hdfs_utils import HdfsUtils from utils.hdfs_utils import HdfsUtils
from utils.DorisHelper import DorisHelper from utils.DorisHelper import DorisHelper
...@@ -32,12 +28,10 @@ class DimAsinProfitRateInfo(object): ...@@ -32,12 +28,10 @@ class DimAsinProfitRateInfo(object):
def __init__(self, site_name, date_info): def __init__(self, site_name, date_info):
self.site_name = site_name self.site_name = site_name
self.date_info = date_info self.date_info = date_info
self.last_date_info = (datetime.strptime(date_info, "%Y-%m-%d").date() - timedelta(days=1)).strftime("%Y-%m-%d")
# 上限:程序运行当天 00:00:00(拉昨天及之前完整数据,今天还在写不拉)
self.upper_bound = datetime.now().strftime("%Y-%m-%d") + " 00:00:00"
self.spark = SparkUtil.get_spark_session( self.spark = SparkUtil.get_spark_session(
f"{self.__class__.__name__}: {self.site_name} {self.date_info}" f"{self.__class__.__name__}: {self.site_name} {self.date_info}"
) )
self.ods_table = "ods_asin_profit_rate"
self.hive_table = "dim_asin_profit_rate_info" self.hive_table = "dim_asin_profit_rate_info"
self.doris_db = "dwd" self.doris_db = "dwd"
self.doris_table = "dwd_asin_profit_rate_latest" self.doris_table = "dwd_asin_profit_rate_latest"
...@@ -46,49 +40,25 @@ class DimAsinProfitRateInfo(object): ...@@ -46,49 +40,25 @@ class DimAsinProfitRateInfo(object):
self.df_save = None self.df_save = None
def run(self): def run(self):
self.sqoop_to_hive()
self.read_data() self.read_data()
self.handle_data() self.handle_data()
self.save_data() self.save_data()
self.write_to_doris() self.write_to_doris()
def sqoop_to_hive(self):
"""sqoop 从 PG 增量拉到 Hive dim_asin_profit_rate_info 当日分区(原始增量)"""
partition_dict = {"site_name": self.site_name, "date_info": self.date_info}
hdfs_path = CommonUtil.build_hdfs_path(self.hive_table, partition_dict=partition_dict)
engine = get_remote_engine(site_name=self.site_name, db_type='postgresql_cluster')
import_table = f"{self.site_name}_asin_profit_rate_calc"
# 增量区间:[last_date_info, 程序运行当天 00:00:00)
# 上限用 self.upper_bound(当天 00:00:00),避免拉到 PG 当日正在写入的数据
query = f"""
SELECT asin, price, category, ocean_profit, air_profit,
package_length, package_width, package_height, weight,
updated_time, asin_crawl_date
FROM {import_table}
WHERE updated_time >= '{self.last_date_info}' AND updated_time < '{self.upper_bound}' AND \\$CONDITIONS
"""
print(f"sqoop query:\n{query}")
engine.sqoop_raw_import(
query=query,
hive_table=self.hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
check_count=True,
)
def read_data(self): def read_data(self):
"""读当日 sqoop 增量 + 历史所有分区""" """读当日 ODS 增量(ods_asin_profit_rate)+ dim 历史所有分区"""
sql_today = f""" sql_today = f"""
SELECT asin, price, category, ocean_profit, air_profit, SELECT asin, price, category, ocean_profit, air_profit,
package_length, package_width, package_height, weight, package_length, package_width, package_height, weight,
asin_crawl_date, updated_time asin_crawl_date, updated_time
FROM {self.hive_table} FROM {self.ods_table}
WHERE site_name = '{self.site_name}' AND date_info = '{self.date_info}' WHERE site_name = '{self.site_name}' AND date_info = '{self.date_info}'
""" """
print(f"sql_today =\n{sql_today}") print(f"sql_today =\n{sql_today}")
# cache:save_data 会 DROP PARTITION 当日分区,write_to_doris 还要复用此 df,必须先物化 # cache:save_data 会 DROP PARTITION 当日分区,write_to_doris 还要复用此 df,必须先物化
# count() 强制立即物化,确保 cache 在 DROP/saveAsTable 之前已填充到 BlockManager
self.df_today = self.spark.sql(sqlQuery=sql_today).repartition(40, 'asin', 'price').cache() self.df_today = self.spark.sql(sqlQuery=sql_today).repartition(40, 'asin', 'price').cache()
self.df_today.count()
sql_history = f""" sql_history = f"""
SELECT asin, price, category, ocean_profit, air_profit, SELECT asin, price, category, ocean_profit, air_profit,
......
"""
author: CT
description: sqoop 从 PG {site_name}_asin_profit_rate_calc 增量拉取利润率数据
写入 Hive ods_asin_profit_rate 分区 (site_name, date_info)
增量时间窗:[date_info-1天 00:00:00, 当天运行时刻的 00:00:00)
执行示例: python ods_asin_profit_rate.py us 2026-05-15
"""
import os
import sys
from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.secure_db_client import get_remote_engine
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_info = CommonUtil.get_sys_arg(2, None)
assert site_name is not None, "site_name 不能为空!"
assert date_info is not None, "date_info 不能为空!"
last_date_info = (datetime.strptime(date_info, "%Y-%m-%d").date() - timedelta(days=1)).strftime("%Y-%m-%d")
# 上限:程序运行当天 00:00:00,避免拉到 PG 当日正在写入的数据
upper_bound = datetime.now().strftime("%Y-%m-%d") + " 00:00:00"
hive_table = "ods_asin_profit_rate"
import_table = f"{site_name}_asin_profit_rate_calc"
partition_dict = {"site_name": site_name, "date_info": date_info}
hdfs_path = CommonUtil.build_hdfs_path(hive_table, partition_dict=partition_dict)
engine = get_remote_engine(site_name=site_name, db_type='postgresql_cluster')
query = f"""
SELECT asin, price, category, ocean_profit, air_profit,
package_length, package_width, package_height, weight,
updated_time, asin_crawl_date
FROM {import_table}
WHERE updated_time >= '{last_date_info}' AND updated_time < '{upper_bound}' AND \\$CONDITIONS
"""
print(f"sqoop query:\n{query}")
engine.sqoop_raw_import(
query=query,
hive_table=hive_table,
hdfs_path=hdfs_path,
partitions=partition_dict,
check_count=True,
)
print("success!")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment