redis_utils.py 5.36 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
import json
import time

import redis
from redis import Redis


class RedisUtils(object):
    _host = "120.79.147.190"
    _port = 6379
    _db = 1
    _pwd = 'Vm5vQH4ydFXh'
    current_client = None

    _mic = "wx.yswg.com.cn"

    _selection_redis = {
        "host": "120.79.147.190",
        "port": 6379,
        "db": 1,
        "pwd": 'Vm5vQH4ydFXh'
    }

    _microservice_redis = {
        "host": "wx.yswg.com.cn",
        "port": 6379,
        "db": 2,
        "pwd": 'yswg@2019'
    }

    @classmethod
    def getClient(cls) -> Redis:
        if cls.current_client is None:
            print("init redis client success")
            cls.current_client = redis.Redis(host=cls._host, port=cls._port, db=cls._db, password=cls._pwd)
        return cls.current_client

    @classmethod
    def get_redis_client(cls, decode_responses=False):
        print("init redis client success")
        return redis.StrictRedis(host=cls._host, port=cls._port, db=cls._db, password=cls._pwd, decode_responses=decode_responses)

    @classmethod
    def get_redis_client_by_type(cls, db_type='selection', dbNum=None, decode_responses=False):
        if db_type == 'selection':
            redis_config = cls._selection_redis
        elif db_type == 'microservice':
            redis_config = cls._microservice_redis
        else:
            redis_config = cls._selection_redis

        _host = redis_config.get("host")
        _port = redis_config.get("port")
        _db = dbNum or redis_config.get("db")
        _pwd = redis_config.get("pwd")
        return redis.StrictRedis(host=_host, port=_port, db=_db, password=_pwd, decode_responses=decode_responses)

    @classmethod
    def save_list(cls, redis_key: str, arr: list):
        """
        redis保存数组
        """
        if len(arr) > 0:
            client = cls.getClient()
            rel = list(map(lambda x: json.dumps(x), arr))
            client.rpush(redis_key, *rel)

    @classmethod
    def lpop_list_batch(cls, redis_key: str, count: int):
        """
        redis保存数组
        """
        client = cls.getClient()
        with client.pipeline(transaction=True) as p:
            client.lrange(redis_key, 0, count)
            for i in range(0, count):
                p.rpop(redis_key)
            resultList: list = p.execute()
            return list(map(lambda x: json.loads(x.decode('utf-8')), resultList))

    @classmethod
    def acquire_redis_lock(cls, lock_name: str, expire_time: int = 24 * 60 * 60, retry_flag: bool = True, retry_time: int = 10):
        """
        向redis写入锁信息进行上锁
        :param lock_name: 锁名称
        :param expire_time: 锁失效时间,默认24*60*60s--相当于一天(目前暂无单进程任务需要执行一天以上)
        :param retry_flag: 标记是否要自动重试加锁
        :param retry_time: 重试加锁时间默认10s
        :return: 返回创建锁状态
        """
        # 使用 SETNX 命令尝试获取锁
        with cls.get_redis_client() as client:
            if retry_flag:
                while retry_flag:
                    lock_acquired = client.set(lock_name, 'locked', nx=True, ex=expire_time)
                    if lock_acquired:
                        print(f"创建redis锁:{lock_name},成功!!!")
                        return True
                    else:
                        print(f"锁:{lock_name},已经存在,{retry_time}s后重试加锁")
                        time.sleep(retry_time)
            else:
                lock_acquired = client.set(lock_name, 'locked', nx=True, ex=expire_time)
                if lock_acquired:
                    print(f"创建redis锁:{lock_name},成功!!!")
                    return True
                else:
                    print(f"创建redis锁:{lock_name},失败,该锁已经存在")
                    return False

    @classmethod
    def check_lock_status(cls, lock_name: str, client: Redis = None):
        """
        检查锁是否存在
        :param lock_name: 锁的名称
        :return:boolean类型,锁是否存在
        """
        created_client = False
        try:
            # 如果 client 不存在,则手动创建
            if client is None:
                client = cls.get_redis_client()
                created_client = True

            lock_exists_flag = bool(client.exists(lock_name))
            return lock_exists_flag
        finally:
            # 如果是手动创建的 client,在完成操作后关闭连接
            if created_client:
                client.close()

    @classmethod
    def release_redis_lock(cls, lock_name: str):
        """
        释放redis锁,释放key
        :param lock_name: 锁的名称
        :return:
        """
        with cls.get_redis_client() as client:
            if cls.check_lock_status(lock_name, client):
                client.delete(lock_name)
                print(f"redis锁:{lock_name},释放成功!!!")


if __name__ == "__main__":
    # 创建 Redis 连接
    # 测试锁操作
    max_attempts = 10
    retry_interval = 1
    lock_name = 'redis_big_data_test'
    lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=5 * 60, retry_flag=True, retry_time=10)
    if lock_flag:
        try:
            time.sleep(10)
            print("代码正在执行中")
        finally:
            # 执行完成后释放锁
            RedisUtils.release_redis_lock(lock_name)