no message

parent 067c9808
import json
import time
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, JSONResponse
from loguru import logger
from pydantic import BaseModel, Field
from inv_img_double_search import AmazonImageSearch, SITE_CONFIG_MAPPER
from amazon_configs import site_name_secret_dict
from py_spider.utils.secure_db_client import get_remote_engine
app = FastAPI(title="Amazon Image Search", description="FastAPI 版本")
# 校验参数
class ImageSearchRequest(BaseModel):
image_url: str = Field(..., description="图片的URL地址")
site_name: str = Field(default="us", description="站点简写, 如 us")
search_mode: str = Field(default="default", description="搜索模式")
# ======================
# 首页
# ======================
@app.get("/", response_class=HTMLResponse)
def index():
html_content = """
<html>
<head><meta charset="utf-8"><title>Amazon Image Search API</title></head>
<body style="font-family: Arial; padding:40px;">
<h1>📸 Amazon Image Search API (FastAPI)</h1>
<p style="color:green;font-weight:bold;">服务正在运行</p>
<h3>接口:</h3>
<p><b>POST /api/search_image</b></p>
<pre>
{
"image_url": "https://m.media-amazon.com/images/I/51i3aMcjmOL._SL600_.jpg",
"site_name": "us", // 可选: us
"search_mode": "default" // 可选: default, full_image
}
</pre>
<h3>健康检查:</h3>
<p><a href="/health">/health</a></p>
</body>
</html>
"""
return HTMLResponse(content=html_content)
# ======================
# 搜图接口
# ======================
@app.post("/api/search_image")
def search_image_api(item: ImageSearchRequest):
image_url = item.image_url
site_name = item.site_name
search_mode = item.search_mode
if site_name not in SITE_CONFIG_MAPPER:
return JSONResponse({"code": 400, "msg": "不支持的站点"}, status_code=400)
# 数据库连接
engine = get_remote_engine(site_name=site_name, db_type="mysql")
if not engine:
logger.error(f"站点 {site_name} 数据库连接失败")
task_id = None
# ============ 1. 插入数据库 ============
try:
now = datetime.now() # 获取当前时间 mysql的时间慢20多秒
now_str = now.strftime("%Y-%m-%d %H:%M:%S")
sql_insert = """
INSERT INTO us_inv_img_result (img_url, state, search_mode, site_name, created_at)
VALUES (%s, 1, %s, %s, %s)
"""
with engine.begin() as conn:
conn.execute(sql_insert, [(image_url, search_mode, site_name, now_str)])
sql_find = f"""
SELECT id, created_at
FROM us_inv_img_result
WHERE img_url = '{image_url}' AND state = 1
ORDER BY id DESC LIMIT 1
"""
row = engine.read_sql(sql_find).to_dict(orient="records")
if row:
db_id = row[0]["id"] # 获取id
created_at = datetime.strptime(row[0]["created_at"], "%Y-%m-%dT%H:%M:%S")
diff = abs((now - created_at).total_seconds())
# 计算查出来的数据时间,和我们刚才插入的时间差
# 如果在误差范围内 -> 认定这就是刚才插的那条。
# 如果时间差很大 -> 说明插入失败了,或者查到了很久以前的历史数据,报错处理
if diff < 3:
task_id = db_id
logger.info(f"任务创建成功 ID={task_id} 等待 VPS 处理..")
else:
logger.warning(f"查到了记录 ID:{db_id},但时间相差太大({diff}s),可能是历史旧数据,判定插入失败")
except Exception as e:
logger.error(f"数据库写入失败 转兜底逻辑:{e}")
# ============ 2. 轮询等待 VPS 结果 ============
if task_id:
while True:
try:
sql_sel = f"""
SELECT state, result_data, created_at
FROM us_inv_img_result
WHERE id = {task_id}
"""
row = engine.read_sql(sql_sel).to_dict(orient="records")
except:
row = None
logger.error(f"轮询异常 查询不到该数据 转入本地: {e}")
if not row:
break
state = row[0]["state"]
created = datetime.strptime(row[0]["created_at"], "%Y-%m-%dT%H:%M:%S")
elapsed = (datetime.now() - created).total_seconds()
if state == 3:
try:
res = json.loads(row[0]["result_data"])
if res.get("success") == 1:
logger.success(f"任务 {task_id} 状态3 VPS完成 (耗时{elapsed:.1f}s)")
return JSONResponse({"code": 200, "msg": "success", "data": res},status_code=200)
else:
# VPS 标记为失败 (success=0)
logger.warning(f"任务 {task_id} 状态3 VPS返回失败(success=0). 总耗时: {elapsed:.1f}s")
# 判断时间是否超过 1 分钟
if elapsed > 60:
logger.error("VPS失败状态3且已超时超过60s,不再兜底,直接返回失败")
return JSONResponse({"code": 400, "msg": "识别处理失败", "data": res},status_code=400)
else:
logger.info("耗时未超60s,转本地兜底重试...")
break # 跳出循环,去执行本地逻辑
except:
# 极少情况:JSON解析失败,视为失败走兜底
break
break
if elapsed > 60:
logger.warning(f"任务 {task_id} 状态为{state} 超时({elapsed:.1f})执行兜底")
break
time.sleep(0.3)
# ============ 3. 本地兜底执行 ============
try:
logger.info(f"本地兜底执行: {image_url}")
client = AmazonImageSearch(site_name)
result = client.search(image_url, search_mode=search_mode)
if result.get("success") != 1:
return JSONResponse({"code": 400, "msg": "本地处理失败", "data": result},status_code=400)
if task_id:
try:
finish = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
json_res = json.dumps(result, ensure_ascii=False)
sql_up = """
UPDATE us_inv_img_result
SET state = 3, result_data = %s, updated_at = %s
WHERE id = %s
"""
with engine.begin() as conn:
conn.execute(sql_up, [(json_res, finish, task_id)])
logger.info(f"本地兜底结果已保存至数据库 ID: {task_id}")
except Exception as e:
logger.error(f"本地结果回写数据库失败: {e}")
return JSONResponse({"code": 200, "msg": "success", "data": result},status_code=200)
except Exception as e:
logger.error(f"本地兜底异常: {e}")
return JSONResponse({"code": 500, "msg": str(e)}, status_code=500)
# ======================
# 健康检查
# ======================
@app.get("/health")
async def health():
return {"status": "ok", "service": "Amazon Image Search (FastAPI)"}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment