以图搜竞品

parent d7f6ddab
# uk de 的站点 没测过
from loguru import logger
import json
from py_spider.utils.secure_db_client import get_remote_engine
# uk de 的站点 没测过 拍照接口的 application username uk站点的为amzn-mbl-cscan-ais url不变 为https://match-visualsearch.amazon.com/vsearch/2.0
site_name_secret_dict = {
"us": {'application':'amzn-mbl-cscan-us','username':'amzn-mbl-cscan-us','secret':'5b6874d3a20417591bd5464a25a37bc6','snap_url':'https://match-visualsearch.amazon.com'},
"uk": {'application':'amzn-mbl-cscan-uk','username':'amzn-mbl-cscan-uk','secret':'c1a79f745bbe6a8824ce3178c8b549ad','snap_url':'https://match-visualsearch-uk.amazon.com'},
......@@ -461,7 +465,43 @@ de_devices_list = [
]
# us_cookie_dict ={"i18n-prefs": "USD", "lc-main": "en_US", "session-id": "137-6850115-5627367", "session-id-time": "2082787201l", "session-token": "NDR0kD9QBkk81plbn2yEFeEJ205I0LEzv9G4Q3nG90PVEmpZEtEiL6Ixj3alIu5KBvaiePi9IOHhzSh48qBNQH2yroklDf1+rDTtJ8HNzcfae0ZEL3/szlwFWXRsxH7UwfHip9R66v9ZoO5ni9mrz+4J6QgjTmrzCW7uVjTMthdesVgOk2SVmxyfoLQnmYuScp17v+ayrztH3VFIg/VyhzNBlVbkZDIHg2WB3F+U0JHkiQfiZB1q2uCMWOEIOwRkCGhaRrWHbBgpQtvHnf+Iw2KxVkl2gv1jZga2iNQUhO6cIr2Py5V7evAyuXXhjRaBzIl/STRh58Jr9ctbhdBXnTQsbThexXLn", "skin": "noskin", "ubid-main": "133-5836273-3821350"}
# uk_cookie_dict ={"i18n-prefs": "HKD", "lc-acbuk": "en_GB", "session-id": "521-9387816-0582061", "session-id-time": "2082787201l", "session-token": "\"ANvQ1SZ9VE6Y9cGcEdwhYcEpI1Jql7bOXZ7Q+qdBbb41IuuIFXvUzDSoaXOzPlcUTa3CIMoLIIzD2QOAWnN2h1E/c/iXEzpM8qd4LqR9otFV6PQecFkZKnC/i5v3QIguYgLt3KB9halOcaKi+KXcpG1b5jjYSicsMknbJMQVgfkiqMDXOLcxIns2cHs1xpXgf/DrjOHNpIB6Q24VKaqLTuKbWd8biHv86NGyoBNIs1kkjJxdwZhvWtQC/rHqoiz0e853W2lkrUrdr6Ko3dSIYojEBEgVuglCENJWLmkQzqEMxcIzNgcrpf4ZSlRHhMpmi2D14A6rwrwdo/JMozJSol+Wx3dL7j/AEGiXa1IWwrA=\"", "ubid-acbuk": "259-9451176-7460703"}
# de_cookie_dict ={"i18n-prefs": "HKD", "lc-acbde": "de_DE", "session-id": "258-7262769-1488647", "session-id-time": "2082787201l", "session-token": "RxPtESM/cUQ4JAxsgMz8crm9EMbFhHLK4vPgZzTj2w23v5yWkzjXZsVVsErU5Oi8qzT3Le8nxU1zxeDEWuAg2IL+5fb2VwhE9UOQpLMmnOBWaukpW0CKFMOTvJU6Fu7+IZhWpJWt+ypu5b0KEYVi7CyTsHKh38q8N2m7tGBwb94NyT4tLHyE69XovznI5HV7y19J0aJrDyEYBfk00m6/SwxxOhRbkOXWVIagnGK4N1uQ27n9827rd7o1TpHQc8sWnn1bDAvYbCtGOaUYUHJ8HxnZPsTjKZ2b8OgzIKE9nlvpoAAQOArarZlZxneJTfUY++ucym9Te+RUvPp6SrhpPJyP7W425mA9", "ubid-acbde": "257-6077242-9988057"}
us_cookie_dict ={"i18n-prefs": "USD", "lc-main": "en_US", "session-id": "137-6850115-5627367", "session-id-time": "2082787201l", "session-token": "NDR0kD9QBkk81plbn2yEFeEJ205I0LEzv9G4Q3nG90PVEmpZEtEiL6Ixj3alIu5KBvaiePi9IOHhzSh48qBNQH2yroklDf1+rDTtJ8HNzcfae0ZEL3/szlwFWXRsxH7UwfHip9R66v9ZoO5ni9mrz+4J6QgjTmrzCW7uVjTMthdesVgOk2SVmxyfoLQnmYuScp17v+ayrztH3VFIg/VyhzNBlVbkZDIHg2WB3F+U0JHkiQfiZB1q2uCMWOEIOwRkCGhaRrWHbBgpQtvHnf+Iw2KxVkl2gv1jZga2iNQUhO6cIr2Py5V7evAyuXXhjRaBzIl/STRh58Jr9ctbhdBXnTQsbThexXLn", "skin": "noskin", "ubid-main": "133-5836273-3821350"}
uk_cookie_dict ={"i18n-prefs": "HKD", "lc-acbuk": "en_GB", "session-id": "521-9387816-0582061", "session-id-time": "2082787201l", "session-token": "\"ANvQ1SZ9VE6Y9cGcEdwhYcEpI1Jql7bOXZ7Q+qdBbb41IuuIFXvUzDSoaXOzPlcUTa3CIMoLIIzD2QOAWnN2h1E/c/iXEzpM8qd4LqR9otFV6PQecFkZKnC/i5v3QIguYgLt3KB9halOcaKi+KXcpG1b5jjYSicsMknbJMQVgfkiqMDXOLcxIns2cHs1xpXgf/DrjOHNpIB6Q24VKaqLTuKbWd8biHv86NGyoBNIs1kkjJxdwZhvWtQC/rHqoiz0e853W2lkrUrdr6Ko3dSIYojEBEgVuglCENJWLmkQzqEMxcIzNgcrpf4ZSlRHhMpmi2D14A6rwrwdo/JMozJSol+Wx3dL7j/AEGiXa1IWwrA=\"", "ubid-acbuk": "259-9451176-7460703"}
de_cookie_dict ={"i18n-prefs": "HKD", "lc-acbde": "de_DE", "session-id": "258-7262769-1488647", "session-id-time": "2082787201l", "session-token": "RxPtESM/cUQ4JAxsgMz8crm9EMbFhHLK4vPgZzTj2w23v5yWkzjXZsVVsErU5Oi8qzT3Le8nxU1zxeDEWuAg2IL+5fb2VwhE9UOQpLMmnOBWaukpW0CKFMOTvJU6Fu7+IZhWpJWt+ypu5b0KEYVi7CyTsHKh38q8N2m7tGBwb94NyT4tLHyE69XovznI5HV7y19J0aJrDyEYBfk00m6/SwxxOhRbkOXWVIagnGK4N1uQ27n9827rd7o1TpHQc8sWnn1bDAvYbCtGOaUYUHJ8HxnZPsTjKZ2b8OgzIKE9nlvpoAAQOArarZlZxneJTfUY++ucym9Te+RUvPp6SrhpPJyP7W425mA9", "ubid-acbde": "257-6077242-9988057"}
def get_cookies(site_name: str) -> dict[str, str]:
'''
从站点对应的数据库中读取Cookie
:param site_name: 站点名称(us/uk/de)
:return: Cookie字典,失败则返回None
'''
try:
# 2. 验证站点是否有对应的数据库配置
if site_name not in site_name_secret_dict:
logger.error(f"输入站点 {site_name} 的不存在 ")
return None
# 连接对应数据库
engine = get_remote_engine(
site_name=site_name, # -> database "selection"
db_type="mysql", # -> 服务端 alias "mysql"
)
if not engine:
logger.error('连接数据库出错')
return None
sql = f"select * from {site_name}_cookies order by updated_time desc limit 1;"
df_cookies = engine.read_sql(sql)
if df_cookies.empty:
logger.error("错误:未查询到 cookie 数据")
return None
cookies_dict = json.loads(df_cookies.iloc[0]['cookies'])
if not isinstance(cookies_dict, dict):
logger.error("转换结果不是字典")
return None
return cookies_dict
except Exception as e:
logger.error(f"错误:{str(e)}")
return None
# logger.success(get_cookies('de'))
\ No newline at end of file
import threading
import time
import random
import json
import datetime
from typing import List, Dict
from loguru import logger
from amazon_configs import site_name_secret_dict
from py_spider.utils.secure_db_client import get_remote_engine
def get_cookies_from_db(site_name: str) -> List[Dict]:
"""
从数据库查询最近 days 天的 Cookie
"""
if site_name not in site_name_secret_dict:
logger.error(f"站点 {site_name} 不在配置列表中")
return []
try:
# 1. 获取数据库连接
engine = get_remote_engine(site_name=site_name, db_type="mysql")
if not engine:
logger.error(f"站点 {site_name} 数据库连接失败")
return []
"""
# 2. 计算时间节点 (当前时间 - 7天)
# 格式化为 '2025-11-11 16:18:39'
seven_days_ago_dt = datetime.datetime.now() - datetime.timedelta(days=days)
seven_days_ago_str = seven_days_ago_dt.strftime("%Y-%m-%d %H:%M:%S")
logger.info(f'7天前 时间为 {seven_days_ago_dt},{seven_days_ago_str}')
# 3. 构建 SQL (注意:字段 updated_time 是 timestamp 类型)
table_name = f"{site_name}_cookies"
sql = f"SELECT * FROM {table_name} WHERE updated_time >= '{seven_days_ago_str}' ORDER BY updated_time DESC;"
"""
sql = f"select * from {site_name}_cookies order by updated_time desc limit 100;"
logger.info(f"[{site_name}] 执行SQL查询最近 100条 Cookie: {sql.strip()}")
df_cookies = engine.read_sql(sql)
if df_cookies.empty:
logger.warning(f"站点 {site_name} 无最近100条 Cookie数据")
return []
# 5. 解析数据
valid_cookies = []
for _, row in df_cookies.iterrows():
try:
cookie_str = row.get('cookies') # cookie数据
updated_date = row.get('updated_time') # 更新时间
# logger.info(f'更新时间为 {updated_date},{type(updated_date)}')
if cookie_str:
if isinstance(cookie_str, str):
cookie_dict = json.loads(cookie_str)
else:
cookie_dict = cookie_str # 已经是字典的情况
# 简单校验有效性
if "session-id" in cookie_dict:
if isinstance(updated_date, str):
clean_str = updated_date.replace('T', ' ')
updated_ts = datetime.datetime.strptime(clean_str, "%Y-%m-%d %H:%M:%S")
updated_ts = updated_ts.timestamp()
else:
updated_ts = time.time() # 默认给当前时间(兜底)
# logger.info(f'更新时间 时间戳为{updated_ts}')
valid_cookies.append({
'cookie': cookie_dict, # 给 requests 用的纯净字典
'updated_ts': updated_ts # 判断过期用的时间戳
})
except Exception as e:
logger.warning(f"解析单条Cookie失败: {e}")
continue
return valid_cookies
except Exception as e:
logger.error(f"[{site_name}] 数据库读取异常: {e}")
return []
# print(get_cookies_from_db('us'))
class CookiePoolManager:
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if not cls._instance:
with cls._lock:
if not cls._instance:
cls._instance = super(CookiePoolManager, cls).__new__(cls)
cls._instance._init_pools()
return cls._instance
def _init_pools(self):
"""初始化池子状态"""
# 结构: { "us": { "data": [{’cookie‘:{},updated_ts:''},{’cookie‘:{},updated_ts:''}], "initial_count": 100, "last_refresh": timestamp } }
self.pools = {
site: {"data": [], "initial_count": 0, "last_db_check": 0}
for site in ["us", "uk", "de"]
}
self.cookie_expire_seconds = 1 * 24 * 60 * 60 # 数据更新时间:1天
self.db_check_interval = 1* 60 # 查库冷却期:1分钟
def _refresh_from_db(self, site_name: str):
"""内部方法:从数据库刷新"""
new_data = get_cookies_from_db(site_name) #返回的是列表 但是数据是字典格式 # [{’cookie‘:{},updated_ts:''},{’cookie‘:{},updated_ts:''}]
if new_data:
with self._lock:
self.pools[site_name]["data"] = new_data
self.pools[site_name]["initial_count"] = len(new_data)
# 更新查库时间
self.pools[site_name]["last_db_check"] = time.time()
logger.success(f"[{site_name}] Cookie池已刷新,数量: {len(new_data)}")
else:
# 即使没查到数据,也更新查库时间,防止10分钟内反复无效查询
self.pools[site_name]["last_db_check"] = time.time()
logger.warning(f"[{site_name}] 数据库无有效数据")
def get_cookie(self, site_name: str) -> Dict[str, str]:
"""
对外核心接口:获取一个可用 Cookie
包含:3天周期检查、半数阈值检查、空池初始化
"""
site_pool = self.pools.get(site_name)
if not site_pool:
# 如果是不支持的站点,尝试初始化一下结构
self.pools[site_name] = {"data": [], "initial_count": 0, "last_refresh": 0}
site_pool = self.pools[site_name]
# 1. 空池初始化
if not site_pool["data"]:
self._refresh_from_db(site_name)
# === 策略2: 数量阈值刷新 (少于一半) ===
# 只有在初始化成功过(initial_count > 0)的情况下才检查,防止数据库本来就没数据导致的死循环
current_count = len(site_pool["data"])
if site_pool["initial_count"] > 0 and current_count < (site_pool["initial_count"] / 2):
# 也要受冷却时间限制,防止数据库一直没新数据导致死循环刷新
if (time.time() - site_pool["last_db_check"]) > self.db_check_interval:
logger.warning( f"[{site_name}] 可用Cookie({current_count}) 低于 初始({site_pool['initial_count']}) 一半,触发强制刷新...")
self._refresh_from_db(site_name)
# logger.info(f'cookie池内容 {site_pool["initial_count"]} {site_pool["last_refresh"]}')
wrapper = {}
# === 策略3: 随机获取 ===
with self._lock:
if site_pool["data"]:
wrapper = random.choice(site_pool["data"])
if not wrapper: return {}
# 4. 【核心】基于单条数据的 updated_ts 判断过期
current_ts = time.time()
cookie_ts = wrapper.get('updated_ts', current_ts)
if (current_ts - cookie_ts) > self.cookie_expire_seconds:
last_check = site_pool["last_db_check"] # 拿到查库时间
# 数据过期了,看看能不能查库
if (current_ts - last_check) > self.db_check_interval:
logger.info(f"[{site_name}] 抽到过期数据(>1天) {wrapper},触发数据库更新...")
self._refresh_from_db(site_name)
# 刷新完再拿一次
with self._lock:
if site_pool["data"]:
wrapper = random.choice(site_pool["data"])
else:
# 冷却期内,哪怕过期了也先用着 ,或者可以选择返回空 {} 让上层用兜底 冷却期就是 限制数据库只能10分钟查询一次
logger.debug(f"[{site_name}] 数据过期但处于冷却期,降级使用")
return wrapper.get('cookie', {})
def mark_invalid(self, site_name: str, bad_cookie: Dict):
"""
对外接口:标记 Cookie 失效(从内存池中移除)
"""
if not bad_cookie:
return
# logger.info(f'移除cookie {bad_cookie}')
with self._lock:
pool_data = self.pools.get(site_name, {}).get("data", []) # 获取cookie列表
bad_sid = bad_cookie.get("session-id") # 获取过期cookie的session-id
if bad_sid:
original_len = len(pool_data)
# 过滤掉这个 session-id
self.pools[site_name]["data"] = [
c for c in pool_data if c.get('cookie').get("session-id") != bad_sid
]
new_len = len(self.pools[site_name]["data"])
# 判断删除cookie后的列表是否小于 原cookie 列表 确定移除是否成功
if new_len < original_len:
logger.warning(f"[{site_name}] 移除失效Cookie (SID: {bad_sid}...), 剩余: {new_len}")
# 导出单例
cookie_manager = CookiePoolManager()
\ No newline at end of file
# server.py
from flask import Flask, request, jsonify
from loguru import logger
# 引入逻辑类
from inv_img_double_search import AmazonImageSearch, SITE_CONFIG_MAPPER
app = Flask(__name__)
# 让 Flask 支持中文返回不乱码
app.config['JSON_AS_ASCII'] = False
# ==========================================
# 新增:首页接口 (浏览器直接访问)
# ==========================================
@app.route('/', methods=['GET'])
def index():
"""
首页:显示服务状态和简易文档
"""
html_content = """
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>Amazon 以图搜图服务</title>
<style>
body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 40px; line-height: 1.6; color: #333; }
.container { background: #f9f9fa; padding: 30px; border-radius: 10px; border: 1px solid #e1e4e8; }
h1 { color: #2c3e50; border-bottom: 2px solid #3498db; padding-bottom: 10px; }
.status { color: #27ae60; font-weight: bold; font-size: 1.2em; }
.endpoint { background: #2c3e50; color: #fff; padding: 5px 10px; border-radius: 4px; font-family: monospace; }
.method { background: #e67e22; color: white; padding: 2px 6px; border-radius: 3px; font-size: 0.8em; font-weight: bold; }
pre { background: #2d2d2d; color: #f8f8f2; padding: 15px; border-radius: 5px; overflow-x: auto; }
</style>
</head>
<body>
<div class="container">
<h1>📸 Amazon Image Search API</h1>
<p>状态:<span class="status">✅ 服务正在运行 (Service is Running)</span></p>
<h3>接口信息</h3>
<p>URL: <span class="endpoint">/api/search_image</span> <span class="method">POST</span></p>
<h3>请求示例 (JSON)</h3>
<pre>
{
"image_url": "https://m.media-amazon.com/images/I/51i3aMcjmOL._SL600_.jpg",
"site_name": "us", // 可选: us
"search_mode": "default" // 可选: default, full_image
}
</pre>
<h3>健康检查</h3>
<p>URL: <a href="/health">/health</a> <span class="method">GET</span></p>
</div>
</body>
</html>
"""
return html_content
# ==========================================
# 核心业务接口
# ==========================================
@app.route('/api/search_image', methods=['POST'])
def search_image_api():
"""
接口描述:Amazon 以图搜图
Method: POST
"""
# 1. 获取并校验 JSON
data = request.get_json(silent=True)
if not data:
return jsonify({"code": 400, "msg": "Body必须是JSON格式"}), 400
# 2. 提取参数
image_url = data.get("image_url")
site_name = data.get("site_name", "us")
search_mode = data.get("search_mode", "default")
# 3. 校验必填项
if not image_url:
return jsonify({"code": 400, "msg": "缺少参数: image_url"}), 400
if site_name not in SITE_CONFIG_MAPPER:
return jsonify({"code": 400, "msg": f"不支持的站点: {site_name},支持: {list(SITE_CONFIG_MAPPER.keys())}"}), 400
try:
logger.info(f"收到API请求: Site={site_name}, Mode={search_mode}, URL={image_url}")
# 4. 初始化并执行搜索
client = AmazonImageSearch(site_name=site_name)
result = client.search(image_url, search_mode=search_mode)
# 5. 判断业务是否成功
if result.get("error") or result.get("success") == 0:
return jsonify({"code": 500, "msg": "识别失败", "data": result}), 500
return jsonify({"code": 200, "msg": "success", "data": result})
except Exception as e:
logger.error(f"服务内部错误: {e}")
return jsonify({"code": 500, "msg": f"Server Error: {str(e)}"}), 500
@app.route('/health', methods=['GET'])
def health():
"""健康检查接口"""
return jsonify({"status": "ok", "service": "Amazon Image Search"})
if __name__ == "__main__":
# 启动服务
logger.info("Flask 服务正在启动...")
# logger.info("请访问 http://127.0.0.1:5000 查看首页")
app.run(host='0.0.0.0', port=5000, debug=False)
\ No newline at end of file
import hashlib
import os
import time
import uuid
import random
import re
import json
import io # 新增:用于处理内存图片
from typing import Optional, Dict, Any, List
# 引入线程池
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import requests
from PIL import Image
from scrapy import Selector
from requests.exceptions import RequestException, JSONDecodeError, Timeout, ConnectionError, HTTPError
from loguru import logger
from cookie_manager import cookie_manager
# 导入配置
from amazon_configs import (
site_name_secret_dict,
us_devices_list,
uk_devices_list,
de_devices_list,
us_cookie_dict, uk_cookie_dict, de_cookie_dict
)
# 站点配置映射
SITE_CONFIG_MAPPER = {
"us": {
"devices": us_devices_list,
"cookies": us_cookie_dict,
"base_url": "https://www.amazon.com"
},
"uk": {
"devices": uk_devices_list,
"cookies": uk_cookie_dict,
"base_url": "https://www.amazon.co.uk"
},
"de": {
"devices": de_devices_list,
"cookies": de_cookie_dict,
"base_url": "https://www.amazon.de"
}
}
# 基础URL配置
AMAZON_SEARCH_BASE_URL = "{base_url}/s?rh=p_78:{bbx_asin_list}&rank=asin-scores-asc-rank&searchMethod=CameraSearch"
WEB_REFERER_URL = "{base_url}/stylesnap?pd_rd_w=wvP9K&content-id=amzn1.sym.d26e24db-d6a0-41ff-bb8a-bf1969aea086%3Aamzn1.sym.d26e24db-d6a0-41ff-bb8a-bf1969aea086&pf_rd_p=d26e24db-d6a0-41ff-bb8a-bf1969aea086&pf_rd_r=3EMK072JNCNA4Z81M3GS&pd_rd_wg=So1ZB&pd_rd_r=99eed0de-82e8-4168-a76d-55fe6451ac50&qid=1732271359&ref_=sxts_snpl_1_0_d26e24db-d6a0-41ff-bb8a-bf1969aea086&dplnk=Y&dplnkCustom=Y&q=local"
# === 重试配置 ===
GLOBAL_RETRY_TIMES = 5 # 全局重试
STEP_RETRY_TIMES = 5 # 单步重试
RETRY_DELAY = 1
# === 修改:接收 bytes 数据 ===
def get_image_size(image_data: bytes) -> Optional[Dict[str, int]]:
try:
with Image.open(io.BytesIO(image_data)) as img:
width, height = img.size
return {"width": width, "height": height}
except Exception as e:
logger.error(f"获取图片尺寸失败: {e}")
return None
def get_page_num(total: int | None, count_per_page: int | None) -> int:
if not isinstance(total, int) or not isinstance(count_per_page, int):
return 0
if total <= 0 or count_per_page <= 0:
return 0
page_count = total // count_per_page
return page_count + 1 if total % count_per_page > 0 else page_count
class AmazonImageSearch:
def __init__(self, site_name: str):
if site_name not in site_name_secret_dict:
raise ValueError(f"不支持的站点: {site_name}")
self.site_name = site_name
self.site_config = site_name_secret_dict[site_name]
self.site_specific = SITE_CONFIG_MAPPER[site_name]
self.base_url = self.site_specific["base_url"]
self.snap_url = f"{self.site_config['snap_url']}/style-snap/2.0"
# 初始化上下文
self.cookies = {}
self.session_id = ""
self.device_info = {}
self.client_device_id = ""
self.headers = {}
self._refresh_client_context()
def _refresh_client_context(self) -> None:
"""刷新客户端上下文(优先从数据库池获取,失败则用本地配置兜底)"""
# 1. 尝试从管理器获取 (这里会自动处理 3天刷新 和 阈值刷新)
pool_cookie = cookie_manager.get_cookie(self.site_name)
if pool_cookie:
self.cookies = pool_cookie
logger.info(f"[{self.site_name}] 成功获取Cookie池cookie (SID: {self.cookies.get('session-id', '')[:30]}...)")
else:
# 2. 兜底:如果数据库全挂了,使用本地 configs 里的静态配置
self.cookies = self.site_specific["cookies"].copy()
logger.warning(f"[{self.site_name}] Cookie池耗尽或数据库异常,使用本地默认配置")
self.session_id = self.cookies.get("session-id", "")
devices = self.site_specific["devices"]
self.device_info = random.choice(devices) if devices else {}
self.client_device_id = str(uuid.uuid4())
self.headers = {
"x-amz-access-token": "",
"x-amz-lens-session-auth-token": self.cookies.get("session-token", ""),
"x-amz-lens-session-id": self.session_id,
"x-amz-lens-ubid": self.cookies.get("ubid-main", ""),
"accept-encoding": "gzip",
"user-agent": "okhttp/4.9.1",
}
def _generate_auth_params(self) -> Dict[str, str]:
ts = str(int(time.time()))
combined = (
f"{self.site_config['secret']}{self.site_config['username']}"
f"{self.site_config['application']}{ts}"
)
authtoken = hashlib.sha512(combined.encode("utf-8")).hexdigest()
return {"ts": ts, "authtoken": authtoken}
def _build_query_metadata(self, extra_params: Optional[Dict[str, str]] = None) -> str:
base_params = {
"amznSessionId": self.session_id,
"clientVersion": "30.20.2.100",
"cardsVersion": "1.0",
"clientMessageVersion": "1.0",
"amznDirectedCustomerId": "",
"clientDeviceId": self.client_device_id,
"clientId": str(uuid.uuid4()),
"sourceType": "Photo",
"ingressSource": "ctp",
"uiMode": "stylesnap",
**self.device_info
}
if extra_params:
base_params.update(extra_params)
return json.dumps(base_params)
def _get_new_proxy(self) -> Dict[str, str]:
try:
proxy_url = 'http://api.xiequ.cn/VAD/GetIp.aspx?act=getturn51&uid=83353&vkey=6FEB79CD7E8700AFCDC44CDBC3889B9D&num=1&time=6&plat=1&re=0&type=7&so=3&group=51&ow=1&spl=1&addr=&db=1'
proxy_res = requests.get(url=proxy_url, timeout=5).text.strip()
return {
"http": f"socks5://{proxy_res}",
"https": f"socks5://{proxy_res}"
}
except Exception as e:
logger.warning(f"获取代理失败: {e}")
return {}
def _retry_request(self, method: str, url: str,
**kwargs) -> requests.Response:
if "headers" in kwargs:
if "user-agent" not in {k.lower() for k in kwargs["headers"]}:
kwargs["headers"][
"user-agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36"
for attempt in range(STEP_RETRY_TIMES):
try:
# if fixed_proxies:
# current_proxies = fixed_proxies
# else:
# current_proxies = self._get_new_proxy()
if method.upper() == "POST":
# response = requests.post(url, timeout=20, **kwargs, proxies=current_proxies, verify=False)
response = requests.post(url, timeout=20, **kwargs)
# logger.info(f'post {url}响应, {response.text}')
else:
# response = requests.get(url, timeout=20, **kwargs, proxies=current_proxies, verify=False)
response = requests.get(url, timeout=20, **kwargs)
# logger.info(f'get {url}响应, {response.text}')
response.raise_for_status()
return response
except RequestException as e:
logger.warning(f"请求异常 ({method} {url}): {e} - 第 {attempt + 1}/{STEP_RETRY_TIMES} 次重试")
if attempt < STEP_RETRY_TIMES - 1:
time.sleep(RETRY_DELAY)
except Exception as e:
logger.error(f"发生未预期错误: {e}")
raise e
raise RequestException(f"请求在 {STEP_RETRY_TIMES} 次尝试后最终失败: {url}")
def _parse_app_asin_list(self, response_json: Dict[str, Any]) -> List[str]:
# 1. 检查 JSON 结构是否完整
if "style-snap" not in response_json or "searchResult" not in response_json["style-snap"]:
# 结构不对,直接报错
raise ValueError("App端响应结构异常: 缺少 style-snap 或 searchResult")
results = response_json["style-snap"]["searchResult"]
# 2. 尝试提取 ASIN
asins = []
if results:
asins = results[0].get("bbxAsinList", [])
# === 核心修改点 ===
# 如果提取到的列表为空,抛出异常,强制触发重试 (逻辑同Web端)
if not asins:
raise ValueError("App端解析结果为空 (bbxAsinList),强制触发重试")
return asins
# === 修改:接收 image_data: bytes ===
def _run_app_search(self, image_data: bytes, mode: str) -> List[str]:
# 这里的 open(image_path) 删除了,直接使用 image_data
auth = self._generate_auth_params()
metadata = self._build_query_metadata({"orientation": "-1"})
files = {
"application": (None, self.site_config['application']),
"query_metadata": (None, metadata),
"authtoken": (None, auth['authtoken']),
"lang": (None, "en_US"),
"username": (None, self.site_config['username']),
"ts": (None, auth['ts']),
"file": ("image.jpg", image_data, "image/jpeg"),
}
resp = self._retry_request("POST", self.snap_url, files=files, headers=self.headers, cookies=self.cookies)
resp_json = resp.json()
if mode == "full_image":
query_id = resp_json.get("queryId")
if not query_id:
raise ValueError("全图模式失败:未获取到 queryId (可能需要更换Cookie或环境)")
image_size = get_image_size(image_data) # 修改为传bytes
if not image_size: raise ValueError("无法读取图片尺寸")
offset = random.randint(0, 2)
bbox = {
"tlx": max(0, offset),
"tly": max(0, offset),
"brx": max(image_size["width"] - offset, max(0, offset) + 1),
"bry": max(image_size["height"] - offset, max(0, offset) + 1),
"imh": image_size["height"],
"imw": image_size["width"]
}
auth = self._generate_auth_params()
metadata = self._build_query_metadata()
form_data = {
"mainQueryId": (None, query_id),
"uiMode": (None, "stl_bbx_reformulation"),
"application": (None, self.site_config['application']),
"query_metadata": (None, metadata),
"authtoken": (None, auth['authtoken']),
"inputBoundingBox": (None, json.dumps(bbox)),
"imageHash": (None, ""),
"lang": (None, "en_US"),
"username": (None, self.site_config['username']),
"ts": (None, auth['ts']),
}
resp = self._retry_request("POST", self.snap_url, files=form_data, headers=self.headers,
cookies=self.cookies)
resp_json = resp.json()
return self._parse_app_asin_list(resp_json)
# ========================================================================
# Web 端
# ========================================================================
def _web_get_token(self, strict_cookies: Dict[str, str]) -> str:
url = WEB_REFERER_URL.format(base_url=self.base_url)
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"accept-language": "zh-CN,zh;q=0.9",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36"
}
resp = self._retry_request("GET", url, headers=headers, cookies=strict_cookies)
sel = Selector(text=resp.text)
token = sel.xpath(".//input[@name='stylesnap']/@value").get()
if not token:
raise ValueError("Web页面未找到 stylesnap token")
return token
# === 修改:接收 image_data: bytes ===
def _web_upload_img(self, image_data: bytes, token: str, strict_cookies: Dict[str, str]) -> \
List[str]:
url = f"{self.base_url}/stylesnap/upload"
headers = {
"origin": self.base_url,
"referer": WEB_REFERER_URL.format(base_url=self.base_url),
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36"
}
# 直接使用 bytes 数据
files = {"explore-looks.jpg": ("explore-looks.jpg", image_data, "image/jpeg")}
params = {'stylesnapToken': token}
resp = self._retry_request("POST", url, headers=headers, cookies=strict_cookies,
files=files, params=params)
data = resp.json()
# logger.info(f'web asin响应为 {data}')
if "searchResults" not in data:
raise ValueError("Web响应缺少 searchResults")
asins = []
if data["searchResults"]:
meta_list = data["searchResults"][0].get("bbxAsinMetadataList", [])
asins = [item.get("asin") for item in meta_list if item.get("asin")]
if not asins:
raise ValueError("Web端解析结果为空 (可能被风控/IP问题),强制触发重试")
return asins
# === 修改:接收 image_data: bytes ===
def _run_web_search(self, image_data: bytes) -> List[str]:
current_session_cookies = self.cookies.copy()
# web_session_proxies = self._get_new_proxy()
token = self._web_get_token(current_session_cookies)
asins = self._web_upload_img(image_data, token, current_session_cookies)
logger.info(f"Web端识别成功,ASIN数: {len(asins)}")
return asins
# ========================================================================
# 解析结果页
# ========================================================================
def _parse_items(self, html: str) -> List[Dict[str, Any]]:
items = []
res = Selector(text=html)
product_divs = res.xpath('//div[@data-asin and string-length(@data-asin) > 0]')
for div in product_divs:
# 在当前 div 内部获取 ASIN
asin = div.xpath('./@data-asin').get()
# 如果有些 div 是空的占位符,跳过
if not asin:
continue
# 1. 价格
asin_price = div.xpath(
'.//div[contains(@data-cy, "price-recipe")]//span[contains(@class, "a-offscreen")]/text()').get("")
# 2. 评分
# 增加备选:aria-label 往往比 class 更稳定
asin_rating = div.xpath(
'.//div[@data-cy="reviews-block"]//span[contains(@class, "a-icon-alt")]/text()').get("")
# 3. 图片
# 优先找 s-image (搜索页标准图),这比 generic img 更准
asin_img_list = div.xpath(
'.//div[@data-cy="image-container"]//img[contains(@class, "s-image")]/@src').getall()
if not asin_img_list:
asin_img_list = div.xpath('.//img/@src').getall()
# 过滤 gif/svg
valid_imgs = [i for i in asin_img_list if i and i.split(".")[-1] not in ["gif", "svg"]]
asin_img = valid_imgs[0] if valid_imgs else None
# 4. 标题与品牌
# 获取 h2 下的所有文本片段
h2_texts = div.xpath('.//div[@data-cy="title-recipe"]//h2//text()').getall()
h2_texts = [t.strip() for t in h2_texts if t.strip()]
if len(h2_texts) >= 2:
asin_brand = h2_texts[0]
asin_title = h2_texts[1]
elif len(h2_texts) == 1:
asin_brand = None
asin_title = h2_texts[0]
# 5. 总评论数
asin_total_comments = div.xpath(
'.//div[@data-cy="reviews-block"]//a[contains(@aria-label, "ratings")]//text() ').get("")
# 6. 月销/购买量
# 匹配逻辑:span标签包含“a-color-secondary”类 + 文本包含“bought”或“purchased”(亚马逊销量常用关键词)
# asin_bought = div.xpath('.//span[contains(@class, "a-color-secondary") and (contains(text(), "bought") or contains(text(), "purchased"))]/text()').get("")
asin_bought = div.xpath(
'.//div[@data-cy="reviews-block"]//span[contains(@class, "a-color-secondary")]//text()').get("")
# --- 数据清洗 ---
re_float = re.compile(r'[\d,]+\.?\d*')
# 价格清洗
if asin_price:
match = re.search(re_float, asin_price)
asin_price = match.group() if match else None
# 评分清洗
if asin_rating:
match = re.search(re_float, asin_rating)
asin_rating = match.group() if match else None
# 评论数清洗
if asin_total_comments:
# 移除括号、逗号、ratings文字
# asin_total_comments = re.sub(r'[(),a-zA-Z\s]', '', asin_total_comments)
asin_total_comments = asin_total_comments.replace("(", "").replace(")", "")
item = {
"asin": asin, # asin
"price": asin_price, # 价格
"rating": asin_rating, # 评分数
"img_url": asin_img, # 图片url
"title": asin_title, # 标题
"brand": asin_brand, # 品牌名
"bought": asin_bought.strip() if asin_bought else None, # 月销
"total_comments": asin_total_comments, # 评论数
}
# print(item)
items.append(item)
# logger.info(f'解析成功 当前页{len(items)}条数据')
return items
def _fetch_single_page(self, url: str, page: int, headers: Dict[str, str]) -> List[Dict[str, Any]]:
try:
resp_p = self._retry_request("GET", f"{url}&page={page}", headers=headers, cookies=self.cookies)
return self._parse_items(resp_p.text)
except Exception as e:
logger.warning(f"第{page}页失败: {e}")
return []
def _fetch_results(self, url: str) -> Dict[str, Any]:
headers = {
"accept": "text/html,application/xhtml+xml,*/*",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
"viewport-width": "1343"
}
all_items = []
# logger.info("正在获取第1页...")
resp = self._retry_request("GET", url, headers=headers, cookies=self.cookies)
page1_items = self._parse_items(resp.text) # 第一页数据解析
all_items.extend(page1_items) # 第一页数据添加到列表
total_res = re.search(r'"totalResultCount":(\d+)', resp.text)
count_res = re.search(r'"asinOnPageCount":(\d+)', resp.text)
total = int(total_res.group(1)) if total_res else None
per_page = int(count_res.group(1)) if count_res else None
pages = get_page_num(total, per_page) # 计算页数
logger.info(f"找到商品 {total} 个,共 {pages} 页")
if pages > 1:
logger.info(f"开始并发爬取第 2 到 {pages} 页...")
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(self._fetch_single_page, url, p, headers)
for p in range(2, pages + 1)
]
for future in as_completed(futures):
items = future.result()
if items:
all_items.extend(items)
df = pd.DataFrame(all_items)
# 按 asin 列去重 再转回字典
final_items = df.drop_duplicates(subset=['asin']).to_dict('records') if not df.empty else []
return {"total_items": len(final_items), "items": final_items}
# === 图片下载方法 ===
def _download_image(self, img_url: str) -> bytes:
"""下载图片并返回二进制数据"""
logger.info(f"正在下载图片: {img_url}")
for i in range(3):
try:
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'ect': '3g',
'pragma': 'no-cache',
'priority': 'u=0, i',
'sec-ch-dpr': '1',
'sec-ch-ua': '"Chromium";v="142", "Google Chrome";v="142", "Not_A Brand";v="99"',
'sec-ch-ua-form-factors': '"Desktop"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-viewport-height': '376',
'sec-ch-viewport-width': '1920',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
}
resp = requests.get(img_url, headers=headers, timeout=15)
resp.raise_for_status()
# logger.info(f'图片数据:{resp.content}')
# with open('./image.png','wb') as f:
# f.write(resp.content)
return resp.content
except Exception as e:
logger.warning(f"图片下载失败(第{i + 1}次): {e}")
time.sleep(1)
raise ValueError(f"图片下载最终失败: {img_url}")
# === 修改:入口参数改为 image_url ===
def search(self, image_url: str, search_mode: str = "default") -> Dict[str, Any]:
start_time = time.time()
result = {
"is_web": 0, "is_app": 0,
"site_name": self.site_name, "mode": search_mode,
"input_image_url": image_url,
"success": 0, "save_time": time.strftime("%Y-%m-%d %H:%M:%S"),
"items": [], "total_items": 0
}
# 1. 预先下载图片
try:
image_data = self._download_image(image_url)
except Exception as e:
logger.error(f"图片资源获取失败: {e}")
result["error"] = str(e)
return result
# 2. 进入重试循环
for attempt in range(GLOBAL_RETRY_TIMES):
try:
app_asins = []
web_asins = []
with ThreadPoolExecutor(max_workers=2) as executor:
# 传入下载好的 bytes 数据
future_app = executor.submit(self._run_app_search, image_data, search_mode)
future_web = executor.submit(self._run_web_search, image_data)
try:
app_asins = future_app.result()
app_asins = list(set(app_asins))
result["is_app"] = 1 if app_asins else 0
logger.info(f"App ({search_mode}) 获取 ASIN: {len(app_asins)}")
except Exception as e:
logger.warning(f"App端识别非关键错误: {e}")
raise e
try:
web_asins = future_web.result()
web_asins = list(set(web_asins))
result["is_web"] = 1 if web_asins else 0
except Exception as e:
logger.warning(f"{image_url} Web端识别错误: {e}")
raise e
combined = list(set(app_asins + web_asins))
if not combined:
raise ValueError("App和Web端均未获取到有效ASIN")
logger.info(
f"双端识别完成 - App ASIN数: {len(app_asins)}, Web ASIN数: {len(web_asins)}, 合并去重后: {len(combined)}")
url = AMAZON_SEARCH_BASE_URL.format(base_url=self.base_url, bbx_asin_list="|".join(combined))
# logger.info(f'合并url为:{url}')
data = self._fetch_results(url)
# ======================================================
processed_items = data["items"]
for item in processed_items:
asin = item.get("asin")
in_app = asin in app_asins
in_web = asin in web_asins
if in_app and in_web:
item["source"] = "app_web"
elif in_app:
item["source"] = "app"
elif in_web:
item["source"] = "web"
else:
# 如果爬取到的 ASIN 不在种子列表中(例如广告或推荐位)
# 标记为 other
item["source"] = "other"
# ======================================================
result.update({
"app_asin": "|".join(app_asins),
"web_asin": "|".join(web_asins),
"combined_asin": "|".join(combined),
"search_url": url,
"duration": round(time.time() - start_time, 2),
"success": 1,
"device_info": self.device_info,
"total_items": data["total_items"], "items": data["items"]
})
# logger.info(f'返回值类型{type(result)}')
return result
except Exception as e:
logger.error(f"尝试 {attempt + 1} 失败: {e}")
# === 新增:将当前错误的 Cookie 从池子中踢出 ===
if self.cookies:
cookie_manager.mark_invalid(self.site_name, self.cookies)
if attempt < GLOBAL_RETRY_TIMES - 1:
logger.info("刷新环境(Cookie/设备)中...")
time.sleep(RETRY_DELAY)
self._refresh_client_context()
else:
result.update({
"error": str(e),
"duration": round(time.time() - start_time, 2),
"success": 0,
"device_info": self.device_info
})
return result
if __name__ == "__main__":
# 测试用图片链接
test_img_url = "https://m.media-amazon.com/images/I/71IFE6W6THL._AC_UL320_.jpg"
# test_img_url = "https://m.media-amazon.com/images/I/71IFE6W6THL._AC_SY550_.jpg"
# test_img_url = "https://m.media-amazon.com/images/I/71G1BAeYlNL._AC_SX300_SY300_QL70_FMwebp_.jpg"
try:
client = AmazonImageSearch(site_name="us")
logger.info("\n=== 测试默认模式 ===")
result = client.search(test_img_url, search_mode="default")
# logger.success(f"Result: Success={default_result}")
data = result
print(f"识别成功: {data.get('success')}")
print(f"找到商品数: {data.get('total_items')}")
print(f"耗时: {data.get('duration')}")
print(f"搜索url: {data.get('search_url')}")
# 打印前3个商品看看
items = data.get('items', [])
if items:
print("\n--- 商品示例 ---")
for item in items:
print(f"ASIN: {item['asin']} | 价格: {item['price']} | 评分:{item['rating']} | img_url:{item['img_url']} | 品牌名:{item['brand']} | 销量:{item['bought']} | 评论数:{item['total_comments']} | asin来源:{item['source']} | 标题: {item['title'][:50]}...")
except Exception as e:
logger.error(f"测试失败: {e}")
\ No newline at end of file
import requests
import json
url = "http://192.168.200.210:5000/api/search_image"
# url = "http://127.0.0.1:5000/api/search_image"
# 请求参数
payload = {
# 图片链接
# "image_url": "https://m.media-amazon.com/images/I/31Yq1IRqKGL._SR240,220_.jpg", # ok
# "image_url": "https://m.media-amazon.com/images/I/21cOsqYwDjL._SR480,440_.jpg", # ok
# "image_url": "https://m.media-amazon.com/images/I/21cOsqYwDjL._SR480,440_.jpg", # ok
# "image_url": "https://m.media-amazon.com/images/I/41vTSMa6mNL._SR480,440_.jpg", # ok
# "image_url": "https://m.media-amazon.com/images/I/71G1BAeYlNL._AC_SX300_SY300_QL70_FMwebp_.jpg", # no
# "image_url": "https://m.media-amazon.com/images/I/41ryNvEnNCL._AC_SL1500_.jpg", # ok
# "image_url": "https://m.media-amazon.com/images/I/71IFE6W6THL._AC_SY550_.jpg", # ok
"image_url": "https://soundasia.oss-cn-shenzhen.aliyuncs.com/yswg-img-test/SoundasiaAmazon/competitor_image/2025/1124/84759f5d6514435e83d0fb728a57451c.jpg", # ok
# "image_url": "https://imgtool.net/cdn/image/2023/20230401_1.jpg", # ok
# 站点
"site_name": "us",
# 模式 (default: 普通模式, full_image: 全图模式) 主要针对app端识别
# "search_mode": "full_image" # 不传走默认 default
}
try:
print("开始请求!")
# 发送 POST 请求
response = requests.post(url, json=payload, timeout=120)
# 解析结果
if response.status_code == 200:
result = response.json()
print("\n 请求成功!")
print(f"状态码: {result['code']}")
print(f"消息: {result['msg']}")
data = result.get('data', {})
print(f"识别成功: {data.get('success')}")
print(f"找到商品数: {data.get('total_items')}")
print(f"耗时: {data.get('duration')}")
print(f"搜索url: {data.get('search_url')}")
# 打印前3个商品看看
items = data.get('items', [])
if items:
print("\n--- 商品示例 ---")
for item in items:
print(f"ASIN: {item['asin']} | 价格: {item['price']} | 评分:{item['rating']} | img_url:{item['img_url']} | 品牌名:{item['brand']} | 销量:{item['bought']} | 评论数:{item['total_comments']} | asin来源:{item['source']} | 标题: {item['title'][:50]}...")
else:
print(f" 请求失败: {response.status_code}")
print(response.text)
except Exception as e:
print(f" 发生错误: {e}")
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment