asin_img_parse_sender.py 1.85 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
import os
import sys
import time

sys.path.append(os.path.dirname(sys.path[0]))

from utils.redis_utils import RedisUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F

from utils.common_util import CommonUtil

"""
asin图片计算任务redis队列生成者
"""


class AsinImgParseSender(object):

    def __init__(self):
        pass

    def run(self, site_name):
        site_name = site_name
        app_name = f"{self.__class__.__name__}:{site_name}"
        offset_key = "asin_img_path:offset"
        list_key = "asin_img_path:list"
        client = RedisUtils.getClient()

        if client.exists(offset_key):
            offset = int(client.get(offset_key))
        else:
            offset = 0

        spark = SparkUtil.get_spark_session(app_name)

        size = 200000
        sql = f"""
                select id,
                asin,
                category_id,
                asin_img_path
                from dim_asin_img_path
                where site_name = 'us'
                and id > {offset}
                order by id
                limit {size}
    """
        print("======================查询sql如下======================")
        print(sql)
        df = spark.sql(sql)
        max_id = df.select(F.max("id")).first()[0]
        row_list = [row.asDict() for row in df.collect()]
        if len(row_list) > 0:
            RedisUtils.save_list(list_key, row_list)
            client.set(offset_key, max_id)
            print(f"success,max_id is {max_id},size is {size}")

        # if int(client.llen(list_key)) == 0:
        #     CommonUtil.send_wx_msg(['wujicang'], "提醒", "AsinImgParseSender全部执行成功!!请关闭定时任务")


if __name__ == '__main__':
    sys.exit(1)
    start = time.time()
    obj = AsinImgParseSender()
    obj.run("us")
    end = time.time()
    print(f"耗时:{end - start}")