get_pics_to_h7.py 3.79 KB
import ast
import json
import os
import sys
import threading
import time
import traceback

import pandas as pd
import logging
sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/")
from utils.db_util import DbTypes, DBUtil
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s %(message)s', level=logging.INFO)


class GetPicsToh7(object):

    def __init__(self, site_name='us', thread_num=10, limit=100):
        self.site_name = site_name
        self.thread_num = thread_num
        self.limit = limit
        self.engine_srs = DBUtil.get_db_engine(db_type=DbTypes.srs.name, site_name=self.site_name)
        self.df_self_asin_image = pd.DataFrame()

    def read_data(self):
        with self.engine_srs.begin() as conn:
            sql_update = f"""
                UPDATE us_self_asin_image
                SET state = 2
                WHERE id in (
                    SELECT id
                    FROM us_self_asin_image
                    WHERE state = 1
                    LIMIT {self.limit}
                );
            """
            print(f"sql_update: {sql_update}")
            conn.execute(sql_update)
            sql_read = f"select id, asin, image_file from {self.site_name}_self_asin_image where state=2 limit {self.limit};"
            # self.df_self_asin_image = pd.read_sql(sql_read, con=self.engine_srs)
            a = conn.execute(sql_read)
            df = pd.DataFrame(a, columns=['id', 'asin', 'image_file'])
            id_tuple = tuple(df.id)
            print(f"sql_read: {sql_read}, {df.shape}", id_tuple[:10])
        return df

            # if id_tuple:
            #     id_tuple_str = f"({id_tuple[0]})" if len(id_tuple) == 1 else f"{id_tuple}"
            #     sql_update = f"update {self.site_name}_self_asin_image set state=2 where id in {id_tuple_str};"
            #     conn.execute(sql_update)

    def handle_data(self, df):
        img_str_list = list(df.image_file)
        asin_list = list(df.asin)
        for asin, img_str in zip(asin_list, img_str_list):
            input_bytes = json.loads(img_str)
            input_bytes = ast.literal_eval(input_bytes)
            dir_name = rf"/mnt/data/img_data/amazon_self/{self.site_name}/{asin[:1]}/{asin[:2]}/{asin[:3]}/{asin[:4]}/{asin[:5]}/{asin[:6]}"
            # 确保目录存在
            os.makedirs(dir_name, exist_ok=True)
            file_name = rf"{dir_name }/{asin}.jpg"
            with open(file_name, 'wb') as f:
                f.write(input_bytes)

    def save_data(self, df):
        with self.engine_srs.begin() as conn:
            id_tuple = tuple(df.id)
            if id_tuple:
                id_tuple_str = f"({id_tuple[0]})" if len(id_tuple) == 1 else f"{id_tuple}"
                sql_update = f"update {self.site_name}_self_asin_image set state=3 where id in {id_tuple_str};"
                print(f"sql_update: {sql_update}")
                conn.execute(sql_update)

    def run(self):
        while True:
            try:
                df = self.read_data()
                if df.shape[0]:
                    self.handle_data(df)
                    self.save_data(df)
                else:
                    break
            except Exception as e:
                print(f"error: {e}", traceback.format_exc())
                self.engine_srs = DBUtil.get_db_engine(db_type=DbTypes.srs.name, site_name=self.site_name)
                time.sleep(10)

    def run_thread(self):
        thread_list = []
        for thread_id in range(self.thread_num):
            thread = threading.Thread(target=self.run)
            thread_list.append(thread)
            thread.start()
        for thread in thread_list:
            thread.join()
        logging.info("所有线程处理完成")


if __name__ == '__main__':
    handle_obj = GetPicsToh7(site_name='us', thread_num=1, limit=1000)
    handle_obj.run_thread()