DorisHelper.py 7.33 KB
from pyspark.sql import functions as F, DataFrame


class DorisHelper(object):
    """
    导入导出工具https://doris.apache.org/docs/2.0/ecosystem/spark-doris-connector/
    """
    __connection_info__ = {
        "adv": {
            "ip": "192.168.10.156",
            "http_port": 8030,
            "jdbc_port": 19030,
            "user": "selection",
            "pwd": "1w&@Js12a3",
            "def_db": "advertising_manager"
        },
        "selection": {
            "ip": "192.168.10.151",
            "http_port": 48030,
            "jdbc_port": 49030,
            "user": "spark",
            "pwd": "yswg123"
        }
    }

    @staticmethod
    def get_connection_info(use_type='adv'):
        """
        获取链接信息
        :param use_type:  adv 广告系统
        :return:
        """
        return DorisHelper.__connection_info__[use_type]

    @classmethod
    def spark_export(cls, df_save: DataFrame, table_name, use_type='adv'):
        """
        :param df_save:
        :param db_name:
        :param table_name:
        :param use_type:
        :return:
        """
        print(f"执行导出数据到Doris任务,导出数据: {df_save}, Doris表名: {table_name}")
        try:
            connection_info = DorisHelper.get_connection_info(use_type)
            options = {
                "doris.fenodes": f"{connection_info['ip']}:{connection_info['jdbc_port']}",
                "user": {connection_info['user']},
                "password": {connection_info['pwd']},
                "doris.table.identifier": table_name,
            }
            df_save.write.format("doris") \
                .options(**options) \
                .mode("append") \
                .save()
            print("DataFrame数据导出任务完毕")
        except Exception as e:
            raise e

    """
       通过spark通过jdbc方式读取doris生成dataframe
      :param session: spark对象
      :param query: 查询sql
       """

    @classmethod
    def spark_import_with_sql(cls, session, query, use_type='selection'):
        print(f"执行读取starrock数据任务, sql语句为: {query}")
        query = query.strip()
        assert not query.endswith(";"), "sql 末尾不能带有分号,请检查!!!"
        try:
            connection_info = DorisHelper.get_connection_info(use_type)
            return session.read.format("jdbc") \
                .option("url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
                .option("user", connection_info['user']) \
                .option("password", connection_info['pwd']) \
                .option("query", query) \
                .load()
            print("数据读取任务完毕")
        except Exception as e:
            raise e

    """
     spark导出dataframe数据到doris(条件更新)
     :param df_save: dataframe对象
     :param db_name:doris对应库名
     :param table_name: 调doris对应表名
     :update_field: 插入更新依据的字段
     :table_columns: 插入的具体字段(,连接)
     """

    @classmethod
    def spark_export_with_update(cls, df_save, db_name, table_name, table_columns, update_field, use_type='selection'):
        print(
            f"执行导出数据到doris任务,导出数据: {df_save}, doris库名: {db_name}, doris表名: {table_name}, 更新字段为: {update_field}")
        try:
            connection_info = DorisHelper.get_connection_info(use_type)
            df_save.write.format("doris") \
                .option("doris.fenodes", f"{connection_info['ip']}:{connection_info['http_port']}") \
                .option("doris.table.identifier", f"{db_name}.{table_name}") \
                .option("user", connection_info['user']) \
                .option("password", connection_info['pwd']) \
                .option("doris.write.fields", f"{table_columns}") \
                .optin("doris.sink.properties.type", "DELETE") \
                .option("doris.sink.properties.sequence_col", f"{update_field}") \
                .option("doris.sink.batch.interval.ms", "30000") \
                .option("doris.sink.properties.format", "json") \
                .mode("append") \
                .save()
            print("指定字段更新的数据导出任务完毕")
        except Exception as e:
            raise e

    """
         spark导出dataframe数据到doris(指定列)
         :param df_save: dataframe对象
         :param db_name: doris对应库名
         :param table_name: 调doris对应表名
         :table_columns: 插入的具体字段(,连接)
         """

    @classmethod
    def spark_export_with_columns(cls, df_save, db_name, table_name, table_columns, use_type='selection'):
        print(
            f"执行导出数据到doris任务,导出数据: {df_save}, doris库名: {db_name}, doris表名: {table_name}, 导出的字段为: {table_columns}")
        try:
            connection_info = DorisHelper.get_connection_info(use_type)
            df_save.write.format("doris") \
                .option("doris.fenodes", f"{connection_info['ip']}:{connection_info['http_port']}") \
                .option("doris.table.identifier", f"{db_name}.{table_name}") \
                .option("user", connection_info['user']) \
                .option("password", connection_info['pwd']) \
                .option("doris.write.fields", f"{table_columns}") \
                .option("doris.sink.batch.interval.ms", "30000") \
                .option("doris.sink.properties.column_separator", "|-&|-&|-&|-&|-&|-&|-&") \
                .option("doris.sink.properties.format", "json") \
                .mode("append") \
                .save()
            print("指定字段的数据导出任务完毕")
        except Exception as e:
            raise e


    """
            spark导出dataframe数据到doris(指定字段更新)
            :param df_save: dataframe对象
            :param db_name: doris对应库名
            :param table_name: doris对应表名
            :update_field: 指定字段更新   "id, name"
            :update_mode: 更新模式     "row:适用于较多列且小批量的实时更新场景; column:适用于少数列并且大量行的批处理更新场景"
            """

    @classmethod
    def spark_export_with_partial_update(cls, df_save, db_name, table_name, update_field, update_mode,
                                         use_type='selection'):
        print(
            f"执行导出数据到doris任务,导出数据: {df_save}, doris库名: {db_name}, doris表名: {table_name}, 更新字段为: {update_field}, 更新模式为: {update_mode}")
        try:
            connection_info = DorisHelper.get_connection_info(use_type)
            df_save.write.format("starrocks") \
                .option("doris.fenodes", f"{connection_info['ip']}:{connection_info['http_port']}") \
                .option("doris.table.identifier", f"{db_name}.{table_name}") \
                .option("user", connection_info['user']) \
                .option("password", connection_info['pwd']) \
                .option("doris.sink.properties.partial_columns", "true") \
                .option("doris.sink.batch.interval.ms", "30000") \
                .option("doris.sink.properties.format", "json") \
                .mode("append") \
                .save()
            print("部分字段更新数据导出任务完毕")
        except Exception as e:
            raise e