from pyspark.sql import functions as F, DataFrame class DorisHelper(object): """ 导入导出工具https://doris.apache.org/docs/2.0/ecosystem/spark-doris-connector/ """ __connection_info__ = { "adv": { "ip": "192.168.10.156", "http_port": 8030, "jdbc_port": 19030, "user": "selection", "pwd": "K8@wZp#3nX", "def_db": "advertising_manager" }, "selection": { "ip": "192.168.10.151", "http_port": 48030, "jdbc_port": 49030, "user": "spark", "pwd": "yswg123" } } @staticmethod def get_connection_info(use_type='adv'): """ 获取链接信息 :param use_type: adv 广告系统 :return: """ return DorisHelper.__connection_info__[use_type] @classmethod def spark_export(cls, df_save: DataFrame, table_name, use_type='adv'): """ :param df_save: :param db_name: :param table_name: :param use_type: :return: """ print(f"执行导出数据到Doris任务,导出数据: {df_save}, Doris表名: {table_name}") try: connection_info = DorisHelper.get_connection_info(use_type) options = { "doris.fenodes": f"{connection_info['ip']}:{connection_info['jdbc_port']}", "user": {connection_info['user']}, "password": {connection_info['pwd']}, "doris.table.identifier": table_name, } df_save.write.format("doris") \ .options(**options) \ .mode("append") \ .save() print("DataFrame数据导出任务完毕") except Exception as e: raise e """ 通过spark通过jdbc方式读取doris生成dataframe :param session: spark对象 :param query: 查询sql """ @classmethod def spark_import_with_sql(cls, session, query, use_type='selection'): print(f"执行读取starrock数据任务, sql语句为: {query}") query = query.strip() assert not query.endswith(";"), "sql 末尾不能带有分号,请检查!!!" try: connection_info = DorisHelper.get_connection_info(use_type) return session.read.format("jdbc") \ .option("url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \ .option("user", connection_info['user']) \ .option("password", connection_info['pwd']) \ .option("query", query) \ .load() print("数据读取任务完毕") except Exception as e: raise e """ spark导出dataframe数据到doris(条件更新) :param df_save: dataframe对象 :param db_name:doris对应库名 :param table_name: 调doris对应表名 :update_field: 插入更新依据的字段 :table_columns: 插入的具体字段(,连接) """ @classmethod def spark_export_with_update(cls, df_save, db_name, table_name, table_columns, update_field, use_type='selection'): print( f"执行导出数据到doris任务,导出数据: {df_save}, doris库名: {db_name}, doris表名: {table_name}, 更新字段为: {update_field}") try: connection_info = DorisHelper.get_connection_info(use_type) df_save.write.format("doris") \ .option("doris.fenodes", f"{connection_info['ip']}:{connection_info['http_port']}") \ .option("doris.table.identifier", f"{db_name}.{table_name}") \ .option("user", connection_info['user']) \ .option("password", connection_info['pwd']) \ .option("doris.write.fields", f"{table_columns}") \ .optin("doris.sink.properties.type", "DELETE") \ .option("doris.sink.properties.sequence_col", f"{update_field}") \ .option("doris.sink.batch.interval.ms", "30000") \ .option("doris.sink.properties.format", "json") \ .mode("append") \ .save() print("指定字段更新的数据导出任务完毕") except Exception as e: raise e """ spark导出dataframe数据到doris(指定列) :param df_save: dataframe对象 :param db_name: doris对应库名 :param table_name: 调doris对应表名 :table_columns: 插入的具体字段(,连接) """ @classmethod def spark_export_with_columns(cls, df_save, db_name, table_name, table_columns, use_type='selection'): print( f"执行导出数据到doris任务,导出数据: {df_save}, doris库名: {db_name}, doris表名: {table_name}, 导出的字段为: {table_columns}") try: connection_info = DorisHelper.get_connection_info(use_type) df_save.write.format("doris") \ .option("doris.fenodes", f"{connection_info['ip']}:{connection_info['http_port']}") \ .option("doris.table.identifier", f"{db_name}.{table_name}") \ .option("user", connection_info['user']) \ .option("password", connection_info['pwd']) \ .option("doris.write.fields", f"{table_columns}") \ .option("doris.sink.batch.interval.ms", "30000") \ .option("doris.sink.properties.column_separator", "|-&|-&|-&|-&|-&|-&|-&") \ .option("doris.sink.properties.format", "json") \ .mode("append") \ .save() print("指定字段的数据导出任务完毕") except Exception as e: raise e """ spark导出dataframe数据到doris(指定字段更新) :param df_save: dataframe对象 :param db_name: doris对应库名 :param table_name: doris对应表名 :update_field: 指定字段更新 "id, name" :update_mode: 更新模式 "row:适用于较多列且小批量的实时更新场景; column:适用于少数列并且大量行的批处理更新场景" """ @classmethod def spark_export_with_partial_update(cls, df_save, db_name, table_name, update_field, update_mode, use_type='selection'): print( f"执行导出数据到doris任务,导出数据: {df_save}, doris库名: {db_name}, doris表名: {table_name}, 更新字段为: {update_field}, 更新模式为: {update_mode}") try: connection_info = DorisHelper.get_connection_info(use_type) df_save.write.format("starrocks") \ .option("doris.fenodes", f"{connection_info['ip']}:{connection_info['http_port']}") \ .option("doris.table.identifier", f"{db_name}.{table_name}") \ .option("user", connection_info['user']) \ .option("password", connection_info['pwd']) \ .option("doris.sink.properties.partial_columns", "true") \ .option("doris.sink.batch.interval.ms", "30000") \ .option("doris.sink.properties.format", "json") \ .mode("append") \ .save() print("部分字段更新数据导出任务完毕") except Exception as e: raise e