import ast import json import os import sys import threading import time import traceback import pandas as pd import logging sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/") from utils.db_util import DbTypes, DBUtil logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s %(message)s', level=logging.INFO) class GetPicsToh7(object): def __init__(self, site_name='us', thread_num=10, limit=100): self.site_name = site_name self.thread_num = thread_num self.limit = limit self.engine_srs = DBUtil.get_db_engine(db_type=DbTypes.srs.name, site_name=self.site_name) self.df_self_asin_image = pd.DataFrame() def read_data(self): with self.engine_srs.begin() as conn: sql_update = f""" UPDATE us_self_asin_image SET state = 2 WHERE id in ( SELECT id FROM us_self_asin_image WHERE state = 1 LIMIT {self.limit} ); """ print(f"sql_update: {sql_update}") conn.execute(sql_update) sql_read = f"select id, asin, image_file from {self.site_name}_self_asin_image where state=2 limit {self.limit};" # self.df_self_asin_image = pd.read_sql(sql_read, con=self.engine_srs) a = conn.execute(sql_read) df = pd.DataFrame(a, columns=['id', 'asin', 'image_file']) id_tuple = tuple(df.id) print(f"sql_read: {sql_read}, {df.shape}", id_tuple[:10]) return df # if id_tuple: # id_tuple_str = f"({id_tuple[0]})" if len(id_tuple) == 1 else f"{id_tuple}" # sql_update = f"update {self.site_name}_self_asin_image set state=2 where id in {id_tuple_str};" # conn.execute(sql_update) def handle_data(self, df): img_str_list = list(df.image_file) asin_list = list(df.asin) for asin, img_str in zip(asin_list, img_str_list): input_bytes = json.loads(img_str) input_bytes = ast.literal_eval(input_bytes) dir_name = rf"/mnt/data/img_data/amazon_self/{self.site_name}/{asin[:1]}/{asin[:2]}/{asin[:3]}/{asin[:4]}/{asin[:5]}/{asin[:6]}" # 确保目录存在 os.makedirs(dir_name, exist_ok=True) file_name = rf"{dir_name }/{asin}.jpg" with open(file_name, 'wb') as f: f.write(input_bytes) def save_data(self, df): with self.engine_srs.begin() as conn: id_tuple = tuple(df.id) if id_tuple: id_tuple_str = f"({id_tuple[0]})" if len(id_tuple) == 1 else f"{id_tuple}" sql_update = f"update {self.site_name}_self_asin_image set state=3 where id in {id_tuple_str};" print(f"sql_update: {sql_update}") conn.execute(sql_update) def run(self): while True: try: df = self.read_data() if df.shape[0]: self.handle_data(df) self.save_data(df) else: break except Exception as e: print(f"error: {e}", traceback.format_exc()) self.engine_srs = DBUtil.get_db_engine(db_type=DbTypes.srs.name, site_name=self.site_name) time.sleep(10) def run_thread(self): thread_list = [] for thread_id in range(self.thread_num): thread = threading.Thread(target=self.run) thread_list.append(thread) thread.start() for thread in thread_list: thread.join() logging.info("所有线程处理完成") if __name__ == '__main__': handle_obj = GetPicsToh7(site_name='us', thread_num=1, limit=1000) handle_obj.run_thread()