asin_img_parse_sender.py 1.85 KB
import os
import sys
import time

sys.path.append(os.path.dirname(sys.path[0]))

from utils.redis_utils import RedisUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F

from utils.common_util import CommonUtil

"""
asin图片计算任务redis队列生成者
"""


class AsinImgParseSender(object):

    def __init__(self):
        pass

    def run(self, site_name):
        site_name = site_name
        app_name = f"{self.__class__.__name__}:{site_name}"
        offset_key = "asin_img_path:offset"
        list_key = "asin_img_path:list"
        client = RedisUtils.getClient()

        if client.exists(offset_key):
            offset = int(client.get(offset_key))
        else:
            offset = 0

        spark = SparkUtil.get_spark_session(app_name)

        size = 200000
        sql = f"""
                select id,
                asin,
                category_id,
                asin_img_path
                from dim_asin_img_path
                where site_name = 'us'
                and id > {offset}
                order by id
                limit {size}
    """
        print("======================查询sql如下======================")
        print(sql)
        df = spark.sql(sql)
        max_id = df.select(F.max("id")).first()[0]
        row_list = [row.asDict() for row in df.collect()]
        if len(row_list) > 0:
            RedisUtils.save_list(list_key, row_list)
            client.set(offset_key, max_id)
            print(f"success,max_id is {max_id},size is {size}")

        # if int(client.llen(list_key)) == 0:
        #     CommonUtil.send_wx_msg(['wujicang'], "提醒", "AsinImgParseSender全部执行成功!!请关闭定时任务")


if __name__ == '__main__':
    sys.exit(1)
    start = time.time()
    obj = AsinImgParseSender()
    obj.run("us")
    end = time.time()
    print(f"耗时:{end - start}")