import json import time import redis from redis import Redis class RedisUtils(object): _host = "120.79.147.190" _port = 6379 _db = 1 _pwd = 'Vm5vQH4ydFXh' current_client = None _mic = "wx.yswg.com.cn" _selection_redis = { "host": "120.79.147.190", "port": 6379, "db": 1, "pwd": 'Vm5vQH4ydFXh' } _microservice_redis = { "host": "wx.yswg.com.cn", "port": 6379, "db": 2, "pwd": 'yswg@2019' } @classmethod def getClient(cls) -> Redis: if cls.current_client is None: print("init redis client success") cls.current_client = redis.Redis(host=cls._host, port=cls._port, db=cls._db, password=cls._pwd) return cls.current_client @classmethod def get_redis_client(cls, decode_responses=False): print("init redis client success") return redis.StrictRedis(host=cls._host, port=cls._port, db=cls._db, password=cls._pwd, decode_responses=decode_responses) @classmethod def get_redis_client_by_type(cls, db_type='selection', dbNum=None, decode_responses=False): if db_type == 'selection': redis_config = cls._selection_redis elif db_type == 'microservice': redis_config = cls._microservice_redis else: redis_config = cls._selection_redis _host = redis_config.get("host") _port = redis_config.get("port") _db = dbNum or redis_config.get("db") _pwd = redis_config.get("pwd") return redis.StrictRedis(host=_host, port=_port, db=_db, password=_pwd, decode_responses=decode_responses) @classmethod def save_list(cls, redis_key: str, arr: list): """ redis保存数组 """ if len(arr) > 0: client = cls.getClient() rel = list(map(lambda x: json.dumps(x), arr)) client.rpush(redis_key, *rel) @classmethod def lpop_list_batch(cls, redis_key: str, count: int): """ redis保存数组 """ client = cls.getClient() with client.pipeline(transaction=True) as p: client.lrange(redis_key, 0, count) for i in range(0, count): p.rpop(redis_key) resultList: list = p.execute() return list(map(lambda x: json.loads(x.decode('utf-8')), resultList)) @classmethod def acquire_redis_lock(cls, lock_name: str, expire_time: int = 24 * 60 * 60, retry_flag: bool = True, retry_time: int = 10): """ 向redis写入锁信息进行上锁 :param lock_name: 锁名称 :param expire_time: 锁失效时间,默认24*60*60s--相当于一天(目前暂无单进程任务需要执行一天以上) :param retry_flag: 标记是否要自动重试加锁 :param retry_time: 重试加锁时间默认10s :return: 返回创建锁状态 """ # 使用 SETNX 命令尝试获取锁 with cls.get_redis_client() as client: if retry_flag: while retry_flag: lock_acquired = client.set(lock_name, 'locked', nx=True, ex=expire_time) if lock_acquired: print(f"创建redis锁:{lock_name},成功!!!") return True else: print(f"锁:{lock_name},已经存在,{retry_time}s后重试加锁") time.sleep(retry_time) else: lock_acquired = client.set(lock_name, 'locked', nx=True, ex=expire_time) if lock_acquired: print(f"创建redis锁:{lock_name},成功!!!") return True else: print(f"创建redis锁:{lock_name},失败,该锁已经存在") return False @classmethod def check_lock_status(cls, lock_name: str, client: Redis = None): """ 检查锁是否存在 :param lock_name: 锁的名称 :return:boolean类型,锁是否存在 """ created_client = False try: # 如果 client 不存在,则手动创建 if client is None: client = cls.get_redis_client() created_client = True lock_exists_flag = bool(client.exists(lock_name)) return lock_exists_flag finally: # 如果是手动创建的 client,在完成操作后关闭连接 if created_client: client.close() @classmethod def release_redis_lock(cls, lock_name: str): """ 释放redis锁,释放key :param lock_name: 锁的名称 :return: """ with cls.get_redis_client() as client: if cls.check_lock_status(lock_name, client): client.delete(lock_name) print(f"redis锁:{lock_name},释放成功!!!") if __name__ == "__main__": # 创建 Redis 连接 # 测试锁操作 max_attempts = 10 retry_interval = 1 lock_name = 'redis_big_data_test' lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=5 * 60, retry_flag=True, retry_time=10) if lock_flag: try: time.sleep(10) print("代码正在执行中") finally: # 执行完成后释放锁 RedisUtils.release_redis_lock(lock_name)