1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import json
import time
import redis
from redis import Redis
class RedisUtils(object):
_host = "120.79.147.190"
_port = 6379
_db = 1
_pwd = 'Vm5vQH4ydFXh'
current_client = None
_mic = "wx.yswg.com.cn"
_selection_redis = {
"host": "120.79.147.190",
"port": 6379,
"db": 1,
"pwd": 'Vm5vQH4ydFXh'
}
_microservice_redis = {
"host": "wx.yswg.com.cn",
"port": 6379,
"db": 2,
"pwd": 'yswg@2019'
}
@classmethod
def getClient(cls) -> Redis:
if cls.current_client is None:
print("init redis client success")
cls.current_client = redis.Redis(host=cls._host, port=cls._port, db=cls._db, password=cls._pwd)
return cls.current_client
@classmethod
def get_redis_client(cls, decode_responses=False):
print("init redis client success")
return redis.StrictRedis(host=cls._host, port=cls._port, db=cls._db, password=cls._pwd, decode_responses=decode_responses)
@classmethod
def get_redis_client_by_type(cls, db_type='selection', dbNum=None, decode_responses=False):
if db_type == 'selection':
redis_config = cls._selection_redis
elif db_type == 'microservice':
redis_config = cls._microservice_redis
else:
redis_config = cls._selection_redis
_host = redis_config.get("host")
_port = redis_config.get("port")
_db = dbNum or redis_config.get("db")
_pwd = redis_config.get("pwd")
return redis.StrictRedis(host=_host, port=_port, db=_db, password=_pwd, decode_responses=decode_responses)
@classmethod
def save_list(cls, redis_key: str, arr: list):
"""
redis保存数组
"""
if len(arr) > 0:
client = cls.getClient()
rel = list(map(lambda x: json.dumps(x), arr))
client.rpush(redis_key, *rel)
@classmethod
def lpop_list_batch(cls, redis_key: str, count: int):
"""
redis保存数组
"""
client = cls.getClient()
with client.pipeline(transaction=True) as p:
client.lrange(redis_key, 0, count)
for i in range(0, count):
p.rpop(redis_key)
resultList: list = p.execute()
return list(map(lambda x: json.loads(x.decode('utf-8')), resultList))
@classmethod
def acquire_redis_lock(cls, lock_name: str, expire_time: int = 24 * 60 * 60, retry_flag: bool = True, retry_time: int = 10):
"""
向redis写入锁信息进行上锁
:param lock_name: 锁名称
:param expire_time: 锁失效时间,默认24*60*60s--相当于一天(目前暂无单进程任务需要执行一天以上)
:param retry_flag: 标记是否要自动重试加锁
:param retry_time: 重试加锁时间默认10s
:return: 返回创建锁状态
"""
# 使用 SETNX 命令尝试获取锁
with cls.get_redis_client() as client:
if retry_flag:
while retry_flag:
lock_acquired = client.set(lock_name, 'locked', nx=True, ex=expire_time)
if lock_acquired:
print(f"创建redis锁:{lock_name},成功!!!")
return True
else:
print(f"锁:{lock_name},已经存在,{retry_time}s后重试加锁")
time.sleep(retry_time)
else:
lock_acquired = client.set(lock_name, 'locked', nx=True, ex=expire_time)
if lock_acquired:
print(f"创建redis锁:{lock_name},成功!!!")
return True
else:
print(f"创建redis锁:{lock_name},失败,该锁已经存在")
return False
@classmethod
def check_lock_status(cls, lock_name: str, client: Redis = None):
"""
检查锁是否存在
:param lock_name: 锁的名称
:return:boolean类型,锁是否存在
"""
created_client = False
try:
# 如果 client 不存在,则手动创建
if client is None:
client = cls.get_redis_client()
created_client = True
lock_exists_flag = bool(client.exists(lock_name))
return lock_exists_flag
finally:
# 如果是手动创建的 client,在完成操作后关闭连接
if created_client:
client.close()
@classmethod
def release_redis_lock(cls, lock_name: str):
"""
释放redis锁,释放key
:param lock_name: 锁的名称
:return:
"""
with cls.get_redis_client() as client:
if cls.check_lock_status(lock_name, client):
client.delete(lock_name)
print(f"redis锁:{lock_name},释放成功!!!")
if __name__ == "__main__":
# 创建 Redis 连接
# 测试锁操作
max_attempts = 10
retry_interval = 1
lock_name = 'redis_big_data_test'
lock_flag = RedisUtils.acquire_redis_lock(lock_name, expire_time=5 * 60, retry_flag=True, retry_time=10)
if lock_flag:
try:
time.sleep(10)
print("代码正在执行中")
finally:
# 执行完成后释放锁
RedisUtils.release_redis_lock(lock_name)