from sqlalchemy import create_engine from elasticsearch import Elasticsearch class Utils(object): def __init__(self, site_name='us'): # 1.1 mysql数据库相关配置 self.site_name = site_name self.site_name_db_dict = { "us": "selection", "uk": "selection_uk", "de": "selection_de", "es": "selection_es", "fr": "selection_fr", "it": "selection_it", } self.DB_CONN_MYSQL = { "mysql_aliyun": { "mysql_port": "3306", "mysql_db": self.site_name_db_dict, "mysql_user": "adv_yswg", "mysql_pwd": "HCL1zcUgQesaaXNLbL37O5KhpSAy0c", "mysql_host": "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com", }, "mysql_h6": { "mysql_port": "3306", "mysql_db": self.site_name_db_dict, "mysql_user": "adv_yswg", "mysql_pwd": "HCL1zcUgQesaaXNLbL37O5KhpSAy0c", "mysql_host": "192.168.10.216", }, "mysql_h7": { "mysql_port": "3306", "mysql_db": self.site_name_db_dict, "mysql_user": "adv_yswg", "mysql_pwd": "HCL1zcUgQesaaXNLbL37O5KhpSAy0c", "mysql_host": "192.168.10.217", }, } self.DB_CONN_PG = { "pg_h6": { "pg_port": "3306", "pg_db": self.site_name_db_dict, "pg_user": "postgres", "pg_pwd": "HCL1zcUgQesaaXNLbL37O5KhpSAy0c", "pg_host": "192.168.10.216", } } # 1.2 es数据库相关配置 self.DB_CONN_ES = { "es_h6": { "es_port": "9200", "es_host": "192.168.10.216", "es_user": "elastic", "es_pass": "selection2021.+", } } self.engine = object() self.engine_pg = object() self.engine_es = object() def connection(self, db_type="mysql", db_conn="mysql_aliyun"): print(f"当前选择的数据库类型: {db_type}, 数据库来源: {db_conn}") if db_type == "mysql": self.engine = create_engine( f'mysql+pymysql://{self.DB_CONN_MYSQL[db_conn]["mysql_user"]}:' + f'{self.DB_CONN_MYSQL[db_conn]["mysql_pwd"]}@{self.DB_CONN_MYSQL[db_conn]["mysql_host"]}:{self.DB_CONN_MYSQL[db_conn]["mysql_port"]}/{self.DB_CONN_MYSQL[db_conn]["mysql_db"][self.site_name]}?charset=utf8mb4' ) # , pool_recycle=3600 elif db_type == "pg": # f"postgresql+psycopg2://postgres:apr1$XMEwOzfi$5DSc.prZEwOd4XWA82ICs/@192.168.10.216:5432/selection", self.engine_pg = create_engine( f"postgresql+psycopg2://{self.DB_CONN_PG[db_conn]['pg_user']}:{self.DB_CONN_PG[db_conn]['pg_pwd']}/@{self.DB_CONN_PG[db_conn]['pg_host']}:{self.DB_CONN_PG[db_conn]['pg_port']}/{self.DB_CONN_PG[db_conn]['pg_db'][self.site_name]}", encoding='utf-8') elif db_type == "es": self.engine_es = Elasticsearch( hosts=[f"http://{self.DB_CONN_ES[db_conn]['es_host']}:{self.DB_CONN_ES[db_conn]['es_port']}/"], http_auth=[self.DB_CONN_ES[db_conn]['es_user'], self.DB_CONN_ES[db_conn]['es_pass']], timeout=100 ) else: print("未选择数据库类型") if __name__ == '__main__': handle_obj = Utils()