1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.StarRocksHelper import StarRocksHelper
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
spark = SparkUtil.get_spark_session("ods_asin_detail_sr_to_hive")
partition_dict = {
"site_name": 'us',
"date_type": 'month',
"date_info": '2024-03'
}
hdfs_path = CommonUtil.build_hdfs_path('ods_asin_detail_test', partition_dict=partition_dict)
HdfsUtils.delete_hdfs_file(hdfs_path)
connection_info = StarRocksHelper.get_connection_info('selection')
df_sr = spark.read.format("starrocks") \
.option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \
.option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
.option("starrocks.table.identifier", "test.ods_asin_detail_test2") \
.option("starrocks.user", connection_info['user']) \
.option("starrocks.password", connection_info['pwd']) \
.option("starrocks.request.tablet.size", "1") \
.option("starrocks.batch.size", "40960") \
.option("starrocks.exec.mem.limit", "21474836480") \
.load()
print("读取完毕")
df_sr.repartition(50)
partitions_by = ['site_name', 'date_type', 'date_info']
df_sr.write.saveAsTable(name='ods_asin_detail_test', format='hive', mode='append', partitionBy=partitions_by)
spark.stop()
# 创建lzo索引和修复元数据
CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb='ods_asin_detail_test')