import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.StarRocksHelper import StarRocksHelper
if __name__ == '__main__':
spark = SparkUtil.get_spark_session("ods_asin_detail_to_sr_test")
sql = """
select
*
from ods_asin_detail
where site_name = 'us'
and date_type = 'month'
and date_info = '2024-03'
"""
df_hive = spark.sql(sql).repartition(40)
connection_info = StarRocksHelper.get_connection_info('selection')
df_hive.write.format("starrocks") \
.option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \
.option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
.option("starrocks.table.identifier", "test.ods_asin_detail_test") \
.option("starrocks.user", connection_info['user']) \
.option("starrocks.password", connection_info['pwd']) \
.option("starrocks.write.flush.interval.ms", "10000") \
.option("starrocks.write.properties.column_separator", "~!@#$%^&*~!@#$%^&*") \
.mode("append") \
.save()
print("导出完毕")
spark.stop()