ods_asin_detail_to_sr.py 1.22 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.spark_util import SparkUtil
from utils.StarRocksHelper import StarRocksHelper


if __name__ == '__main__':
    spark = SparkUtil.get_spark_session("ods_asin_detail_to_sr_test")

    sql = """
        select 
            *
        from ods_asin_detail 
        where site_name = 'us'
        and date_type = 'month'
        and date_info = '2024-03'
    """
    df_hive = spark.sql(sql).repartition(40)
    connection_info = StarRocksHelper.get_connection_info('selection')
    df_hive.write.format("starrocks") \
        .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \
        .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
        .option("starrocks.table.identifier", "test.ods_asin_detail_test") \
        .option("starrocks.user", connection_info['user']) \
        .option("starrocks.password", connection_info['pwd']) \
        .option("starrocks.write.flush.interval.ms", "10000") \
        .option("starrocks.write.properties.column_separator", "~!@#$%^&*~!@#$%^&*") \
        .mode("append") \
        .save()
    print("导出完毕")

    spark.stop()