Commit 0d70b338 by Peng

no message

parent e8e1be65
...@@ -20,7 +20,6 @@ import json ...@@ -20,7 +20,6 @@ import json
# from curl_cffi import requests as curl # from curl_cffi import requests as curl
from kafka.errors import KafkaTimeoutError from kafka.errors import KafkaTimeoutError
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
sess = requests.Session()
urllib3.disable_warnings() urllib3.disable_warnings()
...@@ -46,11 +45,8 @@ class async_asin_pg(): ...@@ -46,11 +45,8 @@ class async_asin_pg():
self.item_queue = Queue() # 存储 item 详情数据队列 self.item_queue = Queue() # 存储 item 详情数据队列
self.queries_asin_queue = Queue() # 需要爬取的asin队列 self.queries_asin_queue = Queue() # 需要爬取的asin队列
self.buyBox_list = [] # 卖家名称 url 列表 self.buyBox_list = [] # 卖家名称 url 列表
self.asin_detail_list = [] # 存储asin 详情的列表
self.buyBoxname_asin_list = [] # asin 卖家的列表item self.buyBoxname_asin_list = [] # asin 卖家的列表item
self.delete_cookies_list = [] # 存储出现中国邮编的cookie self.delete_cookies_list = [] # 存储出现中国邮编的cookie
self.stop_item_queue = True # 用于是否退出循环存储的条件
self.spider_de_feedback = False
self.cookie_dict_delete_id = {} self.cookie_dict_delete_id = {}
self.headers_num_int = 0 self.headers_num_int = 0
self.star_list = [] # 存储星级百分比 self.star_list = [] # 存储星级百分比
...@@ -59,7 +55,6 @@ class async_asin_pg(): ...@@ -59,7 +55,6 @@ class async_asin_pg():
self.bs_category_asin_list = [] # 存储 asin 详情 bsr 文本类目 self.bs_category_asin_list = [] # 存储 asin 详情 bsr 文本类目
self.bs_category_asin_list_pg = [] # 存储 asin 详情 bsr 文本类目 self.bs_category_asin_list_pg = [] # 存储 asin 详情 bsr 文本类目
self.month_ = month self.month_ = month
self.week = week
# 返回 对应站点的host,首页链接 # 返回 对应站点的host,首页链接
self.site_url, self.host = self.reuests_para_val.get_site_url(self.site_name) self.site_url, self.host = self.reuests_para_val.get_site_url(self.site_name)
# 验证码 1 # 验证码 1
...@@ -72,13 +67,16 @@ class async_asin_pg(): ...@@ -72,13 +67,16 @@ class async_asin_pg():
self.hour_total_count_list = [] self.hour_total_count_list = []
# 总请求 4 # 总请求 4
self.request_total_count_list = [] self.request_total_count_list = []
self.topic_detail_month = f'{self.site_name}_asin_detail_month_2025_{self.month_}' self.topic_detail_month = f'{self.site_name}_asin_detail_month_2026_{self.month_}'
self.topic_asin_html = f'asin_html_2025_{self.month_}' self.topic_asin_html = f'asin_html_2026_{self.month_}'
self.asin_video_list = [] self.asin_video_list = []
# 修复:sess 改为类成员变量,只 mount 一次
self.sess = requests.Session()
self.sess.mount(self.site_url, py_ja3.DESAdapter())
def get_asin(self): def get_asin(self):
while True: while True:
if self.queries_asin_queue.empty() == False: if not self.queries_asin_queue.empty():
querys = self.queries_asin_queue.get() querys = self.queries_asin_queue.get()
if self.cookies_queue.empty(): if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie() cookies_dict = self.reuests_para_val.get_cookie()
...@@ -98,7 +96,7 @@ class async_asin_pg(): ...@@ -98,7 +96,7 @@ class async_asin_pg():
asin=asin, scraper_url=None) asin=asin, scraper_url=None)
headers["cookie"] = cookie_str headers["cookie"] = cookie_str
self.month_ = date_info.split('-')[1] self.month_ = date_info.split('-')[1]
if self.headers_num_int > 20: if self.headers_num_int > 20: # 亚马逊出现超过20次ip已经被封锁。退出抓取切换ip。
break break
if is_variat == '1': if is_variat == '1':
scraper_url = self.site_url + 'dp/' + query[0] + "?th=1&psc=1" scraper_url = self.site_url + 'dp/' + query[0] + "?th=1&psc=1"
...@@ -107,10 +105,7 @@ class async_asin_pg(): ...@@ -107,10 +105,7 @@ class async_asin_pg():
self.request_total_count_list.append(4) self.request_total_count_list.append(4)
print('scraper_url::', scraper_url) print('scraper_url::', scraper_url)
try: try:
from urllib.parse import urlparse resp = self.sess.get(scraper_url, headers=headers,
sess.mount(self.site_url, py_ja3.DESAdapter())
resp = sess.get(scraper_url, headers=headers,
timeout=10, verify=False) timeout=10, verify=False)
# with open(rf'D:\新建文件夹\html_selenium_files\{self.site_name}_211123333_{asin}.html', 'w', encoding='utf-8')as f: # with open(rf'D:\新建文件夹\html_selenium_files\{self.site_name}_211123333_{asin}.html', 'w', encoding='utf-8')as f:
# f.write(resp.text) # f.write(resp.text)
...@@ -226,10 +221,7 @@ class async_asin_pg(): ...@@ -226,10 +221,7 @@ class async_asin_pg():
_response_text = None _response_text = None
if item['variat_num'] > 0 and is_variat == '0': if item['variat_num'] > 0 and is_variat == '0':
self.request_total_count_list.append(4) self.request_total_count_list.append(4)
if item['variat_num'] > 0: _url = self.site_url + 'dp/' + asin + "?th=1&psc=1"
_url = self.site_url + 'dp/' + asin + "?th=1&psc=1"
else:
_url = self.site_url + 'dp/' + asin + '?th=1'
print('第二次请求:', _url) print('第二次请求:', _url)
try: try:
_response_text = None _response_text = None
...@@ -272,8 +264,7 @@ class async_asin_pg(): ...@@ -272,8 +264,7 @@ class async_asin_pg():
item['five_star'] = _items["five_star"] item['five_star'] = _items["five_star"]
if item['four_star'] is None: if item['four_star'] is None:
item['four_star'] = _items["four_star"] item['four_star'] = _items["four_star"]
if item['four_star'] is None: # 修复:删除重复的 four_star 检查
item['four_star'] = _items["four_star"]
if item['two_star'] is None: if item['two_star'] is None:
item['two_star'] = _items["two_star"] item['two_star'] = _items["two_star"]
if item['one_star'] is None: if item['one_star'] is None:
...@@ -286,8 +277,10 @@ class async_asin_pg(): ...@@ -286,8 +277,10 @@ class async_asin_pg():
item['node_id'] = _items["node_id"] item['node_id'] = _items["node_id"]
if item['review_json_list'] is None: if item['review_json_list'] is None:
item['review_json_list'] = _items["review_json_list"] item['review_json_list'] = _items["review_json_list"]
if item['fbm_delivery_pric'] is None: if item['fbm_delivery_price'] is None:
item['fbm_delivery_price'] = _items["fbm_delivery_price"] item['fbm_delivery_price'] = _items["fbm_delivery_price"]
if item['review_ai_text'] is None:
item['review_ai_text'] = _items["review_ai_text"]
except: except:
pass pass
_response_text_var = None _response_text_var = None
...@@ -331,16 +324,26 @@ class async_asin_pg(): ...@@ -331,16 +324,26 @@ class async_asin_pg():
if not item['title'] or not item['img_url']: if not item['title'] or not item['img_url']:
self.asin_not_sure_list.append(asin) self.asin_not_sure_list.append(asin)
continue continue
elif len(item['img_url'].strip()) > 2: # 修复:检查 img_url 是否包含无效值
img_url_invalid = False
if item['img_url'] and len(item['img_url'].strip()) > 2:
for key in ['None', 'null', 'none']: for key in ['None', 'null', 'none']:
if key in item['img_url']: if key in item['img_url']:
self.asin_not_sure_list.append(asin) img_url_invalid = True
continue break
elif len(item['title'].strip()) > 2: if img_url_invalid:
self.asin_not_sure_list.append(asin)
continue
# 修复:检查 title 是否包含无效值
title_invalid = False
if item['title'] and len(item['title'].strip()) > 2:
for key in ['None', 'null', 'none']: for key in ['None', 'null', 'none']:
if key in item['title']: if key in item['title']:
self.asin_not_sure_list.append(asin) title_invalid = True
continue break
if title_invalid:
self.asin_not_sure_list.append(asin)
continue
print('itemitem:::', item) print('itemitem:::', item)
# 上架时间 排名 重量 底部信息 如果都为None 重新抓取 # 上架时间 排名 重量 底部信息 如果都为None 重新抓取
if item["launch_time"] is None and item["rank"] is None and item['weight'] is None and item[ if item["launch_time"] is None and item["rank"] is None and item['weight'] is None and item[
...@@ -369,10 +372,9 @@ class async_asin_pg(): ...@@ -369,10 +372,9 @@ class async_asin_pg():
else: else:
item['img_list'] = None item['img_list'] = None
self.item_queue.put(item)
if item['img_list'] is None: if item['img_list'] is None:
item['img_list'] = [] item['img_list'] = []
self.item_queue.put(item)
# 获取字段值为None的字段名称写入redis进行统计 # 获取字段值为None的字段名称写入redis进行统计
none_keys = [key for key, value in item.items() if none_keys = [key for key, value in item.items() if
(value is None) or (value == -1 and key == 'price') or ( (value is None) or (value == -1 and key == 'price') or (
...@@ -395,19 +397,19 @@ class async_asin_pg(): ...@@ -395,19 +397,19 @@ class async_asin_pg():
pass pass
self.reuests_para_val.send_kafka(items=item, topic=self.topic_detail_month) self.reuests_para_val.send_kafka(items=item, topic=self.topic_detail_month)
print(asin, 'rank 排名:', item['rank']) print(asin, 'rank 排名:', item['rank'])
if item['rank']: if item['rank'] is not None and item['rank'] < 9000:
if (item['rank'] < 9000): # requests_num 代表不同类型url请求返回的源码。
if _response_text_var: if _response_text_var: # 请求asin 出现缺货,拿变体asin进行请求
requests_num = 2 requests_num = 2
response_gzip = self.compress_string(_response_text_var) response_gzip = self.compress_string(_response_text_var)
elif _response_text: elif _response_text: # 发现有变体。导入asin没有标记。重新请求第二次请求
requests_num = 1 requests_num = 1
response_gzip = self.compress_string(_response_text) response_gzip = self.compress_string(_response_text)
else: else:
requests_num = 0 requests_num = 0 # 第一次请求返回源码
response_gzip = self.compress_string(response) response_gzip = self.compress_string(response)
html_data = f'{self.site_name}|-||=|-|=||-|{asin}|-||=|-|=||-|{response_gzip}|-||=|-|=||-|{new_date}|-||=|-|=||-|{requests_num}' html_data = f'{self.site_name}|-||=|-|=||-|{asin}|-||=|-|=||-|{response_gzip}|-||=|-|=||-|{new_date}|-||=|-|=||-|{requests_num}'
self.reuests_para_val.send_kafka(html_data=html_data, topic=self.topic_asin_html) self.reuests_para_val.send_kafka(html_data=html_data, topic=self.topic_asin_html)
else: else:
if 'Click the button below to continue shopping' in response: if 'Click the button below to continue shopping' in response:
self.requests_error_asin_list.append(query[0]) self.requests_error_asin_list.append(query[0])
...@@ -435,31 +437,19 @@ class async_asin_pg(): ...@@ -435,31 +437,19 @@ class async_asin_pg():
self.asin_not_div_id_dp_list = [] # 13 返回html没有包含div @id=dp,状态13 self.asin_not_div_id_dp_list = [] # 13 返回html没有包含div @id=dp,状态13
self.requests_error_asin_list = [] # 1 self.requests_error_asin_list = [] # 1
self.asin_list_update = [] # 3 self.asin_list_update = [] # 3
self.item_queue = Queue() # 存储 item 详情数据队列
self.queries_asin_queue = Queue() # 需要爬取的asin队列 self.queries_asin_queue = Queue() # 需要爬取的asin队列
self.buyBox_list = [] # 卖家名称 url 列表
self.asin_detail_list = [] # 存储asin 详情的列表
self.buyBoxname_asin_list = [] # asin 卖家的列表
self.delete_cookies_list = [] # 存储出现中国邮编的cookie self.delete_cookies_list = [] # 存储出现中国邮编的cookie
self.star_list = [] self.star_list = []
self.add_cart_asin_list = [] # 存储绑定购买的asin self.add_cart_asin_list = [] # 存储绑定购买的asin
self.asin_brand_list = []
self.bs_category_asin_list = []
self.bs_category_asin_list_pg = []
# 关闭redis # 关闭redis
self.redis14.close() self.redis14.close()
self.reuests_para_val.kafuka_producer_str.close(timeout=10) self.reuests_para_val.kafuka_producer_str.close(timeout=10)
self.asin_video_list = [] self.asin_video_list = []
self.cookies_queue = Queue() # cookie队列 self.cookies_queue = Queue() # cookie队列
self.item_queue = Queue() # 存储 item 详情数据队列 self.item_queue = Queue() # 存储 item 详情数据队列
self.queries_asin_queue = Queue() # 需要爬取的asin队列
self.buyBox_list = [] # 卖家名称 url 列表 self.buyBox_list = [] # 卖家名称 url 列表
self.asin_detail_list = [] # 存储asin 详情的列表
self.buyBoxname_asin_list = [] # asin 卖家的列表item self.buyBoxname_asin_list = [] # asin 卖家的列表item
self.delete_cookies_list = [] # 存储出现中国邮编的cookie
self.cookie_dict_delete_id = {} self.cookie_dict_delete_id = {}
self.star_list = [] # 存储星级百分比
self.add_cart_asin_list = [] # 存储 绑定购买的asin
self.asin_brand_list = [] # 存储asin 对应 的品牌 self.asin_brand_list = [] # 存储asin 对应 的品牌
self.bs_category_asin_list = [] # 存储 asin 详情 bsr 文本类目 self.bs_category_asin_list = [] # 存储 asin 详情 bsr 文本类目
self.bs_category_asin_list_pg = [] # 存储 asin 详情 bsr 文本类目 self.bs_category_asin_list_pg = [] # 存储 asin 详情 bsr 文本类目
...@@ -475,9 +465,16 @@ class async_asin_pg(): ...@@ -475,9 +465,16 @@ class async_asin_pg():
self.request_total_count_list = [] self.request_total_count_list = []
def run(self): def run(self):
# p = r'D:\新建文件夹\html_selenium_files\us_01_B0007ZF4OA.html'
# with open(p, 'r', encoding='utf-8')as f:
# response = f.read()
# items = ParseAsinUs(resp=response, asin='B0007ZF4OA', month=self.month_, date_info='2026_01',
# site_name=self.site_name).xpath_html()
# print(items)
asin_list = self.save_asin_detail.read_db_data() asin_list = self.save_asin_detail.read_db_data()
# asin_list = ['B0CW1ZM991|2025-01|1|1|null|null'] # asin_list = ['B0FV8W9T52|2025-01|1|1|null|null']
if asin_list: if asin_list:
for asin in asin_list: for asin in asin_list:
self.queries_asin_queue.put(asin) self.queries_asin_queue.put(asin)
...@@ -487,7 +484,7 @@ class async_asin_pg(): ...@@ -487,7 +484,7 @@ class async_asin_pg():
for ck in cookies_dict.values(): for ck in cookies_dict.values():
self.cookies_queue.put(ck) self.cookies_queue.put(ck)
html_thread = [] html_thread = []
for i in range(26): for i in range(25):
thread2 = threading.Thread(target=self.get_asin) thread2 = threading.Thread(target=self.get_asin)
thread2.start() thread2.start()
html_thread.append(thread2) html_thread.append(thread2)
...@@ -560,4 +557,4 @@ class async_asin_pg(): ...@@ -560,4 +557,4 @@ class async_asin_pg():
pass pass
# if __name__ == '__main__': # if __name__ == '__main__':
# async_asin_pg(month=12, spider_int=1, week=14,site_name='de').run() # async_asin_pg(month=12, spider_int=1, week=14,site_name='us').run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment