1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import os
import sys
import time
sys.path.append(os.path.dirname(sys.path[0]))
from utils.redis_utils import RedisUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from utils.common_util import CommonUtil
"""
asin图片计算任务redis队列生成者
"""
class AsinImgParseSender(object):
def __init__(self):
pass
def run(self, site_name):
site_name = site_name
app_name = f"{self.__class__.__name__}:{site_name}"
offset_key = "asin_img_path:offset"
list_key = "asin_img_path:list"
client = RedisUtils.getClient()
if client.exists(offset_key):
offset = int(client.get(offset_key))
else:
offset = 0
spark = SparkUtil.get_spark_session(app_name)
size = 200000
sql = f"""
select id,
asin,
category_id,
asin_img_path
from dim_asin_img_path
where site_name = 'us'
and id > {offset}
order by id
limit {size}
"""
print("======================查询sql如下======================")
print(sql)
df = spark.sql(sql)
max_id = df.select(F.max("id")).first()[0]
row_list = [row.asDict() for row in df.collect()]
if len(row_list) > 0:
RedisUtils.save_list(list_key, row_list)
client.set(offset_key, max_id)
print(f"success,max_id is {max_id},size is {size}")
# if int(client.llen(list_key)) == 0:
# CommonUtil.send_wx_msg(['wujicang'], "提醒", "AsinImgParseSender全部执行成功!!请关闭定时任务")
if __name__ == '__main__':
sys.exit(1)
start = time.time()
obj = AsinImgParseSender()
obj.run("us")
end = time.time()
print(f"耗时:{end - start}")