import os import sys import time sys.path.append(os.path.dirname(sys.path[0])) from utils.redis_utils import RedisUtils from utils.spark_util import SparkUtil from pyspark.sql import functions as F from utils.common_util import CommonUtil """ asin图片计算任务redis队列生成者 """ class AsinImgParseSender(object): def __init__(self): pass def run(self, site_name): site_name = site_name app_name = f"{self.__class__.__name__}:{site_name}" offset_key = "asin_img_path:offset" list_key = "asin_img_path:list" client = RedisUtils.getClient() if client.exists(offset_key): offset = int(client.get(offset_key)) else: offset = 0 spark = SparkUtil.get_spark_session(app_name) size = 200000 sql = f""" select id, asin, category_id, asin_img_path from dim_asin_img_path where site_name = 'us' and id > {offset} order by id limit {size} """ print("======================查询sql如下======================") print(sql) df = spark.sql(sql) max_id = df.select(F.max("id")).first()[0] row_list = [row.asDict() for row in df.collect()] if len(row_list) > 0: RedisUtils.save_list(list_key, row_list) client.set(offset_key, max_id) print(f"success,max_id is {max_id},size is {size}") # if int(client.llen(list_key)) == 0: # CommonUtil.send_wx_msg(['wujicang'], "提醒", "AsinImgParseSender全部执行成功!!请关闭定时任务") if __name__ == '__main__': sys.exit(1) start = time.time() obj = AsinImgParseSender() obj.run("us") end = time.time() print(f"耗时:{end - start}")