Commit 42358832 by Peng

no message

parent ab22cc42
import pymysql
import requests
import sys
import os
import traceback
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from amazon_params import py_ja3
from utils.requests_param import Requests_param_val
from utils.db_connect import BaseUtils
from amazon_params.params import DB_CONN_DICT
from amazon_spider.VPS_IP import pppoe_ip
import uuid
import random
import time
from lxml import etree
import re
import urllib3
from queue import Queue
import threading
......@@ -21,15 +17,34 @@ import pandas as pd
import datetime
import json
from utils.secure_db_client import get_remote_engine
from curl_cffi import requests
from threading_spider.post_to_dolphin import DolphinschedulerHelper
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
sess = requests.Session()
import traceback
"""抓取亚马逊 new_releases ,和 new_releases asin top100"""
class nsr_catgory(BaseUtils):
# ① 站点配置字典,替代 init_db 里的 6 个 if/elif
_SITE_CONFIG = {
'us': ('https://www.amazon.com', 'www.amazon.com'),
'uk': ('https://www.amazon.co.uk', 'www.amazon.co.uk'),
'de': ('https://www.amazon.de', 'www.amazon.de'),
'fr': ('https://www.amazon.fr', 'www.amazon.fr'),
'es': ('https://www.amazon.es', 'www.amazon.es'),
'it': ('https://www.amazon.it', 'www.amazon.it'),
}
# ② h1 标题前缀字典,替代 html_4 里的多站点 if/elif
_NSR_TITLE_PREFIX = {
'us': 'Best Sellers in',
'uk': 'Best Sellers in',
'de': 'Bestseller in',
'fr': 'Les meilleures ventes en',
'es': 'Los más vendidos en',
'it': 'Bestseller in',
}
def __init__(self, site_name):
super().__init__()
self.site_name = site_name
......@@ -53,14 +68,24 @@ class nsr_catgory(BaseUtils):
self.item_queue = Queue()
self.columns = ['cate_current_id', 'asin', 'bsr_rank', 'price', 'rating', 'total_comments', "week",
"year_month", 'category_id']
self.int_parse()
self._ip_switch_lock = threading.Lock()
self._last_ip_switch_time = 0
def _safe_switch_ip(self, min_interval=60):
"""线程安全的IP切换,避免多线程重复切换"""
with self._ip_switch_lock:
now = time.time()
if now - self._last_ip_switch_time < min_interval:
print(f'距离上次切换IP不到{min_interval}秒,跳过')
return False
print('切换IP...')
pppoe_ip()
self._last_ip_switch_time = time.time()
time.sleep(5) # ③ sleep 移到锁外,避免长时间阻塞其他线程
return True
def db_engine_us(self, site_name, db_type):
engine_mysql = get_remote_engine(
site_name=site_name, # -> database "selection"
db_type=db_type, # -> 服务端 alias "mysql"
)
return engine_mysql
return get_remote_engine(site_name=site_name, db_type=db_type)
def db_cursor_connect_update(self, sql, site):
for i in range(3):
......@@ -80,16 +105,19 @@ class nsr_catgory(BaseUtils):
engine_us_mysql = self.db_engine_us(site, 'mysql')
else:
engine_us_mysql = self.db_engine_us(self.site_name, 'mysql')
print(select_state1_sql)
df = engine_us_mysql.read_sql(select_state1_sql)
return df
except Exception as e:
import traceback
traceback.print_exc() # ★ 打印完整栈到终端
traceback.print_exc()
print(e, 'db_cursor_connect_msyql_read 报错:', select_state1_sql)
def int_parse(self):
def init_db(self, site_name):
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
# ① 使用字典替代 if/elif
self.site_url, self.host = self._SITE_CONFIG[site_name]
# 合并原 int_parse 的初始化逻辑
self.reuests_para_val = Requests_param_val(site_name=self.site_name, spider="seller_account_product")
self.year_month = f'{self.year}_{self.month}'
sele_sql = f"SELECT week FROM week_20_to_30 WHERE `year_month`='{self.year}_{self.month}'"
......@@ -98,49 +126,64 @@ class nsr_catgory(BaseUtils):
self.year_week = list(df_year_week['week'])[-1]
print(self.year_week, '====当前周===1232333')
def init_db(self, site_name):
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
if site_name == "us":
self.site_url = 'https://www.amazon.com'
self.host = 'www.amazon.com'
elif site_name == 'uk':
self.site_url = 'https://www.amazon.co.uk' # 站点url
self.host = 'www.amazon.co.uk'
elif site_name == 'de':
self.site_url = 'https://www.amazon.de'
self.host = 'www.amazon.de'
elif site_name == 'fr':
self.site_url = 'https://www.amazon.fr'
self.host = 'www.amazon.fr'
elif site_name == 'es':
self.site_url = 'https://www.amazon.es'
self.host = 'www.amazon.es'
elif site_name == 'it':
self.site_url = 'https://www.amazon.it'
self.host = 'www.amazon.it'
def safeIndex(self, list: list, index: int, default: object = None):
"""
安全获取list的索引对应的值
:param list: 列表
:param index: 索引
:param default: 默认值
:return:
"""
if (index <= len(list) - 1):
"""安全获取list的索引对应的值"""
if index <= len(list) - 1:
return list[index]
return default
# ④ 抽取 headers 构建,消除 html_4 和 get_bsr_asin_data 的重复代码
def _build_headers(self, url):
"""构建随机请求头,防反爬"""
n = random.randint(70, 114)
ua = (f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
f'(KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}'
f'.{random.randint(1, 181)} Safari/537.36')
headers = {
'connection': 'close',
'authority': self.host,
'accept': 'text/html,*/*',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
'origin': url,
'referer': url,
'sec-ch-ua-mobile': '?0',
'user-agent': ua,
}
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n']
k = ""
for i in (0, random.randint(0, 15)):
k += random.choice(alphabet)
headers[k] = str(uuid.uuid4())
return headers
# ⑤ 抽取验证码检测,消除两处重复的 5 个 or 判断
def _is_captcha(self, text):
"""检测页面是否出现验证码"""
return any(kw in text for kw in [
"Enter the characters you see below",
"Geben Sie die Zeichen unten ein",
"Introduce los caracteres que se muestran",
"Saisissez les caractères que vous voyez",
"Inserisci i caratteri visualizzati nello spazio",
])
# ② 抽取 h1 标题解析,消除 html_4 里的多站点 if/elif
def _parse_h1_name(self, response):
"""从页面 h1 解析 NSR 分类名称"""
prefix = self._NSR_TITLE_PREFIX.get(self.site_name, '')
if not prefix:
return None
h1_list = response.xpath(f"//h1[contains(text(),'{prefix}')]/text()")
if h1_list:
return h1_list[0].replace(prefix, '').strip()
return None
def parse_url(self, nodes_num: int, url: str):
"""
统一解析链接获取 bsr 分类id等数据
:param nodes_num:
:param url:
:return:
"""
"""统一解析链接获取 nsr 分类id等数据"""
arr = url.split("/")
if not "ref=" in url:
if "ref=" not in url:
ref_suffix = None
category_id = self.safeIndex(arr, len(arr) - 1, None)
category_first_id = self.safeIndex(arr, len(arr) - 2, None)
......@@ -148,6 +191,7 @@ class nsr_catgory(BaseUtils):
ref_suffix = self.safeIndex(arr, len(arr) - 1, None)
category_first_id = self.safeIndex(arr, len(arr) - 3, None)
category_id = self.safeIndex(arr, len(arr) - 2, None)
if nodes_num == 1:
level = 1
elif url.endswith("_0"):
......@@ -157,29 +201,17 @@ class nsr_catgory(BaseUtils):
else:
level = 4
# 获取 parent id
if level == 1:
# 根节点
category_id = "0"
category_first_id = None
category_parent_id = None
elif level == 2:
# 一级节点
category_id = category_id
category_first_id = category_id
category_parent_id = "0"
elif level == 3:
# 一级节点下的次级节点
category_id = category_id
category_first_id = category_first_id
category_parent_id = category_first_id
elif level == 4:
category_id = category_id
category_first_id = category_first_id
if ref_suffix is not None:
category_parent_id = ref_suffix[ref_suffix.rfind("_") + 1:]
else:
category_parent_id = None
category_parent_id = ref_suffix[ref_suffix.rfind("_") + 1:] if ref_suffix else None
else:
category_id = None
category_first_id = None
......@@ -191,75 +223,56 @@ class nsr_catgory(BaseUtils):
"category_parent_id": category_parent_id
}
# ⑥ 修复拼写错误 get_cookeis → get_cookies,空 cookie 返回 '' 而非短字符串
def get_cookies(self):
if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie()
self.cookie_dict_delete_id = cookies_dict
for ck in cookies_dict.values():
self.cookies_queue.put(ck)
while 1:
cookie_str = self.cookies_queue.get()
if len(cookie_str) > 50:
try:
cookie_list = json.loads(cookie_str)
except:
cookie_list = eval(cookie_str)
cookie_dic = {}
try:
for i in cookie_list:
if i:
cookie_dic[i["name"]] = i["value"]
cookie_str = ''.join(f'{k}={v};' for k, v in cookie_dic.items())
return cookie_str
except:
cookie_str = ''.join(f'{k}={v};' for k, v in cookie_list.items())
return cookie_str
else:
return '' # 修复:原来返回短字符串,现在明确返回空串
def html_4(self, bum):
print('bum 当前请求的层级:', bum)
while True:
if self.name_path_queue.empty() == False:
if not self.name_path_queue.empty():
querys = self.name_path_queue.get()
print('请求=====:', querys)
bsr_url = querys[2]
bsr_class_name = querys[1]
bsr_id = querys[0]
if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie()
self.cookie_dict_delete_id = cookies_dict
for ck in cookies_dict.values():
self.cookies_queue.put(ck)
while 1:
cookie_str = self.cookies_queue.get()
if len(cookie_str) > 50:
try:
cookie_lsit = json.loads(cookie_str)
except:
cookie_lsit = eval(cookie_str)
cookie_dic = {}
try:
for i in cookie_lsit:
if i:
cookie_dic[i["name"]] = i["value"]
else:
continue
cookie_str = ''
for k, v in cookie_dic.items():
cookie_str = cookie_str + str(k) + '=' + str(v) + ';'
break
except:
cookie_str = ''
for k, v in cookie_lsit.items():
cookie_str = cookie_str + str(k) + '=' + str(v) + ';'
break
else:
break
n = random.randint(70, 114)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
headers = {
'connection': 'close',
'authority': self.host,
'accept': 'text/html,*/*',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
'origin': f'{bsr_url}',
'referer': f'{bsr_url}',
'sec-ch-ua-mobile': '?0',
'user-agent': ua
}
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n']
k = ""
for i in (0, random.randint(0, 15)):
k += random.choice(alphabet)
headers[k] = str(uuid.uuid4())
cookie_str = self.get_cookies() # ⑥ 修正拼写
headers = self._build_headers(bsr_url) # ④ 使用抽取的方法
headers["cookie"] = cookie_str
num = 0
while True:
try:
sess.mount(self.site_url, py_ja3.DESAdapter())
resp = sess.get(bsr_url, headers=headers,
timeout=20, verify=False)
if ("Enter the characters you see below" in resp.text) or (
"Geben Sie die Zeichen unten ein" in resp.text) or (
"Introduce los caracteres que se muestran" in resp.text) or (
"Saisissez les caractères que vous voyez" in resp.text) or (
"Inserisci i caratteri visualizzati nello spazio" in resp.text):
resp = requests.get(bsr_url, headers=headers, timeout=20, impersonate="chrome")
if self._is_captcha(resp.text): # ⑤ 使用抽取的方法
print('请求出现验证码:')
time.sleep(random.uniform(1.5, 5.5))
num += 1
if num > 20:
self._safe_switch_ip()
num = 0
continue
if resp.status_code == 200 or resp.status_code == 201:
response = etree.HTML(resp.text)
......@@ -268,252 +281,168 @@ class nsr_catgory(BaseUtils):
except Exception as e:
print("报错 继续 请求", e)
time.sleep(random.uniform(1.5, 5.5))
num += 1
if num > 20:
self._safe_switch_ip()
num = 0
continue
xpath_herf_list = [
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name}")]/parent::div/following-sibling::div[@role="group"]/div[@role="treeitem"]/a'
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name}")]/parent::div/parent::div//a/@href',
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name}")]/parent::div/parent::div/parent::div//a/@href',
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name[0:5]}")]/parent::div/parent::div/parent::div//a/@href']
for xpath_herf in xpath_herf_list:
a_2_list = response.xpath(xpath_herf)
break
if self.site_name == 'us' or self.site_name == 'uk':
h1_bsr_name_lsit = response.xpath("//h1[contains(text(),'Best Sellers in')]/text()")
if h1_bsr_name_lsit:
h1_bsr_name = h1_bsr_name_lsit[0].replace("Best Sellers in", '').strip()
else:
h1_bsr_name = None
elif self.site_name == 'de':
h1_bsr_name_lsit = response.xpath("//h1[contains(text(),'Bestseller in')]/text()")
if h1_bsr_name_lsit:
h1_bsr_name = h1_bsr_name_lsit[0].replace("Bestseller in", '').strip()
else:
h1_bsr_name = None
elif self.site_name == 'fr':
h1_bsr_name_lsit = response.xpath("//h1[contains(text(),'Les meilleures ventes en')]/text()")
if h1_bsr_name_lsit:
h1_bsr_name = h1_bsr_name_lsit[0].replace("Les meilleures ventes en", '').strip()
else:
h1_bsr_name = None
elif self.site_name == 'es':
h1_bsr_name_lsit = response.xpath("//h1[contains(text(),'Los más vendidos en')]/text()")
if h1_bsr_name_lsit:
h1_bsr_name = h1_bsr_name_lsit[0].replace("Los más vendidos en", '').strip()
else:
h1_bsr_name = None
elif self.site_name == 'it':
h1_bsr_name_lsit = response.xpath("//h1[contains(text(),'Bestseller in')]/text()")
if h1_bsr_name_lsit:
h1_bsr_name = h1_bsr_name_lsit[0].replace("Bestseller in", '').strip()
else:
h1_bsr_name = None
# self.id_and_en_name_list 1 代表还有 子分类 2代表没有子分类,为最终节点
bsr_url_category_id_list = self.parse_url(bum - 1, bsr_url)
# ⑦ 修复 Bug:原来 a_2_list 赋值后立即被 for...break 覆盖,现在使用单一 xpath
a_2_list = response.xpath(
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name}")]/parent::div'
f'/following-sibling::div[@role="group"]/div[@role="treeitem"]/a')
h1_bsr_name = self._parse_h1_name(response) # ② 使用抽取的方法
bsr_url_category_id_dict = self.parse_url(bum - 1, bsr_url)
print('bsr_url_category_id_dict', bsr_url_category_id_dict)
print('获取下一级数量:', len(a_2_list))
if a_2_list:
self.id_and_en_name_list.append((bsr_id, h1_bsr_name, 1, bsr_url_category_id_list['category_id'],
bsr_url_category_id_list['category_parent_id']))
self.id_and_en_name_list.append((bsr_id, h1_bsr_name, 1,
bsr_url_category_id_dict['category_id'],
bsr_url_category_id_dict['category_parent_id']))
name_2_href_list = []
for a_2 in a_2_list:
name_2 = a_2.xpath('./text()')[0].strip()
name_2_href = a_2.xpath('./@href')[0].strip()
print('下一级类目链接::', name_2_href)
href = self.site_url + name_2_href
id_pid_data_list = self.parse_url(bum, href)
print(id_pid_data_list)
id_pid_data_dict = self.parse_url(bum, href)
print(id_pid_data_dict)
self.insert_id_pid_list.append(
[name_2, href, id_pid_data_list['category_id'], id_pid_data_list['category_parent_id']])
# 父级类目自增id 类目名称 二级类目, 链接
self.insert_list.append((bsr_id, name_2, bum, href, id_pid_data_list['category_id'],
id_pid_data_list['category_parent_id']))
# 存储当前分类id。,和页面获取的下一级url
[name_2, href, id_pid_data_dict['category_id'], id_pid_data_dict['category_parent_id']])
self.insert_list.append((bsr_id, name_2, bum, href,
id_pid_data_dict['category_id'],
id_pid_data_dict['category_parent_id']))
name_2_href_list.append(href)
self.catgory_next_path_list.append([bsr_id, '|-|'.join(name_2_href_list)])
# ⑧ 修复 Bug:原来用 bsr_id(自增id),改为 category_id 才能正确匹配 update_delete_time
self.catgory_next_path_list.append(
[bsr_url_category_id_dict['category_id'], '|-|'.join(name_2_href_list)])
else:
self.id_and_en_name_list.append((bsr_id, h1_bsr_name, 2, bsr_url_category_id_list['category_id'],
bsr_url_category_id_list['category_parent_id']))
self.id_and_en_name_list.append((bsr_id, h1_bsr_name, 2,
bsr_url_category_id_dict['category_id'],
bsr_url_category_id_dict['category_parent_id']))
print(self.year_week, '====当前周===', self.week)
ele_next = response.xpath(
'//a[contains(text(), "Weiter")]/@href|//a[contains(text(), "Next")]/@href|//a[contains(text(), "Page suivante")]/@href|//a[contains(text(), "Suivant")]/@href|//a[contains(text(), "Avanti")]/@href|//a[contains(text(), "Siguiente")]/@href|//a[contains(text(), "Seite")]/@href|//a[contains(text(), "siguiente")]/@href|//a[contains(text(), "Pagina")]/@href')
'//a[contains(text(), "Weiter")]/@href|//a[contains(text(), "Next")]/@href'
'|//a[contains(text(), "Page suivante")]/@href|//a[contains(text(), "Suivant")]/@href'
'|//a[contains(text(), "Avanti")]/@href|//a[contains(text(), "Siguiente")]/@href'
'|//a[contains(text(), "Seite")]/@href|//a[contains(text(), "siguiente")]/@href'
'|//a[contains(text(), "Pagina")]/@href')
data_client_recs_list = response.xpath(
"//div[@class='p13n-desktop-grid']/@data-client-recs-list")
div_list = response.xpath("//div[@id='gridItemRoot']")
for div in div_list:
bsr_rank_list = div.xpath(".//span[@class='zg-bdg-text']/text()")
if bsr_rank_list:
bsr_rank = re.findall(r'#(\d+)', bsr_rank_list[0])[0]
else:
bsr_rank = None
href_list = div.xpath(".//a[@role='link']/@href")
if href_list:
try:
asin = re.findall('dp/(.*)/ref', href_list[0])[0]
except:
asin = None
else:
asin = None
price = None
rating = None
reviews = None
if asin:
self.asin_dict_[asin] = [bsr_id, asin, bsr_rank, price, rating, reviews, self.week,
self.year_month, bsr_url_category_id_list['category_id']]
if data_client_recs_list:
data_list = eval(data_client_recs_list[0])
data_list = json.loads(data_client_recs_list[0]) # ⑨ eval → json.loads
print('第一页获取asin:', len(data_list))
for data in data_list:
asin = data['id']
rank = data['metadataMap']['render.zg.rank']
if self.asin_dict_.get(asin):
tp_item = tuple(self.asin_dict_.get(asin))
self.item_list.append(tp_item)
else:
self.item_list.append(
(bsr_id, asin, int(rank), None, None, None, self.week, self.year_month,
bsr_url_category_id_list['category_id']))
self.item_list.append(
(bsr_id, asin, int(rank), None, None, None, self.week, self.year_month,
bsr_url_category_id_dict['category_id']))
if ele_next and data_client_recs_list:
self.get_bsr_asin_data(ele_next, bsr_id, bsr_url_category_id_list['category_id'], asin_dict_={})
self.get_bsr_asin_data(ele_next, bsr_id, bsr_url_category_id_dict['category_id'], asin_dict_={})
else:
break
def get_bsr_asin_data(self, ele_next, c_id, bsr_url_category_id, asin_dict_):
next_url = self.site_url + ele_next[0]
if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie()
self.cookie_dict_delete_id = cookies_dict
for ck in cookies_dict.values():
self.cookies_queue.put(ck)
while 1:
cookie_str = self.cookies_queue.get()
if len(cookie_str) > 50:
try:
cookie_lsit = json.loads(cookie_str)
except:
cookie_lsit = eval(cookie_str)
cookie_dic = {}
try:
for i in cookie_lsit:
if i:
cookie_dic[i["name"]] = i["value"]
else:
continue
cookie_str = ''
for k, v in cookie_dic.items():
cookie_str = cookie_str + str(k) + '=' + str(v) + ';'
break
except:
cookie_str = ''
for k, v in cookie_lsit.items():
cookie_str = cookie_str + str(k) + '=' + str(v) + ';'
break
else:
break
n = random.randint(70, 114)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
headers = {
'connection': 'close',
'authority': self.host,
'accept': 'text/html,*/*',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
'origin': f'{next_url}',
'referer': f'{next_url}',
'sec-ch-ua-mobile': '?0',
'user-agent': ua
}
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n']
k = ""
for i in (0, random.randint(0, 15)):
k += random.choice(alphabet)
headers[k] = str(uuid.uuid4())
headers["cookie"] = cookie_str
num = 0
while True:
try:
sess.mount(self.site_url, py_ja3.DESAdapter())
resp = sess.get(
next_url,
headers=headers,
timeout=15, verify=False)
if ("Enter the characters you see below" in resp.text) or (
"Geben Sie die Zeichen unten ein" in resp.text) or (
"Introduce los caracteres que se muestran" in resp.text) or (
"Saisissez les caractères que vous voyez" in resp.text) or (
"Inserisci i caratteri visualizzati nello spazio" in resp.text):
cookie_str = self.get_cookies() # ⑥
if not cookie_str:
cookie_str = self.get_cookies()
headers = self._build_headers(next_url) # ④
headers['Sec-Fetch-Site'] = 'same-origin'
headers['Sec-Fetch-Mode'] = 'cors'
headers['Sec-Fetch-Dest'] = 'empty'
headers['Accept'] = '*/*'
headers['X-Requested-With'] = 'XMLHttpRequest'
headers["cookie"] = cookie_str
print('请求第二页::', next_url)
resp = requests.get(next_url, headers=headers, timeout=15, impersonate="chrome")
if self._is_captcha(resp.text): # ⑤
print("出现验证码")
self.headers_num_int += 1
if self.headers_num_int > 150:
self.headers_num_int = 0
pppoe_ip()
time.sleep(5)
num += 1
if num > 20:
self._safe_switch_ip()
num = 0
time.sleep(random.uniform(3.5, 5.5))
continue
if resp.status_code == 200 or resp.status_code == 201:
response = etree.HTML(resp.text)
response.xpath("//span[@id='glow-ingress-line2']/text()")
break
else:
print('resp.status_code:L:', resp.status_code)
num += 1
if resp.status_code == 503:
self._safe_switch_ip()
num = 0
except Exception as e:
print("报错 继续 请求", e)
time.sleep(random.uniform(5.5, 15.5))
num += 1
if num > 20:
self._safe_switch_ip()
num = 0
continue
data_client_recs_list = response.xpath("//div[@class='p13n-desktop-grid']/@data-client-recs-list")
div_list = response.xpath("//div[@id='gridItemRoot']")
for div in div_list:
bsr_rank_list = div.xpath(".//span[@class='zg-bdg-text']/text()")
if bsr_rank_list:
bsr_rank = re.findall(r'#(\d+)', bsr_rank_list[0])[0]
else:
bsr_rank = None
href_list = div.xpath(".//a[@role='link']/@href")
if href_list:
try:
asin = re.findall('dp/(.*)/ref', href_list[0])[0]
except:
asin = None
else:
asin = None
price = None
rating = None
reviews = None
if asin:
asin_dict_[asin] = [c_id, asin, bsr_rank, price, rating, reviews, self.week, self.year_month,
bsr_url_category_id]
if data_client_recs_list:
data_list = eval(data_client_recs_list[0])
data_list = json.loads(data_client_recs_list[0]) # ⑨ eval → json.loads
print('第二页获取asin:', len(data_list))
for data in data_list:
asin = data['id']
rank = data['metadataMap']['render.zg.rank']
if asin_dict_.get(asin):
tp_item = tuple(asin_dict_.get(asin))
self.item_list.append(tp_item)
else:
self.item_list.append(
(c_id, asin, int(rank), None, None, None, self.week, self.year_month, bsr_url_category_id))
self.item_list.append(
(c_id, asin, int(rank), None, None, None, self.week, self.year_month, bsr_url_category_id))
else:
print('没有获取到第二页获取asin::', next_url)
def save_asin_data(self):
retry_count = 0
max_retries = 50
batch_size = 5000
df = None
pg_inserted = 0
df_ready = False
while True:
try:
print('存储:====', len(self.item_list), f'{self.site_name}_new_releases_top100_asin')
df = pd.DataFrame(data=self.item_list, columns=self.columns)
df['date_info'] = self.time_strftime_
print(df['date_info'])
df.drop_duplicates(['asin', 'bsr_rank', 'cate_current_id'], inplace=True) # 去重
print(df.shape, '111111111111')
self.engine.to_sql(df, f'{self.site_name}_new_releases_top100_asin', if_exists="append")
self.engine_pg = self.pg_connect()
if not df_ready:
print('存储:====', len(self.item_list), f'{self.site_name}_nsr_top100_asin_2026')
df = pd.DataFrame(data=self.item_list, columns=self.columns)
df['date_info'] = self.time_strftime_
print(df['date_info'])
df.drop_duplicates(['asin', 'bsr_rank', 'cate_current_id'], inplace=True)
print(df.shape, '111111111111')
df_ready = True
if df is not None:
for start in range(pg_inserted, len(df), batch_size):
batch_df = df.iloc[start:start + batch_size]
self.engine_pg.to_sql(batch_df, f'{self.site_name}_nsr_top100_asin_2026', if_exists="append")
pg_inserted = start + len(batch_df)
print(f'pg 批次插入成功: {start}-{pg_inserted}')
self.item_list = []
except Exception as e:
print(e, '报错存储')
self.mysql_reconnect(table_name=f'{self.site_name}_new_releases_top100_asin', e=e)
time.sleep(10)
continue
try:
self.engine_pg.to_sql(df, f'{self.site_name}_new_releases_top100_asin', if_exists="append")
except Exception as e:
print(e, '报错存储')
self.engine = self.mysql_connect()
print(e, '报错存储 pg')
retry_count += 1
if retry_count > max_retries:
print(f'超过最大重试次数 {max_retries},跳过pg存储')
break
if 'Network is unreachable' in str(e) or 'All servers failed' in str(e):
print('网络不可达,尝试切换IP恢复...')
pppoe_ip()
time.sleep(15)
else:
self.pg_reconnect(table_name=f'{self.site_name}_nsr_top100_asin_2026', e=e)
time.sleep(10)
self.engine_pg = self.pg_connect()
continue
break
def init_list(self):
......@@ -536,11 +465,11 @@ class nsr_catgory(BaseUtils):
category_url_list = category_path[1].split('|-|')
category_id = category_path[0]
path_sql = f"""
select id, category_id, category_parent_id, en_name, `path`, delete_time
from {self.site_name}_new_releases
where (category_parent_id = "{category_id}")
order by {self.site_name}_new_releases.category_id, category_parent_id;
"""
select id, category_id, category_parent_id, en_name, `path`, delete_time
from {self.site_name}_new_releases
where (category_parent_id = "{category_id}")
order by {self.site_name}_new_releases.category_id, category_parent_id;
"""
print('path_sql:', path_sql)
df_exist_rows = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=path_sql)
exist_rows = df_exist_rows.values.tolist()
......@@ -555,17 +484,15 @@ class nsr_catgory(BaseUtils):
_strftime_ = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print(_strftime_)
if group1_id:
if len(group1_id) == 1:
sql1 = f'update {self.site_name}_new_releases set delete_time = null where id in ({group1_id[0]})'
else:
sql1 = f"update {self.site_name}_new_releases set delete_time = null where id in {tuple(group1_id)}"
print('sql1::', sql1)
sql1 = (f'update {self.site_name}_new_releases set delete_time = null where id in ({group1_id[0]})'
if len(group1_id) == 1 else
f"update {self.site_name}_new_releases set delete_time = null where id in {tuple(group1_id)}")
print('sql1::', sql1)
self.db_cursor_connect_update(sql1, self.site_name)
if group2_id:
if len(group2_id) == 1:
sql2 = f"update {self.site_name}_new_releases set delete_time = '{_strftime_}' where id in ({group2_id[0]})"
else:
sql2 = f"update {self.site_name}_new_releases set delete_time = '{_strftime_}' where id in {tuple(group2_id)}"
sql2 = (f"update {self.site_name}_new_releases set delete_time = '{_strftime_}' where id in ({group2_id[0]})"
if len(group2_id) == 1 else
f"update {self.site_name}_new_releases set delete_time = '{_strftime_}' where id in {tuple(group2_id)}")
print('sql2::', sql2)
self.db_cursor_connect_update(sql2, self.site_name)
except Exception as e:
......@@ -575,22 +502,18 @@ class nsr_catgory(BaseUtils):
for name_num_path in self.insert_list:
save_name_num_list = []
while True:
# 不存在就插入
try:
select_sql_id = f'''SELECT id FROM {self.site_name}_new_releases WHERE `path`="{name_num_path[3]}"'''
select_sql_id = f'''SELECT id FROM {self.site_name}_new_releases WHERE `path`="{name_num_path[3]}"'''
print('select_sql_id:', select_sql_id)
df_id = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql_id)
if not df_id.empty:
save_name_num_list.append(name_num_path)
else:
select_sql_name = f'''SELECT en_name FROM {self.site_name}_new_releases WHERE `path`="{name_num_path[3]}" order by id desc'''
select_sql_name = f'''SELECT en_name FROM {self.site_name}_new_releases WHERE `path`="{name_num_path[3]}" order by id desc'''
print('select_sql_name:', select_sql_name)
df_en_name = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql_name)
print(df_en_name['en_name'][0], '33333333333333333333333', name_num_path[1])
if df_en_name['en_name'][0] == name_num_path[1]:
pass
else:
_strftime_ = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
if df_en_name['en_name'][0] != name_num_path[1]:
update_name_sql = f'''update {self.site_name}_new_releases set delete_time = '2023-06-19 00:00:00' WHERE `path`="{name_num_path[3]}" and delete_time is null'''
print('更新 en_name:', update_name_sql)
self.db_cursor_connect_update(update_name_sql, self.site_name)
......@@ -601,13 +524,13 @@ class nsr_catgory(BaseUtils):
time.sleep(10)
self.init_db(self.site_name)
continue
# # 插入新的数据
print('save_name_num_list::', save_name_num_list)
with self.engine.begin() as conn:
conn.execute(
f"insert into {self.site_name}_new_releases (p_id, en_name, nodes_num,path, category_id, category_parent_id) values (%s, %s,%s, %s,%s, %s)",
save_name_num_list)
if save_name_num_list:
with self.engine.begin() as conn:
conn.execute(
f"insert into {self.site_name}_new_releases (p_id, en_name, nodes_num, path, category_id, category_parent_id) values (%s, %s, %s, %s, %s, %s)",
save_name_num_list)
def run(self):
print(" run 函数 是抓取 分类节点,只新增,不删除")
......@@ -618,14 +541,14 @@ class nsr_catgory(BaseUtils):
name_1_list = self.db_read_data(num)
print('name_1_list:::', len(name_1_list))
if name_1_list:
if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie()
self.cookie_dict_delete_id = cookies_dict
for ck in cookies_dict.values():
self.cookies_queue.put(ck)
if num < 15:
for name_1 in name_1_list:
self.name_path_queue.put(name_1)
if self.cookies_queue.empty():
cookies_dict = self.reuests_para_val.get_cookie()
self.cookie_dict_delete_id = cookies_dict
for ck in cookies_dict.values():
self.cookies_queue.put(ck)
html_thread = []
for i in range(50):
thread2 = threading.Thread(target=self.html_4, args=(b_num + 1,))
......@@ -648,32 +571,30 @@ class nsr_catgory(BaseUtils):
elif num > 15:
break
num += 1
self.select_id_1()
def select_id_1(self):
# 查询 子节点的顶级父类id
print('查询 子节点的顶级父类id')
select_sql_1 = f'select id from {self.site_name}_new_releases where nodes_num=2'
df_id = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql_1)
df_id_lsit = df_id.values.tolist()
print(df_id_lsit)
for id in df_id_lsit:
df_id_list = df_id.values.tolist()
print(df_id_list)
for id_ in df_id_list:
try:
en_name_id_list = []
select_p_id = f"select t3.id,t4.en_name from (select t1.id,t1.parent_id,if(find_in_set(parent_id, @pids) > 0, @pids := concat(@pids, ',',id), 0) as ischild from (select id,p_id as parent_id from {self.site_name}_new_releases t order by p_id,id) t1,(select @pids := {id[0]}) t2) t3 LEFT JOIN {self.site_name}_new_releases t4 on t3.id = t4.id where ischild != 0;"
select_p_id = f"select t3.id,t4.en_name from (select t1.id,t1.parent_id,if(find_in_set(parent_id, @pids) > 0, @pids := concat(@pids, ',',id), 0) as ischild from (select id,p_id as parent_id from {self.site_name}_new_releases t order by p_id,id) t1,(select @pids := {id_[0]}) t2) t3 LEFT JOIN {self.site_name}_new_releases t4 on t3.id = t4.id where ischild != 0;"
print('select_p_id::', select_p_id)
df_all_id = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_p_id)
if not df_all_id.empty:
all_id_lsit = df_all_id.values.tolist()
for en_name_id in all_id_lsit:
all_id_list = df_all_id.values.tolist()
for en_name_id in all_id_list:
en_name_id_list.append(en_name_id[0])
id_tuple = tuple(en_name_id_list)
print(len(id_tuple))
if len(id_tuple) == 1:
update_sql = f"""UPDATE {self.site_name}_new_releases set one_category_id={id[0]} where id in ('{id_tuple[0]}')"""
update_sql = f"""UPDATE {self.site_name}_new_releases set one_category_id={id_[0]} where id in ('{id_tuple[0]}')"""
else:
update_sql = f'update {self.site_name}_new_releases set one_category_id={id[0]} where id in {id_tuple}'
update_sql = f'update {self.site_name}_new_releases set one_category_id={id_[0]} where id in {id_tuple}'
self.db_cursor_connect_update(update_sql, self.site_name)
except:
pass
......@@ -697,9 +618,7 @@ class nsr_catgory(BaseUtils):
print(sql_update)
conn.execute(sql_update)
categorys_list = list(df_read.id.astype("U") + '|-|' + df_read.en_name + '|-|' + df_read.path)
category_url_list = []
for i in categorys_list:
category_url_list.append(i.split('|-|'))
category_url_list = [i.split('|-|') for i in categorys_list]
return category_url_list
except Exception as e:
print(e)
......@@ -707,55 +626,46 @@ class nsr_catgory(BaseUtils):
self.init_db(self.site_name)
continue
def send_ms(self):
for i in range(3):
try:
url = 'http://selection.yswg.com.cn:8080/soundasia_selection/workflow/emit'
data = {"dateType": "day", "reportDate": self.time_strftime_, "statusVal": 3,
"siteName": self.site_name,
"remark": "new_releases 榜单爬取完毕 ", "isEnd": "是",
"tableName": f"{self.site_name}_new_releases_top100_asin",
"status": "new_releases 榜单爬取完毕"}
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
t = requests.post(url, headers=headers, json=data)
print(t.text)
break
except:
self.init_db(self.site_name)
time.sleep(10)
continue
def send_ms_count_data_num(self,site,num,day_time):
account = 'pengyanbing,chenjianyun,wujicang'
# def send_ms(self):
# for i in range(3):
# try:
# DolphinschedulerHelper.start_process_instance_common(
# project_name="big_data_selection",
# process_df_name='asin_nsr榜单统计日流程_api',
# startParams={
# "site_name": self.site_name,
# "date_type": "day",
# "date_info": self.time_strftime_
# }
# )
# break
# except:
# self.init_db(self.site_name)
# time.sleep(10)
# continue
def send_ms_count_data_num(self, site, num, day_time):
account = 'pengyanbing'
title = site + ' new 榜单'
content = f'{day_time} 新品 榜单 抓取完成 当日抓取总数 {num}'
url = 'http://47.112.96.71:8082/selection/sendMessage'
data = {
'account': account,
'title': title,
'content': content
}
data = {'account': account, 'title': title, 'content': content}
requests.post(url=url, data=data, timeout=15)
def updata_category_state(self):
for i in range(3):
try:
update_sql = f'UPDATE {self.site_name}_new_releases set category_state=1;'
print(update_sql)
self.db_cursor_connect_update(update_sql, self.site_name)
days = ((datetime.datetime.now()) + datetime.timedelta(days=-6)).strftime("%Y-%m-%d")
delect_sql = f"DELETE FROM {self.site_name}_new_releases_top100_asin WHERE date_info < '{days}';"
print(delect_sql)
self.db_cursor_connect_update(delect_sql, self.site_name)
self.engine_pg = self.pg_connect()
_0_days = ((datetime.datetime.now()) + datetime.timedelta(days=0)).strftime("%Y-%m-%d")
select_sql = f"select count(id) FROM {self.site_name}_new_releases_top100_asin WHERE date_info = '{_0_days}';"
select_sql = f"select count(id) AS cnt FROM {self.site_name}_nsr_top100_asin_2026 WHERE date_info = '{_0_days}';"
print(select_sql)
df_count_data_num = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql)
count_data_num = df_count_data_num['count(id)'][0]
df_count_data_num = self.engine_pg.read_sql(select_sql)
count_data_num = df_count_data_num['cnt'][0]
print('count_data_num::', count_data_num)
self.send_ms_count_data_num(self.site_name,count_data_num,_0_days)
self.send_ms_count_data_num(self.site_name, count_data_num, _0_days)
break
except Exception as e:
print(e, '222222222')
......@@ -764,7 +674,6 @@ class nsr_catgory(BaseUtils):
updata_sql_none = f"UPDATE {self.site_name}_new_releases set and_en_name=NULL WHERE and_en_name='None'"
self.db_cursor_connect_update(updata_sql_none, self.site_name)
def update_sql(self):
with self.engine_pg.begin() as conn:
up_sql = "update user_collection_syn set state=1 where state=2"
......@@ -777,17 +686,8 @@ if __name__ == '__main__':
for site in ['us', 'de', 'uk']:
spider_us = nsr_catgory(site_name=site)
spider_us.run()
spider_us.send_ms()
if site == 'us':
spider_us.update_sql()
for site in ['us', 'de', 'uk']:
spider_us = nsr_catgory(site_name=site)
spider_us.updata_category_state()
for site in ['us', 'de', 'uk']:
from amazon_every_day_spider.get_BsrCatgory_asin_rank import bsr_catgory
spider_us = bsr_catgory(site_name=site)
spider_us.run()
spider_us.update_category_state()
spider_us.run_update_redirect_flag()
spider_us.updata_category_first_id()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment