Commit 16efe939 by Peng

update: utils 模块多项优化与功能扩展

[asin_parse.py]
- ParseAsinUs 新增 response_s 参数,支持外部传入 etree 解析树,避免重复解析
- 新增搜索框分类(search_category)字段解析
- 分类路径新增 all_nodeid(全路径节点 ID 拼接)及空值保护
- initialSeenAsins 解析新增空列表保护,防止 IndexError
- AI 评论按钮解析改用 data-testid 定位,过滤无效短文本
- 修复 es 站品牌解析变量名冲突 bug
- 修复评论数去括号逻辑及德站 Sternebewertung 判断错误
- 修复日期解析变量名 data_time 误用 bug
- 移除优惠券动态 XPATH 追加逻辑,防止重复追加

[db_connect.py]
- BaseUtils.__init__ 新增 site_name 参数(默认 us),不再硬编码
- 新增 doris_connect_adv() 连接 Doris 广告库(advertising_manager)
- 新增 doris_adv_direct_connect() pymysql 直连 Doris 广告库
- 清理旧版 SQLAlchemy 注释代码及调试 print 语句
- Kafka 超时 30000→40000ms,linger_ms 150→350,重试 10→5 次

[requests_param.py]
- 新增 next_page_lock 线程锁和 headers_num_int_s 属性,支持多线程分页
- 修复中文检测 check_str 为 None 时报错的 bug
- cookie 加载上限 300→350 条
- 修正请求头 accept-Encodin 拼写错误,移除 authority 字段
- 请求超时 10→30 秒,提升慢速页面成功率

[params_asin_xpath.py]
- 全站点 review_ai_list / review_button_list 新增 data-testid XPath,兼容新版 AI 评论结构
- 全站点新增 fbm_delivery_price 字段,采集 FBM 配送运费
- 全站点新增 search_category 字段,采集搜索框当前分类
- US/UK/DE/FR 站点 td_0_text 新增多条 XPath,适配 Amazon 最新页面结构

