import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.spark_util import SparkUtil from utils.StarRocksHelper import StarRocksHelper from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils if __name__ == '__main__': spark = SparkUtil.get_spark_session("ods_asin_detail_sr_to_hive") partition_dict = { "site_name": 'us', "date_type": 'month', "date_info": '2024-03' } hdfs_path = CommonUtil.build_hdfs_path('ods_asin_detail_test', partition_dict=partition_dict) HdfsUtils.delete_hdfs_file(hdfs_path) connection_info = StarRocksHelper.get_connection_info('selection') df_sr = spark.read.format("starrocks") \ .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \ .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \ .option("starrocks.table.identifier", "test.ods_asin_detail_test2") \ .option("starrocks.user", connection_info['user']) \ .option("starrocks.password", connection_info['pwd']) \ .option("starrocks.request.tablet.size", "1") \ .option("starrocks.batch.size", "40960") \ .option("starrocks.exec.mem.limit", "21474836480") \ .load() print("读取完毕") df_sr.repartition(50) partitions_by = ['site_name', 'date_type', 'date_info'] df_sr.write.saveAsTable(name='ods_asin_detail_test', format='hive', mode='append', partitionBy=partitions_by) spark.stop() # 创建lzo索引和修复元数据 CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb='ods_asin_detail_test')