import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.DorisHelper import DorisHelper from utils.spark_util import SparkUtil from pyspark.sql import functions as F if __name__ == '__main__': spark = SparkUtil.get_spark_session('aba_to_doris_test') sql = f""" select * from dwt_aba_last365 where site_name = 'us' and date_type = 'month' and date_info = '2024-10'; """ df_aba = spark.sql(sql).drop('site_name', 'date_type').cache() df_aba = df_aba.withColumn( 'date_info', F.concat(F.regexp_replace('date_info', '-', ''), F.lit('01')) ) df_aba.show(10, True) columns = df_aba.columns columns_str = ",".join(columns) DorisHelper.spark_export_with_columns(df_aba, 'test', 'dwt_aba_last365', columns_str) print('导出完成')