Commit 4825673f by Peng

解决多个任务操作队列读取操作,避免某个线程被挂起。通过ai进行优化整体代码,多线程相互争夺资源的控制

parent 0d70b338
...@@ -2,14 +2,14 @@ import sys ...@@ -2,14 +2,14 @@ import sys
import os import os
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from amazon_params import py_ja3 # from amazon_params import py_ja3
from amazon_save_db.save_asin_detail_pg import Save_asin_detail from amazon_save_db.save_asin_detail_pg import Save_asin_detail
from utils.asin_parse import ParseAsinUs from utils.asin_parse import ParseAsinUs
from queue import Queue from amazon_params import py_ja3
from queue import Queue, Empty
import time import time
import re import re
from lxml import etree from lxml import etree
import requests
import urllib3 import urllib3
import threading import threading
from func_timeout.exceptions import FunctionTimedOut from func_timeout.exceptions import FunctionTimedOut
...@@ -17,7 +17,8 @@ import traceback ...@@ -17,7 +17,8 @@ import traceback
from datetime import datetime from datetime import datetime
import gzip import gzip
import json import json
# from curl_cffi import requests as curl from curl_cffi import requests
# import requests as requests2
from kafka.errors import KafkaTimeoutError from kafka.errors import KafkaTimeoutError
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
urllib3.disable_warnings() urllib3.disable_warnings()
...@@ -42,6 +43,7 @@ class async_asin_pg(): ...@@ -42,6 +43,7 @@ class async_asin_pg():
self.asin_not_div_id_dp_list = [] # 返回html没有包含div @id=dp,状态13 self.asin_not_div_id_dp_list = [] # 返回html没有包含div @id=dp,状态13
self.asin_list_update = [] # 3 self.asin_list_update = [] # 3
self.cookies_queue = Queue() # cookie队列 self.cookies_queue = Queue() # cookie队列
self.cookie_refill_lock = threading.Lock() # cookie重填锁
self.item_queue = Queue() # 存储 item 详情数据队列 self.item_queue = Queue() # 存储 item 详情数据队列
self.queries_asin_queue = Queue() # 需要爬取的asin队列 self.queries_asin_queue = Queue() # 需要爬取的asin队列
self.buyBox_list = [] # 卖家名称 url 列表 self.buyBox_list = [] # 卖家名称 url 列表
...@@ -70,14 +72,16 @@ class async_asin_pg(): ...@@ -70,14 +72,16 @@ class async_asin_pg():
self.topic_detail_month = f'{self.site_name}_asin_detail_month_2026_{self.month_}' self.topic_detail_month = f'{self.site_name}_asin_detail_month_2026_{self.month_}'
self.topic_asin_html = f'asin_html_2026_{self.month_}' self.topic_asin_html = f'asin_html_2026_{self.month_}'
self.asin_video_list = [] self.asin_video_list = []
# 修复:sess 改为类成员变量,只 mount 一次
self.sess = requests.Session()
self.sess.mount(self.site_url, py_ja3.DESAdapter())
def get_asin(self): def get_asin(self):
while True: while True:
if not self.queries_asin_queue.empty(): try:
querys = self.queries_asin_queue.get() querys = self.queries_asin_queue.get_nowait()
except Empty:
print(f"当前线程-已完成-爬取-跳出循环")
break
with self.cookie_refill_lock:
if self.cookies_queue.empty(): if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie() cookies_dict = self.reuests_para_val.get_cookie()
self.cookie_dict_delete_id = cookies_dict self.cookie_dict_delete_id = cookies_dict
...@@ -105,11 +109,16 @@ class async_asin_pg(): ...@@ -105,11 +109,16 @@ class async_asin_pg():
self.request_total_count_list.append(4) self.request_total_count_list.append(4)
print('scraper_url::', scraper_url) print('scraper_url::', scraper_url)
try: try:
resp = self.sess.get(scraper_url, headers=headers, # sess = requests2.Session()
timeout=10, verify=False) # sess.mount(self.site_url, py_ja3.DESAdapter())
# resp = requests.get(scraper_url, headers=headers,
# timeout=20)
resp = requests.get(scraper_url, headers=headers,
timeout=30, verify=False, impersonate="chrome")
# with open(rf'D:\新建文件夹\html_selenium_files\{self.site_name}_211123333_{asin}.html', 'w', encoding='utf-8')as f: # with open(rf'D:\新建文件夹\html_selenium_files\{self.site_name}_211123333_{asin}.html', 'w', encoding='utf-8')as f:
# f.write(resp.text) # f.write(resp.text)
if self.reuests_para_val.check_amazon_yzm(resp): if self.reuests_para_val.check_amazon_yzm(resp):
print('出现验证码::','#' * 80)
self.yzm_err_total_list.append(1) self.yzm_err_total_list.append(1)
self.headers_num_int += 1 self.headers_num_int += 1
self.requests_error_asin_list.append(query[0]) self.requests_error_asin_list.append(query[0])
...@@ -120,6 +129,7 @@ class async_asin_pg(): ...@@ -120,6 +129,7 @@ class async_asin_pg():
if 'Received response with content-encoding: gzip' in str(e): if 'Received response with content-encoding: gzip' in str(e):
self.asin_not_found_list.append(asin) self.asin_not_found_list.append(asin)
else: else:
print('2233请求错误错误::', '#' * 80)
self.requests_error_asin_list.append(query[0]) self.requests_error_asin_list.append(query[0])
continue continue
response_url = resp.url response_url = resp.url
...@@ -148,19 +158,23 @@ class async_asin_pg(): ...@@ -148,19 +158,23 @@ class async_asin_pg():
print(ingress, ' 打印 邮编 ', resp.url) print(ingress, ' 打印 邮编 ', resp.url)
if ingress: if ingress:
if self.reuests_para_val.check_amazon_ingress(ingress): if self.reuests_para_val.check_amazon_ingress(ingress):
try: ubid_list = re.findall(r'ubid-main=(.*?);', cookie_str)
cookie_ubid_main_id = re.findall(r'ubid-main=(.*?);', cookie_str)[0] if ubid_list:
except: cookie_ubid_main_id = ubid_list[0]
cookie_ubid_main_id = re.findall(r'session-id=(.*?);', cookie_str)[0] else:
session_list = re.findall(r'session-id=(.*?);', cookie_str)
cookie_ubid_main_id = session_list[0] if session_list else None
for cookie_key_value in self.cookie_dict_delete_id.items(): for cookie_key_value in self.cookie_dict_delete_id.items():
if cookie_ubid_main_id in cookie_key_value[1]: if cookie_ubid_main_id in cookie_key_value[1]:
self.delete_cookies_list.append(cookie_key_value[0]) self.delete_cookies_list.append(cookie_key_value[0])
print(ingress,'邮编 错误 ::', '#' * 80)
self.requests_error_asin_list.append(asin)
continue
else:
self.requests_error_asin_list.append(asin) self.requests_error_asin_list.append(asin)
continue continue
div_dp = response_s.xpath('//div[@id="dp"]') div_dp = response_s.xpath('//div[@id="dp"]')
if div_dp: if div_dp:
# 解析resp=_response_text, asin=asin
items = ParseAsinUs(resp=response, asin=asin, month=self.month_, date_info=date_info, items = ParseAsinUs(resp=response, asin=asin, month=self.month_, date_info=date_info,
site_name=self.site_name).xpath_html() site_name=self.site_name).xpath_html()
new_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") new_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
...@@ -225,7 +239,7 @@ class async_asin_pg(): ...@@ -225,7 +239,7 @@ class async_asin_pg():
print('第二次请求:', _url) print('第二次请求:', _url)
try: try:
_response_text = None _response_text = None
_response_text = self.reuests_para_val.requests_amazon(headers=headers, scraper_url=_url) _response_text = self.reuests_para_val.requests_amazon(headers=headers, scraper_url=_url, sess=None)
if _response_text: if _response_text:
_items = ParseAsinUs(resp=_response_text, asin=asin, month=self.month_, _items = ParseAsinUs(resp=_response_text, asin=asin, month=self.month_,
date_info=date_info, date_info=date_info,
...@@ -264,7 +278,8 @@ class async_asin_pg(): ...@@ -264,7 +278,8 @@ class async_asin_pg():
item['five_star'] = _items["five_star"] item['five_star'] = _items["five_star"]
if item['four_star'] is None: if item['four_star'] is None:
item['four_star'] = _items["four_star"] item['four_star'] = _items["four_star"]
# 修复:删除重复的 four_star 检查 if item['three_star'] is None:
item['three_star'] = _items["three_star"]
if item['two_star'] is None: if item['two_star'] is None:
item['two_star'] = _items["two_star"] item['two_star'] = _items["two_star"]
if item['one_star'] is None: if item['one_star'] is None:
...@@ -297,7 +312,7 @@ class async_asin_pg(): ...@@ -297,7 +312,7 @@ class async_asin_pg():
_url = self.site_url + 'dp/' + _to_asin + "?th=1&psc=1" _url = self.site_url + 'dp/' + _to_asin + "?th=1&psc=1"
print('请求asin 出现缺货,拿变体asin进行请求:', _url) print('请求asin 出现缺货,拿变体asin进行请求:', _url)
_response_text_var = self.reuests_para_val.requests_amazon(headers=headers, _response_text_var = self.reuests_para_val.requests_amazon(headers=headers,
scraper_url=_url) scraper_url=_url, sess=None)
_to_items = ParseAsinUs(resp=_response_text_var, asin=asin, month=self.month_, _to_items = ParseAsinUs(resp=_response_text_var, asin=asin, month=self.month_,
date_info=date_info, date_info=date_info,
site_name=self.site_name).xpath_html() site_name=self.site_name).xpath_html()
...@@ -349,6 +364,7 @@ class async_asin_pg(): ...@@ -349,6 +364,7 @@ class async_asin_pg():
if item["launch_time"] is None and item["rank"] is None and item['weight'] is None and item[ if item["launch_time"] is None and item["rank"] is None and item['weight'] is None and item[
'product_detail_json'] is None and len(items['div_id_list']) < 1: 'product_detail_json'] is None and len(items['div_id_list']) < 1:
print('上架时间 排名 重量 底部信息 如果都为None 重新抓取:::', asin) print('上架时间 排名 重量 底部信息 如果都为None 重新抓取:::', asin)
print(ingress, '上架时间 排名 重量 底部信息 如果都为None ::', '#' * 80)
self.requests_error_asin_list.append(asin) self.requests_error_asin_list.append(asin)
continue continue
if (self.reuests_para_val.check_contain_chinese(item['title'])) or ( if (self.reuests_para_val.check_contain_chinese(item['title'])) or (
...@@ -373,7 +389,7 @@ class async_asin_pg(): ...@@ -373,7 +389,7 @@ class async_asin_pg():
item['img_list'] = None item['img_list'] = None
if item['img_list'] is None: if item['img_list'] is None:
item['img_list'] = [] item['img_list'] = json.dumps([])
self.item_queue.put(item) self.item_queue.put(item)
# 获取字段值为None的字段名称写入redis进行统计 # 获取字段值为None的字段名称写入redis进行统计
none_keys = [key for key, value in item.items() if none_keys = [key for key, value in item.items() if
...@@ -412,14 +428,12 @@ class async_asin_pg(): ...@@ -412,14 +428,12 @@ class async_asin_pg():
self.reuests_para_val.send_kafka(html_data=html_data, topic=self.topic_asin_html) self.reuests_para_val.send_kafka(html_data=html_data, topic=self.topic_asin_html)
else: else:
if 'Click the button below to continue shopping' in response: if 'Click the button below to continue shopping' in response:
print(ingress, 'Click the button below to continue shopping ::', '#' * 80)
self.requests_error_asin_list.append(query[0]) self.requests_error_asin_list.append(query[0])
else: else:
print('状态13', asin) print('状态13', asin)
self.asin_not_div_id_dp_list.append(asin) self.asin_not_div_id_dp_list.append(asin)
continue continue
else:
print(f"当前线程-已完成-爬取-跳出循环")
break
# 压缩字符串 # 压缩字符串
def compress_string(self, input_string): def compress_string(self, input_string):
...@@ -474,7 +488,7 @@ class async_asin_pg(): ...@@ -474,7 +488,7 @@ class async_asin_pg():
# site_name=self.site_name).xpath_html() # site_name=self.site_name).xpath_html()
# print(items) # print(items)
asin_list = self.save_asin_detail.read_db_data() asin_list = self.save_asin_detail.read_db_data()
# asin_list = ['B0FV8W9T52|2025-01|1|1|null|null'] # asin_list = ['B0FM433BGV|2025-01|1|1|null|null']
if asin_list: if asin_list:
for asin in asin_list: for asin in asin_list:
self.queries_asin_queue.put(asin) self.queries_asin_queue.put(asin)
...@@ -484,7 +498,7 @@ class async_asin_pg(): ...@@ -484,7 +498,7 @@ class async_asin_pg():
for ck in cookies_dict.values(): for ck in cookies_dict.values():
self.cookies_queue.put(ck) self.cookies_queue.put(ck)
html_thread = [] html_thread = []
for i in range(25): for i in range(20):
thread2 = threading.Thread(target=self.get_asin) thread2 = threading.Thread(target=self.get_asin)
thread2.start() thread2.start()
html_thread.append(thread2) html_thread.append(thread2)
...@@ -557,4 +571,4 @@ class async_asin_pg(): ...@@ -557,4 +571,4 @@ class async_asin_pg():
pass pass
# if __name__ == '__main__': # if __name__ == '__main__':
# async_asin_pg(month=12, spider_int=1, week=14,site_name='us').run() # async_asin_pg(month='02', spider_int=1, week=14,site_name='us').run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment