import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.DorisHelper import DorisHelper
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
if __name__ == '__main__':
spark = SparkUtil.get_spark_session('aba_to_doris_test')
sql = f"""
select *
from dwt_aba_last365
where site_name = 'us'
and date_type = 'month'
and date_info = '2024-10';
"""
df_aba = spark.sql(sql).drop('site_name', 'date_type').cache()
df_aba = df_aba.withColumn(
'date_info', F.concat(F.regexp_replace('date_info', '-', ''), F.lit('01'))
)
df_aba.show(10, True)
columns = df_aba.columns
columns_str = ",".join(columns)
DorisHelper.spark_export_with_columns(df_aba, 'test', 'dwt_aba_last365', columns_str)
print('导出完成')