ods_asin_detail_sr_to_hive.py 1.64 KB
import os
import sys



sys.path.append(os.path.dirname(sys.path[0]))

from utils.spark_util import SparkUtil
from utils.StarRocksHelper import StarRocksHelper
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils

if __name__ == '__main__':
    spark = SparkUtil.get_spark_session("ods_asin_detail_sr_to_hive")
    partition_dict = {
        "site_name": 'us',
        "date_type": 'month',
        "date_info": '2024-03'
    }
    hdfs_path = CommonUtil.build_hdfs_path('ods_asin_detail_test', partition_dict=partition_dict)
    HdfsUtils.delete_hdfs_file(hdfs_path)
    connection_info = StarRocksHelper.get_connection_info('selection')
    df_sr = spark.read.format("starrocks") \
        .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \
        .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
        .option("starrocks.table.identifier", "test.ods_asin_detail_test2") \
        .option("starrocks.user", connection_info['user']) \
        .option("starrocks.password", connection_info['pwd']) \
        .option("starrocks.request.tablet.size", "1") \
        .option("starrocks.batch.size", "40960") \
        .option("starrocks.exec.mem.limit", "21474836480") \
        .load()
    print("读取完毕")
    df_sr.repartition(50)
    partitions_by = ['site_name', 'date_type', 'date_info']
    df_sr.write.saveAsTable(name='ods_asin_detail_test', format='hive', mode='append', partitionBy=partitions_by)
    spark.stop()
    # 创建lzo索引和修复元数据
    CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb='ods_asin_detail_test')