import os
import sys
from enum import Enum

from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
from typing import Dict
from sqlalchemy.engine import Engine

sys.path.append(os.path.dirname(sys.path[0]))


class DbTypes(Enum):
    """
    导出数据库类型
    """
    mysql = "mysql"
    postgresql_test = "postgresql_test"
    postgresql = "postgresql"
    postgresql_14 = "postgresql_14"
    postgresql_16 = "postgresql_16"
    postgresql_8 = "postgresql_8"
    postgresql_cluster = "postgresql_cluster"
    adv_us = "adv_us"
    adv_uk = "adv_uk"
    adv_other = "adv_other"
    srs = "srs"
    doris = "doris"


class DBUtil(object):
    __SITE_SET__ = {'us', 'uk', 'de', 'fr', 'es', 'it', 'au', 'ca'}

    # mysql连接参数
    __mysql_host__ = "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com"
    __mysql_port__ = "3306"
    __mysql_username__ = "adv_yswg"
    __mysql_pwd__ = "HCL1zcUgQesaaXNLbL37O5KhpSAy0c"

    # pg连接参数正式库-h15
    __pgsql_host__ = "192.168.10.224"
    __pgsql_port__ = "5433"
    # __pgsql_username__ = "postgres"
    # __pgsql_pwd__ = "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS"
    __pgsql_username__ = "yswg_postgres"
    __pgsql_pwd__ = "yswg_postgres"

    # pg连接参数正式库-h14
    __pgsql_h14_host__ = "192.168.10.223"
    __pgsql_h14_port__ = "5433"
    __pgsql_h14_username__ = "yswg_postgres"
    __pgsql_h14_pwd__ = "yswg_postgres"

    # pg测试库连接参数
    __pgsql_test_host__ = "192.168.10.217"
    __pgsql_test_port__ = "5433"
    __pgsql_test_username__ = "postgres"
    __pgsql_test_pwd__ = "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS"

    # pg集群库连接参数
    # __pgsql_cluster_host__ = "192.168.10.221"
    __pgsql_cluster_host__ = "192.168.10.155"
    __pgsql_cluster_port__ = "6432"
    # __pgsql_cluster_username__ = "postgres"
    # __pgsql_cluster_pwd__ = "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS"
    __pgsql_cluster_username__ = "yswg_postgres"
    __pgsql_cluster_pwd__ = "yswg_postgres"

    # 广告系统us站点阿里云mysql连接参数
    __adv_us_host__ = "rm-wz93hkkmzm5f3868iho.mysql.rds.aliyuncs.com"
    __adv_us_port__ = "3306"
    __adv_us_username__ = "adv_yswg"
    __adv_us_pwd__ = "S4FeR09bFF441lTz"

    # 广告系统uk站点阿里云mysql连接参数
    __adv_uk_host__ = "rm-wz9rrjr9ub877ue8bko.mysql.rds.aliyuncs.com"
    __adv_uk_port__ = "3306"
    __adv_uk_username__ = "adv_yswg"
    __adv_uk_pwd__ = "S4FeR09bFF441lTz"

    # 广告系统小站点mysql连接参数
    __adv_other_host__ = "192.168.10.226"
    __adv_other_port__ = "3306"
    __adv_other_username__ = "root"
    __adv_other_pwd__ = "8cUrBPb0IMY1hfBy"

    # pg8连接参数正式库-h8
    __pgsql_h8_host__ = "192.168.10.210"
    __pgsql_h8_port__ = "5432"
    __pgsql_h8_username__ = "postgres"
    __pgsql_h8_pwd__ = "fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS"

    # pg16
    __pgsql_h16_host__ = "192.168.10.225"
    __pgsql_h16_port__ = "5433"
    __pgsql_h16_username__ = "yswg_postgres"
    __pgsql_h16_pwd__ = "yswg_postgres"

    # star_rocks
    __srs_cluster_host__ = "192.168.10.151"
    __srs_cluster_port__ = "19030"
    __srs_cluster_username__ = "spark"
    __srs_cluster_pwd__ = "yswg123"

    # star_rocks
    __doris_cluster_host__ = "192.168.10.151"
    __doris_cluster_port__ = "49030"
    __doris_cluster_username__ = "fangxingjun"
    __doris_cluster_pwd__ = "fangxingjun12345"

    @staticmethod
    def get_connection_info(db_type: str, site_name: str):
        """
        根据不同站点获取不同的访问db库连接配置
        :param db_type: 连接的数据库类型
        :param site_name: 站点名称
        :return: db连接配置
        """
        assert db_type is not None, "db_type 不能为空!!"
        assert site_name is not None, "站点不能为空!!"
        site_db_map = {
            "us": "selection",
            "uk": "selection_uk",
            "de": "selection_de",
            "es": "selection_es",
            "fr": "selection_fr",
            "it": "selection_it"
        }

        adv_site_db_map = {
            "us": "advertising_manager",
            "uk": "advertising_manager",
            "de": "advertising_manager",
            "es": "advertising_manager",
            "fr": "advertising_manager",
            "it": "advertising_manager"
        }

        if db_type == DbTypes.mysql.name:
            db_name = site_db_map[site_name]
            return {
                "db_name": site_db_map[site_name],
                "db_type": "mysql",
                "url": f"jdbc:mysql://{DBUtil.__mysql_host__}:{DBUtil.__mysql_port__}/{db_name}",
                "username": DBUtil.__mysql_username__,
                "pwd": DBUtil.__mysql_pwd__,
                "host": DBUtil.__mysql_host__,
                "port": DBUtil.__mysql_port__,
            }

        if db_type == DbTypes.srs.name:
            db_name = site_db_map[site_name]
            return {
                # "db_name": site_db_map[site_name],
                "db_name": "selection",
                "db_type": "mysql",
                "url": f"jdbc:mysql://{DBUtil.__srs_cluster_host__}:{DBUtil.__srs_cluster_port__}/{db_name}",
                "username": DBUtil.__srs_cluster_username__,
                "pwd": DBUtil.__srs_cluster_pwd__,
                "host": DBUtil.__srs_cluster_host__,
                "port": DBUtil.__srs_cluster_port__,
            }

        if db_type == DbTypes.doris.name:
            db_name = site_db_map[site_name]
            return {
                # "db_name": site_db_map[site_name],
                "db_name": "selection",
                "db_type": "mysql",
                "url": f"jdbc:mysql://{DBUtil.__doris_cluster_host__}:{DBUtil.__doris_cluster_port__}/{db_name}",
                "username": DBUtil.__doris_cluster_username__,
                "pwd": DBUtil.__doris_cluster_pwd__,
                "host": DBUtil.__doris_cluster_host__,
                "port": DBUtil.__doris_cluster_port__,
            }

        if db_type == DbTypes.postgresql.name:
            db_name = site_db_map[site_name]
            return {
                "db_name": site_db_map[site_name],
                "db_type": "postgresql",
                "url": f"jdbc:postgresql://{DBUtil.__pgsql_host__}:{DBUtil.__pgsql_port__}/{db_name}",
                "username": DBUtil.__pgsql_username__,
                "host": DBUtil.__pgsql_host__,
                "port": DBUtil.__pgsql_port__,
                "pwd": DBUtil.__pgsql_pwd__,
            }

        if db_type == DbTypes.postgresql_14.name:
            db_name = site_db_map[site_name]
            return {
                "db_name": site_db_map[site_name],
                "db_type": "postgresql",
                "url": f"jdbc:postgresql://{DBUtil.__pgsql_h14_host__}:{DBUtil.__pgsql_h14_port__}/{db_name}",
                "username": DBUtil.__pgsql_h14_username__,
                "host": DBUtil.__pgsql_h14_host__,
                "port": DBUtil.__pgsql_h14_port__,
                "pwd": DBUtil.__pgsql_h14_pwd__,
            }

        if db_type == DbTypes.postgresql_8.name:
            db_name = site_db_map[site_name]
            return {
                "db_name": site_db_map[site_name],
                "db_type": "postgresql",
                "url": f"jdbc:postgresql://{DBUtil.__pgsql_h8_host__}:{DBUtil.__pgsql_h8_port__}/{db_name}",
                "username": DBUtil.__pgsql_h8_username__,
                "host": DBUtil.__pgsql_h8_host__,
                "port": DBUtil.__pgsql_h8_port__,
                "pwd": DBUtil.__pgsql_h8_pwd__,
            }

        if db_type == DbTypes.postgresql_16.name:
            db_name = site_db_map[site_name]
            return {
                "db_name": site_db_map[site_name],
                "db_type": "postgresql",
                "url": f"jdbc:postgresql://{DBUtil.__pgsql_h16_host__}:{DBUtil.__pgsql_h16_port__}/{db_name}",
                "username": DBUtil.__pgsql_h16_username__,
                "host": DBUtil.__pgsql_h16_host__,
                "port": DBUtil.__pgsql_h16_port__,
                "pwd": DBUtil.__pgsql_h16_pwd__,
            }

        if db_type == DbTypes.postgresql_test.name:
            db_name = site_db_map[site_name]
            return {
                "db_name": site_db_map[site_name],
                "db_type": "postgresql",
                "url": f"jdbc:postgresql://{DBUtil.__pgsql_test_host__}:{DBUtil.__pgsql_test_port__}/{db_name}",
                "username": DBUtil.__pgsql_test_username__,
                "pwd": DBUtil.__pgsql_test_pwd__,
                "host": DBUtil.__pgsql_test_host__,
                "port": DBUtil.__pgsql_test_port__,
            }

        if db_type == DbTypes.postgresql_cluster.name:
            # 注意集群不分库
            db_name = "postgres"
            return {
                "db_name": db_name,
                "db_type": "postgresql",
                "url": f"jdbc:postgresql://{DBUtil.__pgsql_cluster_host__}:{DBUtil.__pgsql_cluster_port__}/{db_name}",
                "username": DBUtil.__pgsql_cluster_username__,
                "pwd": DBUtil.__pgsql_cluster_pwd__,
                "host": DBUtil.__pgsql_cluster_host__,
                "port": DBUtil.__pgsql_cluster_port__,
            }

        if db_type == DbTypes.adv_us.name:
            db_name = adv_site_db_map[site_name]
            return {
                "db_name": adv_site_db_map[site_name],
                "db_type": "adv_us",
                "url": f"jdbc:mysql://{DBUtil.__adv_us_host__}:{DBUtil.__adv_us_port__}/{db_name}",
                "username": DBUtil.__adv_us_username__,
                "pwd": DBUtil.__adv_us_pwd__,
                "host": DBUtil.__adv_us_host__,
                "port": DBUtil.__adv_us_port__,
            }

        if db_type == DbTypes.adv_uk.name:
            db_name = adv_site_db_map[site_name]
            return {
                "db_name": adv_site_db_map[site_name],
                "db_type": "adv_uk",
                "url": f"jdbc:mysql://{DBUtil.__adv_uk_host__}:{DBUtil.__adv_uk_port__}/{db_name}",
                "username": DBUtil.__adv_uk_username__,
                "pwd": DBUtil.__adv_uk_pwd__,
                "host": DBUtil.__adv_uk_host__,
                "port": DBUtil.__adv_uk_port__,
            }

        if db_type == DbTypes.adv_other.name:
            db_name = adv_site_db_map[site_name]
            return {
                "db_name": adv_site_db_map[site_name],
                "db_type": "adv_other",
                "url": f"jdbc:mysql://{DBUtil.__adv_other_host__}:{DBUtil.__adv_other_port__}/{db_name}",
                "username": DBUtil.__adv_other_username__,
                "pwd": DBUtil.__adv_other_pwd__,
                "host": DBUtil.__adv_other_host__,
                "port": DBUtil.__adv_other_port__,
            }

    @staticmethod
    def get_db_engine(db_type, site_name) -> Engine:
        """
        根据不同站点和数据库类型,获取数据库连接对象
        :param db_type: 连接的数据库类型
        :param site_name: 站点名称
        :return: engine: 数据库连接对象
        """
        conn = DBUtil.get_connection_info(db_type, site_name)
        assert conn is not None, "获取连接配置信息错误,请检查"
        if conn['db_type'] == 'postgresql':
            engine = create_engine(
                f"postgresql://{conn['username']}:{conn['pwd']}@{conn['host']}:{conn['port']}/{conn['db_name']}",
                pool_size=10,  # 连接池的大小
                max_overflow=20  # 超出连接池大小之外可以创建的连接数
            )
        # 获取数据库连接
        elif conn['db_type'] == 'mysql':
            engine = create_engine(
                f"mysql+pymysql://{conn['username']}:{conn['pwd']}@{conn['host']}:{conn['port']}/{conn['db_name']}",
                pool_size=10,  # 连接池的大小
                max_overflow=20  # 超出连接池大小之外可以创建的连接数
            )
        else:
            raise Exception("数据库不支持")
        return engine

    @staticmethod
    def exec_sql(db_type, site_name, sql: str, dispose_flag=False):
        """
        根据不同站点获取不同的访问db库获取连接并执行sql sql 多条sql以分号;隔开
        :param db_type: 连接的数据库类型
        :param site_name: 站点名称
        :param sql: 需要执行的sql语句
        :param dispose_flag: 执行完毕后是否释放销毁
        """
        engine = DBUtil.get_db_engine(db_type, site_name)
        with engine.connect() as connection:
            for linesql in sql.split(";"):
                linesql = linesql.strip()
                if len(linesql) > 0:
                    print("==========================执行sql中=====================================")
                    print(linesql)
                    connection.execute(linesql)
        print("==========================sql执行完毕=====================================")
        if dispose_flag:
            engine.dispose()
        pass

    @staticmethod
    def engine_exec_sql(engine, sql: str):
        """
         根据已经建立的数据库engine执行sql
        :param engine: 数据库连接对象
        :param sql: 需要执行的sql语句
        """
        print("==========================执行sql中=====================================")
        print(sql)
        with engine.connect() as connection:
            val = connection.execute(text(sql.strip()))
            print("==========================sql执行完毕=====================================")
            return val
        pass

    @classmethod
    def copy_pg_tb_index(cls, engine: Engine, source_tb_name: str, target_tb_name: str):
        """
        copy source_tb_name 表的索引生成建立索引语句 注意target_tb_name的索引必须为空 不然可能触发重复建立索引的情况
        :param engine:
        :param source_tb_name:
        :param target_tb_name:
        :return:
        """
        indexs = []
        indexs_cp = []
        with engine.connect() as connection:
            sql = f"""select schemaname, tablename, indexname, indexdef from pg_indexes  where tablename = '{source_tb_name}'"""
            rows = connection.execute(sql)
            i = 0
            for row in rows:
                i = i + 1
                # 判断是否是重复索引 todo
                indexdef = str(row['indexdef'])
                schemaname = row['schemaname']
                indexname = row['indexname']
                suffix = indexdef[indexdef.rfind("USING"):]
                prefix = indexdef[0:indexdef.rfind("INDEX")]
                fields = suffix[suffix.rfind("(") + 1: suffix.rfind(")")].split(",")
                # tmp = "_".join(list(map(lambda it: str(it).strip(), fields)))
                pk = "_pk" if str(indexname).endswith("pk") else ""
                sql = f"{prefix} index {source_tb_name}_idx{i}_{datetime.now().strftime('%m_%d_%H%M')}{pk} on {schemaname}.{target_tb_name} {suffix};".lower()
                indexs_cp.append(sql)
                indexs.append({
                    "schemaname": schemaname,
                    "indexname": row['indexname'],
                    "tablename": row['tablename'],
                    "suffix": suffix,
                    "fields": fields
                })

        return indexs, indexs_cp

    @classmethod
    def exchange_tb(cls, engine: Engine, source_tb_name: str, target_tb_name: str, cp_index_flag: bool):
        """
        交换表名 注意两个表必须都是普通表 而不是分区表
        :param engine: 引擎
        :param source_tb_name:原始表
        :param target_tb_name:目标表
        :param copy_index_flag:是否创建索引
        :return:
        """
        import random
        index_cp_sql = None
        if cp_index_flag:
            indexs, indexs_cp = cls.copy_pg_tb_index(engine, target_tb_name, source_tb_name)
            if len(indexs_cp) > 0:
                index_cp_sql = "\n".join(indexs_cp)

        with engine.connect() as connection:
            # 先重新构建索引
            if index_cp_sql is not None:
                print("================================重新构建索引中================================")
                print(index_cp_sql)
                connection.execute(index_cp_sql)
            suffix_int = random.randint(1, 200)
            sql = f"""
                                alter table {target_tb_name} rename to {target_tb_name}_back_{suffix_int};
                                alter table {source_tb_name} rename to {target_tb_name};
                                alter table {target_tb_name}_back_{suffix_int} rename to {source_tb_name};
                            """
            print(f"================================交换表{source_tb_name}到{target_tb_name}中================================")
            for line in sql.strip().split("\n"):
                line = line.strip()
                if len(line) > 2:
                    print(line)
                    connection.execute(text(line))
                pass
        return True

    @classmethod
    def add_pg_part(cls, engine: Engine,
                    source_tb_name: str,
                    part_master_tb: str,
                    cp_index_flag: bool,
                    part_val: Dict,
                    print_flag: bool = False
                    ):
        """
        把普通表 source_tb_name 添加到分区表
        :param engine:  引擎
        :param source_tb_name:普通表
        :param part_master_tb: 分区表母表
        :param cp_index_flag: 是否在复制master的索引
        :param part_val: 分区值 from  to key 的dict
        :return:
        """
        val1 = ",".join(list(map(lambda it: f"'{str(it).strip()}'", part_val['from'])))
        val2 = ",".join(list(map(lambda it: f"'{str(it).strip()}'", part_val['to'])))
        index_cp_sql = None
        if cp_index_flag:
            indexs, indexs_cp = cls.copy_pg_tb_index(engine, part_master_tb, source_tb_name)
            if len(indexs_cp) > 0:
                index_cp_sql = "\n".join(indexs_cp)

        add_sql = f"""alter table {part_master_tb} attach partition {source_tb_name} for values from ({val1}) to ({val2});"""
        # 只是打印
        if print_flag:
            if index_cp_sql is not None:
                print("================================构建索引如下================================")
                print(index_cp_sql)

            print("================================普通表加入分区sql如下================================")
            print(add_sql)
            return

        # 真正执行
        with engine.connect() as connection:
            # 先重新构建索引
            if index_cp_sql is not None:
                print("================================构建索引中================================")
                print(index_cp_sql)
                connection.execute(index_cp_sql)
            print(f"==========================加入普通表{source_tb_name}到分区表{part_master_tb}中===========================")
            print(add_sql)
            connection.execute(add_sql)
        return True

    @classmethod
    def exchange_pg_part_distributed_tb(cls, engine: Engine,
                                        source_tb_name: str,
                                        part_master_tb: str,
                                        part_target_tb: str,
                                        part_val: Dict,
                                        drop_old: bool = True
                                        ):
        """
        交换普通表 source_tb_name 到分布式的分区表的指定分区 如果分区表的分区不存在则直接添加进去 存在则交换
        :param engine:  引擎
        :param source_tb_name:要加入到分区母表的普通表
        :param part_master_tb: 分区表母表
        :param part_target_tb: 要加入到分区表的目标分区子表
        :param part_val: 分区值 from  to key 的dict
        :return:
        """
        # 先判断目标分区是不是已存在
        val = DBUtil.engine_exec_sql(engine, f"""
        SELECT * FROM pg_partition_tree('{part_master_tb}') where relid::varchar='{part_target_tb}'
""")
        exist_flag = len(list(val)) > 0
        if exist_flag:
            DBUtil.engine_exec_sql(engine, f"""alter table {part_master_tb} detach partition {part_target_tb}""")
            DBUtil.engine_exec_sql(engine, f"""alter table {part_target_tb} rename to {part_target_tb}_back""")
            pass

        if source_tb_name != part_target_tb:
            DBUtil.engine_exec_sql(engine, f"""alter table {source_tb_name} rename to {part_target_tb};""")

        val1 = ",".join(list(map(lambda it: f"'{str(it).strip()}'", part_val['from'])))
        val2 = ",".join(list(map(lambda it: f"'{str(it).strip()}'", part_val['to'])))

        # 新加入的表重命名并加入到分区中
        DBUtil.engine_exec_sql(engine, f"""
        alter table {part_master_tb} attach partition {part_target_tb} for values from ({val1}) to ({val2});
""")
        if drop_old:
            DBUtil.engine_exec_sql(engine, f"""drop table  if exists {part_target_tb}_back """)
        else:
            # 交换back
            DBUtil.engine_exec_sql(engine, f"""alter table {part_target_tb}_back rename to {source_tb_name}""")

        print(f"==================表{source_tb_name}加入成功==================================")
        pass

    @classmethod
    def exchange_pg_part_tb(cls, engine: Engine,
                            source_tb_name: str,
                            part_master_tb: str,
                            part_target_tb: str,
                            cp_index_flag: bool,
                            part_val: Dict,
                            print_flag: bool = False
                            ):
        """
        交换普通表 source_tb_name 到分区表
        :param engine:  引擎
        :param source_tb_name:普通表
        :param part_master_tb: 分区表母表
        :param part_target_tb: 分区表目标表
        :param cp_index_flag: 是否在源表复制创建索引
        :param part_val: 分区值 from  to key 的dict
        :return:
        """

        val1 = ",".join(list(map(lambda it: f"'{str(it).strip()}'", part_val['from'])))
        val2 = ",".join(list(map(lambda it: f"'{str(it).strip()}'", part_val['to'])))

        index_cp_sql = None
        if cp_index_flag:
            indexs, indexs_cp = cls.copy_pg_tb_index(engine, part_target_tb, source_tb_name)
            if len(indexs_cp) > 0:
                index_cp_sql = "\n".join(indexs_cp)

        exchange_sql = f"""
                        alter table {part_master_tb} detach partition {part_target_tb};
                        alter table {part_master_tb} attach partition {source_tb_name} for values from ({val1}) to ({val2});
                        alter table {part_target_tb} rename to {part_target_tb}_back;
                        alter table {source_tb_name} rename to {part_target_tb};
                        alter table {part_target_tb}_back rename to {source_tb_name};
                        """

        # 只是打印
        if print_flag:
            if index_cp_sql is not None:
                print("================================构建索引如下================================")
                print(index_cp_sql)

            print("================================构建交换sql如下================================")
            print(exchange_sql)
            return

        # 真正执行
        with engine.connect() as connection:
            # 先重新构建索引
            if index_cp_sql is not None:
                print("================================重新构建索引中================================")
                print(index_cp_sql)
                connection.execute(index_cp_sql)
            # 1. 旧分区脱离分区主表
            # 2.新普通表加入分区主表
            # 3.修改分区表名
            print(
                f"================================交换普通表{source_tb_name}到分区表{part_master_tb}中================================")
            print(exchange_sql)
            connection.execute(exchange_sql)
        return True

    @classmethod
    def list_partition(cls, engine: Engine, tb_name):
        """
        查询pgsql分区表信息
        :param engine:
        :param tb_name:
        :return:
        """
        sql = f"""
    select pt.relname                                 as partition_name,
   pg_get_expr(pt.relpartbound, pt.oid, true) as partition_expression
from pg_class base_tb
     join pg_inherits i on i.inhparent = base_tb.oid
     join pg_class pt on pt.oid = i.inhrelid
where base_tb.oid = '{tb_name}'::regclass;
        """
        with engine.connect() as connection:
            rows = connection.execute(sql)
            return list(rows)