[check_columns.py]
- 取消 __main__ 注释,支持脚本直接运行

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
parent 4c8ae1db
......@@ -301,7 +301,7 @@ class spider_check(BaseUtils):
bytes(json.dumps(item), 'utf-8').decode('unicode_escape'))
# if __name__ == '__main__':
# spider_check('us').send_ms()
if __name__ == '__main__':
spider_check('us').send_ms()
# spider_check('de').send_ms()
# spider_check('uk').send_ms()
......@@ -14,78 +14,43 @@ import redis
from utils.secure_db_client import get_remote_engine
class BaseUtils(object):
def __init__(self):
self.site_name = 'us'
def __init__(self,site_name='us'):
self.site_name = site_name
self.engine = self.mysql_connect()
def pg_connect(self):
# db_type_alias_map = {
# "mysql": "mysql", # 阿里云mysql
# "postgresql_14": "postgresql_14", # pg14爬虫库-内网
# "postgresql_14_outer": "postgresql_14_outer", # pg14爬虫库-外网
# "postgresql_15": "postgresql_15", # pg15正式库-内网
# "postgresql_15_outer": "postgresql_15_outer", # pg15正式库-外网
# "postgresql_cluster": "postgresql_cluster", # pg集群-内网
# "postgresql_cluster_outer": "postgresql_cluster_outer", # pg集群-外网
# "doris": "doris", # doris集群-内网
# }
engine_pg = get_remote_engine(
site_name=self.site_name, # -> database "selection"
db_type="postgresql_14_outer", # -> 服务端 alias "mysql"
)
print('engine_pg::',engine_pg)
return engine_pg
# while True:
# try:
# if self.site_name == 'us':
# db = 'selection'
# else:
# db = f'selection_{self.site_name}'
# engine_pg = create_engine(
# f"postgresql+psycopg2://{PG_CONN_DICT['pg_user']}:{PG_CONN_DICT['pg_pwd']}@{PG_CONN_DICT['pg_host']}:{PG_CONN_DICT['pg_port']}/{db}",
# encoding='utf-8', connect_args={"connect_timeout": 10, "keepalives": 1,
# "keepalives_idle": 40, # 40s 空闲后开始发 心跳链接
# "keepalives_interval": 20, # 每 20s 发一次
# "keepalives_count": 10}, #在空闲 40 秒后,每 20 秒发一次探测,总共探测 10 次
# pool_recycle=900, # 太老的连接(15mi)强制回收,避免中间网络设备回收后无法用,池中连接存活 15 min 后丢弃
# pool_size=60, # 根据并发量适当设置
# max_overflow=40)
# return engine_pg
# except Exception as e:
# print("pg_connect 14 t11111111111111111111111:", e, f"\n{traceback.format_exc()}")
# time.sleep(3)
# continue
def doris_connect_adv(self):
engine_doris = get_remote_engine(
site_name=self.site_name, # -> database "selection"
db_type="doris_adv", # -> 服务端 alias "mysql"
database="advertising_manager", # -> 服务端 alias "mysql"
)
return engine_doris
def doris_adv_direct_connect(self):
"""直连 Doris 广告库(不走代理,用于高速读取和更新)"""
import pymysql
from amazon_params.params import DORIS_ADV_DIRECT_CONN
conn = pymysql.connect(**DORIS_ADV_DIRECT_CONN)
return conn
def doris_connect(self):
engine_doris = get_remote_engine(
site_name=self.site_name, # -> database "selection"
db_type="doris", # -> 服务端 alias "mysql"
)
print('engine_pg::', engine_doris)
return engine_doris
# nums = 0
# while True:
# nums += 1
# try:
# db = 'us_spider'
# # 设置连接参数字典,包括连接超时参数
# connect_args = {
# "connect_timeout": 10
# }
# return create_engine(
# f'mysql+pymysql://{DORIS_CONN["mysql_user"]}:' + f'{DORIS_CONN["mysql_pwd"]}@{DORIS_CONN["mysql_host"]}:{DORIS_CONN["mysql_port"]}/{db}?charset=utf8mb4',
# connect_args=connect_args, poolclass=NullPool)
# except Exception as e:
# print("doris_connect22222222222222222222222222:", e, f"\n{traceback.format_exc()}")
# time.sleep(3)
# continue
def pg_connect_6(self):
engine_pg15 = get_remote_engine(
site_name=self.site_name, # -> database "selection"
db_type="postgresql_15_outer", # -> 服务端 alias "mysql"
)
print('engine_pg15::', engine_pg15)
return engine_pg15
def pg_reconnect(self, table_name=None, e=None):
......@@ -153,7 +118,6 @@ class BaseUtils(object):
time.sleep(5)
def kafuka_connect(self, kafka_html_connect=None, bootstrap_servers=None, acks=None, connections_max_idle_ms=60000):
request_timeout_ms = 30000
if kafka_html_connect:
bootstrap_servers = '61.145.136.61:20092'
else:
......@@ -172,17 +136,17 @@ class BaseUtils(object):
sasl_plain_password='R8@xY3pL!qz',
value_serializer=str.encode,
max_request_size=10000120,
request_timeout_ms=request_timeout_ms,
max_block_ms=30000, # 阻塞超时时间设置为60秒
request_timeout_ms=40000,
max_block_ms=40000, # 阻塞超时时间设置为60秒
compression_type='gzip', # 启用消息压缩
acks=1 if acks else 0, # 根据需要设置 acks, # 等待所有副本确认接收
connections_max_idle_ms=connections_max_idle_ms, # 一分钟释放链接
max_in_flight_requests_per_connection=1000,
linger_ms=150, # 增加等待时间
linger_ms=350, # 增加等待时间
batch_size=16384 if acks else 0, # 增加批处理大小
api_version=(2, 4, 1), # 我的kafka版本是2.4.1
retries=10, # 自动重试
retry_backoff_ms=500
retries=5, # 自动重试
retry_backoff_ms=600
)
return producer
except Exception as e:
......
......@@ -115,25 +115,26 @@ class ParseSearchTermUs(object):
def parse_asin_zr(self):
"""
返回通过data-asin匹配到的所有asin中,排除了sb、sp的对应asin --> 剩余zrasin
通过 data-index + data-asin 且不含 AdHolder 的 div 获取ZR自然排名ASIN
"""
asin_all = self.etree_html.xpath('//div[@data-asin]/@data-asin')
asin_all_str = "-".join(asin_all).replace('/', '')
asin_all = re.findall("(\w+)", asin_all_str)
self.asin_all = asin_all # 保留原始列表给 parse_buy 用
# 用集合排除 SB/SP 的 ASIN,避免 list.remove() 只删第一个导致漏删
# 先去重保持页面顺序,再排除已识别的 SB 和 SP
exclude_set = set(self.sb_list_all) | set(self.sp_list_all)
asin_unique = list(dict.fromkeys(asin_all))
zr_list = [a for a in asin_unique if a not in exclude_set]
return zr_list
# 保留原始全量列表给 parse_buy 用
asin_all_raw = self.etree_html.xpath('//div[@data-asin]/@data-asin')
asin_all_str = "-".join(asin_all_raw).replace('/', '')
self.asin_all = re.findall("(\w+)", asin_all_str)
# ZR: 主搜索结果中没有 AdHolder 的项
zr_items = self.etree_html.xpath('//div[@data-index and @data-asin and not(contains(@class, "AdHolder"))]')
zr_asin_list = []
for item in zr_items:
asin = item.get('data-asin', '').strip()
if asin and len(asin) >= 9 and asin not in zr_asin_list:
zr_asin_list.append(asin)
return zr_asin_list
def parse_type_common(self, asin_list=None, cate_type=None):
"""
asin_list: list
"""
asin_list = list(dict.fromkeys(asin_list)) # 去重保序
asin_list.sort(key=lambda a: self.asin_position_map.get(a, 9999)) # 按 data-index 页面位置排序,无 data-index 的排最后
asin_list = list(dict.fromkeys(asin_list)) # 去重保序,保持xpath返回的页面顺序
asin_detail_all_list = []
cate_type_copy = 1
asin_detail_dict = {
......@@ -319,6 +320,21 @@ class ParseSearchTermUs(object):
print(self.search_term,' 页数:',self.page,'广告asin:',asin)
if asin and len(asin) >= 9 and asin not in sp_asin_list:
sp_asin_list.append(asin)
# 标签下的SP广告位(如 Customers frequently viewed、Today's deals)
if self.site_name == 'de':
sp_label = 'Gesponsert'
elif self.site_name == 'us' or self.site_name == 'uk':
sp_label = 'Sponsored'
else:
sp_label = 'Sponsored'
tag_asin_list = self.etree_html.xpath(
f'//span[@class="a-declarative"]/span[contains(text(),"{sp_label}")]/../../../../../../../../div/following-sibling::span[2]//div/@data-asin|//span/a[contains(text(),"{sp_label}")]/../../../../../../../../div/following-sibling::span[2]//div/@data-asin')
if tag_asin_list:
for asin in tag_asin_list:
if asin and len(asin) >= 9 and asin not in sp_asin_list:
sp_asin_list.append(asin)
self.sp_list_all = sp_asin_list.copy() # 供 parse_asin_zr 排除用
if sp_asin_list:
sp_asin_list.sort(key=lambda a: self.asin_position_map.get(a, 9999))
......@@ -632,3 +648,4 @@ if __name__ == '__main__':
print('sb_list:' ,sb_list)
import hashlib
# import requests
import json
import os
import random
......@@ -7,11 +6,11 @@ import re
import sys
import time
import uuid
from urllib.parse import urlparse
from threading import Lock
import urllib3
from lxml import etree
# py -3.9 -m pip pyinstaller 指定pip 安装
# py -3.10 -m pip install -r E:\Git_new\spider\yswg-agent\requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from amazon_params.params import DB_REQUESTS_ASIN_PARAMS
from utils.db_connect import BaseUtils
......@@ -35,6 +34,8 @@ class Requests_param_val(BaseUtils):
print("站点名称:", self.site_name, '抓取项目', "代理ip:", self.proxy_name)
self.cookies_queue = Queue() # cookie队列
self.kafuka_producer_str = self.kafuka_connect()
self.next_page_lock = Lock()
self.headers_num_int_s = 0
def init_db_names(self):
self.engine_pg = self.pg_connect()
......@@ -67,7 +68,7 @@ class Requests_param_val(BaseUtils):
"""
判断获取文本是否有中文
"""
if check_str != '无':
if check_str and check_str != '无':
for c in check_str:
if '\u4e00' <= c <= '\u9fa5':
print('--是中文,说明该cookie有问题,或者改数据有问题--')
......@@ -117,7 +118,7 @@ class Requests_param_val(BaseUtils):
if num:
sql_read = f'SELECT cookies,id FROM {self.db_cookies} limit {num};'
else:
sql_read = f'SELECT cookies,id FROM {self.db_cookies} limit 300;'
sql_read = f'SELECT cookies,id FROM {self.db_cookies} limit 350;'
print("获取cookie:", sql_read)
df_read = self.engine.read_sql(sql_read)
clientPriceList = list(df_read.cookies + "|-|" + df_read.id.astype("U"))
......@@ -195,31 +196,26 @@ class Requests_param_val(BaseUtils):
# 组装请求头,
def requests_amazon_headers(self, host=None, site_url=None, asin=None, scraper_url=None):
n = random.randint(120, 142)
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 6900)}.{random.randint(1, 181)} Safari/537.36'
# Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36
ua = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{n}.0.{random.randint(1000, 5000)}.{random.randint(1, 181)} Safari/537.36'
# ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
headers = {
'connection': 'close',
'authority': urlparse(self.site_url).hostname,
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh-CN,zh;q=0.9',
'accept-Encodin': 'gzip, deflate, br, zstd',
'accept-encoding': 'gzip, deflate, br, zstd',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded;charset=UTF-8',
'sec-ch-ua-mobile': '?0',
'user-agent': ua,
"pragma": "no-cache",
}
if asin:
headers['origin'] = f'{site_url}dp/{asin}'
headers['referer'] = f'{site_url}?th=1'
if scraper_url:
headers['origin'] = scraper_url
headers['referer'] = scraper_url
alphabet = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r',
's', 't', 'u', 'v', 'w', 'x', 'y', 'z']
k = ""
......@@ -229,11 +225,11 @@ class Requests_param_val(BaseUtils):
return headers
# 第二次发送请求。
def requests_amazon(self, headers=None, scraper_url=None):
def requests_amazon(self, headers=None, scraper_url=None, sess=None):
for i in range(2):
try:
resp = requests.get(scraper_url, headers=headers, impersonate="chrome",
timeout=10, verify=False)
timeout=30, verify=False)
if self.check_amazon_yzm(resp):
print('验证码2222222222222222')
continue
......@@ -262,22 +258,7 @@ class Requests_param_val(BaseUtils):
# 获取对应每个小时的数字。存到redis列表中
def get_hour(self, new_date_hour):
# new_date_hour = datetime.now().strftime("%Y-%m-%d:%H")
# 获取当前日期
current_date = datetime.now()
# 将当前时间的小时、分钟和秒设置为0
current_date = current_date.replace(hour=0, minute=0, second=0, microsecond=0)
# 生成当天的24小时时间
hourly_times = [current_date + timedelta(hours=i) for i in range(24)]
hour_dict = {}
# 打印每个小时的时间
for hour_time in hourly_times:
hour = hour_time.strftime("%Y-%m-%d:%H")
num = re.findall(r':(\d+)', hour)[0]
hour_dict[hour] = num
print(new_date_hour, hour_dict)
n = hour_dict[new_date_hour]
return n
return str(datetime.now().hour)
# 组装cookie
def get_cookie_str(self, cookies_queue):
......@@ -305,11 +286,11 @@ class Requests_param_val(BaseUtils):
break
if self.site_name == 'uk':
cookie_str = cookie_str.replace('i18n-prefs=HKD;', 'i18n-prefs=GBP;').replace('i18n-prefs=USD;', 'i18n-prefs=GBP;')
cookie_str = cookie_str.replace('i18n-prefs=HKD', 'i18n-prefs=GBP').replace('i18n-prefs=USD', 'i18n-prefs=GBP')
elif self.site_name == 'de':
cookie_str = cookie_str.replace('i18n-prefs=HKD;', 'i18n-prefs=EUR;').replace('i18n-prefs=USD;', 'i18n-prefs=EUR;')
cookie_str = cookie_str.replace('i18n-prefs=HKD', 'i18n-prefs=EUR').replace('i18n-prefs=USD', 'i18n-prefs=EUR')
elif self.site_name == 'us':
cookie_str = cookie_str.replace('i18n-prefs=HKD;', 'i18n-prefs=USD;')
cookie_str = cookie_str.replace('i18n-prefs=HKD', 'i18n-prefs=USD')
return cookie_str
# 获取自增id区间。根据传的站点获取对应的月 周 syn表的id
......@@ -353,7 +334,7 @@ class Requests_param_val(BaseUtils):
def hex_md5(self, input_string):
# 创建一个MD5哈希对象
md5_hash = hashlib.md5()
# 使用输入字符串的字节更新哈希对象
# 使用输入字符串的字节更新哈希对象items.pop('div_id_list', None)
md5_hash.update(input_string.encode('utf-8'))
# 获取哈希的十六进制表示
md5_hex_digest = md5_hash.hexdigest()
......@@ -367,24 +348,25 @@ class Requests_param_val(BaseUtils):
def send_kafka(self, items=None, html_data=None, topic=None):
print('向Kafka发送数据')
for i in range(5):
if items:
items.pop('div_id_list', None)
for i in range(3):
try:
if items:
print('232323232323')
del items['div_id_list']
future = self.kafuka_producer_str.send(topic, json.dumps(items))
future.add_callback(self.on_send_success).add_errback(self.on_send_error)
if html_data:
future = self.kafuka_producer_str.send(topic, html_data)
future.add_callback(self.on_send_success).add_errback(self.on_send_error)
print('向Kafka发送数据 发送成功')
break
except KafkaTimeoutError:
print(f'Kafka flush超时,第{i+1}次重试')
if i >= 2:
self.kafuka_producer_str = self.kafuka_connect()
except Exception as e:
print(e)
if i >= 1:
self.kafuka_producer_str = self.kafuka_connect() # 调用kafka
try:
self.kafuka_producer_str.flush(timeout=30)
except KafkaTimeoutError as e:
print("flush 超时,跳过这次等待:", e)
if i >= 2:
self.kafuka_producer_str = self.kafuka_connect()
if __name__ == '__main__':
Requests_param_val().get_cookie(num=1)
\ No newline at end of file
DEFAULT_USER = "fangxingjun"
DEFAULT_USER_TOKEN = "fxj_token_123"
DEFAULT_USER = "pengyanbing"
DEFAULT_USER_TOKEN = "8f3b9d2a4c7e58b1"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment