Commit e8e1be65 by Peng

no message

parent 60a70d74
import sys
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.db_connect import BaseUtils
......@@ -12,8 +12,7 @@ from queue import Queue
import time
import random
import re
import json
from curl_cffi import requests, Curl
from curl_cffi import requests
import pandas as pd
from threading import Lock
import threading
......@@ -23,9 +22,6 @@ import gzip
import requests as requests2
from datetime import datetime
# from amazon_spider.inset_starrcoks_data import send_request
sess = requests2.Session()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
......@@ -40,10 +36,7 @@ class search_temp_pg(BaseUtils):
self.read_size = read_size
self.cookies_queue = Queue() # cookie队列
self.search_term_queue = Queue() # 需要爬取的asin队列
self.search_term_priority_queue = Queue() # 优先抓取队列
self.search_term_priority_list = [] # 优先抓取列表
self.pppoe_ip_queue = Queue()
self.search_term_list = [] # 存放search_term的列表
self.search_term_not_found = [] # 变狗页面
self.asin_not_sure_list = [] # 没有关键词相关结果的列表
# 返回 对应站点的host,首页链接
......@@ -51,14 +44,9 @@ class search_temp_pg(BaseUtils):
self.search_term_not_next_page_list = [] # 存放找不到下一页关键词的url队列
self.stop_item_queue = True # 用于是否退出循环存储的条件
# 关键词相关
self.search_term_url_list = [] # 存放search_term+url的列表
self.search_term_list_update = [] # 存储成功的搜索词
# 初始化数据库相关参数
self.week_list = []
self.db_name_change_common() # 初始化对应站点的表名 + 初始化站点的周期
self.df_read = pd.DataFrame()
self.id_tuple = ()
self.nums_success = 0 # 每次循环成功的关键词数量
# 页面解析(zr和sp才有page和page_row)
self.zr_all_list = []
self.sp_all_list = []
......@@ -70,26 +58,27 @@ class search_temp_pg(BaseUtils):
self.buy_text_list = []
self.hr_list = []
self.sort_all_list = []
self.columns = ['search_term', 'asin', 'page', 'page_row', 'data_type',
'title', 'img_url', 'price', 'rating', 'total_comments']
self.df_asin_detail_simply_list = []
self.columns = [
'search_term', 'asin', 'page', 'page_row', 'data_type',
'keyword_id', 'adgroupid', 'campaignid', 'adid', 'sku',
'title', 'img_url', 'price', 'rating', 'total_comments'
]
self.next_page_lock = Lock()
# 新增:列表操作锁,保证线程安全
self.list_lock = Lock()
self.not_sp_url_kw_list = []
self.nums_no_sp = 0
self.st_list = []
self.delete_cookies_list = [] # 存储出现中国邮编的cookie
self.headers_num_int = 0
self.headers_num_int_s = 0
self.search_term_html_queue = Queue()
# 创建一个队列用于将数据传递给数据库插入函数
self.insert_queue = Queue()
self.keyword_html_data_list = []
# 存储html 主题
self.search_term_html_topic = f'search_term_html_2025_{self.month}'
self.search_term_html_topic = f'search_term_html_2026_{self.month}'
# 存储数据详情
self.search_term_month_topic = f'{self.site_name}_search_term_month_2025_{self.month}'
self.search_term_month_topic = f'{self.site_name}_search_term_month_2026_{self.month}'
print('存储html 主题:', self.search_term_html_topic)
print('存储数据详情 主题:', self.search_term_month_topic)
self.sess = requests2.Session()
self.sess.mount(self.site_url, py_ja3.DESAdapter())
def db_name_change_common(self):
self.engine_pg = self.pg_connect()
self.kafuka_producer = self.kafuka_connect(acks=True)
......@@ -107,7 +96,7 @@ class search_temp_pg(BaseUtils):
def get_search_kw(self, t_num):
while True:
if self.search_term_queue.empty() == False:
if not self.search_term_queue.empty():
keywords_scraper_url = self.search_term_queue.get()
keywords_scraper_url_list = keywords_scraper_url.split('|-|')
keywords_id = int(keywords_scraper_url_list[0])
......@@ -130,8 +119,7 @@ class search_temp_pg(BaseUtils):
resp = requests.get(scraper_url, impersonate="chrome110", headers=headers,
timeout=10, verify=False)
else:
sess.mount(self.site_url, py_ja3.DESAdapter())
resp = sess.get(scraper_url, headers=headers,
resp = self.sess.get(scraper_url, headers=headers,
timeout=10, verify=False)
if self.reuests_para_val.check_amazon_yzm(resp):
self.search_term_priority_list.append(keywords_id)
......@@ -154,7 +142,7 @@ class search_temp_pg(BaseUtils):
if ingress:
if self.reuests_para_val.check_amazon_ingress(ingress):
self.search_term_priority_list.append(keywords_id)
if self.site_name != 'es' or self.site_name != 'it':
if self.site_name != 'es' and self.site_name != 'it':
try:
cookie_ubid_main_id = re.findall(r'ubid-main=(.*?);', cookie_str)[0]
except:
......@@ -173,23 +161,21 @@ class search_temp_pg(BaseUtils):
print("***** Page Not Found 关键词搜索出现问题:", keyword, scraper_url)
self.search_term_priority_list.append(keywords_id)
# self.search_term_not_found.extend([keywords_id])
time.sleep(random.uniform(2.2, 4.5))
continue
# 判断是否有下一页
lock = Lock()
lock.acquire()
no_results = 'No results for'
if no_results in response:
self.asin_not_sure_list.extend([keywords_id])
lock.release()
self.asin_not_sure_list.append(keywords_id)
num = random.randint(1, 100)
if num < 10:
if num < 7:
new_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
response_gzip = self.compress_string(response)
md5_hex_digest = self.reuests_para_val.hex_md5(keyword)
html_data = f'{md5_hex_digest}|-||=|-|=||-|{self.site_name}|-||=|-|=||-|{keyword}|-||=|-|=||-|{response_gzip}|-||=|-|=||-|{new_date}|-||=|-|=||-|{page}'
# html_data = f'{md5_hex_digest}|-||=|-|=||-|{self.site_name}|-||=|-|=||-|{keyword}|-||=|-|=||-|{response}|-||=|-|=||-|{new_date}|-||=|-|=||-|{page}'
self.send_kafka_html(html_data=html_data)
self.parse_html_page(response, keyword, scraper_url, page)
self.parse_html_page(response=response, keywords=keyword, scraper_url=scraper_url, page=page,
keywords_id=keywords_id)
else:
print(f"当前线程-{t_num} 已完成-爬取-跳出循环")
break
......@@ -211,14 +197,11 @@ class search_temp_pg(BaseUtils):
def init_list(self):
print("=======清空变量==========")
self.item_queue = Queue() # 存储 item 详情数据队列
self.search_term_queue = Queue() # 需要爬取的asin队列
self.buyBox_list = [] # 卖家名称 url 列表
self.delete_cookies_list = [] # 存储出现中国邮编的cookie
self.search_term_not_found = [] # 没有关键词相关结果的列表
self.asin_not_sure_list = [] # 没有关键词相关结果的列表
self.search_term_not_next_page_list = [] # 存放找不到下一页关键词的url队列
self.search_term_url_list = [] # 存储要爬取的url
self.zr_all_list = []
self.sp_all_list = []
self.sb_all_list = []
......@@ -229,14 +212,7 @@ class search_temp_pg(BaseUtils):
self.buy_text_list = []
self.hr_list = []
self.sort_all_list = []
self.df_asin_detail_simply_list = []
self.cookies_list = []
self.search_term_list = [] # 存放search_term的列表
self.headers_num_int = 0
self.search_term_html_queue = Queue()
# 创建一个队列用于将数据传递给数据库插入函数
self.insert_queue = Queue()
self.keyword_html_data_list = []
self.kafuka_producer.close()
def run_pol(self):
......@@ -270,22 +246,14 @@ class search_temp_pg(BaseUtils):
# 清空变量,
self.init_list()
def parse_html_page(self, response=None, keywords=None, scraper_url=None, page=None):
def parse_html_page(self, response=None, keywords=None, scraper_url=None, page=None, keywords_id=None):
parse_search_term = ParseSearchTermUs(page_source=response, driver=None, search_term=keywords,
page=page, site_name=self.site_name)
st_list = parse_search_term.run()
zr_list, sp_list, sb_list, ac_list, bs_list, er_list, tr_list, sort_list, buy_text_list, hr_list = st_list
self.st_list.append(st_list)
if (len(sp_list) == 0) and (self.search_term_queue.empty() == False):
self.nums_no_sp += 1
self.not_sp_url_kw_list.append((keywords, scraper_url))
if self.nums_no_sp > 3:
print("************** 没有sq:", self.not_sp_url_kw_list)
for st_url_queue in self.not_sp_url_kw_list:
self.search_term_queue.put(st_url_queue)
for st_list in self.st_list:
zr_list, sp_list, sb_list, ac_list, bs_list, er_list, tr_list, sort_list, buy_text_list, hr_list = st_list
# 直接添加到结果列表(修复:不再使用 self.st_list 中转)
# 修复:使用锁保证线程安全
with self.list_lock:
self.zr_all_list.extend(zr_list)
self.sp_all_list.extend(sp_list)
self.sb_all_list.extend(sb_list)
......@@ -297,15 +265,24 @@ class search_temp_pg(BaseUtils):
self.hr_list.extend(hr_list)
if parse_search_term.page == 1:
self.sort_all_list.extend(sort_list)
self.not_sp_url_kw_list = []
self.st_list = []
self.nums_no_sp = 0
if (len(sp_list) == 0) and (not self.search_term_queue.empty()):
self.nums_no_sp += 1
self.not_sp_url_kw_list.append(str(keywords_id) + '|-|' + keywords + '|-|' + scraper_url)
if self.nums_no_sp > 3:
print("************** 没有sq:", self.not_sp_url_kw_list)
for st_url_queue in self.not_sp_url_kw_list:
self.search_term_queue.put(st_url_queue)
## 重试后才清空
self.not_sp_url_kw_list = []
self.nums_no_sp = 0
def db_read_data_common(self):
while True:
try:
self.date_info = f'2026-{self.month}'
self.engine_pg = self.pg_connect()
sql_read = f"""SELECT id, search_term, url FROM {self.db_search_term} where state=1 and month={self.month} LIMIT {self.read_size} for update;"""
sql_read = f"""SELECT id, search_term, url FROM {self.db_search_term} where state=1 and date_info='{self.date_info}' LIMIT {self.read_size} for update;"""
print(sql_read)
# self.df_read = self.engine_pg.read_sql(sql_read)
self.df_read = self.engine_pg.read_then_update(
......@@ -315,15 +292,7 @@ class search_temp_pg(BaseUtils):
where_keys=["id"], # WHERE sku = :sku
)
if self.df_read.shape[0] > 0:
# self.id_tuple = tuple(self.df_read.id)
self.date_info = f'2025-{self.month}'
print('date_info::', self.date_info, ' 月:', self.month)
# with self.engine_pg.begin() as conn:
# if len(self.id_tuple) == 1:
# sql_update = f'UPDATE {self.db_search_term} set state=2 where id in ({self.id_tuple[0]});'
# else:
# sql_update = f'UPDATE {self.db_search_term} set state=2 where id in {self.id_tuple};'
# conn.execute(sql_update)
search_term_list = list(
self.df_read.id.astype("U") + '|-|' + self.df_read.search_term + '|-|' + self.df_read.url)
return search_term_list
......@@ -347,10 +316,11 @@ class search_temp_pg(BaseUtils):
def db_change_state_common(self, state, search_term_list):
if state == 3:
df = self.df_read.loc[~self.df_read.id.isin(self.search_term_priority_list)] # 找出id不再search_term_list里面的修改状态 3
df = self.df_read.loc[
~self.df_read.id.isin(self.search_term_priority_list)] # 找出id不再search_term_list里面的修改状态 3
id_tuple = tuple(df.id)
else:
df = self.df_read.loc[self.df_read.id.isin(search_term_list)] # 找出id再search_term_list里面的修改状态 1
df = self.df_read.loc[self.df_read.id.isin(search_term_list)] # 找出id再search_term_list里面的修改状态 1
id_tuple = tuple(df.id)
print(f"== 存储状态 {state} 数据 ========== {len(id_tuple)} ========")
while True:
......@@ -375,32 +345,9 @@ class search_temp_pg(BaseUtils):
"""
仅仅当传过来的data_list不为空的情况下,才会执行下面的数据
"""
# # if self.site_name != 'us':
# new_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# if len(data_list) > 1000:
# if cate_type in ['zr']:
# # 计算分割点
# split_point = len(data_list) // 20
# else:
# # 计算分割点
# split_point = len(data_list) // 10
# # 使用切片分割列表
# split_data = [data_list[i:i + split_point] for i in range(0, len(data_list), split_point)]
# # 打印结果
# for i, sublist in enumerate(split_data, 1):
# print(f'{cate_type} sublist:', len(sublist))
# item = {"cate_type": cate_type, "data_list": sublist, 'date_info': self.date_info,
# 'spider_time': new_date}
# self.send_kafka(items=item)
# else:
# item = {"cate_type": cate_type, "data_list": data_list, 'date_info': self.date_info,
# 'spider_time': new_date}
# self.send_kafka(items=item)
# else:
######### 以下注释是正常us抓取入库。###########################
if cate_type in ['buy']:
df = pd.DataFrame(data=data_list, columns=['search_term', 'asin', 'page', 'buy_data', 'label','asin_brand'])
df = pd.DataFrame(data=data_list,
columns=['search_term', 'asin', 'page', 'buy_data', 'label', 'asin_brand'])
df.label = df.label.apply(lambda x: str(x)[:200] if x is not None else None) # 截取字符
df.buy_data = df.buy_data.apply(lambda x: str(x)[:200] if x is not None else None) # 截取字符
else:
......@@ -416,12 +363,6 @@ class search_temp_pg(BaseUtils):
if cate_type in ['sb', 'tr']:
df = df.loc[:, ['search_term', 'asin', 'page', 'data_type', 'date_info']]
df.drop_duplicates(['search_term', 'asin', 'page', 'data_type'], inplace=True)
elif cate_type in ['buy']:
df = df.loc[:, ['search_term', 'asin', 'page', 'buy_data', 'date_info', 'label','asin_brand']]
df.drop_duplicates(['search_term', 'asin', 'page', 'buy_data', 'label'], inplace=True)
df.label = df.label.apply(lambda x: str(x)[:200] if x is not None else None) # 截取字符
df.buy_data = df.buy_data.apply(lambda x: str(x)[:200] if x is not None else None) # 截取字符
df.asin_brand = df.asin_brand.apply(lambda x: str(x)[:200] if x is not None else None) # 截取字符
else:
df = df.loc[:, ['search_term', 'asin', 'page', 'date_info']]
df.drop_duplicates(['search_term', 'asin', 'page'], inplace=True)
......@@ -445,7 +386,7 @@ class search_temp_pg(BaseUtils):
self.engine_pg = self.pg_connect()
df_being_sold = pd.DataFrame(data=self.sort_all_list,
columns=['search_term', 'quantity_being_sold',
'quantity_being_sold_str', 'result_count','departments'])
'quantity_being_sold_str', 'result_count', 'departments'])
# 获取成功抓取的搜索词来更改状态 3
df_being_sold['month'] = self.month
df_being_sold['date_info'] = self.date_info
......@@ -455,8 +396,8 @@ class search_temp_pg(BaseUtils):
print(len(self.sort_all_list))
df_being_sold.drop_duplicates(['search_term', 'quantity_being_sold'], inplace=True) # 去重
if df_being_sold.shape[0] > 0:
self.engine_pg.to_sql(df_being_sold,f'{self.db_brand_analytics}_{year_moth_list[0]}',
if_exists='append')
self.engine_pg.to_sql(df_being_sold, f'{self.db_brand_analytics}_{year_moth_list[0]}',
if_exists='append')
break
except Exception as e:
print('db_update_brand::', e, f"\n{traceback.format_exc()}")
......@@ -471,21 +412,23 @@ class search_temp_pg(BaseUtils):
print("消息发送失败", excp)
def send_kafka_html(self, html_data=None, items=None):
for i in range(10):
if not html_data:
return
for i in range(5):
try:
if html_data:
future = self.kafuka_producer.send(self.search_term_html_topic, html_data)
future.add_callback(self.on_send_success).add_errback(self.on_send_error)
print(f"{i}发送中")
future.get(8) # 阻塞直到发送成功或超时
print(f"{i}发送成功")
self.kafuka_producer.flush()
future = self.kafuka_producer.send(self.search_term_html_topic, html_data)
future.add_callback(self.on_send_success).add_errback(self.on_send_error)
future.get(30)
with self.next_page_lock:
self.headers_num_int_s += 1
if self.headers_num_int_s % 10 == 0:
self.kafuka_producer.flush()
break
except Exception as e:
if i > 0 and i % 2 == 0:
print(f"kafka发送失败(第{i + 1}/5次)", e)
time.sleep(2)
if i >= 1 and i % 2 == 1:
self.kafuka_producer = self.kafuka_connect(acks=True)
elif i >= 9:
print("搜索词上传html报错", e, f"\n{traceback.format_exc()}")
def db_save_data(self):
print('========================= 准备存储 ============================')
......@@ -510,3 +453,5 @@ class search_temp_pg(BaseUtils):
self.db_save_common(cate_type='hr', data_list=self.hr_list,
db_name=f"{self.db_search_term_hr}_{year_moth}")
self.db_update_brand()
# if __name__ == '__main__':
# search_temp_pg(site_name='us', read_size=300, proxy_name=None, month=11).run_pol()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment