1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import ast
import json
import os
import sys
import threading
import time
import traceback
import pandas as pd
import logging
sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/")
from utils.db_util import DbTypes, DBUtil
logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s %(message)s', level=logging.INFO)
class GetPicsToh7(object):
def __init__(self, site_name='us', thread_num=10, limit=100):
self.site_name = site_name
self.thread_num = thread_num
self.limit = limit
self.engine_srs = DBUtil.get_db_engine(db_type=DbTypes.srs.name, site_name=self.site_name)
self.df_self_asin_image = pd.DataFrame()
def read_data(self):
with self.engine_srs.begin() as conn:
sql_update = f"""
UPDATE us_self_asin_image
SET state = 2
WHERE id in (
SELECT id
FROM us_self_asin_image
WHERE state = 1
LIMIT {self.limit}
);
"""
print(f"sql_update: {sql_update}")
conn.execute(sql_update)
sql_read = f"select id, asin, image_file from {self.site_name}_self_asin_image where state=2 limit {self.limit};"
# self.df_self_asin_image = pd.read_sql(sql_read, con=self.engine_srs)
a = conn.execute(sql_read)
df = pd.DataFrame(a, columns=['id', 'asin', 'image_file'])
id_tuple = tuple(df.id)
print(f"sql_read: {sql_read}, {df.shape}", id_tuple[:10])
return df
# if id_tuple:
# id_tuple_str = f"({id_tuple[0]})" if len(id_tuple) == 1 else f"{id_tuple}"
# sql_update = f"update {self.site_name}_self_asin_image set state=2 where id in {id_tuple_str};"
# conn.execute(sql_update)
def handle_data(self, df):
img_str_list = list(df.image_file)
asin_list = list(df.asin)
for asin, img_str in zip(asin_list, img_str_list):
input_bytes = json.loads(img_str)
input_bytes = ast.literal_eval(input_bytes)
dir_name = rf"/mnt/data/img_data/amazon_self/{self.site_name}/{asin[:1]}/{asin[:2]}/{asin[:3]}/{asin[:4]}/{asin[:5]}/{asin[:6]}"
# 确保目录存在
os.makedirs(dir_name, exist_ok=True)
file_name = rf"{dir_name }/{asin}.jpg"
with open(file_name, 'wb') as f:
f.write(input_bytes)
def save_data(self, df):
with self.engine_srs.begin() as conn:
id_tuple = tuple(df.id)
if id_tuple:
id_tuple_str = f"({id_tuple[0]})" if len(id_tuple) == 1 else f"{id_tuple}"
sql_update = f"update {self.site_name}_self_asin_image set state=3 where id in {id_tuple_str};"
print(f"sql_update: {sql_update}")
conn.execute(sql_update)
def run(self):
while True:
try:
df = self.read_data()
if df.shape[0]:
self.handle_data(df)
self.save_data(df)
else:
break
except Exception as e:
print(f"error: {e}", traceback.format_exc())
self.engine_srs = DBUtil.get_db_engine(db_type=DbTypes.srs.name, site_name=self.site_name)
time.sleep(10)
def run_thread(self):
thread_list = []
for thread_id in range(self.thread_num):
thread = threading.Thread(target=self.run)
thread_list.append(thread)
thread.start()
for thread in thread_list:
thread.join()
logging.info("所有线程处理完成")
if __name__ == '__main__':
handle_obj = GetPicsToh7(site_name='us', thread_num=1, limit=1000)
handle_obj.run_thread()