VPS执行以图搜竞品代码 服务器兜底

parent 96e7eb0b
# server.py import json
import time
from datetime import datetime
import pymysql
from flask import Flask, request, jsonify from flask import Flask, request, jsonify
from loguru import logger from loguru import logger
# 引入逻辑类 # 引入逻辑类
from inv_img_double_search import AmazonImageSearch, SITE_CONFIG_MAPPER from inv_img_double_search import AmazonImageSearch, SITE_CONFIG_MAPPER
from amazon_configs import site_name_secret_dict
TIMEOUT = 25 # 设置超时时间(秒)
app = Flask(__name__) app = Flask(__name__)
# 让 Flask 支持中文返回不乱码 # 让 Flask 支持中文返回不乱码
app.config['JSON_AS_ASCII'] = False app.config['JSON_AS_ASCII'] = False
DB_CONFIG = {
'host': 'rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com', # 你的数据库IP (如果是远程数据库请填写公网IP)
'port': 3306,
'user': 'adv_test', # 数据库用户名
'password': 'jBVQe0FAwZgY3YsQqfwzUd', # 数据库密码
'db': 'selection', # 数据库名
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
# 获取数据库连接的辅助函数
def get_db_conn():
return pymysql.connect(**DB_CONFIG)
# ==========================================
# 新增:首页接口 (浏览器直接访问)
# ==========================================
@app.route('/', methods=['GET']) @app.route('/', methods=['GET'])
def index(): def index():
""" """
...@@ -74,33 +88,115 @@ def search_image_api(): ...@@ -74,33 +88,115 @@ def search_image_api():
if not data: if not data:
return jsonify({"code": 400, "msg": "Body必须是JSON格式"}), 400 return jsonify({"code": 400, "msg": "Body必须是JSON格式"}), 400
# 2. 提取参数
image_url = data.get("image_url") image_url = data.get("image_url")
site_name = data.get("site_name", "us") site_name = data.get("site_name", "us")
search_mode = data.get("search_mode", "default") search_mode = data.get("search_mode", "default")
# 3. 校验必填项
if not image_url:
return jsonify({"code": 400, "msg": "缺少参数: image_url"}), 400
if site_name not in SITE_CONFIG_MAPPER: if site_name not in SITE_CONFIG_MAPPER:
return jsonify({"code": 400, "msg": f"不支持的站点: {site_name},支持: {list(SITE_CONFIG_MAPPER.keys())}"}), 400 return jsonify({"code": 400, "msg": "不支持的站点"}), 400
conn = None
task_id = None
# ======================================================
# 2:写入数据库 (发布任务)
# ======================================================
try: try:
conn = get_db_conn()
with conn.cursor() as cursor:
now_time = datetime.now() # 获取当前时间 mysql的时间慢20多秒
sql = "INSERT INTO us_inv_img_result (img_url, state,created_at) VALUES (%s, 1, %s)"
cursor.execute(sql, (image_url, now_time))
conn.commit()
task_id = cursor.lastrowid
logger.info(f"任务已创建 ID: {task_id}, 等待 VPS 处理...")
except Exception as e:
logger.error(f"数据库写入失败,直接转入本地运行: {e}")
# 如果数据库挂了,不直接报错,而是直接去跑本地兜底逻辑
finally:
if 'conn' in locals() and conn: conn.close()
# ======================================================
# 3. 轮询等待结果
# ======================================================
if task_id:
while True:
row = None
try:
conn = get_db_conn()
with conn.cursor() as cursor:
# 每次必须查 state, result_data 和 created_at
sql = "SELECT state, result_data, created_at FROM us_inv_img_result WHERE id = %s"
cursor.execute(sql, (task_id,))
row = cursor.fetchone()
except Exception as e:
logger.error(f"轮询异常: {e}")
finally:
if 'conn' in locals() and conn: conn.close()
# 如果任务突然查不到了,直接跳出走本地
if not row:
break
state = row['state']
created_at = row['created_at'] # 类型是 datetime
# print(created_at,type(created_at))
# 计算耗时 (当前时间 - 数据库里的创建时间)
elapsed_seconds = 0
if created_at:
elapsed_seconds = (datetime.now() - created_at).total_seconds()
# 情况1: 状态为 3 (已完成) -> ,返回结果
if state == 3:
logger.success(f"任务 {task_id} VPS完成 (耗时{elapsed_seconds:.1f}s)")
try:
res_data = json.loads(row['result_data'])
return jsonify({"code": 200, "msg": "success", "data": res_data})
except:
# 极少情况:JSON解析失败,视为失败走兜底
break
# 情况2: 状态为 1 (待处理) 且 耗时超过 5秒 -> 超时兜底
if state == 1 and elapsed_seconds > 5:
logger.warning(f"任务 {task_id} [待处理] 超时 ({elapsed_seconds:.1f}s > 5s) -> 转本地")
break
# 情况3: 状态为 2 (爬取中) 且 耗时超过 30秒 -> 超时兜底
if state == 2 and elapsed_seconds > 30:
logger.warning(f"任务 {task_id} [进行中] 超时 ({elapsed_seconds:.1f}s > 30s) -> 转本地")
break
# 没超时也没完成,休息0.3秒继续查
time.sleep(0.3)
# ======================================================
# 4. 本地兜底逻辑
# ======================================================
try:
logger.info(f"启动本地爬虫兜底: ")
logger.info(f"收到API请求: Site={site_name}, Mode={search_mode}, URL={image_url}") logger.info(f"收到API请求: Site={site_name}, Mode={search_mode}, URL={image_url}")
# 4. 初始化并执行搜索
client = AmazonImageSearch(site_name=site_name) client = AmazonImageSearch(site_name=site_name)
result = client.search(image_url, search_mode=search_mode) result = client.search(image_url, search_mode='default')
# 简单检查本地是否成功
# 5. 判断业务是否成功
if result.get("error") or result.get("success") == 0: if result.get("error") or result.get("success") == 0:
return jsonify({"code": 500, "msg": "识别失败", "data": result}), 500 return jsonify({"code": 500, "msg": "本地处理失败", "data": result}), 500
if task_id:
try:
conn = get_db_conn()
with conn.cursor() as cursor:
json_result = json.dumps(result, ensure_ascii=False)
finish_time = datetime.now()
# 更新状态为 3 (已完成),填入结果,更新时间
cursor.execute(
"UPDATE us_inv_img_result SET state = 3, result_data = %s, updated_at = %s WHERE id = %s",
(json_result, finish_time, task_id)
)
conn.commit()
logger.info(f"本地兜底结果已保存至数据库 ID: {task_id}")
except Exception as save_e:
# 存库失败只记录日志,不影响给用户返回结果
logger.error(f"本地结果回写数据库失败: {save_e}")
finally:
if 'conn' in locals() and conn: conn.close()
return jsonify({"code": 200, "msg": "success", "data": result}) return jsonify({"code": 200, "msg": "success", "data": result})
except Exception as e: except Exception as e:
logger.error(f"服务内部错误: {e}") logger.error(f"本地执行出错: {e}")
return jsonify({"code": 500, "msg": f"Server Error: {str(e)}"}), 500 return jsonify({"code": 500, "msg": f"Server Error: {str(e)}"}), 500
......
attrs==25.4.0 attrs
Automat==25.4.16 Automat
blinker==1.9.0 blinker
certifi==2025.10.5 certifi
cffi==2.0.0 cffi
charset-normalizer==3.4.4 charset-normalizer
click==8.3.1 click
colorama==0.4.6 colorama
constantly==23.10.4 constantly
cryptography==46.0.3 cryptography
cssselect==1.3.0 cssselect
curl_cffi==0.13.0 curl_cffi
defusedxml==0.7.1 defusedxml
et_xmlfile==2.0.0 et_xmlfile
fake-useragent==2.2.0 fake-useragent
filelock==3.20.0 filelock
Flask==3.1.2 Flask
frida==16.0.0 frida
frida-tools==12.0.0 frida-tools
hyperlink==21.0.0 hyperlink
idna==3.11 idna
incremental==24.7.2 incremental
itemadapter==0.12.2 itemadapter
itemloaders==1.3.2 itemloaders
itsdangerous==2.2.0 itsdangerous
Jinja2==3.1.6 Jinja2
jmespath==1.0.1 jmespath
loguru==0.7.3 loguru
lxml==6.0.2 lxml
MarkupSafe==3.0.3 MarkupSafe
numpy==1.23.5 numpy
openpyxl==3.1.5 openpyxl
orjson==3.11.4 orjson
packaging==25.0 packaging
pandas==1.5.3 pandas
parsel==1.10.0 parsel
pillow==12.0.0 pillow
prompt_toolkit==3.0.52 prompt_toolkit
Protego==0.5.0 Protego
pyasn1==0.6.1 pyasn1
pyasn1_modules==0.4.2 pyasn1_modules
pycparser==2.23 pycparser
PyDispatcher==2.0.7 PyDispatcher
Pygments==2.19.2 Pygments
pyOpenSSL==25.3.0 pyOpenSSL
PySocks==1.7.1 PySocks
python-dateutil==2.9.0.post0 python-dateutil
pytz==2025.2 pytz
queuelib==1.8.0 queuelib
requests==2.32.5 requests
requests-file==3.0.1 requests-file
Scrapy==2.13.4 Scrapy
service-identity==24.2.0 service-identity
six==1.17.0 six
tldextract==5.3.0 tldextract
tomli==2.3.0 tomli
Twisted==25.5.0 Twisted
typing_extensions==4.15.0 typing_extensions
tzdata==2025.2 tzdata
urllib3==2.5.0 urllib3
w3lib==2.3.1 w3lib
wcwidth==0.2.14 wcwidth
Werkzeug==3.1.3 Werkzeug
win32_setctime==1.2.0 win32_setctime
zope.interface==8.1.1 zope.interface
import time
import json
from datetime import datetime
import pymysql
from loguru import logger
from inv_img_double_search import AmazonImageSearch
DB_CONFIG = {
'host': 'rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com', # 你的数据库IP (如果是远程数据库请填写公网IP)
'port': 3306,
'user': 'adv_test', # 数据库用户名
'password': 'jBVQe0FAwZgY3YsQqfwzUd', # 数据库密码
'db': 'selection', # 数据库名
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
def get_db_conn():
return pymysql.connect(**DB_CONFIG)
def worker_loop():
logger.info("VPS Worker 已启动,监听任务中...")
while True:
conn = None
try:
conn = get_db_conn()
with conn.cursor() as cursor:
# 1. 抢任务 优先处理早期任务
sql_get = "SELECT id, img_url FROM us_inv_img_result WHERE state = 1 ORDER BY id ASC LIMIT 1"
cursor.execute(sql_get)
task = cursor.fetchone()
if task:
task_id = task['id']
url = task['img_url']
logger.info(f"领取任务: {task_id}")
# ===================================================
# 阶段 A: 标记为进行中 (手动更新 updated_at)
# ===================================================
now_time = datetime.now() # 获取 VPS 当前时间
# 更新 updated_at
cursor.execute(
"UPDATE us_inv_img_result SET state = 2, updated_at = %s WHERE id = %s",
(now_time, task_id)
)
conn.commit()
try:
# 执行爬虫
spider = AmazonImageSearch(site_name="us")
result_data = spider.search(url)
json_result = json.dumps(result_data, ensure_ascii=False)
# ===================================================
# 阶段 B: 标记为完成 (手动更新 updated_at)
# ===================================================
finish_time = datetime.now() # 获取完成时间
cursor.execute(
"UPDATE us_inv_img_result SET state = 3, result_data = %s, updated_at = %s WHERE id = %s",
(json_result, finish_time, task_id)
)
conn.commit()
logger.success(f"任务 {task_id} 完成")
except Exception as e:
logger.error(f"任务 {task_id} 报错: {e}")
# error_json = json.dumps({"success": 0, "msg": str(e)}, ensure_ascii=False)
# ===================================================
# 阶段 C: 标记为报错 (手动更新 updated_at)
# ===================================================
# fail_time = datetime.now()
# cursor.execute(
# "UPDATE us_inv_img_result SET state = 3, result_data = %s, updated_at = %s WHERE id = %s",
# (error_json, fail_time, task_id)
# )
# conn.commit()
else:
time.sleep(0.5)
except Exception as e:
logger.error(f"数据库连接失败: {e}")
time.sleep(3)
finally:
if conn: conn.close()
if __name__ == "__main__":
worker_loop()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment