Commit 6b7f0ae7 by chenyuanjie

Doris相关更新任务

parent 58111aef
"""
author: CT
description: 同步 Hive tmp_jm_info 竞卖/SKU 数据到 Doris dwd.dwd_asin_auction
按 asin UPSERT,sequence_col=updated_at 控制旧数据不覆盖新数据
执行示例: spark-submit doris_update_auction.py
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from pyspark.sql import functions as F
from utils.spark_util import SparkUtil
from utils.DorisHelper import DorisHelper
DORIS_DB = 'dwd'
DORIS_TABLE = 'dwd_asin_auction'
if __name__ == '__main__':
spark = SparkUtil.get_spark_session('doris_update_auction')
sql = """
SELECT asin,
CAST(auctions_num AS INT) AS auctions_num,
CAST(auctions_num_all AS INT) AS auctions_num_all,
CAST(skus_num_creat AS INT) AS skus_num_creat,
CAST(skus_num_creat_all AS INT) AS skus_num_creat_all
FROM big_data_selection.tmp_jm_info
"""
print(f"读取 Hive: {sql}")
df = spark.sql(sqlQuery=sql).repartition(40)
df = df.na.fill({
"auctions_num": 0,
"auctions_num_all": 0,
"skus_num_creat": 0,
"skus_num_creat_all": 0,
})
df = df.withColumn("updated_at", F.current_timestamp()).cache()
count = df.count()
print(f"写入数据量:{count:,}")
df.show(10, truncate=False)
table_columns = "asin, auctions_num, auctions_num_all, skus_num_creat, skus_num_creat_all, updated_at"
DorisHelper.spark_export_with_columns(
df_save=df,
db_name=DORIS_DB,
table_name=DORIS_TABLE,
table_columns=table_columns,
)
df.unpersist()
print('success!')
"""
@Author : CT
@Description : 将 PostgreSQL sys_edit_log 表全量迁移到 Doris selection.sys_edit_log
@Usage : spark-submit x2doris.py
@CreateTime : 2026-05-08
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from pyspark.sql import functions as F
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from utils.DorisHelper import DorisHelper
SITE_NAME = 'us'
DORIS_DB = 'selection'
DORIS_TABLE = 'sys_edit_log'
TABLE_COLUMNS = 'site_name, module, edit_key_id, table_name, filed, val_before, val_after, val_related_info, user_id, user_name, create_time, success_flag'
if __name__ == '__main__':
spark = SparkUtil.get_spark_session(f'pg_to_doris_{DORIS_TABLE}')
# ===== Step 1:从 PostgreSQL 读取 sys_edit_log =====
print(f"[1/2] 读取 PostgreSQL 表:sys_edit_log")
pg_con_info = DBUtil.get_connection_info(db_type='postgresql', site_name=SITE_NAME)
assert pg_con_info is not None, "PG 连接信息为空,请检查配置"
df = SparkUtil.read_jdbc_query(
session=spark,
url=pg_con_info['url'],
username=pg_con_info['username'],
pwd=pg_con_info['pwd'],
query="""SELECT site_name, module, edit_key_id, table_name, filed,
val_before, val_after, val_related_info,
user_id, user_name, create_time, success_flag
FROM sys_edit_log""",
).cache()
# NOT NULL 过滤 + 文本截断(Doris VARCHAR 按字节计,中文每字 3 字节)
# df = df.filter("site_name IS NOT NULL AND edit_key_id IS NOT NULL") \
# .withColumn('module', F.substring(F.col('module'), 1, 13)) \
# .withColumn('edit_key_id', F.substring(F.col('edit_key_id'), 1, 166)) \
# .withColumn('filed', F.substring(F.col('filed'), 1, 33)) \
# .withColumn('val_before', F.substring(F.col('val_before'), 1, 49)) \
# .withColumn('val_after', F.substring(F.col('val_after'), 1, 49)) \
# .withColumn('val_related_info', F.substring(F.col('val_related_info'), 1, 333)) \
# .withColumn('user_name', F.substring(F.col('user_name'), 1, 20)) \
# .withColumn('create_time', F.col('create_time').cast('timestamp')) \
# .withColumn('success_flag', F.col('success_flag').cast('short')) \
# .repartition(20)
count = df.count()
print(f"读取完成,数据量:{count:,}")
df.show(10, truncate=False)
# ===== Step 2:写入 Doris selection.sys_edit_log =====
print(f"[2/2] 写入 Doris {DORIS_DB}.{DORIS_TABLE}")
DorisHelper.spark_export_with_columns(
df_save=df,
db_name=DORIS_DB,
table_name=DORIS_TABLE,
table_columns=TABLE_COLUMNS,
use_type='selection',
)
print("success")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment