Commit f56f31df by Peng

no message

parent 409ca255
......@@ -3,30 +3,28 @@ import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.db_connect import BaseUtils
from amazon_params import py_ja3
from utils.requests_param import Requests_param_val
from utils.parse_search_term_xpath import ParseSearchTermUs
from amazon_params.params import DB_SEARCH_TERM_PARAMS_SPIDER
from lxml import etree
from queue import Queue
from queue import Queue, Empty
import time
import random
import re
from curl_cffi import requests
import pandas as pd
from threading import Lock
import threading
import urllib3
import traceback
import gzip
import requests as requests2
from curl_cffi import requests
from datetime import datetime
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class search_temp_pg(BaseUtils):
def __init__(self, site_name='us', read_size=300, proxy_name=None, week=None, month=None):
def __init__(self, site_name='us', read_size=100, proxy_name=None, week=None, month=None):
super().__init__()
self.site_name = site_name # 站点
self.month = month
......@@ -60,16 +58,17 @@ class search_temp_pg(BaseUtils):
self.sort_all_list = []
self.columns = [
'search_term', 'asin', 'page', 'page_row', 'data_type',
'keyword_id', 'adgroupid', 'campaignid', 'adid', 'sku',
'time_batch',
'title', 'img_url', 'price', 'rating', 'total_comments'
]
self.next_page_lock = Lock()
# 新增:列表操作锁,保证线程安全
self.list_lock = Lock()
# 新增:Cookie补充锁,防止多线程踩踏
self.cookie_refill_lock = Lock()
self.not_sp_url_kw_list = []
self.nums_no_sp = 0
self.delete_cookies_list = [] # 存储出现中国邮编的cookie
self.headers_num_int = 0
self.headers_num_int_s = 0
# 存储html 主题
self.search_term_html_topic = f'search_term_html_2026_{self.month}'
......@@ -77,8 +76,23 @@ class search_temp_pg(BaseUtils):
self.search_term_month_topic = f'{self.site_name}_search_term_month_2026_{self.month}'
print('存储html 主题:', self.search_term_html_topic)
print('存储数据详情 主题:', self.search_term_month_topic)
self.sess = requests2.Session()
self.sess.mount(self.site_url, py_ja3.DESAdapter())
# ===== 新增:线程局部存储,每个线程独立Session =====
self._thread_local = threading.local()
# ===== 新增:预加载Cookie,避免爬取时阻塞 =====
self._preload_cookies()
def _preload_cookies(self):
"""启动时预加载Cookie,避免爬取时阻塞"""
cookies_dict = self.reuests_para_val.get_cookie(num=500)
self.cookie_dict_delete_id = cookies_dict
for ck in cookies_dict.values():
self.cookies_queue.put(ck)
print(f'预加载Cookie完成,共 {len(cookies_dict)} 个')
def db_name_change_common(self):
self.engine_pg = self.pg_connect()
self.kafuka_producer = self.kafuka_connect(acks=True)
......@@ -95,36 +109,43 @@ class search_temp_pg(BaseUtils):
self.db_brand_analytics = self.site_name + DB_SEARCH_TERM_PARAMS_SPIDER[f"us_brand_analytics"][2:] + '_month'
def get_search_kw(self, t_num):
# ===== 修复:获取线程独立Session,避免线程竞争 =====
while True:
if not self.search_term_queue.empty():
keywords_scraper_url = self.search_term_queue.get()
try:
keywords_scraper_url = self.search_term_queue.get(timeout=10)
except Empty:
print(f"当前线程-{t_num} 已完成-爬取-跳出循环")
break
keywords_scraper_url_list = keywords_scraper_url.split('|-|')
keywords_id = int(keywords_scraper_url_list[0])
keyword = keywords_scraper_url_list[1]
scraper_url = keywords_scraper_url_list[2]
if scraper_url not in self.search_term_not_next_page_list:
page = int(re.findall("&page=(\d+)", scraper_url)[0])
if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie()
self.cookie_dict_delete_id = cookies_dict
# ===== 修复:Cookie不足时补充,加锁防止多线程踩踏 =====
if self.cookies_queue.qsize() < 20:
with self.cookie_refill_lock:
if self.cookies_queue.qsize() < 20: # 二次检查,拿到锁后可能已被其他线程补充
cookies_dict = self.reuests_para_val.get_cookie(num=200)
self.cookie_dict_delete_id.update(cookies_dict)
for ck in cookies_dict.values():
self.cookies_queue.put(ck)
# 获取组装cookie
cookie_str = self.reuests_para_val.get_cookie_str(self.cookies_queue)
headers = self.reuests_para_val.requests_amazon_headers(host=self.host, site_url=self.site_url,
asin=None, scraper_url=scraper_url)
headers["cookie"] = cookie_str
try:
if self.headers_num_int > 120:
resp = requests.get(scraper_url, impersonate="chrome110", headers=headers,
timeout=10, verify=False)
else:
resp = self.sess.get(scraper_url, headers=headers,
timeout=10, verify=False)
# ===== 修复:使用线程独立Session =====
resp = requests.get(scraper_url, headers=headers,
timeout=10, verify=False,impersonate="chrome")
if self.reuests_para_val.check_amazon_yzm(resp):
self.search_term_priority_list.append(keywords_id)
self.headers_num_int += 1
continue
# with open(rf'D:\新建文件夹\html_selenium_files\{self.site_name}_211123333_{keyword}_{page}.html', 'w', encoding='utf-8')as f:
# f.write(resp.text)
except Exception as e:
print('请求报错:', e)
self.search_term_priority_list.append(keywords_id)
......@@ -143,18 +164,20 @@ class search_temp_pg(BaseUtils):
if self.reuests_para_val.check_amazon_ingress(ingress):
self.search_term_priority_list.append(keywords_id)
if self.site_name != 'es' and self.site_name != 'it':
try:
cookie_ubid_main_id = re.findall(r'ubid-main=(.*?);', cookie_str)[0]
except:
cookie_ubid_main_id = re.findall(r'session-id=(.*?);', cookie_str)[0]
for cookie_key_value in self.cookie_dict_delete_id.items():
if cookie_ubid_main_id in cookie_key_value[1]:
ubid_list = re.findall(r'ubid-main=(.*?);', cookie_str)
if ubid_list:
cookie_ubid_main_id = ubid_list[0]
else:
session_list = re.findall(r'session-id=(.*?);', cookie_str)
cookie_ubid_main_id = session_list[0] if session_list else None
for cookie_key_value in list(self.cookie_dict_delete_id.items()):
if cookie_ubid_main_id and cookie_ubid_main_id in cookie_key_value[1]:
with self.list_lock:
self.delete_cookies_list.append(cookie_key_value[0])
continue
else:
print("没有获取到邮编,", keyword, scraper_url)
self.search_term_priority_list.append(keywords_id)
self.headers_num_int += 1
continue
if (self.reuests_para_val.check_amazon_not_page(response)) and (
'404' not in keyword and 'page not found' not in keyword):
......@@ -166,19 +189,16 @@ class search_temp_pg(BaseUtils):
no_results = 'No results for'
if no_results in response:
self.asin_not_sure_list.append(keywords_id)
continue
num = random.randint(1, 100)
if num < 7:
new_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
response_gzip = self.compress_string(response)
md5_hex_digest = self.reuests_para_val.hex_md5(keyword)
html_data = f'{md5_hex_digest}|-||=|-|=||-|{self.site_name}|-||=|-|=||-|{keyword}|-||=|-|=||-|{response_gzip}|-||=|-|=||-|{new_date}|-||=|-|=||-|{page}'
# html_data = f'{md5_hex_digest}|-||=|-|=||-|{self.site_name}|-||=|-|=||-|{keyword}|-||=|-|=||-|{response}|-||=|-|=||-|{new_date}|-||=|-|=||-|{page}'
self.send_kafka_html(html_data=html_data)
self.parse_html_page(response=response, keywords=keyword, scraper_url=scraper_url, page=page,
keywords_id=keywords_id)
else:
print(f"当前线程-{t_num} 已完成-爬取-跳出循环")
break
keywords_id=keywords_id, etree_html=etree_html)
# 压缩字符串
def compress_string(self, input_string):
......@@ -212,7 +232,6 @@ class search_temp_pg(BaseUtils):
self.buy_text_list = []
self.hr_list = []
self.sort_all_list = []
self.headers_num_int = 0
self.kafuka_producer.close()
def run_pol(self):
......@@ -226,7 +245,7 @@ class search_temp_pg(BaseUtils):
for search_url in search_term_list:
self.search_term_queue.put(search_url)
html_thread = []
for i in range(17):
for i in range(10):
thread2 = threading.Thread(target=self.get_search_kw, args=(i,))
html_thread.append(thread2)
for ti in html_thread:
......@@ -246,9 +265,9 @@ class search_temp_pg(BaseUtils):
# 清空变量,
self.init_list()
def parse_html_page(self, response=None, keywords=None, scraper_url=None, page=None, keywords_id=None):
def parse_html_page(self, response=None, keywords=None, scraper_url=None, page=None, keywords_id=None, etree_html=None):
parse_search_term = ParseSearchTermUs(page_source=response, driver=None, search_term=keywords,
page=page, site_name=self.site_name)
page=page, site_name=self.site_name, etree_html=etree_html)
st_list = parse_search_term.run()
zr_list, sp_list, sb_list, ac_list, bs_list, er_list, tr_list, sort_list, buy_text_list, hr_list = st_list
# 直接添加到结果列表(修复:不再使用 self.st_list 中转)
......@@ -266,23 +285,24 @@ class search_temp_pg(BaseUtils):
if parse_search_term.page == 1:
self.sort_all_list.extend(sort_list)
if (len(sp_list) == 0) and (not self.search_term_queue.empty()):
self.nums_no_sp += 1
self.not_sp_url_kw_list.append(str(keywords_id) + '|-|' + keywords + '|-|' + scraper_url)
if self.nums_no_sp > 3:
print("************** 没有sq:", self.not_sp_url_kw_list)
for st_url_queue in self.not_sp_url_kw_list:
self.search_term_queue.put(st_url_queue)
## 重试后才清空
self.not_sp_url_kw_list = []
self.nums_no_sp = 0
# if (len(sp_list) == 0) and (not self.search_term_queue.empty()):
# self.nums_no_sp += 1
# self.not_sp_url_kw_list.append(str(keywords_id) + '|-|' + keywords + '|-|' + scraper_url)
# if self.nums_no_sp > 3:
# print("************** 没有sq:", self.not_sp_url_kw_list)
# for st_url_queue in self.not_sp_url_kw_list:
# self.search_term_queue.put(st_url_queue)
# ## 重试后才清空
# self.not_sp_url_kw_list = []
# self.nums_no_sp = 0
def db_read_data_common(self):
current_read_size = self.read_size
while True:
try:
self.date_info = f'2026-{self.month}'
self.engine_pg = self.pg_connect()
sql_read = f"""SELECT id, search_term, url FROM {self.db_search_term} where state=1 and date_info='{self.date_info}' LIMIT {self.read_size} for update;"""
sql_read = f"""SELECT id, search_term, url FROM {self.db_search_term} where state=1 and date_info='{self.date_info}' LIMIT {current_read_size} for update;"""
print(sql_read)
# self.df_read = self.engine_pg.read_sql(sql_read)
self.df_read = self.engine_pg.read_then_update(
......@@ -301,7 +321,7 @@ class search_temp_pg(BaseUtils):
return []
except Exception as e:
time.sleep(random.uniform(5, 17.5))
self.read_size = 100
current_read_size = 100
print("读取数据出bug并等待5s继续", e)
continue
......@@ -347,7 +367,9 @@ class search_temp_pg(BaseUtils):
"""
if cate_type in ['buy']:
df = pd.DataFrame(data=data_list,
columns=['search_term', 'asin', 'page', 'buy_data', 'label', 'asin_brand'])
columns=['search_term', 'asin', 'page', 'buy_data', 'label', 'asin_brand',
'time_batch'])
df = df.loc[:, ['search_term', 'asin', 'page', 'buy_data', 'label','asin_brand']]
df.label = df.label.apply(lambda x: str(x)[:200] if x is not None else None) # 截取字符
df.buy_data = df.buy_data.apply(lambda x: str(x)[:200] if x is not None else None) # 截取字符
else:
......@@ -414,21 +436,15 @@ class search_temp_pg(BaseUtils):
def send_kafka_html(self, html_data=None, items=None):
if not html_data:
return
for i in range(5):
try:
future = self.kafuka_producer.send(self.search_term_html_topic, html_data)
future.add_callback(self.on_send_success).add_errback(self.on_send_error)
future.get(30)
with self.next_page_lock:
# ===== 修复:移除阻塞的 future.get(30),改为异步发送 =====
self.headers_num_int_s += 1
if self.headers_num_int_s % 10 == 0:
if self.headers_num_int_s % 50 == 0: # 每50条flush一次
self.kafuka_producer.flush()
break
except Exception as e:
print(f"kafka发送失败(第{i + 1}/5次)", e)
time.sleep(2)
if i >= 1 and i % 2 == 1:
self.kafuka_producer = self.kafuka_connect(acks=True)
print("kafka发送失败", e)
def db_save_data(self):
print('========================= 准备存储 ============================')
......@@ -453,5 +469,5 @@ class search_temp_pg(BaseUtils):
self.db_save_common(cate_type='hr', data_list=self.hr_list,
db_name=f"{self.db_search_term_hr}_{year_moth}")
self.db_update_brand()
# if __name__ == '__main__':
# search_temp_pg(site_name='us', read_size=300, proxy_name=None, month=11).run_pol()
if __name__ == '__main__':
search_temp_pg(site_name='us', read_size=20, proxy_name=None, month='03').run_pol()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment