# author : wangrui # data : 2024/4/19 14:57 import os import sys from sqlalchemy import create_engine, text import traceback class StarRocksHelper(object): """ starrocks导入导出参考文档地址: https://docs.starrocks.io/zh/docs/loading/loading_introduction/Loading_intro/ """ __connection_info__ = { "selection": { "ip": "192.168.10.151", "http_port": "18030", "jdbc_port": "19030", "user": "spark", "pwd": "yswg123" }, "adv": { "ip": "192.168.10.156", "http_port": "8030", "jdbc_port": "19030", "user": "selection", "pwd": "1w&@Js12a3" } } """ 获取链接信息 :param use_type: adv 广告系统 selection 选品系统 """ @staticmethod def get_connection_info(use_type='selection'): return StarRocksHelper.__connection_info__[use_type] """ 创建starrocks引擎 :param db_name: starrocks对应库名 """ @classmethod def create_starrocks_engine(cls, db_name='selection', use_type='selection'): try: connection_info = StarRocksHelper.get_connection_info(use_type) engine = create_engine( f"mysql+pymysql://{connection_info['user']}:{connection_info['pwd']}@{connection_info['ip']}:{connection_info['jdbc_port']}/{db_name}", pool_size=20, # 连接池的大小 max_overflow=30 # 超出连接池大小之外可以创建的连接数 ) return engine except Exception as e: print("创建失败!") print(e, traceback.format_exc()) raise e """ 执行sql语句 :param db_name: starrocks对应库名 :param sql: 待执行的sql语句 """ @staticmethod def execute_sql(sql, db_name='selection', use_type='selection'): try: engine = StarRocksHelper.create_starrocks_engine(db_name, use_type) print("执行的sql语句是: ", sql) with engine.connect() as connection: val = connection.execute(text(sql)) print("==========================sql执行完毕=====================================") return val except Exception as e: print("执行失败!") print(e, traceback.format_exc()) raise e finally: engine.dispose() """ spark导出dataframe数据到starrocks :param df_save: dataframe对象 :param db_name: starrocks对应库名 :param table_name: 调starrocks对应表名 """ @classmethod def spark_export(cls, df_save, db_name, table_name, use_type='selection'): print(f"执行导出数据到starrock任务,导出数据: {df_save}, starrocks库名: {db_name}, starrocks表名: {table_name}") try: connection_info = StarRocksHelper.get_connection_info(use_type) df_save.write.format("starrocks") \ .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \ .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \ .option("starrocks.table.identifier", f"{db_name}.{table_name}") \ .option("starrocks.user", connection_info['user']) \ .option("starrocks.password", connection_info['pwd']) \ .option("starrocks.write.flush.interval.ms", "30000") \ .option("starrocks.write.properties.column_separator", "|-&|-&|-&|-&|-&|-&|-&") \ .mode("append") \ .save() print("DataFrame数据导出任务完毕") except Exception as e: raise e """ 通过spark读取starrocks生成dataframe :param session: spark对象 :param db_name: starrocks对应库名 :param table_name: 调starrocks对应表名 """ @classmethod def spark_import(cls, session, db_name, table_name, use_type='selection'): print(f"执行读取starrock数据任务, starrocks库名: {db_name}, starrocks表名: {table_name}") try: connection_info = StarRocksHelper.get_connection_info(use_type) return session.read.format("starrocks") \ .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \ .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \ .option("starrocks.table.identifier", f"{db_name}.{table_name}") \ .option("starrocks.user", connection_info['user']) \ .option("starrocks.password", connection_info['pwd']) \ .option("starrocks.request.tablet.size", "3") \ .option("starrocks.batch.size", "40960") \ .option("starrocks.exec.mem.limit", "21474836480") \ .load() print("数据读取任务完毕") except Exception as e: raise e """ 通过spark读取starrocks分区数据生成dataframe :param session: SparkSession对象 :param db_name: starrocks对应库名 :param table_name: starrocks对应表名 :param filter_query: 对应的过滤条件 """ @classmethod def spark_import_with_filter(cls, session, db_name, table_name, filter_query, use_type='selection'): print(f"执行读取starrock数据任务, starrocks库名: {db_name}, starrocks表名: {table_name}, 过滤条件为: {filter_query}") try: connection_info = StarRocksHelper.get_connection_info(use_type) return session.read.format("starrocks") \ .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \ .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \ .option("starrocks.table.identifier", f"{db_name}.{table_name}") \ .option("starrocks.user", connection_info['user']) \ .option("starrocks.password", connection_info['pwd']) \ .option("starrocks.filter.query", f"{filter_query}") \ .option("starrocks.request.tablet.size", "3") \ .option("starrocks.batch.size", "40960") \ .option("starrocks.exec.mem.limit", "21474836480") \ .load() print("通过SQL过滤的数据读取任务完毕") except Exception as e: raise e """ 通过spark通过jdbc方式读取starrocks生成dataframe :param session: spark对象 :param query: 查询sql """ @classmethod def spark_import_with_sql(cls, session, query, use_type='selection'): print(f"执行读取starrock数据任务, sql语句为: {query}") query = query.strip() assert not query.endswith(";"), "sql 末尾不能带有分号,请检查!!!" try: connection_info = StarRocksHelper.get_connection_info(use_type) return session.read.format("jdbc") \ .option("url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \ .option("user", connection_info['user']) \ .option("password", connection_info['pwd']) \ .option("query", query) \ .load() print("数据读取任务完毕") except Exception as e: raise e """ spark导出dataframe数据到starrocks(条件更新) :param df_save: dataframe对象 :param db_name: starrocks对应库名 :param table_name: 调starrocks对应表名 :update_field: 插入更新依据的字段 :table_columns: 插入的具体字段(,连接) """ @classmethod def spark_export_with_update(cls, df_save, db_name, table_name, table_columns, update_field, use_type='selection'): print(f"执行导出数据到starrock任务,导出数据: {df_save}, starrocks库名: {db_name}, starrocks表名: {table_name}, 更新字段为: {update_field}") try: connection_info = StarRocksHelper.get_connection_info(use_type) df_save.write.format("starrocks") \ .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \ .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \ .option("starrocks.table.identifier", f"{db_name}.{table_name}") \ .option("starrocks.user", connection_info['user']) \ .option("starrocks.password", connection_info['pwd']) \ .option("starrocks.columns", f"{table_columns}") \ .option("starrocks.write.properties.merge_condition", f"{update_field}") \ .option("starrocks.write.flush.interval.ms", "30000") \ .option("starrocks.write.properties.column_separator", "|-|-|-|-|-|") \ .mode("append") \ .save() print("指定字段更新的数据导出任务完毕") except Exception as e: raise e """ spark导出dataframe数据到starrocks(指定列) :param df_save: dataframe对象 :param db_name: starrocks对应库名 :param table_name: 调starrocks对应表名 :table_columns: 插入的具体字段(,连接) """ @classmethod def spark_export_with_columns(cls, df_save, db_name, table_name, table_columns, use_type='selection'): print(f"执行导出数据到starrock任务,导出数据: {df_save}, starrocks库名: {db_name}, starrocks表名: {table_name}, 导出的字段为: {table_columns}") try: connection_info = StarRocksHelper.get_connection_info(use_type) df_save.write.format("starrocks") \ .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \ .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \ .option("starrocks.table.identifier", f"{db_name}.{table_name}") \ .option("starrocks.user", connection_info['user']) \ .option("starrocks.password", connection_info['pwd']) \ .option("starrocks.columns", f"{table_columns}") \ .option("starrocks.write.flush.interval.ms", "30000") \ .option("starrocks.write.properties.column_separator", "|-&|-&|-&|-&|-&|-&|-&") \ .option("starrocks.write.properties.format", "json") \ .mode("append") \ .save() print("指定字段的数据导出任务完毕") except Exception as e: raise e """ spark导出dataframe数据到starrocks(导出字段包括array) :param df_save: dataframe对象 :param db_name: starrocks对应库名 :param table_name: 调starrocks对应表名 :array_column_list: array类型字段 "a0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>") """ @classmethod def spark_export_with_array(cls, df_save, db_name, table_name, array_column_list, use_type='selection'): print(f"执行导出数据到starrock任务,导出数据: {df_save}, starrocks库名: {db_name}, starrocks表名: {table_name}, ARRAY类型字段为: {array_column_list}") try: connection_info = StarRocksHelper.get_connection_info(use_type) df_save.write.format("starrocks") \ .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \ .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \ .option("starrocks.table.identifier", f"{db_name}.{table_name}") \ .option("starrocks.user", connection_info['user']) \ .option("starrocks.password", connection_info['pwd']) \ .option("starrocks.column.types", f"{array_column_list}") \ .option("starrocks.write.flush.interval.ms", "30000") \ .option("starrocks.write.properties.column_separator", "|-|-|-|-|-|") \ .mode("append") \ .save() print("包含数组字段的数据导出任务完毕") except Exception as e: raise e """ spark导出dataframe数据到starrocks(指定字段更新) :param df_save: dataframe对象 :param db_name: starrocks对应库名 :param table_name: 调starrocks对应表名 :update_field: 指定字段更新 "id, name" :update_mode: 更新模式 "row:适用于较多列且小批量的实时更新场景; column:适用于少数列并且大量行的批处理更新场景" """ @classmethod def spark_export_with_partial_update(cls, df_save, db_name, table_name, update_field, update_mode, use_type='selection'): print(f"执行导出数据到starrock任务,导出数据: {df_save}, starrocks库名: {db_name}, starrocks表名: {table_name}, 更新字段为: {update_field}, 更新模式为: {update_mode}") try: connection_info = StarRocksHelper.get_connection_info(use_type) df_save.write.format("starrocks") \ .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \ .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \ .option("starrocks.table.identifier", f"{db_name}.{table_name}") \ .option("starrocks.user", connection_info['user']) \ .option("starrocks.password", connection_info['pwd']) \ .option("starrocks.write.properties.partial_update", "true") \ .option("starrocks.write.properties.partial_update_mode", f"{update_mode}") \ .option("starrocks.columns", f"{update_field}") \ .option("starrocks.write.flush.interval.ms", "30000") \ .option("starrocks.write.properties.column_separator", "|-|-|-|-|-|") \ .mode("append") \ .save() print("部分字段更新数据导出任务完毕") except Exception as e: raise e