import logging import mysql.connector from mysql.connector import pooling from sqlalchemy import create_engine cnx_pool = None MYSQL_connector = { 'pool_name': 'mysql_connect_pool', 'pool_size': 2, 'database': 'selection', 'host': 'rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com', 'user': 'adv_yswg', 'password': 'HCL1zcUgQesaaXNLbL37O5KhpSAy0c', 'port': 3306, 'charset': 'utf8mb4', } Mysql_arguments = { 'user': 'adv_yswg', 'password': 'HCL1zcUgQesaaXNLbL37O5KhpSAy0c', 'host': 'rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com', 'port': 3306, 'database': 'selection', 'charset': 'utf8mb4', } keepa_mysql_text = { 'user': 'root', 'password': 'Yswg@erp120300', 'host': '120.77.232.73', 'port': 3306, 'database': 'inventory_test', 'charset': 'utf8mb4', } # 数据库:rm-wz956fk600d89g2g7uo.mysql.rds.aliyuncs.com # 账号 chenjianyun # 密码 Cjy8751_07 keepa_mysql = { 'user': 'chenjianyun', 'password': 'Cjy8751_07', 'host': 'rm-wz956fk600d89g2g7uo.mysql.rds.aliyuncs.com', 'port': 3306, 'database': 'inventory', 'charset': 'utf8mb4', } def sql_connect(site="us"): """ sql connect. :return: None """ global cnx_pool, MYSQL_connector if site == 'us' or site == 'mx' or site == "ca": MYSQL_connector["database"] = 'selection' else: MYSQL_connector["database"] = f'selection_{site}' try: cnx_pool = mysql.connector.pooling.MySQLConnectionPool( **MYSQL_connector, autocommit=True, ) except mysql.connector.Error as err: if err.errno == mysql.ER_ACCESS_DENIED_ERROR: logging.getLogger().exception(err) elif err.errno == mysql.ER_BAD_DB_ERROR: logging.getLogger().exception(err) else: logging.getLogger().exception(err) def sql_fetch_one(stmt, data=None): """fetch one. :param stmt: sql statement. :param data: data. :return: results. """ global cnx_pool results = None cnx = cnx_pool.get_connection() cur = cnx.cursor(buffered=True, dictionary=True) try: cur.execute(stmt, data) results = cur.fetchone() except Exception as err: logging.getLogger().exception(err) finally: try: cur.close() cnx.close() except Exception as e: logging.getLogger().exception(e) return results def sql_fetch_rows(stmt, data=None): """fetch many :param stmt: sql statement. :param data: data. :return: result. """ global cnx_pool results = None cnx = cnx_pool.get_connection() cur = cnx.cursor(buffered=True, dictionary=True) try: cur.execute(stmt, data) results = cur.fetchall() except Exception as err: logging.getLogger().exception(err) finally: if cur: cur.close() if cnx: cnx.close() return results def sql_fetch_one_cell(stmt, data=None): """fetch one cell :param stmt: sql statement. :param data: data. :return: result """ result = None cnx = cnx_pool.get_connection() cur = cnx.cursor(buffered=True) try: cur.execute(stmt, data) for i in cur: result = i[0] cur.close() return result except Exception as err: logging.getLogger().exception(err) finally: if cur: cur.close() if cnx: cnx.close() return result def sql_update(stmt, data=None): """update one. :param stmt: sql statement. :param data: data. :return: row count. """ # data = [(), ()] cnx = cnx_pool.get_connection() cur = cnx.cursor(buffered=True) try: cur.execute(stmt, data) cnx.commit() except Exception as err: logging.getLogger().exception(err) cnx.rollback() finally: if cur: cur.close() if cnx: cnx.close() return cur.rowcount def sql_update_many(stmt, data=None): """update many :param stmt: sql statement. :param data: data. :return: row count. """ cnx = cnx_pool.get_connection() cur = cnx.cursor(buffered=True) try: cur.executemany(stmt, data) cnx.commit() except Exception as err: logging.getLogger().exception(err) cnx.rollback() finally: if cur: cur.close() if cnx: cnx.close() return cur.rowcount def sql_insert(stmt, data=None): """ insert one :param stmt: sql statement. :param data: data. :return: last row id. """ cnx = cnx_pool.get_connection() cur = cnx.cursor(buffered=True) try: cur.execute(stmt, data) cnx.commit() except Exception as err: logging.getLogger().exception(err) cnx.rollback() finally: if cur: cur.close() if cnx: cnx.close() return cur.lastrowid def sql_insert_many(stmt, data=None): """ insert many :param stmt: sql statement. :param data: data. :return: last row id. """ cnx = cnx_pool.get_connection() cur = cnx.cursor(buffered=True) try: cur.executemany(stmt, data) cnx.commit() except Exception as err: logging.getLogger().exception(err) cnx.rollback() finally: if cur: cur.close() if cnx: cnx.close() return cur.lastrowid def sql_delete(stmt, data=None): """ insert many :param stmt: sql statement. :param data: data. :return: last row id. """ cnx = cnx_pool.get_connection() cur = cnx.cursor(buffered=True) try: cur.execute(stmt, data) cnx.commit() except Exception as err: logging.getLogger().exception(err) cnx.rollback() finally: if cur: cur.close() if cnx: cnx.close() return cur.rowcount def get_country_engine(site_name="us"): if site_name == 'us' or site_name == 'mx' or site_name == "ca": Mysql_arguments["database"] = f"selection" db_ = 'mysql+pymysql://{}:{}@{}:{}/{}?charset={}'.format(*Mysql_arguments.values()) elif site_name == "keepa": db_ = 'mysql+pymysql://{}:{}@{}:{}/{}?charset={}'.format(*keepa_mysql.values()) else: Mysql_arguments["database"] = f"selection_{site_name}" db_ = 'mysql+pymysql://{}:{}@{}:{}/{}?charset={}'.format(*Mysql_arguments.values()) engine = create_engine(db_) # , pool_recycle=3600 return engine