StarRocksHelper.py 14.4 KB
# author : wangrui
# data : 2024/4/19 14:57

import os
import sys
from sqlalchemy import create_engine, text
import traceback


class StarRocksHelper(object):
    """
        starrocks导入导出参考文档地址:
            https://docs.starrocks.io/zh/docs/loading/loading_introduction/Loading_intro/
    """
    __connection_info__ = {
        "selection": {
            "ip": "192.168.10.151",
            "http_port": "18030",
            "jdbc_port": "19030",
            "user": "spark",
            "pwd": "yswg123"
        },
        "adv": {
            "ip": "192.168.10.156",
            "http_port": "8030",
            "jdbc_port": "19030",
            "user": "selection",
            "pwd": "1w&@Js12a3"
        }
    }

    """
    获取链接信息
    :param use_type: adv        广告系统
                     selection  选品系统
    """
    @staticmethod
    def get_connection_info(use_type='selection'):
        return StarRocksHelper.__connection_info__[use_type]

    """
     创建starrocks引擎
     :param db_name: starrocks对应库名
     """
    @classmethod
    def create_starrocks_engine(cls, db_name='selection', use_type='selection'):
        try:
            connection_info = StarRocksHelper.get_connection_info(use_type)
            engine = create_engine(
                f"mysql+pymysql://{connection_info['user']}:{connection_info['pwd']}@{connection_info['ip']}:{connection_info['jdbc_port']}/{db_name}",
                pool_size=20,  # 连接池的大小
                max_overflow=30  # 超出连接池大小之外可以创建的连接数
            )
            return engine
        except Exception as e:
            print("创建失败!")
            print(e, traceback.format_exc())
            raise e

    """
    执行sql语句
    :param db_name: starrocks对应库名
    :param sql: 待执行的sql语句
    """
    @staticmethod
    def execute_sql(sql, db_name='selection', use_type='selection'):
        try:
            engine = StarRocksHelper.create_starrocks_engine(db_name, use_type)
            print("执行的sql语句是: ", sql)
            with engine.connect() as connection:
                val = connection.execute(text(sql))
                print("==========================sql执行完毕=====================================")
                return val
        except Exception as e:
            print("执行失败!")
            print(e, traceback.format_exc())
            raise e
        finally:
            engine.dispose()

    """
    spark导出dataframe数据到starrocks
    :param df_save: dataframe对象
    :param db_name: starrocks对应库名
    :param table_name: 调starrocks对应表名
    """
    @classmethod
    def spark_export(cls, df_save, db_name, table_name, use_type='selection'):
        print(f"执行导出数据到starrock任务,导出数据: {df_save}, starrocks库名: {db_name}, starrocks表名: {table_name}")
        try:
            connection_info = StarRocksHelper.get_connection_info(use_type)
            df_save.write.format("starrocks") \
                .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \
                .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
                .option("starrocks.table.identifier", f"{db_name}.{table_name}") \
                .option("starrocks.user", connection_info['user']) \
                .option("starrocks.password", connection_info['pwd']) \
                .option("starrocks.write.flush.interval.ms", "30000") \
                .option("starrocks.write.properties.column_separator", "|-&|-&|-&|-&|-&|-&|-&") \
                .mode("append") \
                .save()
            print("DataFrame数据导出任务完毕")
        except Exception as e:
            raise e

    """
    通过spark读取starrocks生成dataframe
    :param session: spark对象
    :param db_name: starrocks对应库名
    :param table_name: 调starrocks对应表名
    """
    @classmethod
    def spark_import(cls, session, db_name, table_name, use_type='selection'):
        print(f"执行读取starrock数据任务, starrocks库名: {db_name}, starrocks表名: {table_name}")
        try:
            connection_info = StarRocksHelper.get_connection_info(use_type)
            return session.read.format("starrocks") \
                    .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \
                    .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
                    .option("starrocks.table.identifier", f"{db_name}.{table_name}") \
                    .option("starrocks.user", connection_info['user']) \
                    .option("starrocks.password", connection_info['pwd']) \
                    .option("starrocks.request.tablet.size", "3") \
                    .option("starrocks.batch.size", "40960") \
                    .option("starrocks.exec.mem.limit", "21474836480") \
                    .load()
            print("数据读取任务完毕")
        except Exception as e:
            raise e

    """
        通过spark读取starrocks分区数据生成dataframe
        :param session: SparkSession对象
        :param db_name: starrocks对应库名
        :param table_name: starrocks对应表名
        :param filter_query: 对应的过滤条件
    """
    @classmethod
    def spark_import_with_filter(cls, session, db_name, table_name, filter_query, use_type='selection'):
        print(f"执行读取starrock数据任务, starrocks库名: {db_name}, starrocks表名: {table_name}, 过滤条件为: {filter_query}")
        try:
            connection_info = StarRocksHelper.get_connection_info(use_type)
            return session.read.format("starrocks") \
                .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \
                .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
                .option("starrocks.table.identifier", f"{db_name}.{table_name}") \
                .option("starrocks.user", connection_info['user']) \
                .option("starrocks.password", connection_info['pwd']) \
                .option("starrocks.filter.query", f"{filter_query}") \
                .option("starrocks.request.tablet.size", "3") \
                .option("starrocks.batch.size", "40960") \
                .option("starrocks.exec.mem.limit", "21474836480") \
                .load()
            print("通过SQL过滤的数据读取任务完毕")
        except Exception as e:
            raise e

    """
    通过spark通过jdbc方式读取starrocks生成dataframe
   :param session: spark对象
   :param query: 查询sql
    """
    @classmethod
    def spark_import_with_sql(cls, session, query, use_type='selection'):
        print(f"执行读取starrock数据任务, sql语句为: {query}")
        query = query.strip()
        assert not query.endswith(";"), "sql 末尾不能带有分号,请检查!!!"
        try:
            connection_info = StarRocksHelper.get_connection_info(use_type)
            return session.read.format("jdbc") \
                .option("url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
                .option("user", connection_info['user']) \
                .option("password", connection_info['pwd']) \
                .option("query", query) \
                .load()
            print("数据读取任务完毕")
        except Exception as e:
            raise e

    """
     spark导出dataframe数据到starrocks(条件更新)
     :param df_save: dataframe对象
     :param db_name: starrocks对应库名
     :param table_name: 调starrocks对应表名
     :update_field: 插入更新依据的字段
     :table_columns: 插入的具体字段(,连接)
     """
    @classmethod
    def spark_export_with_update(cls, df_save, db_name, table_name, table_columns, update_field, use_type='selection'):
        print(f"执行导出数据到starrock任务,导出数据: {df_save}, starrocks库名: {db_name}, starrocks表名: {table_name}, 更新字段为: {update_field}")
        try:
            connection_info = StarRocksHelper.get_connection_info(use_type)
            df_save.write.format("starrocks") \
                .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \
                .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
                .option("starrocks.table.identifier", f"{db_name}.{table_name}") \
                .option("starrocks.user", connection_info['user']) \
                .option("starrocks.password", connection_info['pwd']) \
                .option("starrocks.columns", f"{table_columns}") \
                .option("starrocks.write.properties.merge_condition", f"{update_field}") \
                .option("starrocks.write.flush.interval.ms", "30000") \
                .option("starrocks.write.properties.column_separator", "|-|-|-|-|-|") \
                .mode("append") \
                .save()
            print("指定字段更新的数据导出任务完毕")
        except Exception as e:
            raise e

    """
         spark导出dataframe数据到starrocks(指定列)
         :param df_save: dataframe对象
         :param db_name: starrocks对应库名
         :param table_name: 调starrocks对应表名
         :table_columns: 插入的具体字段(,连接)
         """
    @classmethod
    def spark_export_with_columns(cls, df_save, db_name, table_name, table_columns, use_type='selection'):
        print(f"执行导出数据到starrock任务,导出数据: {df_save}, starrocks库名: {db_name}, starrocks表名: {table_name}, 导出的字段为: {table_columns}")
        try:
            connection_info = StarRocksHelper.get_connection_info(use_type)
            df_save.write.format("starrocks") \
                .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \
                .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
                .option("starrocks.table.identifier", f"{db_name}.{table_name}") \
                .option("starrocks.user", connection_info['user']) \
                .option("starrocks.password", connection_info['pwd']) \
                .option("starrocks.columns", f"{table_columns}") \
                .option("starrocks.write.flush.interval.ms", "30000") \
                .option("starrocks.write.properties.column_separator", "|-&|-&|-&|-&|-&|-&|-&") \
                .option("starrocks.write.properties.format", "json") \
                .mode("append") \
                .save()
            print("指定字段的数据导出任务完毕")
        except Exception as e:
            raise e

    """
            spark导出dataframe数据到starrocks(导出字段包括array)
            :param df_save: dataframe对象
            :param db_name: starrocks对应库名
            :param table_name: 调starrocks对应表名
            :array_column_list: array类型字段   "a0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>")
            """
    @classmethod
    def spark_export_with_array(cls, df_save, db_name, table_name, array_column_list, use_type='selection'):
        print(f"执行导出数据到starrock任务,导出数据: {df_save}, starrocks库名: {db_name}, starrocks表名: {table_name}, ARRAY类型字段为: {array_column_list}")
        try:
            connection_info = StarRocksHelper.get_connection_info(use_type)
            df_save.write.format("starrocks") \
                .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \
                .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
                .option("starrocks.table.identifier", f"{db_name}.{table_name}") \
                .option("starrocks.user", connection_info['user']) \
                .option("starrocks.password", connection_info['pwd']) \
                .option("starrocks.column.types", f"{array_column_list}") \
                .option("starrocks.write.flush.interval.ms", "30000") \
                .option("starrocks.write.properties.column_separator", "|-|-|-|-|-|") \
                .mode("append") \
                .save()
            print("包含数组字段的数据导出任务完毕")
        except Exception as e:
            raise e

    """
            spark导出dataframe数据到starrocks(指定字段更新)
            :param df_save: dataframe对象
            :param db_name: starrocks对应库名
            :param table_name: 调starrocks对应表名
            :update_field: 指定字段更新   "id, name"
            :update_mode: 更新模式     "row:适用于较多列且小批量的实时更新场景; column:适用于少数列并且大量行的批处理更新场景"
            """

    @classmethod
    def spark_export_with_partial_update(cls, df_save, db_name, table_name, update_field, update_mode, use_type='selection'):
        print(f"执行导出数据到starrock任务,导出数据: {df_save}, starrocks库名: {db_name}, starrocks表名: {table_name}, 更新字段为: {update_field}, 更新模式为: {update_mode}")
        try:
            connection_info = StarRocksHelper.get_connection_info(use_type)
            df_save.write.format("starrocks") \
                .option("starrocks.fe.http.url", f"{connection_info['ip']}:{connection_info['http_port']}") \
                .option("starrocks.fe.jdbc.url", f"jdbc:mysql://{connection_info['ip']}:{connection_info['jdbc_port']}") \
                .option("starrocks.table.identifier", f"{db_name}.{table_name}") \
                .option("starrocks.user", connection_info['user']) \
                .option("starrocks.password", connection_info['pwd']) \
                .option("starrocks.write.properties.partial_update", "true") \
                .option("starrocks.write.properties.partial_update_mode", f"{update_mode}") \
                .option("starrocks.columns", f"{update_field}") \
                .option("starrocks.write.flush.interval.ms", "30000") \
                .option("starrocks.write.properties.column_separator", "|-|-|-|-|-|") \
                .mode("append") \
                .save()
            print("部分字段更新数据导出任务完毕")
        except Exception as e:
            raise e