no message

parent dd261257
import json import json
import time import time
from datetime import datetime from datetime import datetime
import pymysql
from flask import Flask, request, jsonify from flask import Flask, request, jsonify
from loguru import logger from loguru import logger
# 引入逻辑类 # 引入逻辑类
from inv_img_double_search import AmazonImageSearch, SITE_CONFIG_MAPPER from inv_img_double_search import AmazonImageSearch, SITE_CONFIG_MAPPER
from amazon_configs import site_name_secret_dict from amazon_configs import site_name_secret_dict
TIMEOUT = 25 # 设置超时时间(秒) from py_spider.utils.secure_db_client import get_remote_engine
app = Flask(__name__) app = Flask(__name__)
# 让 Flask 支持中文返回不乱码 # 让 Flask 支持中文返回不乱码
app.config['JSON_AS_ASCII'] = False app.config['JSON_AS_ASCII'] = False
DB_CONFIG = {
'host': 'rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com', # 你的数据库IP (如果是远程数据库请填写公网IP)
'port': 3306,
'user': 'adv_test', # 数据库用户名
'password': 'jBVQe0FAwZgY3YsQqfwzUd', # 数据库密码
'db': 'selection', # 数据库名
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
# 获取数据库连接的辅助函数
def get_db_conn():
return pymysql.connect(**DB_CONFIG)
@app.route('/', methods=['GET']) @app.route('/', methods=['GET'])
def index(): def index():
...@@ -93,72 +81,104 @@ def search_image_api(): ...@@ -93,72 +81,104 @@ def search_image_api():
search_mode = data.get("search_mode", "default") search_mode = data.get("search_mode", "default")
if site_name not in SITE_CONFIG_MAPPER: if site_name not in SITE_CONFIG_MAPPER:
return jsonify({"code": 400, "msg": "不支持的站点"}), 400 return jsonify({"code": 400, "msg": "不支持的站点"}), 400
# 连接数据库
engine = get_remote_engine(site_name=site_name, db_type="mysql")
if not engine:
logger.error(f"站点 {site_name} 数据库连接失败")
conn = None
task_id = None task_id = None
# ====================================================== # ======================================================
# 2:写入数据库 (发布任务) # 2:写入数据库 (发布任务)
# ====================================================== # ======================================================
try: try:
conn = get_db_conn() now_time = datetime.now() # 获取当前时间 mysql的时间慢20多秒
with conn.cursor() as cursor: now_time_str = now_time.strftime("%Y-%m-%d %H:%M:%S")
now_time = datetime.now() # 获取当前时间 mysql的时间慢20多秒 sql = "INSERT INTO us_inv_img_result (img_url, state,search_mode,site_name,created_at) VALUES (%s, 1, %s, %s, %s);"
sql = "INSERT INTO us_inv_img_result (img_url, state,created_at) VALUES (%s, 1, %s)" # cursor.execute(sql, (image_url, now_time))
cursor.execute(sql, (image_url, now_time)) with engine.begin() as conn: # 把当前url写入sql 状态记录为1
conn.commit() conn.execute(sql, [(image_url,search_mode,site_name, now_time_str)])
task_id = cursor.lastrowid # 查询url对应的id 注意 同一时间如果多个同样的链接插入 会查询最新的id
logger.info(f"任务已创建 ID: {task_id}, 等待 VPS 处理...") sql_find = "SELECT id, created_at FROM us_inv_img_result WHERE img_url = '%s' AND state = 1 ORDER BY id DESC LIMIT 1;" % (image_url)
df_data = engine.read_sql(sql_find).to_dict(orient='records') # 转字典 格式为 [{}]
if df_data:
db_id = df_data[0].get('id') # 获取id
db_created_at = df_data[0].get('created_at') # 获取创建时间
db_created_at = datetime.strptime(db_created_at, "%Y-%m-%dT%H:%M:%S") #转datetime类型计算
# 计算查出来的数据时间,和我们刚才插入的时间差
# 如果在误差范围内 -> 认定这就是刚才插的那条。
# 如果时间差很大 -> 说明插入失败了,或者查到了很久以前的历史数据,报错处理
if db_created_at:
time_diff = abs((now_time - db_created_at).total_seconds())
if time_diff < 3:
# 时间吻合,认定这就是我刚刚插入的那条
task_id = db_id
logger.info(f"任务已创建 ID: {task_id}, 等待 VPS 处理...")
else:
logger.warning(f"查到了记录 ID:{db_id},但时间相差太大({time_diff}s),可能是历史旧数据,判定插入失败")
else:
# 极少情况:数据库里有数据但没时间,保守起见放行
task_id = db_id
else:
logger.error("查询数据失败")
except Exception as e: except Exception as e:
logger.error(f"数据库写入失败,直接转入本地运行: {e}") logger.error(f"数据库操作失败,直接转入本地运行: {e}")
# 如果数据库挂了,不直接报错,而是直接去跑本地兜底逻辑 # 如果数据库挂了,不直接报错,而是直接去跑本地兜底逻辑
finally:
if 'conn' in locals() and conn: conn.close()
# ====================================================== # ======================================================
# 3. 轮询等待结果 # 3. 轮询等待结果
# ====================================================== # ======================================================
if task_id: if task_id:
while True: while True:
row = None row_dict = None
try: try:
conn = get_db_conn() # 每次必须查 state, result_data 和 created_at
with conn.cursor() as cursor: select_sql = "SELECT state, result_data, created_at FROM us_inv_img_result WHERE id = %s;" % (task_id)
# 每次必须查 state, result_data 和 created_at select_df_data = engine.read_sql(select_sql).to_dict(orient='records')
sql = "SELECT state, result_data, created_at FROM us_inv_img_result WHERE id = %s" if select_df_data:
cursor.execute(sql, (task_id,)) row_dict = select_df_data[0] # 取出这条数据 字典格式
row = cursor.fetchone()
except Exception as e: except Exception as e:
logger.error(f"轮询异常: {e}") logger.error(f"轮询异常: {e}")
finally:
if 'conn' in locals() and conn: conn.close()
# 如果任务突然查不到了,直接跳出走本地 # 如果任务突然查不到了,直接跳出走本地
if not row: if not row_dict:
break break
state = row['state'] state = row_dict.get('state')
created_at = row['created_at'] # 类型是 datetime created_at = row_dict.get('created_at') # 类型是字符串
# print(created_at,type(created_at)) created_at = datetime.strptime(created_at, "%Y-%m-%dT%H:%M:%S")
# 计算耗时 (当前时间 - 数据库里的创建时间) # 计算耗时 (当前时间 - 数据库里的创建时间)
elapsed_seconds = 0 elapsed_seconds = 0
if created_at: if created_at:
elapsed_seconds = (datetime.now() - created_at).total_seconds() elapsed_seconds = (datetime.now() - created_at).total_seconds()
# 情况1: 状态为 3 (已完成) -> ,返回结果 # 情况1: 状态为 3 (已完成) -> ,返回结果
if state == 3: if state == 3:
logger.success(f"任务 {task_id} VPS完成 (耗时{elapsed_seconds:.1f}s)")
try: try:
res_data = json.loads(row['result_data']) res_data = json.loads(row_dict['result_data'])
return jsonify({"code": 200, "msg": "success", "data": res_data}) if res_data.get("success") == 1:
logger.success(f"任务 {task_id} 状态3 VPS完成 (耗时{elapsed_seconds:.1f}s)")
return jsonify({"code": 200, "msg": "success", "data": res_data})
else:
# VPS 标记为失败 (success=0)
logger.warning(f"任务 {task_id} 状态3 VPS返回失败(success=0). 总耗时: {elapsed_seconds:.1f}s")
# 判断时间是否超过 1 分钟
if elapsed_seconds > 60:
logger.error("VPS失败状态3且已超时超过60s,不再兜底,直接返回失败")
return jsonify({"code": 500, "msg": "本地处理失败", "data": res_data}), 500
else:
logger.info("耗时未超60s,转本地兜底重试...")
break # 跳出循环,去执行本地逻辑
except: except:
# 极少情况:JSON解析失败,视为失败走兜底 # 极少情况:JSON解析失败,视为失败走兜底
break break
# 情况2: 状态为 1 (待处理) 且 耗时超过 5秒 -> 超时兜底 # 情况2: 状态为 1 (待处理) 且 耗时超过 60秒 -> 超时兜底
if state == 1 and elapsed_seconds > 5: if state == 1 and elapsed_seconds > 60:
logger.warning(f"任务 {task_id} [待处理] 超时 ({elapsed_seconds:.1f}s > 5s) -> 转本地") logger.warning(f"任务 {task_id} [待处理 状态1] 超时 ({elapsed_seconds:.1f}s > 5s) -> 转本地")
break break
# 情况3: 状态为 2 (爬取中) 且 耗时超过 30秒 -> 超时兜底 # 情况3: 状态为 2 (爬取中) 且 耗时超过 60秒 -> 超时兜底
if state == 2 and elapsed_seconds > 30: if state == 2 and elapsed_seconds > 60:
logger.warning(f"任务 {task_id} [进行中] 超时 ({elapsed_seconds:.1f}s > 30s) -> 转本地") logger.warning(f"任务 {task_id} [进行中状态2] 超时 ({elapsed_seconds:.1f}s > 30s) -> 转本地")
break break
# 没超时也没完成,休息0.3秒继续查 # 没超时也没完成,休息0.3秒继续查
...@@ -171,29 +191,27 @@ def search_image_api(): ...@@ -171,29 +191,27 @@ def search_image_api():
logger.info(f"启动本地爬虫兜底: ") logger.info(f"启动本地爬虫兜底: ")
logger.info(f"收到API请求: Site={site_name}, Mode={search_mode}, URL={image_url}") logger.info(f"收到API请求: Site={site_name}, Mode={search_mode}, URL={image_url}")
client = AmazonImageSearch(site_name=site_name) client = AmazonImageSearch(site_name=site_name)
result = client.search(image_url, search_mode='default') result = client.search(image_url, search_mode=search_mode)
# 简单检查本地是否成功 # 简单检查本地是否成功
if result.get("error") or result.get("success") == 0: if result.get("error") or result.get("success") == 0:
return jsonify({"code": 500, "msg": "本地处理失败", "data": result}), 500 return jsonify({"code": 500, "msg": "本地处理失败", "data": result}), 500
if task_id: if task_id:
try: if result.get("success") == 1:
conn = get_db_conn() try:
with conn.cursor() as cursor:
json_result = json.dumps(result, ensure_ascii=False) json_result = json.dumps(result, ensure_ascii=False)
finish_time = datetime.now() finish_time = datetime.now()
finish_time_str = finish_time.strftime("%Y-%m-%d %H:%M:%S")
# 更新状态为 3 (已完成),填入结果,更新时间 # 更新状态为 3 (已完成),填入结果,更新时间
cursor.execute( up_sql = "UPDATE us_inv_img_result SET state = 3, result_data = %s, updated_at = %s WHERE id = %s"
"UPDATE us_inv_img_result SET state = 3, result_data = %s, updated_at = %s WHERE id = %s", with engine.begin() as conn:
(json_result, finish_time, task_id) conn.execute(up_sql, [(json_result, finish_time_str,task_id)])
)
conn.commit()
logger.info(f"本地兜底结果已保存至数据库 ID: {task_id}") logger.info(f"本地兜底结果已保存至数据库 ID: {task_id}")
except Exception as save_e: except Exception as save_e:
# 存库失败只记录日志,不影响给用户返回结果 # 存库失败只记录日志,不影响给用户返回结果
logger.error(f"本地结果回写数据库失败: {save_e}") logger.error(f"本地结果回写数据库失败: {save_e}")
finally: else:
if 'conn' in locals() and conn: conn.close() logger.info(f"本地兜底结果 解析失败 ID: {task_id}")
return jsonify({"code": 200, "msg": "success", "data": result}) return jsonify({"code": 200, "msg": "success", "data": result})
except Exception as e: except Exception as e:
logger.error(f"本地执行出错: {e}") logger.error(f"本地执行出错: {e}")
......
...@@ -5,14 +5,15 @@ import uuid ...@@ -5,14 +5,15 @@ import uuid
import random import random
import re import re
import json import json
import io # 新增:用于处理内存图片 import io
from io import BytesIO
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List
# 引入线程池 # 引入线程池
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd import pandas as pd
import requests import requests
from PIL import Image from PIL import Image,UnidentifiedImageError
import pillow_avif
from scrapy import Selector from scrapy import Selector
from requests.exceptions import RequestException, JSONDecodeError, Timeout, ConnectionError, HTTPError from requests.exceptions import RequestException, JSONDecodeError, Timeout, ConnectionError, HTTPError
from loguru import logger from loguru import logger
...@@ -464,39 +465,89 @@ class AmazonImageSearch: ...@@ -464,39 +465,89 @@ class AmazonImageSearch:
# === 图片下载方法 === # === 图片下载方法 ===
def _download_image(self, img_url: str) -> bytes: def _download_image(self, img_url: str) -> bytes:
"""下载图片并返回二进制数据""" """下载图片并返回二进制数据 """
# 可以更新一下 图片下载走_retry_request
logger.info(f"正在下载图片: {img_url}") logger.info(f"正在下载图片: {img_url}")
target_size_bytes = 1 * 1024 * 1024 # 限定最大为1mb
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'ect': '3g',
'pragma': 'no-cache',
'priority': 'u=0, i',
'sec-ch-dpr': '1',
'sec-ch-ua': '"Chromium";v="142", "Google Chrome";v="142", "Not_A Brand";v="99"',
'sec-ch-ua-form-factors': '"Desktop"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-viewport-height': '376',
'sec-ch-viewport-width': '1920',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
}
for i in range(3): for i in range(3):
try: try:
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'ect': '3g',
'pragma': 'no-cache',
'priority': 'u=0, i',
'sec-ch-dpr': '1',
'sec-ch-ua': '"Chromium";v="142", "Google Chrome";v="142", "Not_A Brand";v="99"',
'sec-ch-ua-form-factors': '"Desktop"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-viewport-height': '376',
'sec-ch-viewport-width': '1920',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
}
resp = requests.get(img_url, headers=headers, timeout=15) resp = requests.get(img_url, headers=headers, timeout=15)
resp.raise_for_status() resp.raise_for_status()
# logger.info(f'图片数据:{resp.content}') img_bytes = resp.content
# with open('./image.png','wb') as f: # 2. 判断大小,如果小于 1MB 直接返回,不浪费 CPU 如果是 SVG,Pillow 处理不了,且 SVG 通常很小,直接下载返回即可
# f.write(resp.content) if len(img_bytes) <= target_size_bytes or img_url.lower().endswith('.svg'):
return resp.content return img_bytes
f_obj = BytesIO(img_bytes)
# 3. 内存中进行压缩
# 使用 BytesIO 将字节流转换为 Pillow 可识别的对象
try:
with Image.open(f_obj) as img:
logger.info(f"图片大小 ({len(img_bytes) / 1024 / 1024:.2f}MB) 超过限制,开始压缩...")
# 兼容性处理:如果是 PNG/RGBA,转为 RGB (否则无法保存为 JPEG)
if img.mode != "RGB":
img = img.convert("RGB")
# 压缩参数初始化
buffer = BytesIO()
quality = 90 # 初始质量
min_quality = 20 # 最低质量
step = 10 # 每次递减幅度
while True:
# 清空缓冲区指针,准备写入
buffer.seek(0)
buffer.truncate()
# 尝试保存压缩版 强制转为 JPEG,这是兼容性最好的压缩方式
img.save(buffer, format="JPEG", quality=quality, optimize=True)
current_size = buffer.tell()
# 检查是否达标
if current_size <= target_size_bytes:
logger.info(f"压缩成功: {current_size / 1024 / 1024:.2f}MB (Quality={quality})")
return buffer.getvalue()
# 如果不达标,继续调整
if quality > min_quality:
quality -= step
else:
# 质量已降无可降,只能缩小分辨率 (Resize)
w, h = img.size
# 如果图片已经小到 100px 了,直接返回吧(防止死循环风险)
if w < 100 or h < 100:
logger.warning(f"图片已缩至 {w}x{h} 仍无法满足大小限制,强制返回")
return buffer.getvalue()
# 每次缩小为原来的 0.9 倍
img = img.resize((int(w * 0.9), int(h * 0.9)), Image.Resampling.LANCZOS)
logger.info(f"质量极限,缩小分辨率至: {img.size}")
# 缩小尺寸后,回升一点质量再试,防止画质太烂
quality = 60
except UnidentifiedImageError:
# Pillow 认不出来的格式(比如 SVG 或 奇怪的加密格式)
logger.warning(f"Pillow 无法识别图片格式: {img_url},直接返回原图")
return img_bytes
except OSError:
# 图片文件损坏
logger.warning(f"图片文件似乎已损坏: {img_url}")
return img_bytes
except Exception as e: except Exception as e:
logger.warning(f"图片下载失败(第{i + 1}次): {e}") logger.warning(f"图片下载失败/压缩失败(第{i + 1}次): {e}")
time.sleep(1) time.sleep(1)
raise ValueError(f"图片下载最终失败: {img_url}") raise ValueError(f"图片下载最终失败: {img_url}")
...@@ -609,12 +660,14 @@ class AmazonImageSearch: ...@@ -609,12 +660,14 @@ class AmazonImageSearch:
if __name__ == "__main__": if __name__ == "__main__":
# 测试用图片链接 # 测试用图片链接
test_img_url = "https://m.media-amazon.com/images/I/71IFE6W6THL._AC_UL320_.jpg" # test_img_url = "https://soundasia.oss-cn-shenzhen.aliyuncs.com/yswg-img/SoundasiaAmazon/file/2025/1125/bdb9b06102184048b6eb9db3b39bb97e.png"
test_img_url = "https://yswg-private-test.oss-cn-shenzhen.aliyuncs.com/SoundasiaAmazon/competitor_image/2025/1127/88e90bbd317a42ea80cc9128ea333e6c.svg" # 图片权限问题
# test_img_url = "https://m.media-amazon.com/images/I/71IFE6W6THL._AC_UL320_.jpg"
# test_img_url = "https://m.media-amazon.com/images/I/71IFE6W6THL._AC_SY550_.jpg" # test_img_url = "https://m.media-amazon.com/images/I/71IFE6W6THL._AC_SY550_.jpg"
# test_img_url = "https://m.media-amazon.com/images/I/71G1BAeYlNL._AC_SX300_SY300_QL70_FMwebp_.jpg" # test_img_url = "https://m.media-amazon.com/images/I/71G1BAeYlNL._AC_SX300_SY300_QL70_FMwebp_.jpg"
try: try:
client = AmazonImageSearch(site_name="us") client = AmazonImageSearch(site_name="uk")
logger.info("\n=== 测试默认模式 ===") logger.info("\n=== 测试默认模式 ===")
result = client.search(test_img_url, search_mode="default") result = client.search(test_img_url, search_mode="default")
# logger.success(f"Result: Success={default_result}") # logger.success(f"Result: Success={default_result}")
......
import socket
from func_timeout import func_timeout, FunctionTimedOut
import pandas as pd
from sqlalchemy.exc import OperationalError
from py_spider.utils.secure_db_client import get_remote_engine
from loguru import logger
import time
# 防止底层握手阶段无限等待
socket.setdefaulttimeout(30)
def sql_execute_agg(f_name, sql_or_table, data=None, site="us", db="mysql"):
engine = get_remote_engine(site_name=site, db_type=db)
try:
if f_name == "to_sql":
if data is None or not isinstance(data, pd.DataFrame):
raise ValueError("to_sql 操作必须提供 DataFrame 数据")
engine.to_sql(data, table=sql_or_table, if_exists="append")
return True
elif f_name == "read_sql":
df = engine.read_sql(sql_or_table)
return df
elif f_name == "sql_execute":
with engine.begin() as conn:
conn.execute(sql_or_table, data)
return True
except OperationalError as e:
logger.error(f"OperationalError sql_or_table is {sql_or_table}")
return False
except RuntimeError as e:
logger.error(f"RuntimeError sql_or_table is {sql_or_table}")
return False
def sql_try_again(f_name, sql_or_table, data=None, site="us", db="mysql", max_timeout=15):
fail_count = 0
while True:
try:
# 使用 func_timeout 强制限制执行时间
# 如果 sql_execute_agg 在 max_timeout 秒内没反应,直接抛出 FunctionTimedOut
result = func_timeout(max_timeout,sql_execute_agg, args=(f_name, sql_or_table, data, site, db))
# --- 成功处理逻辑 ---
if f_name == 'read_sql':
if isinstance(result, pd.DataFrame):
# logger.success(f"SQL读取成功: {sql_or_table[:30]}...")
return result
else:
raise ValueError("返回类型不是 DataFrame")
else:
if result is True:
# logger.success(f"SQL执行成功: {f_name}")
return True
else:
raise ValueError("执行返回 False")
except FunctionTimedOut:
fail_count += 1
logger.error(f" 数据库操作超时 (强制中断) - 第 {fail_count} 次重试")
# 必须休眠!给网络恢复的时间
time.sleep(5)
continue
except (OperationalError, Exception) as e:
fail_count += 1
# 捕获所有异常(包括网络断开、SQL报错等)
logger.warning(f"数据库报错 (第 {fail_count} 次): {e}")
# 【关键】休眠!防止死循环把 CPU 跑满
# 可以做一个简单的策略:前几次快点重试,后面慢点
sleep_time = 3 if fail_count < 5 else 5
time.sleep(sleep_time)
continue
import time
from concurrent.futures import ThreadPoolExecutor,as_completed
import requests import requests
import json import json
from py_spider.utils.secure_db_client import get_remote_engine
url = "http://192.168.200.210:5000/api/search_image" url = "http://192.168.200.210:5000/api/search_image"
# url = "http://127.0.0.1:5000/api/search_image" # url = "http://127.0.0.1:5000/api/search_image"
...@@ -15,42 +19,84 @@ payload = { ...@@ -15,42 +19,84 @@ payload = {
# "image_url": "https://m.media-amazon.com/images/I/71G1BAeYlNL._AC_SX300_SY300_QL70_FMwebp_.jpg", # no # "image_url": "https://m.media-amazon.com/images/I/71G1BAeYlNL._AC_SX300_SY300_QL70_FMwebp_.jpg", # no
# "image_url": "https://m.media-amazon.com/images/I/41ryNvEnNCL._AC_SL1500_.jpg", # ok # "image_url": "https://m.media-amazon.com/images/I/41ryNvEnNCL._AC_SL1500_.jpg", # ok
# "image_url": "https://m.media-amazon.com/images/I/71IFE6W6THL._AC_SY550_.jpg", # ok # "image_url": "https://m.media-amazon.com/images/I/71IFE6W6THL._AC_SY550_.jpg", # ok
"image_url": "https://soundasia.oss-cn-shenzhen.aliyuncs.com/yswg-img-test/SoundasiaAmazon/competitor_image/2025/1124/84759f5d6514435e83d0fb728a57451c.jpg", # ok # "image_url": "https://soundasia.oss-cn-shenzhen.aliyuncs.com/yswg-img-test/SoundasiaAmazon/competitor_image/2025/1124/84759f5d6514435e83d0fb728a57451c.jpg", # ok
# "image_url": "https://imgtool.net/cdn/image/2023/20230401_1.jpg", # ok "image_url": "https://imgtool.net/cdn/image/2023/20230401_1.jpg", # ok
# 站点 # 站点
"site_name": "us", "site_name": "us",
# 模式 (default: 普通模式, full_image: 全图模式) 主要针对app端识别 # 模式 (default: 普通模式, full_image: 全图模式) 主要针对app端识别
# "search_mode": "full_image" # 不传走默认 default # "search_mode": "full_image" # 不传走默认 default
} }
def send_request(url1):
"""发送单个请求,返回(是否成功、响应时间)"""
start = time.time()
try:
payload = {
# 图片链接
"image_url": url1, # ok
# 站点
"site_name": "us",
}
response = requests.post(url, json=payload, timeout=120,allow_redirects=False )
print(response.json())
return (response.status_code == 200, round(time.time() - start, 4))
except:
return (False, 5) # 失败时响应时间记为超时时间
try: try:
print("开始请求!") print("开始请求!")
# 发送 POST 请求 # 发送 POST 请求
response = requests.post(url, json=payload, timeout=120) # response = requests.post(url, json=payload, timeout=120)
thread_num = 5 # 线程数
# 2. 多线程测试
success = 0
response_times = []
start_total = time.time()
engine = get_remote_engine(site_name='us', db_type="mysql")
sql_get = "SELECT img_url FROM us_inv_img_info ORDER BY updated_at DESC LIMIT 10, 10"
get_df_data = engine.read_sql(sql_get).to_dict("records")
total_req_list = [i.get('img_url') for i in get_df_data]
print(total_req_list)
# 线程池执行任务
with ThreadPoolExecutor(max_workers=thread_num) as executor:
# 提交所有请求任务
tasks = [executor.submit(send_request,url1) for url1 in total_req_list]
# 遍历结果
for task in as_completed(tasks):
ok, t = task.result()
if ok:
success += 1
response_times.append(t)
total_time = round(time.time() - start_total, 2)
success_rate = (success / len(total_req_list)) * 100
avg_time = round(sum(response_times)/len(response_times), 4) if response_times else 0
print(f"总请求数:{len(total_req_list)} | 成功数:{success} | 成功率:{success_rate:.2f}%")
print(f"总耗时:{total_time}秒 | 平均响应时间:{avg_time}秒")
print(f"QPS(每秒请求数):{len(total_req_list)/total_time:.2f}")
# 解析结果 # 解析结果
if response.status_code == 200: # response = requests.post(url, json=payload, timeout=120)
result = response.json() # if response.status_code == 200:
print("\n 请求成功!") # result = response.json()
print(f"状态码: {result['code']}") # print("\n 请求成功!")
print(f"消息: {result['msg']}") # print(f"状态码: {result['code']}")
# print(f"消息: {result['msg']}")
#
data = result.get('data', {}) #
print(f"识别成功: {data.get('success')}") # data = result.get('data', {})
print(f"找到商品数: {data.get('total_items')}") # print(f"识别成功: {data.get('success')}")
print(f"耗时: {data.get('duration')}") # print(f"找到商品数: {data.get('total_items')}")
print(f"搜索url: {data.get('search_url')}") # print(f"耗时: {data.get('duration')}")
# 打印前3个商品看看 # print(f"搜索url: {data.get('search_url')}")
items = data.get('items', []) # # 打印前3个商品看看
if items: # # items = data.get('items', [])
print("\n--- 商品示例 ---") # # if items:
for item in items: # # print("\n--- 商品示例 ---")
print(f"ASIN: {item['asin']} | 价格: {item['price']} | 评分:{item['rating']} | img_url:{item['img_url']} | 品牌名:{item['brand']} | 销量:{item['bought']} | 评论数:{item['total_comments']} | asin来源:{item['source']} | 标题: {item['title'][:50]}...") # # for item in items:
else: # # print(f"ASIN: {item['asin']} | 价格: {item['price']} | 评分:{item['rating']} | img_url:{item['img_url']} | 品牌名:{item['brand']} | 销量:{item['bought']} | 评论数:{item['total_comments']} | asin来源:{item['source']} | 标题: {item['title'][:50]}...")
print(f" 请求失败: {response.status_code}") # else:
print(response.text) # print(f" 请求失败: {response.status_code}")
# print(response.text)
except Exception as e: except Exception as e:
print(f" 发生错误: {e}") print(f" 发生错误: {e}")
\ No newline at end of file
import random
import socket
import threading
import time import time
import json import json
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime from datetime import datetime
import pymysql
from loguru import logger from loguru import logger
from py_spider.utils.secure_db_client import get_remote_engine
from inv_img_double_search import AmazonImageSearch from inv_img_double_search import AmazonImageSearch
DB_CONFIG = { from mysql_db_new import sql_try_again
'host': 'rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com', # 你的数据库IP (如果是远程数据库请填写公网IP)
'port': 3306,
'user': 'adv_test', # 数据库用户名
'password': 'jBVQe0FAwZgY3YsQqfwzUd', # 数据库密码
'db': 'selection', # 数据库名
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
def get_db_conn():
return pymysql.connect(**DB_CONFIG)
def worker_loop():
logger.info("VPS Worker 已启动,监听任务中...")
while True:
conn = None
try:
conn = get_db_conn()
with conn.cursor() as cursor:
# 1. 抢任务 优先处理早期任务
sql_get = "SELECT id, img_url FROM us_inv_img_result WHERE state = 1 ORDER BY id ASC LIMIT 1"
cursor.execute(sql_get)
task = cursor.fetchone()
if task:
task_id = task['id']
url = task['img_url']
logger.info(f"领取任务: {task_id}")
# ===================================================
# 阶段 A: 标记为进行中 (手动更新 updated_at)
# ===================================================
now_time = datetime.now() # 获取 VPS 当前时间
# 更新 updated_at def process_single_task(task_id, url,search_mode,site_name):
cursor.execute( thread_name = f"Thread-{threading.get_ident()}"
"UPDATE us_inv_img_result SET state = 2, updated_at = %s WHERE id = %s", start_time = time.time()
(now_time, task_id) try:
) logger.info(f"[{thread_name}] ▶ 开始爬取 ID: {task_id}")
conn.commit()
try: # 1. 执行爬虫
# 执行爬虫 spider = AmazonImageSearch(site_name=site_name)
spider = AmazonImageSearch(site_name="us") result_data = spider.search(url,search_mode=search_mode)
result_data = spider.search(url) # 2. 存入结果
json_result = json.dumps(result_data, ensure_ascii=False) # 无论业务逻辑是否成功,都原样存入,让 Flask 判断
json_result = json.dumps(result_data, ensure_ascii=False)
finish_time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
finish_sql = "UPDATE us_inv_img_result SET state = 3, result_data = %s, updated_at = %s WHERE id = %s"
# with engine.begin() as conn:
# conn.execute(finish_sql, [(json_result, finish_time_str, task_id)])
sql_try_again("sql_execute", finish_sql, data=[(json_result, finish_time_str, task_id)],site=site_name )
cost_time = time.time() - start_time
logger.success(f"[{thread_name}] 任务 {task_id} 完成 耗时: {cost_time:.2f}s ")
# =================================================== except Exception as e:
# 阶段 B: 标记为完成 (手动更新 updated_at) logger.error(f"[{thread_name}] 任务 {task_id} 崩溃: {e}")
# ===================================================
finish_time = datetime.now() # 获取完成时间
cursor.execute( def worker_loop():
"UPDATE us_inv_img_result SET state = 3, result_data = %s, updated_at = %s WHERE id = %s", logger.info("VPS Worker 已启动,监听任务中...")
(json_result, finish_time, task_id)
)
conn.commit()
logger.success(f"任务 {task_id} 完成")
except Exception as e:
logger.error(f"任务 {task_id} 报错: {e}")
# error_json = json.dumps({"success": 0, "msg": str(e)}, ensure_ascii=False)
# ===================================================
# 阶段 C: 标记为报错 (手动更新 updated_at)
# ===================================================
# fail_time = datetime.now()
# cursor.execute( # 线程池
# "UPDATE us_inv_img_result SET state = 3, result_data = %s, updated_at = %s WHERE id = %s", executor = ThreadPoolExecutor(max_workers=5)
# (error_json, fail_time, task_id) while True:
# ) try:
# conn.commit() # 连接数据库
else: logger.info('正在轮询中')
time.sleep(0.5) # engine = get_remote_engine(site_name='us', db_type="mysql")
time.sleep(random.uniform(0.1, 0.5)) # 随机休眠 防止撞车
# 1. 抢任务 优先处理早期任务
sql_get = "SELECT id, img_url,search_mode,site_name FROM us_inv_img_result WHERE state = 1 ORDER BY id ASC LIMIT 1"
df = sql_try_again("read_sql", sql_get)
get_df_data = df.to_dict("records")
# get_df_data = engine.read_sql(sql_get).to_dict("records") # 获取最新的状态为1的一条数据
if get_df_data:
task = get_df_data[0]
task_id = task['id']
url = task['img_url']
search_mode = task['search_mode']
site_name = task['site_name']
logger.info(f"领取任务: {task_id}")
# 2 更新状态为2 表示正在爬取
now_time = datetime.now() # 获取 VPS 当前时间
now_time_str = now_time.strftime("%Y-%m-%d %H:%M:%S")
up_two_sql = "UPDATE us_inv_img_result SET state = 2, updated_at = %s WHERE id = %s AND state = 1"
# 更新 updated_at 设置状态为2正在爬取
# with engine.begin() as conn:
# conn.execute(up_two_sql, [(now_time_str, task_id)])
sql_try_again("sql_execute", up_two_sql, data=[(now_time_str, task_id)])
# 3. 直接丢进线程池
logger.info(f"提交任务: {task_id}-> 线程池")
executor.submit(process_single_task, task_id, url,search_mode,site_name)
else:
time.sleep(2) # 没任务 休眠一下
except Exception as e: except Exception as e:
logger.error(f"数据库连接失败: {e}") logger.error(f"数据库连接失败: {e}")
time.sleep(3) time.sleep(2)
finally:
if conn: conn.close()
if __name__ == "__main__": if __name__ == "__main__":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment