Commit 0a9e0953 by PC-20230618BYKI\Administrator
parents 5c1045e2 badb8f44
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>
<component name="PyDocumentationSettings"> <component name="PyDocumentationSettings">
<option name="format" value="PLAIN" /> <option name="format" value="EPYTEXT" />
<option name="myDocStringFormat" value="Plain" /> <option name="myDocStringFormat" value="Epytext" />
</component> </component>
</module> </module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.10" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10" project-jdk-type="Python SDK" /> <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10" project-jdk-type="Python SDK" />
</project> </project>
\ No newline at end of file
import hashlib
import os
import time
import uuid
import random
from typing import Optional, Dict, Any
from loguru import logger
import requests
import json
from PIL import Image
from requests.exceptions import RequestException, JSONDecodeError
# 导入所有站点相关配置
from amazon_configs import (
site_name_secret_dict,
us_devices_list,
uk_devices_list,
de_devices_list,
us_cookie_dict,
uk_cookie_dict,
de_cookie_dict
)
# 日志目录配置
# LOG_DIR = "log" # 日志文件夹名称
# 关键新增:判断日志目录是否存在,不存在则创建
# if not os.path.exists(LOG_DIR):
# os.makedirs(LOG_DIR, exist_ok=True) # exist_ok=True 避免目录已存在时报错
# 配置日志
# logger.add("log/amazon_search_optimized.log", rotation="10 MB", level="INFO", encoding="utf-8")
# log_file = os.path.join(LOG_DIR, "amazon_search_optimized.log")
# logger.add(log_file,rotation="10 MB", level="INFO",encoding="utf-8")
# 站点配置映射(关联设备列表和Cookie) uk 和de 未测试准确性
SITE_CONFIG_MAPPER = {
"us": {
"devices": us_devices_list,
"cookies": us_cookie_dict
},
"uk": {
"devices": uk_devices_list,
"cookies": uk_cookie_dict
},
"de": {
"devices": de_devices_list,
"cookies": de_cookie_dict
}
}
# asin 网页端打开 url
AMAZON_SEARCH_BASE_URL = "https://www.amazon.com/s?rh=p_78:{bbx_asin_list}&rank=asin-scores-asc-rank&searchMethod=CameraSearch"
# 重试策略配置(全局重试次数,包含解析失败)
GLOBAL_RETRY_TIMES = 5 # 整体流程重试次数(含Cookie更新)
STEP_RETRY_TIMES = 5 # 单步请求重试次数
RETRY_DELAY = 1 # 重试延迟(秒)
def get_image_size(image_path: str) -> Optional[Dict[str, int]]:
"""获取图片宽高尺寸"""
try:
with Image.open(image_path) as img:
width, height = img.size
return {"width": width, "height": height}
except FileNotFoundError:
logger.error(f"图片文件未找到: {image_path}")
return None
except Exception as e:
logger.error(f"获取图片尺寸失败 ({image_path}): {e}")
return None
class AmazonImageSearch:
def __init__(self, site_name: str):
'''
:param site_name: 站点名 如 uk us de
'''
# 验证站点合法性
if site_name not in site_name_secret_dict:
raise ValueError(f"不支持的站点: {site_name},支持站点:{list(site_name_secret_dict.keys())}")
if site_name not in SITE_CONFIG_MAPPER:
raise ValueError(f"站点 {site_name} 缺少设备或Cookie配置")
self.site_name = site_name
self.site_config = site_name_secret_dict[site_name]
self.site_specific = SITE_CONFIG_MAPPER[site_name]
# 初始化Cookie和设备信息
self._update_cookie() # 初始加载Cookie
self._update_device() # 初始加载设备
self.client_device_id = str(uuid.uuid4())
logger.info(
f"客户端初始化完成 - 站点: {self.site_name}, "
f"随机设备: {self.device_info.get('clientDevice')}, "
f"clientDeviceId: {self.client_device_id}"
)
# 构建请求头(依赖Cookie,初始化时生成)
self._update_headers()
self.snap_url = f"{self.site_config['snap_url']}/style-snap/2.0"
def _update_cookie(self) -> None:
"""更新Cookie(当前固定返回站点对应Cookie,后续可扩展为从数据库读取)"""
self.cookies = self.site_specific["cookies"].copy() # 复制一份避免修改原配置
self.session_id = self.cookies.get("session-id", "")
logger.info(f"站点 {self.site_name} Cookie已更新(session-id: {self.session_id[:20]}...)")
def _update_device(self) -> None:
"""更新随机设备信息"""
devices = self.site_specific["devices"]
if not devices:
raise ValueError(f"站点 {self.site_name} 的设备列表为空")
self.device_info = random.choice(devices)
logger.debug(f"设备已更新: {self.device_info.get('clientDevice')}")
def _update_headers(self) -> None:
"""更新请求头(依赖Cookie,Cookie变更后需重新生成)"""
self.headers = {
"x-amz-access-token": "",
"x-amz-lens-session-auth-token": self.cookies.get("session-token", ""),
"x-amz-lens-session-id": self.session_id,
"x-amz-lens-ubid": self.cookies.get("ubid-main", ""),
"accept-encoding": "gzip",
"user-agent": "okhttp/4.9.1",
}
def _generate_auth_params(self) -> Dict[str, str]:
"""生成认证所需的 authtoken 和 ts"""
ts = str(int(time.time()))
combined = (
f"{self.site_config['secret']}{self.site_config['username']}"
f"{self.site_config['application']}{ts}"
)
authtoken = hashlib.sha512(combined.encode("utf-8")).hexdigest()
return {"ts": ts, "authtoken": authtoken}
def _build_query_metadata(self, extra_params: Optional[Dict[str, str]] = None) -> str:
"""构建通用的 query_metadata,包含随机设备信息"""
base_params = {
"amznSessionId": self.session_id,
"clientVersion": "30.20.2.100",
"cardsVersion": "1.0",
"clientMessageVersion": "1.0",
"amznDirectedCustomerId": "",
"clientDeviceId": self.client_device_id,
"clientId": str(uuid.uuid4()),
"sourceType": "Photo",
"ingressSource": "ctp",
"uiMode": "stylesnap",
# 注入随机设备信息
**self.device_info
}
if extra_params:
base_params.update(extra_params)
return json.dumps(base_params)
def _parse_response(self, response_json: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""从API响应中解析ASIN列表并构建结果,缺失关键字段视为解析失败"""
try:
# 严格检查必须存在的字段
if "style-snap" not in response_json:
raise KeyError("响应缺少 'style-snap' 字段")
if "searchResult" not in response_json["style-snap"]:
raise KeyError("响应缺少 'style-snap.searchResult' 字段")
if not response_json["style-snap"]["searchResult"]:
raise IndexError("'searchResult' 列表为空")
search_result = response_json["style-snap"]["searchResult"][0]
bbx_asin_list = search_result.get("bbxAsinList", [])
if not bbx_asin_list:
logger.warning("响应中 bbxAsinList 为空")
return None
unique_asin_list = list(set(bbx_asin_list)) # 去重asin
joined_asins = "|".join(unique_asin_list)
return {
"is_app": 1,
"asin_list_app": joined_asins,
"search_url": AMAZON_SEARCH_BASE_URL.format(bbx_asin_list=joined_asins),
}
except (KeyError, IndexError, TypeError) as e:
logger.error(f"解析响应失败: {e}. 响应内容: {json.dumps(response_json, ensure_ascii=False)}...")
return None # 返回None表示解析失败,触发重试
def _perform_request(self, **kwargs) -> Dict[str, Any]:
"""单步请求的重试逻辑(不更新Cookie,仅处理网络错误)"""
for attempt in range(STEP_RETRY_TIMES):
try:
response = requests.post(**kwargs, timeout=10)
response.raise_for_status() # 对非2xx响应抛出HTTPError
return response.json()
except JSONDecodeError:
logger.error(f"请求失败 (第 {attempt + 1}/{STEP_RETRY_TIMES} 次): 响应不是有效的JSON格式")
except RequestException as e:
logger.error(f"请求失败 (第 {attempt + 1}/{STEP_RETRY_TIMES} 次): {e}")
if attempt < STEP_RETRY_TIMES - 1:
logger.warning(f"将在 {RETRY_DELAY} 秒后重试...")
time.sleep(RETRY_DELAY)
raise RequestException(f"单步API请求在 {STEP_RETRY_TIMES} 次尝试后最终失败")
def _default_search(self, image_path: str) -> Dict[str, Any]:
"""执行默认的图片识别请求(第一步)"""
logger.info(f"开始默认识别 (站点: {self.site_name}, 图片: {image_path}), 站点链接:{self.snap_url}")
try:
with open(image_path, "rb") as f:
image_data = f.read()
except FileNotFoundError:
logger.error(f"无法读取图片文件: {image_path}")
raise
auth_params = self._generate_auth_params()
query_metadata = self._build_query_metadata({"orientation": "-1"})
files = {
"application": (None, self.site_config['application']),
"query_metadata": (None, query_metadata),
"authtoken": (None, auth_params['authtoken']),
# "authtoken": (None, ''), # 为空失请求失败 测试重试功能
"lang": (None, "en_US"),
"username": (None, self.site_config['username']),
"ts": (None, auth_params['ts']),
"file": ("image.jpg", image_data, "image/jpeg"),
}
return self._perform_request(url=self.snap_url, files=files, headers=self.headers)
def _full_image_search(self, query_id: str, image_path: str) -> Dict[str, Any]:
"""执行全图识别请求(第二步)"""
logger.info(f"开始全图识别 (Query ID: {query_id[:10]}...)")
image_size = get_image_size(image_path)
if not image_size:
raise ValueError("无法获取图片尺寸,无法进行全图搜索")
# 生成随机裁剪框
offset = random.randint(0, 2)
bounding_box = {
"tlx": max(0, offset),
"tly": max(0, offset),
"brx": max(image_size["width"] - offset, max(0, offset) + 1),
"bry": max(image_size["height"] - offset, max(0, offset) + 1),
"imh": image_size["height"],
"imw": image_size["width"]
}
auth_params = self._generate_auth_params()
query_metadata = self._build_query_metadata()
form_data = {
"mainQueryId": (None, query_id),
"uiMode": (None, "stl_bbx_reformulation"),
"application": (None, self.site_config['application']),
"query_metadata": (None, query_metadata),
"authtoken": (None, auth_params['authtoken']),
"inputBoundingBox": (None, json.dumps(bounding_box)),
"imageHash": (None, ""),
"lang": (None, "en_US"),
"username": (None, self.site_config['username']),
"ts": (None, auth_params['ts']),
}
return self._perform_request(url=self.snap_url, files=form_data, headers=self.headers)
def search(self, image_path: str, search_mode: str = "default") -> Dict[str, Any]:
"""
执行图片搜索(包含全局重试逻辑,解析失败时更新Cookie重试)
:param image_path: 本地图片文件路径
:param search_mode: 搜索模式 ('default' 或 'full_image')
:return: 包含搜索结果的字典
"""
base_result = {
"is_web": 0, "is_app": 0, "asin_list_web": "", "asin_list_app": "",
"asin_list_join": "", "site_name": self.site_name,
"search_url": None, "mode": search_mode,
"retry_count": 0 # 记录重试次数
}
# 全局重试逻辑(包含Cookie更新)
for global_attempt in range(GLOBAL_RETRY_TIMES):
try:
# 检查Cookie有效性(每次重试前确认)
if not self.session_id:
logger.warning("Cookie中缺少'session-id',尝试更新Cookie...")
self._update_cookie()
self._update_headers() # 同步更新请求头
# 步骤1: 执行默认搜索(全图模式依赖此步骤的query_id)
default_response = self._default_search(image_path)
# 处理默认模式
if search_mode == "default":
parsed_result = self._parse_response(default_response)
if parsed_result: # 解析成功才返回
base_result.update(parsed_result)
base_result["asin_list_join"] = parsed_result["asin_list_app"]
base_result["retry_count"] = global_attempt
return base_result
else:
# 解析失败,准备重试
raise ValueError("默认模式响应解析失败,触发重试")
# 处理全图模式
elif search_mode == "full_image":
query_id = default_response.get("queryId")
if not query_id:
raise ValueError("默认识别未返回queryId,无法进行全图搜索")
full_image_response = self._full_image_search(query_id, image_path)
parsed_result = self._parse_response(full_image_response)
if parsed_result: # 解析成功才返回
base_result.update(parsed_result)
base_result["asin_list_join"] = parsed_result["asin_list_app"]
base_result["retry_count"] = global_attempt
return base_result
else:
# 解析失败,准备重试
raise ValueError("全图模式响应解析失败,触发重试")
else:
logger.error(f"不支持的搜索模式: {search_mode}")
return base_result
except Exception as e:
logger.error(f"第 {global_attempt + 1}/{GLOBAL_RETRY_TIMES} 次尝试失败: {e}")
# 最后一次重试失败则返回基础结果
if global_attempt == GLOBAL_RETRY_TIMES - 1:
base_result["error"] = str(e)
return base_result
# 非最后一次重试:更新Cookie和设备,延迟后重试
logger.info("准备更新Cookie和设备信息后重试...")
self._update_cookie() # 更新Cookie(当前固定站点Cookie,后续可从数据库读取)
self._update_device() # 更新设备信息
self._update_headers() # 同步更新请求头
self.client_device_id = str(uuid.uuid4()) # 更新客户端ID
logger.info(f"重试准备完成(下次是第 {global_attempt + 2} 次尝试)")
time.sleep(RETRY_DELAY)
return base_result
if __name__ == "__main__":
# 模拟图片路径
image_file_path = "temp_img/B0BYNB2J6W.jpg"
try:
# 测试不同站点
for site in ["us","uk","de"]:
logger.info(f"\n{'=' * 40}")
logger.info(f"开始测试站点: {site}")
logger.info(f"{'=' * 40}")
client = AmazonImageSearch(site_name=site)
# 测试默认识别模式
logger.info("\n--- 测试默认识别模式 ---")
default_result = client.search(image_file_path, search_mode="default")
logger.info(f"默认模式结果: {json.dumps(default_result, ensure_ascii=False, indent=2)}")
# # 测试全图识别模式
logger.info("\n--- 测试全图识别模式 ---")
full_image_result = client.search(image_file_path, search_mode="full_image")
logger.info(f"全图模式结果: {json.dumps(full_image_result, ensure_ascii=False, indent=2)}")
except ValueError as e:
logger.error(f"初始化失败: {e}")
except Exception as e:
logger.error(f"执行过程中发生错误: {e}")
\ No newline at end of file
import os
import time
import threading
import queue
import random
import json
from datetime import datetime
from typing import List, Dict
from loguru import logger
import pandas as pd
from app_img_search_api_new import AmazonImageSearch # 导入客户端类
from amazon_configs import (
site_name_secret_dict,
us_devices_list,
uk_devices_list,
de_devices_list
)
# 日志目录配置
LOG_DIR = "log" # 日志文件夹名称
# 判断日志目录是否存在,不存在则创建
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR, exist_ok=True) # exist_ok=True 避免目录已存在时报错
# 测试配置
TEST_FOLDER = "temp_img" # 图片文件夹路径
REQUESTS_PER_IMAGE = 1 # 每张图片每个站点的请求次数(每种模式各执行一次)
THREAD_COUNT = 30 # 线程数量
RESULT_EXCEL = "multi_mode_test_results3.xlsx" # 结果保存文件
ALL_SITES = ["us", "uk", "de"] # 要测试的三个站点
ALL_MODES = ["default", "full_image"] # 两种搜索模式
# 站点设备列表映射
SITE_DEVICES = {
"us": us_devices_list,
"uk": uk_devices_list,
"de": de_devices_list
}
# 线程安全组件
task_queue = queue.Queue()
result_lock = threading.Lock()
test_results = []
def worker():
"""线程工作函数:先执行默认模式,再执行全图模式,记录两种模式结果"""
thread_id = threading.current_thread().ident
thread_name = threading.current_thread().name
while not task_queue.empty():
try:
# 从队列获取任务:图片路径、请求序号、站点
image_path, request_idx, site_name = task_queue.get(block=False)
image_name = os.path.basename(image_path)
logger.info(
f"[{thread_name}] 开始处理 {image_name} 第 {request_idx} 次请求 "
f"(站点:{site_name},两种模式)"
)
# 初始化客户端(同一客户端实例,确保设备信息一致)
client = AmazonImageSearch(site_name=site_name)
device_info = client.device_info # 获取设备信息(两种模式共用)
device_str = (
f"设备: {device_info.get('clientDevice')}, "
f"厂商: {device_info.get('deviceManufacturer')}, "
f"系统版本: {device_info.get('clientDeviceVersion')}"
)
logger.debug(f"[{thread_name}] {device_str}")
# 依次执行两种模式
for mode in ALL_MODES:
try:
# 记录单次模式开始时间
mode_start = time.time()
# 执行对应模式的搜索
result_data = client.search(
image_path=image_path,
search_mode=mode
)
# 计算单次模式耗时
duration = round(time.time() - mode_start, 4)
save_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 锁定并保存结果(保持原有数据格式,新增mode字段)
with result_lock:
test_results.append({
"image": image_name,
"request_index": request_idx, # 同一请求序号关联两种模式
"mode": mode, # 新增:标记当前模式
"duration": duration,
"success": result_data.get("is_app", 0),
"result_url": result_data.get("search_url", "None"),
"thread_id": thread_id,
"is_web": result_data.get("is_web", 0),
"is_app": result_data.get("is_app", 0),
"asin_list_web": result_data.get("asin_list_web", ""),
"asin_list_app": result_data.get("asin_list_app", ""),
"asin_list_join": result_data.get("asin_list_join", ""),
"site_name": site_name,
"device_info": json.dumps(device_info, ensure_ascii=False),
# "device_summary": f"{device_info.get('clientDevice')} ({device_info.get('clientDeviceVersion')})",
"save_time": save_time
})
logger.info(f"[{thread_name}] {mode}模式完成(耗时{duration}秒)")
except Exception as e:
logger.error(f"[{thread_name}] {mode}模式处理出错: {str(e)}")
# 记录模式失败结果
with result_lock:
test_results.append({
"image": image_name,
"request_index": request_idx,
"mode": mode,
"duration": 0,
"success": 0,
"result_url": "None",
"thread_id": thread_id,
"is_web": 0,
"is_app": 0,
"asin_list_web": "",
"asin_list_app": "",
"asin_list_join": "",
"site_name": site_name,
"device_info": json.dumps(device_info, ensure_ascii=False),
# "device_summary": f"{device_info.get('clientDevice')} ({device_info.get('clientDeviceVersion')})",
"save_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
# "error": str(e)
})
except queue.Empty:
break
except Exception as e:
logger.error(f"[{thread_name}] 整体任务出错: {str(e)}")
finally:
task_queue.task_done()
def collect_image_paths(folder: str) -> List[str]:
"""收集文件夹中的图片路径(仅支持常见图片格式)"""
image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.bmp')
if not os.path.exists(folder):
logger.error(f"图片文件夹不存在: {folder}")
return []
return [
os.path.join(folder, f)
for f in os.listdir(folder)
if f.lower().endswith(image_extensions)
]
def init_task_queue(image_paths: List[str]):
"""初始化任务队列:包含所有站点、所有图片、所有请求次数(一次任务对应两种模式)"""
for site in ALL_SITES:
for request_idx in range(1, REQUESTS_PER_IMAGE + 1):
for image_path in image_paths:
task_queue.put((image_path, request_idx, site))
total_tasks = task_queue.qsize()
logger.info(
f"任务队列初始化完成 - 总任务数: {total_tasks} "
f"(每个任务包含{len(ALL_MODES)}种模式),"
f"站点数: {len(ALL_SITES)}, 图片数: {len(image_paths)}, "
f"单图单站请求数: {REQUESTS_PER_IMAGE}"
)
def save_results_to_excel(results: List[Dict]):
"""将结果写入Excel,新增mode字段区分模式"""
if not results:
logger.warning("无结果可保存")
return
# 转换为DataFrame,确保字段顺序(新增mode字段)
df = pd.DataFrame(results)[[
"image", "request_index", "mode", "duration", "success", "result_url",
"thread_id", "is_web", "is_app", "asin_list_web", "asin_list_app",
"asin_list_join", "site_name", "device_info", "save_time"
]]
# 保存到Excel
df.to_excel(RESULT_EXCEL, index=False, engine="openpyxl")
logger.info(f"结果已保存到 {RESULT_EXCEL}")
def print_summary(results: List[Dict]):
"""打印测试总结(按站点和模式分组统计)"""
if not results:
return
total = len(results)
total_duration = sum(r["duration"] for r in results)
logger.info("\n===== 测试总结 =====")
logger.info(f"总请求数: {total}(含{len(ALL_MODES)}种模式),"
f"总耗时: {total_duration:.2f}秒,"
f"平均耗时: {total_duration / total:.4f}秒")
# 按站点分组统计
for site in ALL_SITES:
site_results = [r for r in results if r["site_name"] == site]
site_total = len(site_results)
if site_total == 0:
continue
logger.info(f"\n----- 站点: {site} 统计 -----")
logger.info(f"总请求数: {site_total},平均耗时: {sum(r['duration'] for r in site_results) / site_total:.4f}秒")
# 按模式分组统计
for mode in ALL_MODES:
mode_results = [r for r in site_results if r["mode"] == mode]
mode_total = len(mode_results)
if mode_total == 0:
continue
mode_success = sum(1 for r in mode_results if r["is_app"] == 1)
mode_success_rate = (mode_success / mode_total) * 100 if mode_total else 0
logger.info(
f"模式 {mode}: "
f"请求数 {mode_total}, 成功数 {mode_success}, "
f"成功率 {mode_success_rate:.2f}%, "
f"平均耗时 {sum(r['duration'] for r in mode_results) / mode_total:.4f}秒"
)
# 设备多样性统计
# devices_used = {r["device_summary"] for r in site_results}
# logger.info(f"使用设备种类: {len(devices_used)} 种")
if __name__ == "__main__":
# 初始化日志
log_file = os.path.join(LOG_DIR, "multi_mode_stability_test.log")
logger.add(log_file,rotation="10 MB",level="INFO",encoding="utf-8")
# 验证所有站点的合法性
for site in ALL_SITES:
if site not in site_name_secret_dict:
logger.error(f"不支持的站点: {site},支持站点: {list(site_name_secret_dict.keys())}")
exit(1)
if not SITE_DEVICES.get(site, []):
logger.warning(f"站点 {site} 的设备列表为空,可能影响测试结果")
# 准备资源
image_paths = collect_image_paths(TEST_FOLDER)
if not image_paths:
logger.error("未找到图片,退出测试")
exit(1)
init_task_queue(image_paths)
# 启动多线程
start_time = time.time()
threads = []
for i in range(THREAD_COUNT):
t = threading.Thread( target=worker, name=f"Worker-{i + 1}")
threads.append(t)
t.start()
logger.info(f"启动线程: {t.name}")
# 等待所有任务完成
task_queue.join()
for t in threads:
t.join()
total_time = time.time() - start_time
logger.info(f"所有任务完成,总耗时: {total_time:.2f}秒")
# 保存结果并打印总结
save_results_to_excel(test_results)
print_summary(test_results)
\ No newline at end of file
import time import json
import pandas as pd import pandas as pd
from secure_db_client import get_remote_engine
import time
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool
from sqlalchemy import text
from sqlalchemy.orm import sessionmaker
import platform
import traceback
import json
import uuid
from sqlalchemy.exc import SQLAlchemyError
class ConnectSpider: class ConnectSpider:
def __init__(self): def __init__(self):
self.pg_port = 54328 self.db_engine = get_remote_engine(
self.pg_db = "selection" site_name='us', # -> database "selection"
self.pg_user = "postgres" db_type="postgresql_14_outer", # -> 服务端 alias "mysql"
self.pg_pwd = "F9kL2sXe81rZq" )
self.pg_host = "61.145.136.61" self.db_engine192 = get_remote_engine(
self.db_engine = create_engine(f"postgresql://{self.pg_user}:{self.pg_pwd}@{self.pg_host}:{self.pg_port}/{self.pg_db}") site_name='us', # -> database "selection"
db_type="postgresql_14_outer", # -> 服务端 alias "mysql"
pg_host = "192.168.10.223" )
self.db_engine192 = create_engine( # self.pg_port = 54328
f"postgresql://{self.pg_user}:{self.pg_pwd}@{self.pg_host}:{self.pg_port}/{self.pg_db}") # self.pg_db = "selection"
# self.pg_user = "postgres"
# mysql # self.pg_pwd = "F9kL2sXe81rZq"
self.sql_port = 3306 # self.pg_host = "61.145.136.61"
self.sql_db = "selection" # pg_host = "192.168.10.223"
self.sql_user = "adv_yswg" # self.db_engine192 = create_engine(
self.sql_pwd = "Gd1pGJog1ysLMLBdML8w81" # f"postgresql://{self.pg_user}:{self.pg_pwd}@{self.pg_host}:{self.pg_port}/{self.pg_db}")
self.sql_host = "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com"
self.charset = 'utf8mb4'
# 创建数据库连接字符串
connection_string_mysql = f"mysql+pymysql://{self.sql_user}:{self.sql_pwd}@{self.sql_host}:{self.sql_port}/{self.sql_db}?charset={self.charset}"
self.mysql_engine = create_engine(connection_string_mysql)
# mysql
sql_port = 19030
sql_db = "test"
sql_user = "fangxingjun"
sql_pwd = "fangxingjun12345"
sql_host = "192.168.10.151"
wai_host = "113.100.143.162"
# 创建数据库连接字符串
connection_string_mysql = f"mysql+pymysql://{sql_user}:{sql_pwd}@{wai_host}:{sql_port}/{sql_db}"
self.mysql_test = create_engine(connection_string_mysql)
# mysql
sql_port = 19030
sql_db = "selection"
sql_user = "fangxingjun"
sql_pwd = "fangxingjun12345"
sql_host = "192.168.10.151"
wai_host = "113.100.143.162"
# 创建数据库连接字符串
connection_string_mysql = f"mysql+pymysql://{sql_user}:{sql_pwd}@{sql_host}:{sql_port}/{sql_db}"
self.mysql_selection = create_engine(connection_string_mysql)
def mysql(self): def mysql(self):
sql_port = 3306 mysql_engine = get_remote_engine(
sql_db = "us_spider" site_name='us', # -> database "selection"
sql_user = "adv_yswg" db_type="mysql", # -> 服务端 alias "mysql"
sql_pwd = "Gd1pGJog1ysLMLBdML8w81" )
sql_host = "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com"
charset = 'utf8mb4'
# 创建数据库连接字符串
connection_string_mysql = f"mysql+pymysql://{sql_user}:{sql_pwd}@{sql_host}:{sql_port}/{sql_db}?charset={charset}"
mysql_engine = create_engine(connection_string_mysql)
return mysql_engine return mysql_engine
def mysql_us_spider(self): def save_stock_img_id(self,items):
sql_port = 19030 # sql = """
# sql_db = "us_spider" # INSERT INTO stock_image_id_wj
sql_db = "test" # (account_id, image_id, state, created_at, image_title, image_size_info)
sql_user = "fangxingjun" # VALUES (%s, %s, %s, %s, %s, %s)
sql_pwd = "fangxingjun12345" # ON DUPLICATE KEY UPDATE
sql_host = "192.168.10.151" # state = VALUES(state),
# created_at = VALUES(created_at),
# 创建数据库连接字符串 # image_title = VALUES(image_title),
connection_string_mysql = f"mysql+pymysql://{sql_user}:{sql_pwd}@{sql_host}:{sql_port}/{sql_db}" # image_size_info = VALUES(image_size_info)
mysql_us_spider_engine = create_engine(connection_string_mysql) # """
return mysql_us_spider_engine sql = """
def save_stock_img_id(self, items):
"""批量保存数据到数据库。"""
# 提取 image_title 和 image_size_info
processed_items = []
for item in items:
image_title = item.get('title', '')
image_size_info = json.dumps(item.get('sizes', {}))
processed_item = {
'account_id': item['account_id'],
'image_id': item['image_id'],
'state': item['state'],
'created_at': item['created_at'],
'image_title': image_title,
'image_size_info': image_size_info
}
processed_items.append(processed_item)
# # 定义DataFrame的列 旧代码
# columns = ['account_id', 'image_id', 'state', 'created_at', 'image_title', 'image_size_info']
# df = pd.DataFrame(processed_items, columns=columns)
#
# # 使用with语句管理数据库连接
# with self.db_engine192.connect() as connection:
# df.to_sql(
# name=table_name,
# con=connection,
# if_exists='append',
# index=False
# )
insert_sql = text("""
INSERT INTO stock_image_id_wj INSERT INTO stock_image_id_wj
(account_id, image_id, state, created_at, image_title, image_size_info) (account_id, image_id, state, created_at, image_title, image_size_info)
VALUES VALUES (%s, %s, %s, %s, %s, %s::jsonb)
(:account_id, :image_id, :state, :created_at, :image_title, :image_size_info) ON CONFLICT (account_id, image_id) DO UPDATE SET
ON CONFLICT (account_id, image_id) DO NOTHING state = EXCLUDED.state,
""") created_at = EXCLUDED.created_at,
print('新代码插入id',insert_sql) # 不需要担心有重复的。表里面有唯一索引。会跳过。 image_title = EXCLUDED.image_title,
# 假设 processed_items 是 [{'account_id':..., 'image_id':..., …}, …] image_size_info = EXCLUDED.image_size_info;
with self.db_engine192.begin() as conn: """
conn.execute(insert_sql, processed_items) params = [
(
def save_homedepot_projects(self, items): item['account_id'],
"""批量保存数据到数据库。""" item['image_id'],
table_name = "homedepot_projects_items" item['state'],
item['created_at'],
# 提取 image_title 和 image_size_info item.get('title', ''),
processed_items = [] json.dumps(item.get('sizes', {}))
for item in items: )
image_title = item.get('title', '') for item in items
image_size_info = json.dumps(item.get('sizes', {})) ]
processed_item = { for i in range(5):
'account_id': item['account_id'],
'image_id': item['image_id'],
'state': item['state'],
'created_at': item['created_at'],
'image_title': image_title,
'image_size_info': image_size_info
}
processed_items.append(processed_item)
# 定义DataFrame的列
columns = ['account_id', 'image_id', 'state', 'created_at', 'image_title', 'image_size_info']
df = pd.DataFrame(processed_items, columns=columns)
# 使用with语句管理数据库连接
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
)
def get_account_id(self, item_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_summary_wj"
query = text(f"SELECT account_id, id FROM {table_name} WHERE id = :item_id")
result = connection.execute(query, {"item_id": item_id})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
try: try:
accounts = df_status.account_id.iloc[0] with self.db_engine192.begin() as conn:
except IndexError: conn.execute(sql, params)
accounts = None # 或者处理不存在的情况 print('存储更新成功')
return accounts break
except Exception as e:
def update_id_to_3(self,account_id): time.sleep(30)
with self.db_engine192.connect() as connection: print('save_stock_img_id 报错。', e)
table_name = "stock_image_summary_wj" def update_id_to_3(self, account_id):
success_id = tuple(account_id) for i in range(5):
sql_update = text(f"UPDATE {table_name} SET state = 3 WHERE account_id IN :success_id") try:
result = connection.execute(sql_update, {"success_id": success_id}) with self.db_engine192.begin() as connection:
print('成功更新为3') table_name = "stock_image_summary_wj"
connection.close() print(account_id)
sql_update = f"UPDATE {table_name} SET state = 3 WHERE account_id='{account_id}'"
def update_all_states_to_1(self,state=1,item_id=None): print(sql_update,'成功更新为3')
try: connection.execute(sql_update)
with self.db_engine192.begin() as connection: # 使用 begin() 自动管理事务 break
table_name = "stock_image_summary_wj" except Exception as e:
if state==3: time.sleep(30)
sql_update = text(f"UPDATE {table_name} SET state = {state} where id={item_id}") print('update_id_to_3 报错。', e)
else:
sql_update = text(f"UPDATE {table_name} SET state = {state}")
print(sql_update)
result = connection.execute(sql_update)
print(f'成功更新所有状态为1,受影响行数:{result.rowcount}')
# 显式提交事务(虽然 begin() 已经自动提交)
connection.commit()
except Exception as e:
print(f'更新状态失败:{e}')
# 回滚事务
if 'connection' in locals():
connection.rollback()
def save_account(self,items):
"""批量保存数据到数据库。"""
table_name = "stock_image_summary_wj"
# 定义DataFrame的列
columns = ['account_id', 'account_secret', 'year_month', 'spider_date','state','created_time']
df = pd.DataFrame(items, columns=columns)
# 使用with语句管理数据库连接
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
)
print("保存成功!")
def delet_datails(self, image_id_list):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
# 使用 SQLAlchemy 的 text 函数来创建 SQL 语句
query = text(f"SELECT image_id FROM {table_name} WHERE account_id = 'zhouweiqing@yswg.com.cn';")
# 使用 connection.execute() 来执行查询
result = connection.execute(query).fetchall()
# 获取表中的 image_id 列表
db_image_ids = [row[0] for row in result]
# 找出不在 image_id_list 中的 image_id
non_existent_image_ids = set(db_image_ids) - set(image_id_list)
# 删除不在 image_id_list 中的记录
for image_id in non_existent_image_ids:
delete_query = text(
f"DELETE FROM {table_name} WHERE account_id = 'zhouweiqing@yswg.com.cn' AND image_id = '{image_id}';")
connection.execute(delete_query)
# 提交更改
connection.commit()
def get_datails_image_id(self, account_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
sql_query = text(f"SELECT image_id FROM {table_name} WHERE account_id = :account_id and created_time < '2024-09-02 00:00:00'")
result = connection.execute(sql_query, {"account_id": account_id})
image_id_list = [int(row[0]) for row in result.fetchall()]
# 提交更改
# connection.commit()
return image_id_list
def update_all_states_to_1(self, state=1, item_id=None):
for i in range(5):
try:
with self.db_engine192.begin() as connection: # 使用 begin() 自动管理事务
table_name = "stock_image_summary_wj"
if state == 3:
sql_update = f"UPDATE {table_name} SET state = {state} where id={item_id}"
else:
sql_update = f"UPDATE {table_name} SET state = {state}"
print(sql_update)
connection.execute(sql_update)
break
except Exception as e:
time.sleep(30)
print(f'更新状态失败:{e}')
# 回滚事务
# 1111111111111
def save_stock_detail(self, item): def save_stock_detail(self, item):
"""批量保存数据到数据库。""" """批量保存数据到数据库。"""
table_name = "stock_image_detail_wj" table_name = "stock_image_detail_wj"
# 将item包装成列表 # 将item包装成列表
items_list = [item] items_list = [item]
# 定义DataFrame的列 # 定义DataFrame的列
columns = ['account_id', 'image_id', 'image_size_info', 'image_title', 'image_type', 'image_url','state', 'created_time'] columns = ['account_id', 'image_id', 'image_size_info', 'image_title', 'image_type', 'image_url', 'state',
'created_time']
df = pd.DataFrame(items_list, columns=columns) df = pd.DataFrame(items_list, columns=columns)
for i in range(5):
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
)
# print("保存成功!")
def get_pic_urls_limit1(self, account_id):
pic_data_list = [] # 创建一个空列表来存储结果
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
query = text(
f"""select image_url, image_id, image_title from {table_name} where account_id = :account_id and state = 1 limit 1""")
try: try:
result = connection.execute(query, {'account_id': account_id}) self.db_engine192.to_sql(df, table_name, if_exists='append')
for row in result: # 遍历所有的结果行 print("保存成功!")
if row is not None: break
# 将 RowProxy 转换为标准的字典
row_dict = dict(zip(result.keys(), row)) # 使用 keys 和 fetchone 的结果创建字典
# 直接构建所需格式的字符串
pic_datas = f"{row_dict['image_url']}||{row_dict['image_id']}||{row_dict['image_title']}"
pic_data_list.append(pic_datas) # 添加到列表中
if not pic_data_list:
# print("No data found for the given account_id")
return False
else:
return pic_data_list # 返回列表
except Exception as e: except Exception as e:
print(f"An error occurred: {e}") time.sleep(30)
return False print(f'save_stock_detail 报错:{e}')
# 回滚事务
def get_pic_urls(self, account_id):
pic_data_list = [] # 创建一个空列表来存储结果
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
query = text(
f"""select image_url, image_id, image_title from {table_name} where account_id = :account_id and state = 1""")
def get_stock_images_id(self, account_id):
for i in range(5):
try: try:
result = connection.execute(query, {'account_id': account_id}) table_name = "stock_image_id_wj"
for row in result: # 遍历所有的结果行 # 修改查询语句以匹配你的数据表名称和列名称
if row is not None: query = f""" SELECT image_id,id,image_title,image_size_info FROM {table_name} where account_id ='{account_id}' and state = 1"""
# 将 RowProxy 转换为标准的字典 print(query)
row_dict = dict(zip(result.keys(), row)) # 使用 keys 和 fetchone 的结果创建字典 df_status = self.db_engine192.read_sql(query)
try:
# 直接构建所需格式的字符串 df_status['id'] = df_status['id'].astype(str)
pic_datas = f"{row_dict['image_url']}||{row_dict['image_id']}||{row_dict['image_title']}" image_id_id_pairs = list(
pic_data_list.append(pic_datas) # 添加到列表中 df_status['image_id'] + '||-||' + df_status['id'] + '||-||' + df_status['image_title'] + '||-||' +
df_status['image_size_info'])
if not pic_data_list: print(f'账号:{account_id}需爬取{len(image_id_id_pairs)}张')
# print("No data found for the given account_id") return image_id_id_pairs
except Exception as e:
print(e)
return False return False
else:
return pic_data_list # 返回列表
except Exception as e: except Exception as e:
print(f"An error occurred: {e}") time.sleep(30)
return False print(f'get_stock_images_id 报错:{e}')
# 回滚事务
def update_image_id_to_3(self, item_id):
def get_stock_images_id2(self, account_id): for i in range(5):
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj"
# 特定的 image_id 列表
specific_image_ids = [
'1025406430',
'782084149',
'2340663257',
'2444918601',
'2481076155',
'2534369399',
'2522128969',
'2522144147',
'2482077119',
'2475085855',
'2560247125',
'1115348984',
'2555951185',
'1644852415',
'1644852424',
'258700904',
'2540342353',
'2555951245',
'2529955899',
'1309059847',
'1899316957',
'2416180707',
'1978653428',
'2520112131',
'1447499252',
'2335787565',
'1780440524',
'2316295613',
'2463106909',
'2527382733',
'2548693637',
'2460743889',
'2489123001',
'2527399543',
'2456315025',
'2469939069',
'2305915213',
'1660111006',
'2218802639',
'453729808',
'2295540279',
'2323950095',
'2323950087',
'2057817146',
'2541104423',
'231076948',
'2196541827',
'2407612765',
'2521017693',
'2554778219',
'2523427909',
'2520799267',
'2533854931',
'2498052331',
'2521798533',
'2471652945',
'2445858817',
'2449783031',
'1735869230',
'1106587370',
'2393397957',
'2527382699',
'2348771553',
'1822384931',
'2564084221'
]
# specific_image_ids = ['2509630613', '2568241787', '2568242327', '2568242443', '2568242799', '2568242949']
# 修改查询语句以匹配你的数据表名称、列名称,并加入 image_id 条件
query = text(f"""SELECT image_id,id,image_title,image_size_info FROM {table_name}
WHERE account_id = :account_id
AND image_id IN :image_ids""")
print(query)
result = connection.execute(query, {'account_id': account_id, 'image_ids': tuple(specific_image_ids)})
try:
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
df_status['id'] = df_status['id'].astype(str)
image_id_id_pairs = list(
df_status['image_id'].astype(str) + '||-||' + df_status['id'] + '||-||' + df_status[
'image_title'] + '||-||' + df_status['image_size_info'])
print(f'账号:{account_id}需爬取{len(image_id_id_pairs)}张')
return image_id_id_pairs
except Exception as e:
print(e)
return False
def img_size_is_1(self, account_id, image_ids):
# 确保image_ids列表非空
if not image_ids:
print("No image IDs provided.")
return False
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj"
query = text(f"""SELECT image_id, id, image_title, image_size_info FROM {table_name} WHERE account_id = :account_id AND image_id IN :image_ids""")
try:
result = connection.execute(query, {'account_id': account_id, 'image_ids': tuple(image_ids)})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
df_status['id'] = df_status['id'].astype(str)
image_id_id_pairs = list(
df_status['image_id'] + '||-||' + df_status['id'] + '||-||' + df_status['image_title'] + '||-||' +
df_status['image_size_info'])
print(f'账号:{account_id}需爬取{len(image_id_id_pairs)}张')
return image_id_id_pairs
except Exception as e:
print(e)
return False
def get_stock_images_id(self,account_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj"
# 修改查询语句以匹配你的数据表名称和列名称
query = text(f""" SELECT image_id,id,image_title,image_size_info FROM {table_name} where account_id = :account_id and state = 1""")
print(query)
result = connection.execute(query, {'account_id': account_id})
try: try:
df_status = pd.DataFrame(result.fetchall()) with self.db_engine192.begin() as connection:
df_status.columns = result.keys() table_name = "stock_image_id_wj"
sql_update = f"UPDATE {table_name} SET state = 3 WHERE id = {item_id}"
connection.execute(sql_update)
df_status['id'] = df_status['id'].astype(str) break
image_id_id_pairs = list(df_status['image_id'] + '||-||' + df_status['id'] + '||-||' + df_status['image_title'] + '||-||' + df_status['image_size_info'])
print(f'账号:{account_id}需爬取{len(image_id_id_pairs)}张')
connection.close()
return image_id_id_pairs
except Exception as e: except Exception as e:
print(e) time.sleep(30)
return False print(f'update_image_id_to_3 报错:{e}')
# 回滚事务
def get_kong_images_id(self, account_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
# 使用子查询来过滤掉已经有 image_size_info 的 image_id
query = text(
f"""SELECT image_id, image_type, image_url
FROM {table_name}
WHERE account_id = :account_id
AND image_id NOT IN (
SELECT image_id
FROM {table_name}
WHERE account_id = :account_id
AND image_size_info != '{{}}'
)
AND image_size_info = '{{}}'"""
)
print(query)
result = connection.execute(query, {'account_id': account_id})
df_status = pd.DataFrame(result.fetchall(), columns=result.keys())
if df_status.empty:
return []
data_list = list(df_status['image_id'] + '||' + df_status['image_type'] + '||' + df_status['image_url'])
connection.close()
return data_list
def get_stock_image_detail(self, account_id):
with self.mysql_selection.connect() as connection:
table_name = "stock_image_detail"
query = text(
f"""SELECT account_id, image_id, image_size_info, image_title, image_type, image_url, created_time FROM {table_name} WHERE account_id = :account_id""")
print(query)
result = connection.execute(query, {'account_id': account_id})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
# 将 Timestamp 转换为字符串格式
df_status['created_time'] = df_status['created_time'].dt.strftime('%Y-%m-%d %H:%M:%S')
# 拼接字符串
detail_datas = list(
df_status['account_id'] + '||-||' +
df_status['image_id'] + '||-||' +
df_status['image_size_info'] + '||-||' +
df_status['image_title'] + '||-||' +
df_status['image_type'] + '||-||' +
df_status['image_url'] + '||-||' +
df_status['created_time']
)
print(f'账号:{account_id} 一共 {len(detail_datas)} 条数据')
return detail_datas
def save_stock_detail_move(self, data_list):
table_name = "stock_image_detail_wj"
# 定义DataFrame的列
columns = ['account_id', 'image_id', 'image_size_info', 'image_title', 'image_type', 'image_url',
'created_time']
df = pd.DataFrame(data_list, columns=columns)
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
)
print("保存成功!")
# 11111111111
def update_image_id_to_3(self, item_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj"
trans = connection.begin()
sql_update = text(f"UPDATE {table_name} SET state = 3 WHERE id = :item_id")
result = connection.execute(sql_update, {"item_id": item_id})
trans.commit()
def update_url_state_to_3(self,image_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
trans = connection.begin()
sql_update = text(f"UPDATE {table_name} SET state = 3 WHERE image_id = :image_id and state = 1")
result = connection.execute(sql_update, {"image_id": image_id})
trans.commit()
# 11111111111
def update_image_id_to_4(self, item_id): def update_image_id_to_4(self, item_id):
with self.db_engine192.connect() as connection: for i in range(5):
table_name = "stock_image_id_wj"
trans = connection.begin()
sql_update = text(f"UPDATE {table_name} SET state = 4 WHERE id = :item_id")
result = connection.execute(sql_update, {"item_id": item_id})
connection.close()
trans.commit()
def save_stock_cookie(self,item):
table_name = "stock_cookie_wj"
# 将item包装成列表
items_list = [item]
# 定义DataFrame的列
columns = ['account_id', 'cookie', 'state', 'created_at']
df = pd.DataFrame(items_list, columns=columns)
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
)
print("保存成功!")
def updata_ck_state(self,ck_id):
with self.db_engine192.connect() as connection:
table_name = "stock_cookie_wj"
# 使用参数化查询防止SQL注入
query = text(
f"""UPDATE {table_name} SET state = :new_state WHERE id = :ck_id;""")
try: try:
# 执行更新语句 with self.db_engine192.begin() as connection:
result = connection.execute(query, {'new_state': 3, 'ck_id': ck_id}) table_name = "stock_image_id_wj"
# 提交事务以确保更改被保存到数据库中 sql_update = f"UPDATE {table_name} SET state = 4 WHERE id = {item_id}"
connection.commit() connection.execute(sql_update)
break
# 检查是否有行受到影响
if result.rowcount > 0:
print('修改cookie状态为3')
return True # 更新成功
else:
return False # 没有找到匹配项或没有更新任何行
except Exception as e: except Exception as e:
# print(f"An error occurred: {e}") time.sleep(30)
return False print(f'update_image_id_to_4 报错:{e}')
def get_cookie_account(self, item_id):
def get_stock_cookie(self, account): for i in range(5):
with self.db_engine192.connect() as connection: try:
table_name = "stock_cookie_wj"
# 使用参数化查询
query = text(
f"""SELECT id, cookie, state FROM {table_name} WHERE account_id = :account_id AND state = :state LIMIT 1;""")
result = connection.execute(query, {'account_id': account, 'state': 1}).mappings().first()
if result is not None:
# 通过键名访问字典中的元素
cookie_id_state = f"{result['id']}||-||{result['cookie']}||-||{result['state']}"
return cookie_id_state
else:
return None # 没有找到匹配项时返回None
def get_stock_cookie_list(self, account):
with self.db_engine192.connect() as connection:
table_name = "stock_cookie_wj"
# 使用参数化查询
query = text(f"""SELECT cookie FROM {table_name} WHERE account_id = :account_id AND state = :state ; """)
result = connection.execute(query, {'account_id': account, 'state': 1})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
cookie_list = df_status['cookie'].tolist() if 'cookie' in df_status.columns else []
return cookie_list
def get_cookie_account(self,item_id):
# try:
with self.db_engine192.connect() as connection:
table_name = "stock_image_summary_wj" table_name = "stock_image_summary_wj"
# 修改查询语句以匹配你的数据表名称和列名称 # 修改查询语句以匹配你的数据表名称和列名称
query = text(f"""SELECT account_id,account_secret FROM {table_name} where id = :item_id and state= :state_int;""") query = f"""SELECT account_id,account_secret FROM {table_name} where id = {item_id} and state= 1;"""
print(query) print(query)
result = connection.execute(query, {'item_id': item_id,'state_int':1}) df_status = self.db_engine192.read_sql(query)
print(result) if len(df_status) > 0:
df_status = pd.DataFrame(result.fetchall())
if len(df_status)>0:
df_status.columns = result.keys()
account_id = df_status.account_id.iloc[0] account_id = df_status.account_id.iloc[0]
account_secret = df_status.account_secret.iloc[0] account_secret = df_status.account_secret.iloc[0]
account_list = [account_id, account_secret] account_list = [account_id, account_secret]
print(account_list,'232323====32') print(account_list, '232323====32')
# print(111111111111)
connection.close()
return account_list return account_list
else: else:
return None return None
# except Exception as e: except Exception as e:
# print(111111111) time.sleep(30)
print(f'get_cookie_account 报错:{e}')
def get_stock_test_id(self,username):
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj"
# 修改查询语句以匹配你的数据表名称和列名称
query = text(f"""SELECT image_id FROM {table_name} WHERE account_id = :username LIMIT 1;""")
result = connection.execute(query, {'username': username})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
image_id = df_status.image_id.iloc[0]
connection.close()
return image_id
def upload_data(self, account_id, image_id, upload_time, err_msg):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
# 强制将 image_id 转换为字符串类型
image_id = str(image_id)
err_msg_json = json.dumps(err_msg)
sql_update = text(
f"UPDATE {table_name} SET upload_time = :upload_time, err_msg = :err_msg WHERE account_id = :account_id AND image_id = :image_id AND created_time < '2024-09-02 00:00:00'")
result = connection.execute(sql_update, {
"upload_time": upload_time,
"err_msg": err_msg_json,
"account_id": account_id,
"image_id": image_id
})
def upload_success_data(self, account_id, image_id, upload_time):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
# 强制将 image_id 转换为字符串类型
image_id = str(image_id)
sql_update = text(
f"UPDATE {table_name} SET upload_time = :upload_time WHERE account_id = :account_id AND image_id = :image_id AND created_time < '2024-09-02 00:00:00'")
result = connection.execute(sql_update, {
"upload_time": upload_time,
"account_id": account_id,
"image_id": image_id
})
def get_all_image_id(self): def get_all_image_id(self):
with self.db_engine192.connect() as connection: for i in range(5):
table_name = "stock_image_detail_wj" try:
sql_query = f"SELECT image_id FROM {table_name} " table_name = "stock_image_detail_wj"
df_status = pd.read_sql(sql_query, con=connection) sql_query = f"SELECT image_id FROM {table_name} "
image_id = list(df_status['image_id'].astype(str)) df_status = self.db_engine192.read_sql(sql_query)
image_id = list(df_status['image_id'].astype(str))
connection.close() return image_id
return image_id except Exception as e:
time.sleep(30)
print(f'get_all_image_id 报错:{e}')
def update_url_state_to_3(self, image_id):
for i in range(5):
try:
with self.db_engine192.begin() as connection:
table_name = "stock_image_detail_wj"
sql_update = f"UPDATE {table_name} SET state = 3 WHERE image_id ='{image_id}' and state = 1"
connection.execute(sql_update)
break
except Exception as e:
time.sleep(30)
print(f'update_url_state_to_3 报错:{e}')
def get_pic_urls(self, account_id):
pic_data_list = [] # 创建一个空列表来存储结果
table_name = "stock_image_detail_wj"
query =f"""select image_url, image_id, image_title from {table_name} where account_id = '{account_id}' and state = 1"""
try:
result_df = self.db_engine192.read_sql(query)
# print(result_df)
result_list = result_df.values.tolist()
for row in result_list: # 遍历所有的结果行
if row is not None:
# 直接构建所需格式的字符串
pic_datas = f"{row[0]}||{row[1]}||{row[2]}"
pic_data_list.append(pic_datas) # 添加到列表中
"""
['https://download.shutterstock.com/gatekeeper/W3siZCI6ICJzaHV0dGVyc3RvY2stbWVkaWEiLCAiayI6ICJwaG90by8yNDY2MDI5NDI1L2h1Z2UuanBnIiwgImRjIjogImlkbF8xMjMiLCAiZSI6IDE3NDYwMTIzNDQsICJtIjogMX0sICJBb0dOUzZDMXNiVU1XczgxMFN5YVBsUEJrakEiXQ==/shutterstock_2466029425.jpg||2466029425||Beautiful smiling model in sunglasses. Female dressed in summer hipster white T-shirt and jeans. Posing near white wall in the street. Funny and positive woman having fun outdoors, in sunglasses']
"""
if not pic_data_list:
# print("No data found for the given account_id")
return False
else:
return pic_data_list # 返回列表
except Exception as e:
print(f"An error occurred: {e}")
return False
if __name__ == '__main__': if __name__ == '__main__':
ConnectSpider().get_cookie_account(1) ConnectSpider().get_cookie_account(10)
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import sys
import os import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from time import sleep from time import sleep
from random import randint from random import randint
from all_connect import ConnectSpider from all_connect import ConnectSpider
import traceback
Con = ConnectSpider() Con = ConnectSpider()
import imaplib import imaplib
import email import email
import os import os
import time
import requests
import hashlib
os.environ['NO_PROXY'] = 'stackoverflow.com' os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging import logging
logging.captureWarnings(True) logging.captureWarnings(True)
from DrissionPage import ChromiumPage from DrissionPage import ChromiumPage,ChromiumOptions
import json import json
import requests from curl_cffi import requests
import re import re
import random import random
import time import time
...@@ -24,7 +29,6 @@ import calendar ...@@ -24,7 +29,6 @@ import calendar
import sys import sys
class GetStockImgId(object): class GetStockImgId(object):
def __init__(self): def __init__(self):
self.headers = { self.headers = {
...@@ -66,17 +70,30 @@ class GetStockImgId(object): ...@@ -66,17 +70,30 @@ class GetStockImgId(object):
) )
self.headers['user-agent'] = ua self.headers['user-agent'] = ua
def get_url_month(self,page,cookie,start_date,last_date): def get_url_month(self, page, cookie, start_date, last_date):
self.random_ua() # self.random_ua()
response = requests.get( "https://www.shutterstock.com/napi/s/dam/holdings/search?include=media-item%2Cmedia-item.track-assets%2Cmedia-item.cms-entry&sort=-licensedAt&useMms=true&channel=shutterstock&page[size]=50&filter[licensedSince]={start_date}T00%3A00%3A00Z&filter[licensedUntil]={last_date}T23%3A59%3A59Z&filter[assetStatus]=comped%2Clicensed&language=zh"
f'https://www.shutterstock.com/api/s/dam/holdings/search?include=media-item%2Cmedia-item.track-assets%2Cmedia-item.cms-entry&sort=-licensedAt&useMms=true&channel=shutterstock&page[size]=200&filter[licensedSince]={start_date}T00%3A00%3A00Z&filter[licensedUntil]={last_date}T23%3A59%3A59Z&page[number]={page}&filter[assetStatus]=comped%2Clicensed&language=zh', url = f"https://www.shutterstock.com/napi/s/dam/holdings/search?include=media-item%2Cmedia-item.track-assets%2Cmedia-item.cms-entry&sort=-licensedAt&useMms=true&channel=shutterstock&page[size]=50&filter[licensedSince]={start_date}T00%3A00%3A00Z&filter[licensedUntil]={last_date}T23%3A59%3A59Z&page[number]={page}&filter[assetStatus]=comped%2Clicensed&language=zh"
cookies=cookie, print('url:',url)
headers=self.headers, # url = "https://www.shutterstock.com/napi/s/dam/holdings/search"
) # params = {
# "include": "media-item,media-item.track-assets,media-item.cms-entry",
# "sort": "-licensedAt",
# "useMms": "true",
# "channel": "shutterstock",
# "page\\[size\\]": "50",
# "filter\\[licensedSince\\]": "2025-09-01T00:00:00Z",
# "filter\\[licensedUntil\\]": "2025-09-30T23:59:59Z",
# "filter\\[assetStatus\\]": "comped,licensed",
# "language": "zh"
# }
response = requests.get(url, headers=self.headers, cookies=cookie)
print(response) print(response)
print(response.url)
return response return response
def get_img_id(self,response,account_id,page): def get_img_id(self, response, account_id, page):
try: try:
# print(response.json()) # print(response.json())
data = response.json()['included'] data = response.json()['included']
...@@ -96,6 +113,7 @@ class GetStockImgId(object): ...@@ -96,6 +113,7 @@ class GetStockImgId(object):
data_list.append(datas) data_list.append(datas)
# 保存 # 保存
print('准备保存:')
Con.save_stock_img_id(data_list) Con.save_stock_img_id(data_list)
print(f"{account_id}第{page}页保存id成功,") print(f"{account_id}第{page}页保存id成功,")
return True return True
...@@ -136,14 +154,13 @@ class GetStockImgId(object): ...@@ -136,14 +154,13 @@ class GetStockImgId(object):
print(f"Start Date: {start_date}") print(f"Start Date: {start_date}")
print(f"Last Date: {last_date}") print(f"Last Date: {last_date}")
while is_continue: while is_continue:
try: try:
response = self.get_url_month(page, cookie, str(start_date), str(last_date)) response = self.get_url_month(page, cookie, str(start_date), str(last_date))
if response.status_code == 200: if response.status_code == 200:
# 更新是否继续标志位 # 更新是否继续标志位
is_continue = self.get_img_id(response, account_id, page) is_continue = self.get_img_id(response, account_id, page)
print('is_continue:',is_continue)
# 如果不再继续,则更新数据库并将当前账户标记为已完成 # 如果不再继续,则更新数据库并将当前账户标记为已完成
if not is_continue: if not is_continue:
Con.update_id_to_3(account_id) Con.update_id_to_3(account_id)
...@@ -160,11 +177,19 @@ class GetStockImgId(object): ...@@ -160,11 +177,19 @@ class GetStockImgId(object):
# 抛出异常以停止外层循环 # 抛出异常以停止外层循环
raise raise
class GetSS_details(): class GetSS_details():
def __init__(self): def __init__(self):
self.account = '' self.account = ''
self.pwd = '' self.pwd = ''
self.page = ChromiumPage() # self.page = ChromiumPage()
# 配置 Chrome 浏览器 - 端口 9222
chrome_options = ChromiumOptions()
chrome_options.set_browser_path(r'C:\Program Files\Google\Chrome\Application\chrome.exe')
chrome_options.set_local_port(9333) # 设置 Chrome 的调试端口
self.page = ChromiumPage(addr_or_opts=chrome_options)
print(f"Chrome 浏览器运行在端口: {9333}")
self.headers = { self.headers = {
'accept': 'application/json', 'accept': 'application/json',
'accept-language': 'zh-CN,zh;q=0.9', 'accept-language': 'zh-CN,zh;q=0.9',
...@@ -192,29 +217,33 @@ class GetSS_details(): ...@@ -192,29 +217,33 @@ class GetSS_details():
'username': 'pengyanbing@yswg.com.cn', 'username': 'pengyanbing@yswg.com.cn',
'password': 'Python3.8', 'password': 'Python3.8',
} }
self.get_microservice_token()
def get_ck(self): def get_ck(self):
print('获取登录后的cookie')
try: try:
self.page.get('https://www.shutterstock.com/zh/catalog/') self.page.get('https://www.shutterstock.com/zh/catalog/licenses')
sleep(randint(2, 4)) sleep(randint(2, 4))
# 获取 cookies 列表 # 获取 cookies 列表
original_cookies_list = self.page.cookies() original_cookies_list = self.page.cookies()
# 将 cookies 列表转换为字典 # 将 cookies 列表转换为字典
original_cookie_dict = {cookie['name']: cookie['value'] for cookie in original_cookies_list} original_cookie_dict = {cookie['name']: cookie['value'] for cookie in original_cookies_list}
print('original_cookie_dict::',original_cookie_dict)
# 检查 accts_customer_sso1 是否等于 '-undefined' # # 检查 accts_customer_sso1 是否等于 '-undefined'
if 'accts_customer_sso1' in original_cookie_dict and original_cookie_dict.get( # if 'accts_customer_sso1' in original_cookie_dict and original_cookie_dict.get(
'accts_customer_sso1') == '-undefined': # 'accts_customer_sso1') == '-undefined':
# 组合成新的值并更新 accts_customer_sso1 # # 组合成新的值并更新 accts_customer_sso1
new_value = f"{original_cookie_dict.get('htjs_user_id', '')}-undefined" # new_value = f"{original_cookie_dict.get('htjs_user_id', '')}-undefined"
original_cookie_dict['accts_customer_sso1'] = new_value # original_cookie_dict['accts_customer_sso1'] = new_value
#
keys_of_interest = ['datadome', 'accts_customer_sso1', 'next.sid'] # keys_of_interest = ['datadome', 'accts_customer_sso1', 'next.sid']
cookies = {key: original_cookie_dict[key] for key in keys_of_interest if key in original_cookie_dict} # cookies = {key: original_cookie_dict[key] for key in keys_of_interest if key in original_cookie_dict}
#
# print('filtered_cookies:', cookies) # # print('filtered_cookies:', cookies)
return cookies return original_cookie_dict
except Exception as e: except Exception as e:
print('获取cookie出错:', e) print('获取cookie出错:', e)
...@@ -225,7 +254,7 @@ class GetSS_details(): ...@@ -225,7 +254,7 @@ class GetSS_details():
login_out.click() login_out.click()
sleep(randint(2, 4)) sleep(randint(2, 4))
self.page.ele('@text()=登出').click() self.page.ele('@text()=登出').click()
else : else:
login_out = self.page.ele('.MuiAvatar-root MuiAvatar-circular MuiAvatar-colorDefault mui-1jeofke') login_out = self.page.ele('.MuiAvatar-root MuiAvatar-circular MuiAvatar-colorDefault mui-1jeofke')
if login_out: if login_out:
login_out.click() login_out.click()
...@@ -238,7 +267,7 @@ class GetSS_details(): ...@@ -238,7 +267,7 @@ class GetSS_details():
sleep(randint(2, 4)) sleep(randint(2, 4))
self.page.ele('@text()=登出').click() self.page.ele('@text()=登出').click()
def decode_body(self,body): def decode_body(self, body):
"""尝试多种编码方式解码邮件内容""" """尝试多种编码方式解码邮件内容"""
encodings = ['utf-8', 'gb18030', 'iso-8859-1', 'latin1'] encodings = ['utf-8', 'gb18030', 'iso-8859-1', 'latin1']
for encoding in encodings: for encoding in encodings:
...@@ -323,81 +352,140 @@ class GetSS_details(): ...@@ -323,81 +352,140 @@ class GetSS_details():
except: except:
pass pass
def yxyzm(self,iframe): def yxyzm(self):
print('需要输入邮箱验证码') print('需要输入邮箱验证码 等待2分钟')
sleep(randint(62, 140))
iframe = self.page.get_frame('#login-iframe')
sleep(randint(2, 4)) sleep(randint(2, 4))
yzm = self.fetch_verification_code(self.email_value_config) yzm = self.fetch_verification_code(self.email_value_config)
yzm_input = iframe.ele('.MuiFormLabel-root MuiInputLabel-root MuiInputLabel-formControl MuiInputLabel-animated MuiInputLabel-sizeSmall MuiInputLabel-standard MuiFormLabel-colorPrimary css-17839r8') try:
sleep(randint(2, 4)) print(('验证码输入'))
yzm_input.input(yzm) yzm_input = iframe.ele('@text()=输入代码')
iframe.ele('.MuiTouchRipple-root css-w0pj6f').click() sleep(randint(2, 4))
yzm_input.input(yzm)
except:
yzm_input = iframe.ele(
'.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall css-186x7cf')
sleep(randint(2, 4))
yzm_input.input(yzm)
print('点击验证')
iframe.ele('@text()=验证').click()
def get_microservice_token(self):
for i in range(5):
try:
url = "http://wx.yswg.com.cn:8000/microservice-system/system/admin/getToken"
timestamp = str(int(time.time()))
secret = "dafa17fb-0e97-4246-a6b3-d574e44d212d"
md5_value = hashlib.md5((secret + timestamp).encode("utf-8")).hexdigest()
response = requests.post(url, json={
"module": "spider",
"weChatId": "pengyanbing",
"secret": md5_value,
"timestamp": timestamp
})
res = response.json()
print(res)
if (res['code'] == 200):
userinfo = res['data']
self.token = userinfo['token']
expireTime = userinfo['expireTime']
print(self.token, expireTime)
else:
raise Exception(res['msg'])
break
except Exception as e:
print('get_microservice_token, 报错',e)
time.sleep(20)
def login(self): def login(self):
try: try:
# 打开页面 # 打开页面
self.page.get('https://www.shutterstock.com/zh/catalog/') self.page.get('https://www.shutterstock.com/zh/catalog/')
sleep(randint(2, 4)) sleep(randint(12, 24))
try: try:
# print('No thanks') # print('No thanks')
print('click No thanks') print('click No thanks')
login_button = self.page.ele('xpath://a[@id="continue"]', timeout=10) login_button = self.page.ele('xpath://a[@id="continue"]', timeout=15)
login_button.click() login_button.click()
except: except:
print('No thanks 错误') print('No thanks 错误')
print('开始登录。', self.account, self.pwd)
# 判断是否在登录状态 # 判断是否在登录状态
self.login_out() # self.login_out()
# 查找并点击登录按钮 # 查找并点击登录按钮
login_button = self.page.ele('xpath://a[@data-automation="loginButton"]', timeout=10) login_button = self.page.ele('xpath://a[@data-automation="loginButton"]', timeout=15)
login_button.click() login_button.click()
sleep(randint(2, 4)) sleep(randint(12,24))
# 等待页面加载,切换到 iframe # 等待页面加载,切换到 iframe
iframe = self.page.get_frame('#login-iframe') iframe = self.page.get_frame('#login-iframe')
print('已切换到 login-iframe') print('已切换到 login-iframe')
# 查找并输入邮箱 # 查找并输入邮箱
print("正在等待邮箱输入框...") print("正在等待邮箱输入框...")
sleep(10) sleep(15)
email_input = iframe.ele('.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall css-186x7cf') # email_input = iframe.ele('.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall css-186x7cf')
email_input = iframe.ele('xpath://input[@name="username"]')
email_input.clear() # 清除任何预填充的内容 email_input.clear() # 清除任何预填充的内容
email_input.input(self.account) # 输入文本 email_input.input(self.account) # 输入文本
print("已输入账号到邮箱输入框") print("已输入账号到邮箱输入框")
sleep(randint(2, 4))
# 查找并输入密码 # 查找并输入密码
print("正在等待密码输入框...") print("正在等待密码输入框...")
email_input = iframe.ele('.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall MuiInputBase-inputAdornedEnd css-186x7cf') email_input = iframe.ele(
'.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall MuiInputBase-inputAdornedEnd css-186x7cf')
email_input.clear() # 清除任何预填充的内容 email_input.clear() # 清除任何预填充的内容
email_input.input(self.pwd) email_input.input(self.pwd)
print("已输入密码到密码输入框") print("已输入密码到密码输入框")
sleep(randint(2, 4)) sleep(randint(3, 5))
# 查找并点击登录按钮 # 查找并点击登录按钮
submit_button = iframe.ele('.MuiButtonBase-root MuiButton-root MuiButton-contained MuiButton-containedPrimary MuiButton-sizeMedium MuiButton-containedSizeMedium MuiButton-disableElevation MuiButton-fullWidth css-df622d') print('查找并点击登录按钮')
submit_button.click() # submit_button = iframe.ele(
# '.MuiButtonBase-root MuiButton-root MuiButton-contained MuiButton-containedPrimary MuiButton-sizeMedium MuiButton-containedSizeMedium MuiButton-disableElevation MuiButton-fullWidth css-1w8itp0')
try:
submit_button = iframe.ele('.LoginForm_bottomSpacingMd__e2Mnm')
submit_button.click()
except:
print('切换点击')
sleep(randint(3, 4))
iframe.ele('.MuiButtonBase-root MuiButton-root MuiButton-contained MuiButton-containedPrimary MuiButton-sizeMedium MuiButton-containedSizeMedium MuiButton-disableElevation MuiButton-fullWidth css-1is1osn').click()
print('已点击登录...') print('已点击登录...')
sleep(randint(8, 15))
except Exception as e: except Exception as e:
print(f"出现错误: {e}") print(f"出现错误: {e}", f"\n{traceback.format_exc()}")
return False return False
try: try:
sleep(randint(5, 8)) print(33333333333)
iframe = self.page.get_frame('#login-iframe')
sleep(randint(4, 8))
h3_element = iframe.ele( h3_element = iframe.ele(
'.MuiFormLabel-root MuiInputLabel-root MuiInputLabel-formControl MuiInputLabel-animated MuiInputLabel-sizeSmall MuiInputLabel-standard MuiFormLabel-colorPrimary css-17839r8') '.FormHeader_root__fHtRy wrapper-component_center__zG6GW')
if h3_element: h3_ = iframe.ele('@text()=输入验证代码')# 要继续,请输入发送到您电子邮件中的代码
self.yxyzm(iframe) h3_1 = iframe.ele('xpath://h3[contains(text(),"输入验证代码")]')# 要继续,请输入发送到您电子邮件中的代码
P_text1 = iframe.ele('xpath://p[contains(text(),"未收到代码?单击")]', timeout=15)
P_text2 = iframe.ele('xpath://p[contains(text(),"电子邮件中")]', timeout=15)
if h3_element or h3_ or h3_1 or P_text1 or P_text2 or '输入验证代码' in iframe.html or '输入验证代码' in self.page.html:
self.yxyzm()
else: else:
print('不需要验证码') print('不需要验证码')
sleep(10) sleep(10)
self.page.refresh()
sleep(randint(5, 10))
self.page.get('https://www.shutterstock.com/zh/catalog/licenses')
sleep(randint(4, 8))
ck = self.get_ck() ck = self.get_ck()
return ck return ck
except Exception as e: except Exception as e:
print(e) print(e)
print('不需要验证码') print('不需要验证码11111111111')
sleep(randint(5, 8)) sleep(randint(5, 8))
ck = self.get_ck() ck = self.get_ck()
return ck return ck
...@@ -416,16 +504,22 @@ class GetSS_details(): ...@@ -416,16 +504,22 @@ class GetSS_details():
max_retries = 3 max_retries = 3
retries = 0 retries = 0
while retries <= max_retries: while retries <= max_retries:
headers = {
"authorization": self.token
}
try: try:
response = requests.post(url, data=data_json) response = requests.post(url, data=data_json,headers=headers)
if response.status_code == 200: if response.status_code == 200:
return response.json() return response.json()
else: else:
print(url,'2323')
print(f'请求失败,状态码: {response.status_code},重试 ({retries}/{max_retries})') print(f'请求失败,状态码: {response.status_code},重试 ({retries}/{max_retries})')
retries += 1 retries += 1
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print(f'请求异常: {e},重试 ({retries}/{max_retries})') print(f'请求异常: {e},重试 ({retries}/{max_retries})')
retries += 1 retries += 1
self.get_microservice_token()
raise Exception(f'请求失败,已达到最大重试次数:{max_retries} 次') raise Exception(f'请求失败,已达到最大重试次数:{max_retries} 次')
def get_jpg(self, cookies, image_id): def get_jpg(self, cookies, image_id):
...@@ -544,12 +638,12 @@ class GetSS_details(): ...@@ -544,12 +638,12 @@ class GetSS_details():
if retry > max_retries: if retry > max_retries:
logging.warning("超过重试次数,跳过该图片") logging.warning("超过重试次数,跳过该图片")
return False return False
sleep_time = [random.randint(60, 180), random.randint(180, 240), random.randint(1800, 1900)][retry - 1] sleep_time = [random.randint(60, 180), random.randint(180, 240), random.randint(1800, 1900)][
retry - 1]
logging.warning(f"未知错误,等待{sleep_time}s 第{retry}次重试...") logging.warning(f"未知错误,等待{sleep_time}s 第{retry}次重试...")
time.sleep(sleep_time) time.sleep(sleep_time)
continue # 继续下一次重试 continue # 继续下一次重试
def run_get_stock_img_id(self, account, cookie): def run_get_stock_img_id(self, account, cookie):
"""封装GetStockImgId.run()调用""" """封装GetStockImgId.run()调用"""
try: try:
...@@ -564,9 +658,9 @@ class GetSS_details(): ...@@ -564,9 +658,9 @@ class GetSS_details():
day = time.strftime("%d") day = time.strftime("%d")
for item_id in range(1, 33): for item_id in range(1, 33):
print(f"开始抓取 item_id: {item_id}") print(f"开始抓取 item_id: {item_id}")
self.page.clear_cache() # 清除浏览器缓存和session信息。下一个账号直接登录。优化上一个账号没有退出导致新账号登录失败 self.page.clear_cache() # 清除浏览器缓存和session信息。下一个账号直接登录。优化上一个账号没有退出导致新账号登录失败
if item_id == 1 and int(day)<2: if item_id == 1 and int(day) < 2:
Con.update_all_states_to_1(state=2) Con.update_all_states_to_1(state=2)
wait_time = random.uniform(6, 10) wait_time = random.uniform(6, 10)
...@@ -584,7 +678,7 @@ class GetSS_details(): ...@@ -584,7 +678,7 @@ class GetSS_details():
image_id_id_pairs = Con.get_stock_images_id(self.account) image_id_id_pairs = Con.get_stock_images_id(self.account)
if not image_id_id_pairs: if not image_id_id_pairs:
print(f'{self.account} 已全部爬取完成') print(f'{self.account} 已全部爬取完成')
Con.update_all_states_to_1(state=3,item_id=item_id) Con.update_all_states_to_1(state=3, item_id=item_id)
continue continue
counts_start = 0 counts_start = 0
...@@ -594,7 +688,8 @@ class GetSS_details(): ...@@ -594,7 +688,8 @@ class GetSS_details():
image_id, item_id_str, image_title, image_size_info = image_id_id_pairs[count].split('||-||') image_id, item_id_str, image_title, image_size_info = image_id_id_pairs[count].split('||-||')
print(f'执行 {self.account}: {image_id}, {item_id_str}, 计数: {count}') print(f'执行 {self.account}: {image_id}, {item_id_str}, 计数: {count}')
try: try:
chong_shi = self.get_pic(self.account, image_id, item_id_str, image_title, image_size_info, cookie, wait_time) chong_shi = self.get_pic(self.account, image_id, item_id_str, image_title, image_size_info,
cookie, wait_time)
if not chong_shi: if not chong_shi:
stop_flag = True stop_flag = True
break break
...@@ -615,11 +710,5 @@ class GetSS_details(): ...@@ -615,11 +710,5 @@ class GetSS_details():
break break
if __name__ == '__main__': if __name__ == '__main__':
GetSS_details().run() GetSS_details().run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment