redis_utils.py 5.36 KB
import json
import time

import redis
from redis import Redis


class RedisUtils(object):
    _host = "120.79.147.190"
    _port = 6379
    _db = 1
    _pwd = 'Vm5vQH4ydFXh'
    current_client = None

    _mic = "wx.yswg.com.cn"

    _selection_redis = {
        "host": "120.79.147.190",
        "port": 6379,
        "db": 1,
        "pwd": 'Vm5vQH4ydFXh'
    }

    _microservice_redis = {
        "host": "wx.yswg.com.cn",
        "port": 6379,
        "db": 2,
        "pwd": 'yswg@2019'
    }

    @classmethod
    def getClient(cls) -> Redis:
        if cls.current_client is None:
            print("init redis client success")
            cls.current_client = redis.Redis(host=cls._host, port=cls._port, db=cls._db, password=cls._pwd)
        return cls.current_client

    @classmethod
    def get_redis_client(cls, decode_responses=False):
        print("init redis client success")
        return redis.StrictRedis(host=cls._host, port=cls._port, db=cls._db, password=cls._pwd, decode_responses=decode_responses)

    @classmethod
    def get_redis_client_by_type(cls, db_type='selection', dbNum=None, decode_responses=False):
        if db_type == 'selection':
            redis_config = cls._selection_redis
        elif db_type == 'microservice':
            redis_config = cls._microservice_redis
        else:
            redis_config = cls._selection_redis

        _host = redis_config.get("host")
        _port = redis_config.get("port")
        _db = dbNum or redis_config.get("db")
        _pwd = redis_config.get("pwd")
        return redis.StrictRedis(host=_host, port=_port, db=_db, password=_pwd, decode_responses=decode_responses)

    @classmethod
    def save_list(cls, redis_key: str, arr: list):
        """
        redis保存数组
        """
        if len(arr) > 0:
            client = cls.getClient()
            rel = list(map(lambda x: json.dumps(x), arr))
            client.rpush(redis_key, *rel)

    @classmethod
    def lpop_list_batch(cls, redis_key: str, count: int):
        """
        redis保存数组
        """
        client = cls.getClient()
        with client.pipeline(transaction=True) as p:
            client.lrange(redis_key, 0, count)
            for i in range(0, count):
                p.rpop(redis_key)
            resultList: list = p.execute()
            return list(map(lambda x: json.loads(x.decode('utf-8')), resultList))

    @classmethod
    def acquire_redis_lock(cls, lock_name: str, expire_time: int = 24 * 60 * 60, retry_flag: bool = True, retry_time: int = 10):
        """
        向redis写入锁信息进行上锁
        :param lock_name: 锁名称
        :param expire_time: 锁失效时间,默认24*60*60s--相当于一天(目前暂无单进程任务需要执行一天以上)
        :param retry_flag: 标记是否要自动重试加锁
        :param retry_time: 重试加锁时间默认10s
        :return: 返回创建锁状态
        """
        # 使用 SETNX 命令尝试获取锁
        with cls.get_redis_client() as client:
            if retry_flag:
                while retry_flag:
                    lock_acquired = client.set(lock_name, 'locked', nx=True, ex=expire_time)
                    if lock_acquired:
                        print(f"创建redis锁:{lock_name},成功!!!")
                        return True
                    else:
                        print(f"锁:{lock_name},已经存在,{retry_time}s后重试加锁")
                        time.sleep(retry_time)
            else:
                lock_acquired = client.set(lock_name, 'locked', nx=True, ex=expire_time)
                if lock_acquired:
                    print(f"创建redis锁:{lock_name},成功!!!")
                    return True
                else:
                    print(f"创建redis锁:{lock_name},失败,该锁已经存在")
                    return False

    @classmethod
    def check_lock_status(cls, lock_name: str, client: Redis = None):
        """
        检查锁是否存在
        :param lock_name: 锁的名称
        :return:boolean类型,锁是否存在
        """
        created_client = False
        try:
            # 如果 client 不存在,则手动创建
            if client is None:
                client = cls.get_redis_client()
                created_client = True

            lock_exists_flag = bool(client.exists(lock_name))
            return lock_exists_flag
        finally:
            # 如果是手动创建的 client,在完成操作后关闭连接
            if created_client:
                client.close()

    @classmethod
    def release_redis_lock(cls, lock_name: str):
        """
        释放redis锁,释放key
        :param lock_name: 锁的名称
        :return:
        """
        with cls.get_redis_client() as client:
            if cls.check_lock_status(lock_name, client):
                client.delete(lock_name)
                print(f"redis锁:{lock_name},释放成功!!!")


if __name__ == "__main__":
    # 创建 Redis 连接
    # 测试锁操作
    max_attempts = 10
    retry_interval = 1
    lock_name = 'redis_big_data_test'
    lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=5 * 60, retry_flag=True, retry_time=10)
    if lock_flag:
        try:
            time.sleep(10)
            print("代码正在执行中")
        finally:
            # 执行完成后释放锁
            RedisUtils.release_redis_lock(lock_name)