mysql_db.py 6.5 KB
import logging
import mysql.connector
from mysql.connector import pooling
from sqlalchemy import create_engine

cnx_pool = None

MYSQL_connector = {
    'pool_name': 'mysql_connect_pool',
    'pool_size': 2,
    'database': 'selection',
    'host': 'rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com',
    'user': 'adv_yswg',
    'password': 'HCL1zcUgQesaaXNLbL37O5KhpSAy0c',
    'port': 3306,
    'charset': 'utf8mb4',
}

Mysql_arguments = {
    'user': 'adv_yswg',
    'password': 'HCL1zcUgQesaaXNLbL37O5KhpSAy0c',
    'host': 'rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com',
    'port': 3306,
    'database': 'selection',
    'charset': 'utf8mb4',
}


keepa_mysql_text = {
    'user': 'root',
    'password': 'Yswg@erp120300',
    'host': '120.77.232.73',
    'port': 3306,
    'database': 'inventory_test',
    'charset': 'utf8mb4',
}

# 数据库:rm-wz956fk600d89g2g7uo.mysql.rds.aliyuncs.com
# 账号  chenjianyun
# 密码  Cjy8751_07
keepa_mysql = {
    'user': 'chenjianyun',
    'password': 'Cjy8751_07',
    'host': 'rm-wz956fk600d89g2g7uo.mysql.rds.aliyuncs.com',
    'port': 3306,
    'database': 'inventory',
    'charset': 'utf8mb4',
}


def sql_connect(site="us"):
    """ sql connect.

    :return: None
    """

    global cnx_pool, MYSQL_connector
    if site == 'us' or site == 'mx' or site == "ca":
        MYSQL_connector["database"] = 'selection'
    else:
        MYSQL_connector["database"] = f'selection_{site}'
    try:
        cnx_pool = mysql.connector.pooling.MySQLConnectionPool(
            **MYSQL_connector,
            autocommit=True,
        )
    except mysql.connector.Error as err:
        if err.errno == mysql.ER_ACCESS_DENIED_ERROR:
            logging.getLogger().exception(err)
        elif err.errno == mysql.ER_BAD_DB_ERROR:
            logging.getLogger().exception(err)
        else:
            logging.getLogger().exception(err)


def sql_fetch_one(stmt, data=None):
    """fetch one.

    :param stmt: sql statement.
    :param data: data.
    :return: results.
    """

    global cnx_pool
    results = None
    cnx = cnx_pool.get_connection()
    cur = cnx.cursor(buffered=True, dictionary=True)
    try:
        cur.execute(stmt, data)
        results = cur.fetchone()
    except Exception as err:
        logging.getLogger().exception(err)
    finally:
        try:
            cur.close()
            cnx.close()
        except Exception as e:
            logging.getLogger().exception(e)
    return results


def sql_fetch_rows(stmt, data=None):
    """fetch many

    :param stmt: sql statement.
    :param data: data.
    :return: result.
    """

    global cnx_pool
    results = None
    cnx = cnx_pool.get_connection()
    cur = cnx.cursor(buffered=True, dictionary=True)
    try:
        cur.execute(stmt, data)
        results = cur.fetchall()
    except Exception as err:
        logging.getLogger().exception(err)
    finally:
        if cur:
            cur.close()
        if cnx:
            cnx.close()
    return results


def sql_fetch_one_cell(stmt, data=None):
    """fetch one cell

    :param stmt: sql statement.
    :param data: data.
    :return: result
    """

    result = None
    cnx = cnx_pool.get_connection()
    cur = cnx.cursor(buffered=True)
    try:
        cur.execute(stmt, data)
        for i in cur:
            result = i[0]
            cur.close()
            return result
    except Exception as err:
        logging.getLogger().exception(err)
    finally:
        if cur:
            cur.close()
        if cnx:
            cnx.close()
    return result


def sql_update(stmt, data=None):
    """update one.

    :param stmt: sql statement.
    :param data: data.
    :return: row count.
    """
    # data = [(), ()]
    cnx = cnx_pool.get_connection()
    cur = cnx.cursor(buffered=True)
    try:
        cur.execute(stmt, data)
        cnx.commit()
    except Exception as err:
        logging.getLogger().exception(err)
        cnx.rollback()
    finally:
        if cur:
            cur.close()
        if cnx:
            cnx.close()
    return cur.rowcount


def sql_update_many(stmt, data=None):
    """update many

    :param stmt: sql statement.
    :param data: data.
    :return: row count.
    """

    cnx = cnx_pool.get_connection()
    cur = cnx.cursor(buffered=True)
    try:
        cur.executemany(stmt, data)
        cnx.commit()
    except Exception as err:
        logging.getLogger().exception(err)
        cnx.rollback()
    finally:
        if cur:
            cur.close()
        if cnx:
            cnx.close()
    return cur.rowcount


def sql_insert(stmt, data=None):
    """ insert one

    :param stmt: sql statement.
    :param data: data.
    :return: last row id.
    """

    cnx = cnx_pool.get_connection()
    cur = cnx.cursor(buffered=True)
    try:
        cur.execute(stmt, data)
        cnx.commit()
    except Exception as err:
        logging.getLogger().exception(err)
        cnx.rollback()
    finally:
        if cur:
            cur.close()
        if cnx:
            cnx.close()
    return cur.lastrowid


def sql_insert_many(stmt, data=None):
    """ insert many

    :param stmt: sql statement.
    :param data: data.
    :return: last row id.
    """

    cnx = cnx_pool.get_connection()
    cur = cnx.cursor(buffered=True)
    try:
        cur.executemany(stmt, data)
        cnx.commit()
    except Exception as err:
        logging.getLogger().exception(err)
        cnx.rollback()
    finally:
        if cur:
            cur.close()
        if cnx:
            cnx.close()
    return cur.lastrowid


def sql_delete(stmt, data=None):
    """ insert many

    :param stmt: sql statement.
    :param data: data.
    :return: last row id.
    """

    cnx = cnx_pool.get_connection()
    cur = cnx.cursor(buffered=True)
    try:
        cur.execute(stmt, data)
        cnx.commit()
    except Exception as err:
        logging.getLogger().exception(err)
        cnx.rollback()
    finally:
        if cur:
            cur.close()
        if cnx:
            cnx.close()
    return cur.rowcount


def get_country_engine(site_name="us"):
    if site_name == 'us' or site_name == 'mx' or site_name == "ca":
        Mysql_arguments["database"] = f"selection"
        db_ = 'mysql+pymysql://{}:{}@{}:{}/{}?charset={}'.format(*Mysql_arguments.values())
    elif site_name == "keepa":
        db_ = 'mysql+pymysql://{}:{}@{}:{}/{}?charset={}'.format(*keepa_mysql.values())
    else:
        Mysql_arguments["database"] = f"selection_{site_name}"
        db_ = 'mysql+pymysql://{}:{}@{}:{}/{}?charset={}'.format(*Mysql_arguments.values())
    engine = create_engine(db_)  # , pool_recycle=3600
    return engine