import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.spark_util import SparkUtil from utils.StarRocksHelper import StarRocksHelper if __name__ == '__main__': spark = SparkUtil.get_spark_session("ods_asin_detail_to_sr_test") sql = """ select * from ods_asin_detail where site_name = 'us' and date_type = 'month' and date_info = '2024-03' """ df_hive = spark.sql(sql).repartition(40) connection_info = StarRocksHelper.get_connection_info('selection') df_hive.write.format("starrocks") \ .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \ .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \ .option("starrocks.table.identifier", "test.ods_asin_detail_test") \ .option("starrocks.user", connection_info['user']) \ .option("starrocks.password", connection_info['pwd']) \ .option("starrocks.write.flush.interval.ms", "10000") \ .option("starrocks.write.properties.column_separator", "~!@#$%^&*~!@#$%^&*") \ .mode("append") \ .save() print("导出完毕") spark.stop()