test.py 914 Bytes
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录

from utils.DorisHelper import DorisHelper
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F


if __name__ == '__main__':
        spark = SparkUtil.get_spark_session('aba_to_doris_test')
        sql = f"""
        select * 
        from dwt_aba_last365 
        where site_name = 'us' 
          and date_type = 'month' 
          and date_info = '2024-10';
        """
        df_aba = spark.sql(sql).drop('site_name', 'date_type').cache()
        df_aba = df_aba.withColumn(
            'date_info', F.concat(F.regexp_replace('date_info', '-', ''), F.lit('01'))
        )
        df_aba.show(10, True)
        columns = df_aba.columns
        columns_str = ",".join(columns)

        DorisHelper.spark_export_with_columns(df_aba, 'test', 'dwt_aba_last365', columns_str)
        print('导出完成')