Commit 80347e49 by Peng

no message

parent 68e0f97e
......@@ -54,21 +54,14 @@ class dow_category_Product():
print("强制关闭chrome.exe失败:", e)
port = 9222
params_ = ""
# params_ = "--blink-settings=imagesEnabled=false"
os.system(f'start Chrome {params_} --remote-debugging-port={port}')
chrome_options = Options()
# 禁止加载图片
chrome_options.add_argument('--blink-settings=imagesEnabled=false')
chrome_options.add_experimental_option("debuggerAddress", f"127.0.0.1:{port}")
driver = webdriver.Chrome(r'chromedriver.exe', options=chrome_options)
# 无界面模式
# chrome_options.add_argument('--headless')
# 禁用 GPU,加快在 headless 下的渲染
chrome_options.add_argument('--disable-gpu')
# 禁用沙箱,规避权限问题
chrome_options.add_argument('--no‑sandbox')
# 改用 /tmp 而不是 /dev/shm(避免共享内存不足)
chrome_options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(r'chromedriver.exe', options=chrome_options)
self.get_category(site, driver)
def get_category(self, site, driver):
......@@ -138,21 +131,6 @@ class dow_category_Product():
if Product_name[0] in self.click_product_name_list:
print(product_nums, "已经抓取::", Product_name[0].upper())
continue
driver.execute_script("localStorage.clear();") # 清除本地存储
time.sleep(0.5)
driver.execute_script(
"caches.keys().then(function(names) { for (let name of names) { caches.delete(name); } });")
driver.execute_script("window.performance.clearResourceTimings();")
time.sleep(0.5)
# 假设你已经有了 driver
# 先 enable heap profiler
driver.execute_cdp_cmd('HeapProfiler.enable', {})
# 然后强制 GC
driver.execute_cdp_cmd('HeapProfiler.collectGarbage', {})
# 最后可选地 disable 掉
driver.execute_cdp_cmd('HeapProfiler.disable', {})
time.sleep(0.5)
self.click_product_name_list.append(Product_name[0])
self.update_cagetory_state = False
driver.execute_script(f"document.querySelector('#{Product_name[0]} > kat-radiobutton').click()")
......@@ -395,36 +373,6 @@ class dow_category_Product():
while True:
try:
if save_Category_list:
# with self.engine_mysql.begin() as conn_mysql:
# for i in save_Category_list:
# dele_sql = f"DELETE from {site}_aba_profit_category_insights where category='{i[0]}' and product_type='{i[1]}' and item_type_keyword='{i[2]}' and year_week='{self.y_w}'"
# print('删除删除mysql:', dele_sql)
# conn_mysql.execute(dele_sql)
# df = pd.DataFrame(data=save_Category_list,
# columns=['category', "product_type", "item_type_keyword",
# "search_ratio", "product_average", "return_ratio",
# "return_product_average", "year_week", 'sellers',
# 'new_brands',
# 'asin', 'new_asin', 'per_asin', 'advertisement_spend',
# 'star_ratings', 'new_brands_int', 'asin_int',
# 'new_asin_int', 'per_asin_int', 'five_star',
# 'three_star', 'two_star', 'one_star', 'ad_spend',
# 'majority_spend', 'most_popular_keywords_item',
# 'reasons_returns_json', 'top_data_json',
# 'news_data_json',
# 'top_sales_amount', 'top_sales_volume',
# 'top_search_ratio',
# 'top_return_ratio', 'top_adv_spend',
# 'top_majority_spend',
# 'news_sales_amount',
# 'news_sales_volume',
# 'news_search_ratio', 'news_return_ratio',
# 'news_adv_spend',
# 'news_majority_spend'
# ])
# self.engine_mysql.to_sql(df, f'{site}_aba_profit_category_insights',
# if_exists="append")
# print('存储成功 mysql')
with self.engine_pg.begin() as conn_pg:
for i in save_Category_list:
dele_sql = f"DELETE from {site}_aba_profit_category_insights where category='{i[0]}' and product_type='{i[1]}' and item_type_keyword='{i[2]}' and year_week='{self.y_w}'"
......
......@@ -3,28 +3,23 @@ import sys
import traceback
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from amazon_params import py_ja3
from utils.requests_param import Requests_param_val
from utils.db_connect import BaseUtils
import random
import time
from lxml import etree
import re
import urllib3
from queue import Queue
import threading
import pandas as pd
import datetime
import requests
from amazon_spider.VPS_IP import pppoe_ip
from curl_cffi import requests as requests2
from curl_cffi import requests
from utils.secure_db_client import get_remote_engine
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
sess = requests.Session()
import json
import uuid
from threading_spider.db_connectivity import connect_db
"""抓取亚马逊 bsr分类,和 bsr asin top100"""
......@@ -55,6 +50,21 @@ class bsr_catgory(BaseUtils):
self.headers_num_int = 0
self.columns = ['cate_current_id', 'asin', 'bsr_rank', 'price', 'rating', 'total_comments', "week",
"year_month", 'category_id']
self._ip_switch_lock = threading.Lock()
self._last_ip_switch_time = 0
def _safe_switch_ip(self, min_interval=60):
"""线程安全的IP切换,避免多线程重复切换"""
with self._ip_switch_lock:
now = time.time()
if now - self._last_ip_switch_time < min_interval:
print(f'距离上次切换IP不到{min_interval}秒,跳过')
return False
print('切换IP...')
pppoe_ip()
self._last_ip_switch_time = time.time()
time.sleep(5)
return True
def db_engine_us(self, site_name, db_type):
engine_mysql = get_remote_engine(
......@@ -117,6 +127,7 @@ class bsr_catgory(BaseUtils):
print(sele_sql)
df_year_week = self.db_cursor_connect_msyql_read(site='us', select_state1_sql=sele_sql)
self.year_week = list(df_year_week['week'])[-1]
# self.year_week = 6
print(self.year_week, '====当前周===1232333')
def safeIndex(self, list: list, index: int, default: object = None):
......@@ -188,19 +199,6 @@ class bsr_catgory(BaseUtils):
"category_first_id": category_first_id,
"category_parent_id": category_parent_id
}
# keys_to_check = ['category_id', 'category_first_id', 'category_parent_id']
# # 使用列表推导式检查多个键的值是否为空字符串或None
# empty_or_none_keys = [key for key in keys_to_check if items.get(key) in ('', None)]
# if empty_or_none_keys:
# print('解析失败')
# try:
# account = 'pengyanbing'
# title = self.site_name + ' bsr 榜单'
# content = f' bsr 榜单解析 url 失败 节点数:{nodes_num} \n 解析url:{url}'
# db_class = connect_db(self.site_name)
# db_class.send_mg(account, title, content)
# except:
# pass
return items
def html_4(self, bum):
......@@ -213,36 +211,7 @@ class bsr_catgory(BaseUtils):
bsr_class_name = querys[1]
bsr_id = querys[0]
category_id = querys[3]
if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie()
self.cookie_dict_delete_id = cookies_dict
for ck in cookies_dict.values():
self.cookies_queue.put(ck)
while 1:
cookie_str = self.cookies_queue.get()
if len(cookie_str) > 50:
try:
cookie_lsit = json.loads(cookie_str)
except:
cookie_lsit = eval(cookie_str)
cookie_dic = {}
try:
for i in cookie_lsit:
if i:
cookie_dic[i["name"]] = i["value"]
else:
continue
cookie_str = ''
for k, v in cookie_dic.items():
cookie_str = cookie_str + str(k) + '=' + str(v) + ';'
break
except:
cookie_str = ''
for k, v in cookie_lsit.items():
cookie_str = cookie_str + str(k) + '=' + str(v) + ';'
break
else:
break
cookie_str = self.get_cookeis()
n = random.randint(70, 114)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
headers = {
......@@ -265,23 +234,21 @@ class bsr_catgory(BaseUtils):
headers["cookie"] = cookie_str
num = 0
while True:
num += 1
try:
if num > 5:
resp = requests2.get(bsr_url, headers=headers,
timeout=20, verify=False)
else:
num = 0
sess.mount(self.site_url, py_ja3.DESAdapter())
resp = sess.get(bsr_url, headers=headers,
timeout=20, verify=False)
resp = requests.get(bsr_url, headers=headers,
timeout=20, impersonate="chrome")
if ("Enter the characters you see below" in resp.text) or (
"Geben Sie die Zeichen unten ein" in resp.text) or (
"Introduce los caracteres que se muestran" in resp.text) or (
"Saisissez les caractères que vous voyez" in resp.text) or (
"Inserisci i caratteri visualizzati nello spazio" in resp.text):
time.sleep(random.uniform(1.5, 5.5))
print('请求出现验证码:')
num += 1
if num > 20:
self._safe_switch_ip()
num = 0
continue
if resp.status_code == 200 or resp.status_code == 201:
response = etree.HTML(resp.text)
response.xpath("//span[@id='glow-ingress-line2']/text()")
......@@ -400,48 +367,20 @@ class bsr_catgory(BaseUtils):
'//a[contains(text(), "Weiter")]/@href|//a[contains(text(), "Next")]/@href|//a[contains(text(), "Page suivante")]/@href|//a[contains(text(), "Suivant")]/@href|//a[contains(text(), "Avanti")]/@href|//a[contains(text(), "Siguiente")]/@href|//a[contains(text(), "Seite")]/@href|//a[contains(text(), "siguiente")]/@href|//a[contains(text(), "Pagina")]/@href')
data_client_recs_list = response.xpath(
"//div[@class='p13n-desktop-grid']/@data-client-recs-list")
div_list = response.xpath("//div[@id='gridItemRoot']")
for div in div_list:
bsr_rank_list = div.xpath(".//span[@class='zg-bdg-text']/text()")
if bsr_rank_list:
bsr_rank = re.findall(r'#(\d+)', bsr_rank_list[0])[0]
else:
bsr_rank = None
href_list = div.xpath(".//a[@role='link']/@href")
if href_list:
try:
asin = re.findall('dp/(.*)/ref', href_list[0])[0]
except:
asin = None
else:
asin = None
price = None
rating = None
reviews = None
if asin:
self.asin_dict_[asin] = [bsr_id, asin, bsr_rank, price, rating, reviews, self.week,
self.year_month, bsr_url_category_id_dict['category_id']]
if data_client_recs_list:
data_list = eval(data_client_recs_list[0])
print('第一页获取asin:', len(data_list))
for data in data_list:
asin = data['id']
rank = data['metadataMap']['render.zg.rank']
if self.asin_dict_.get(asin):
tp_item = tuple(self.asin_dict_.get(asin))
self.item_list.append(tp_item)
else:
self.item_list.append(
(bsr_id, asin, int(rank), None, None, None, self.week, self.year_month,
bsr_url_category_id_dict['category_id']))
print(len(data_client_recs_list))
self.item_list.append(
(bsr_id, asin, int(rank), None, None, None, self.week, self.year_month,
bsr_url_category_id_dict['category_id']))
if ele_next and data_client_recs_list:
self.get_bsr_asin_data(ele_next, bsr_id, bsr_url_category_id_dict['category_id'], asin_dict_={})
else:
break
def get_bsr_asin_data(self, ele_next, c_id, bsr_url_category_id, asin_dict_):
next_url = self.site_url + ele_next[0]
def get_cookeis(self):
if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie()
self.cookie_dict_delete_id = cookies_dict
......@@ -464,106 +403,99 @@ class bsr_catgory(BaseUtils):
cookie_str = ''
for k, v in cookie_dic.items():
cookie_str = cookie_str + str(k) + '=' + str(v) + ';'
break
return cookie_str
except:
cookie_str = ''
for k, v in cookie_lsit.items():
cookie_str = cookie_str + str(k) + '=' + str(v) + ';'
break
return cookie_str
else:
break
n = random.randint(70, 114)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
headers = {
'connection': 'close',
'authority': self.host,
'accept': 'text/html,*/*',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
'origin': f'{next_url}',
'referer': f'{next_url}',
'sec-ch-ua-mobile': '?0',
'user-agent': ua
}
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n']
k = ""
for i in (0, random.randint(0, 5)):
k += random.choice(alphabet)
headers[k] = str(uuid.uuid4())
headers["cookie"] = cookie_str
return ''
def get_bsr_asin_data(self, ele_next, c_id, bsr_url_category_id, asin_dict_):
next_url = self.site_url + ele_next[0]
num = 0
while True:
try:
num += 1
if num > 5:
resp = requests2.get(next_url, headers=headers,
timeout=20, verify=False)
else:
num = 0
sess.mount(self.site_url, py_ja3.DESAdapter())
resp = sess.get(next_url, headers=headers, timeout=15, verify=False)
cookie_str = self.get_cookeis()
if not cookie_str:
cookie_str = self.get_cookeis()
n = random.randint(70, 114)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
headers = {
'connection': 'close',
'accept': 'text/html,*/*',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
'origin': f'{next_url}',
'referer': f'{next_url}',
'sec-ch-ua-mobile': '?0',
'user-agent': ua
}
headers['Sec-Fetch-Site'] = 'same-origin'
headers['Sec-Fetch-Mode'] = 'cors'
headers['Sec-Fetch-Dest'] = 'empty'
headers['Accept'] = '*/*'
headers['X-Requested-With'] = 'XMLHttpRequest'
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n']
k = ""
for i in (0, random.randint(0, 5)):
k += random.choice(alphabet)
headers[k] = str(uuid.uuid4())
headers["cookie"] = cookie_str
print('请求第二页::', next_url)
resp = requests.get(next_url, headers=headers, timeout=15, impersonate="chrome")
if ("Enter the characters you see below" in resp.text) or (
"Geben Sie die Zeichen unten ein" in resp.text) or (
"Introduce los caracteres que se muestran" in resp.text) or (
"Saisissez les caractères que vous voyez" in resp.text) or (
"Inserisci i caratteri visualizzati nello spazio" in resp.text):
print("出现验证码")
self.headers_num_int += 1
if self.headers_num_int > 150:
self.headers_num_int = 0
pppoe_ip()
time.sleep(5)
num += 1
if num > 20:
self._safe_switch_ip()
num = 0
time.sleep(random.uniform(3.5, 5.5))
continue
if resp.status_code == 200 or resp.status_code == 201:
response = etree.HTML(resp.text)
response.xpath("//span[@id='glow-ingress-line2']/text()")
break
else:
print('resp.status_code:L:', resp.status_code)
num += 1
if resp.status_code == 503:
self._safe_switch_ip()
num = 0
except Exception as e:
print("报错 继续 请求", e)
time.sleep(random.uniform(5.5, 15.5))
num += 1
if num > 20:
self._safe_switch_ip()
num = 0
continue
data_client_recs_list = response.xpath("//div[@class='p13n-desktop-grid']/@data-client-recs-list")
div_list = response.xpath("//div[@id='gridItemRoot']")
for div in div_list:
bsr_rank_list = div.xpath(".//span[@class='zg-bdg-text']/text()")
if bsr_rank_list:
bsr_rank = re.findall(r'#(\d+)', bsr_rank_list[0])[0]
else:
bsr_rank = None
href_list = div.xpath(".//a[@role='link']/@href")
if href_list:
try:
asin = re.findall('dp/(.*)/ref', href_list[0])[0]
except:
asin = None
else:
asin = None
price = None
rating = None
reviews = None
if asin:
asin_dict_[asin] = [c_id, asin, bsr_rank, price, rating, reviews, self.week, self.year_month,
bsr_url_category_id]
if data_client_recs_list:
data_list = eval(data_client_recs_list[0])
print('第二页获取asin:', len(data_list))
for data in data_list:
asin = data['id']
rank = data['metadataMap']['render.zg.rank']
if asin_dict_.get(asin):
tp_item = tuple(asin_dict_.get(asin))
self.item_list.append(tp_item)
else:
self.item_list.append(
(c_id, asin, int(rank), None, None, None, self.week, self.year_month, bsr_url_category_id))
self.item_list.append(
(c_id, asin, int(rank), None, None, None, self.week, self.year_month, bsr_url_category_id))
else:
print('没有获取到第二页获取asin::', next_url)
def save_asin_data(self):
while True:
try:
self.engine = self.mysql_connect()
print('存储:====', len(self.item_list))
print('存储====::', len(self.item_list))
df = pd.DataFrame(data=self.item_list, columns=self.columns)
df['date_info'] = self.time_strftime_
df.drop_duplicates(['asin', 'bsr_rank', 'cate_current_id'], inplace=True) # 去重
......@@ -700,10 +632,11 @@ class bsr_catgory(BaseUtils):
continue
# # 插入新的数据
print('save_name_num_list::', save_name_num_list)
with self.engine.begin() as conn:
conn.execute(
f"insert into {self.site_name}_bs_category (p_id, en_name, nodes_num,path, category_id, category_parent_id) values (%s, %s,%s, %s,%s, %s)",
save_name_num_list)
if save_name_num_list:
with self.engine.begin() as conn:
conn.execute(
f"insert into {self.site_name}_bs_category (p_id, en_name, nodes_num,path, category_id, category_parent_id) values (%s, %s,%s, %s,%s, %s)",
save_name_num_list)
self.init_db(self.site_name)
for id_en_mane in self.id_and_en_name_list:
while True:
......@@ -796,7 +729,7 @@ class bsr_catgory(BaseUtils):
for ck in cookies_dict.values():
self.cookies_queue.put(ck)
html_thread = []
for i in range(50):
for i in range(30):
thread2 = threading.Thread(target=self.html_4, args=(b_num + 1,))
html_thread.append(thread2)
for ti in html_thread:
......@@ -971,21 +904,53 @@ class bsr_catgory(BaseUtils):
time.sleep(20)
continue
# def sele_msyql_category(self, site):
# engine_mysql = self.db_engine_us(site, 'mysql')
# sql = f'select path, nodes_num,id from {site}_bs_category where nodes_num>1'
# df = engine_mysql.read_sql(sql)
# values_list = df.values.tolist()
# with engine_mysql.begin() as conn_6:
# for value in values_list:
# print(value)
# items = self.parse_url(value[1], value[0])
# items['id'] = value[2]
# print(items)
# try:
# # {'category_id': '1722264031', 'category_first_id': 'baby', 'category_parent_id': '60244031', 'id': 67478}
# ai_sql1 = f"update {site}_bs_category set category_id = '{items['category_id']}',category_parent_id='{items['category_parent_id']}',category_first_id='{items['category_first_id']}' where id={items['id']}"
# print(ai_sql1)
# conn_6.execute(ai_sql1)
# except:
# self.try_sql(ai_sql1)
def sele_msyql_category(self, site):
engine_mysql = self.db_engine_us(site, 'mysql')
sql = f'select path, nodes_num,id from {site}_bs_category where nodes_num>1'
sql = f"select path, nodes_num, id from {site}_bs_category where nodes_num>1"
df = engine_mysql.read_sql(sql)
values_list = df.values.tolist()
with engine_mysql.begin() as conn_6:
for value in values_list:
print(value)
items = self.parse_url(value[1], value[0])
items['id'] = value[2]
print(items)
# {'category_id': '1722264031', 'category_first_id': 'baby', 'category_parent_id': '60244031', 'id': 67478}
ai_sql1 = f"update {site}_bs_category set category_id = '{items['category_id']}',category_parent_id='{items['category_parent_id']}',category_first_id='{items['category_first_id']}' where id={items['id']}"
print(ai_sql1)
conn_6.execute(ai_sql1)
try:
with engine_mysql.begin() as conn_6:
for value in values_list:
items = self.parse_url(value[1], value[0])
items['id'] = value[2]
ai_sql1 = f"update {site}_bs_category set category_id = '{items['category_id']}',category_parent_id='{items['category_parent_id']}',category_first_id='{items['category_first_id']}' where id={items['id']}"
try:
conn_6.execute(ai_sql1)
except Exception:
self.try_sql(site, ai_sql1)
except Exception as e:
# 这里就能抓到 __exit__ 抛的 RuntimeError 了
print("事务退出时报错:", repr(e))
def try_sql(self, site, ai_sql):
for i in range(5):
try:
engine_mysql = self.db_engine_us(site, 'mysql')
with engine_mysql.begin() as conn_6:
conn_6.execute(ai_sql)
return
except:
time.sleep(10)
def dele_self_real_spider(self):
print('每天晚上定时删除贺哲的抓取表。用户已经取消收藏店铺')
......@@ -1021,6 +986,8 @@ class bsr_catgory(BaseUtils):
if __name__ == '__main__':
pppoe_ip()
time.sleep(5)
print("如果运行 run 函数 有个别类目 别名没有抓取到,结束之后 运行 run_start 抓取,获取所有 and_en_name 为空的类目id,path")
for site in ['us', 'de', 'uk']:
spider_us = bsr_catgory(site_name=site)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment