Commit 3650247a by Peng

no message

parent 77b04aaa
...@@ -3,8 +3,6 @@ import sys ...@@ -3,8 +3,6 @@ import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
import curl_cffi import curl_cffi
from lxml import etree
# from threading_spider.db_connectivity import connect_db
from DrissionPage import ChromiumPage, ChromiumOptions from DrissionPage import ChromiumPage, ChromiumOptions
from DrissionPage.common import Keys from DrissionPage.common import Keys
import json import json
...@@ -12,233 +10,231 @@ import time ...@@ -12,233 +10,231 @@ import time
from utils.secure_db_client import get_remote_engine from utils.secure_db_client import get_remote_engine
import random import random
num_list = [] RANK_LIST = [
1, 10, 30, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000,
*range(1100, 10001, 100),
# # 获取所有站点的bsr 大类名称 和 分类id,存储到us站点 *range(11000, 21000, 1000),
def get_cid(): 25000, 30000, 35000, 40000, 45000, 50000
url = 'https://www.sellersprite.com/v2/tools/sales-estimator' ]
headers = {
ACCOUNTS = [
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", ['18823832416', '18823832416qaz'],
"Accept-Encoding": "gzip, deflate, br, zstd", ['15368051270', '123456'],
"Accept-Language": "zh-CN,zh-TW;q=0.9,zh;q=0.8", ['18307967347', 'Aa123456.'],
"Cache-Control": "no-cache", ['qq16531218653@163.com', 'qq16531218653'],
"User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", ]
}
resp = curl_cffi.get(url, headers=headers, impersonate="chrome")
html = etree.HTML(resp.text) class AccountManager:
data_category = html.xpath("//script[@id='data-category']/text()")[0] def __init__(self):
print(data_category) self.index = 0
self.fail_count = 0
def current(self):
return ACCOUNTS[self.index]
def switch_next(self):
self.fail_count += 1
if self.fail_count >= len(ACCOUNTS):
print('所有账号已轮换一轮,睡眠2小时')
time.sleep(7200)
self.fail_count = 0
self.index = 0
else:
self.index = self.fail_count
print(f'切换到账号[{self.index}]:{self.current()[0]}')
def mysql_connect(site='us'): def mysql_connect(site='us'):
engine_mysql = get_remote_engine( return get_remote_engine(site_name=site, db_type='mysql')
site_name=site, # -> database "selection"
db_type='mysql', # -> 服务端 alias "mysql"
)
return engine_mysql
def db_cursor_connect_update(sql, site): def db_cursor_connect_update(sql, site):
for i in range(3): for i in range(3):
try: try:
engine_us_mysql = mysql_connect(site=site) engine = mysql_connect(site=site)
print('更新sql:', sql) print('更新sql:', sql)
with engine_us_mysql.begin() as conn: with engine.begin() as conn:
conn.execute(sql) conn.execute(sql)
break break
except: except:
print(site, 'db_cursor_connect 报错:', sql) print(site, 'db_cursor_connect 报错:', sql)
def db_cursor_connect_msyql_read(site, select_state1_sql): def db_cursor_connect_msyql_read(site, sql):
for i in range(3): for i in range(3):
try: try:
engine_mysql = mysql_connect(site=site) engine = mysql_connect(site=site)
df = engine_mysql.read_sql(select_state1_sql) return engine.read_sql(sql)
return df
except Exception as e: except Exception as e:
import traceback import traceback
traceback.print_exc() # ★ 打印完整栈到终端 traceback.print_exc()
print(e, 'db_cursor_connect_msyql_read 报错:', select_state1_sql) print(e, 'db_cursor_connect_msyql_read 报错:', sql)
def sellersprite_spider(db_base): def sellersprite_login(account):
month = time.strftime("%m") """传入 [username, password],返回 cookies_dict"""
year = time.strftime("%Y") print('登录账号:', account[0])
year_month = f'{year}_{int(month)}'
category_name_sql_select = f"select `name`,c_id from all_site_category where site='{db_base}' and state =1"
print(category_name_sql_select)
category_name_list_df = db_cursor_connect_msyql_read('us', category_name_sql_select)
print(category_name_list_df)
category_name_list = list(category_name_list_df['name'] + '|-|==|' + category_name_list_df['c_id'])
cookies_dict = sellersprite_login(num=1) # 调用自动化登录账号 num使用第几个账号
for category_name_id in category_name_list:
print(category_name_id, '2323232323')
c_name = category_name_id.split('|-|==|')[0]
c_id = category_name_id.split('|-|==|')[1]
print(c_name, c_id)
name_rnak_list = []
up_sql = f"UPDATE all_site_category set state=2 WHERE site='{db_base}' and state=1 and c_id='{c_id}'"
db_cursor_connect_update(up_sql, 'us')
rank_list = [1, 10, 30, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100, 1200, 1300, 1400, 1500,
1600,
1700,
1800,
1900,
2000, 2100, 2200, 2300, 2400, 2500, 2600, 2700, 2800, 2900, 3000, 3100, 3200, 3300, 3400, 3500,
3600,
3700,
3800, 3900, 4000, 4100, 4200, 4300, 4400, 4500, 4600, 4700, 4800, 4900, 5000, 5100, 5200, 5300,
5400,
5500,
5600, 5700, 5800, 5900, 6000, 6100, 6200, 6300, 6400, 6500, 6600, 6700, 6800, 6900, 7000, 7100,
7200,
7300,
7400, 7500, 7600, 7700, 7800, 7900, 8000, 8100, 8200, 8300, 8400, 8500, 8600, 8700, 8800, 8900,
9000,
9100,
9200, 9300, 9400, 9500, 9600, 9700, 9800, 9900, 10000, 11000, 12000, 13000, 14000, 15000, 16000,
17000,
18000, 19000, 20000, 25000, 30000, 35000, 40000, 45000, 50000
]
for i in rank_list:
headers = {
"Referer": "https://www.sellersprite.com/v2/tools/sales-estimator",
"Origin": "https://www.sellersprite.com",
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh-TW;q=0.9,zh;q=0.8",
"Cache-Control": "no-cache",
"User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
}
url = "https://www.sellersprite.com/v2/tools/sales-estimator/bsr.json"
data = {
"station": db_base.upper(),
"cid": c_id, # 分类id
"bsr": f"{i}" # 排名
}
print(c_name, '请求参数 data::', data)
for i1 in range(3):
try:
response = curl_cffi.post(url, headers=headers, data=data, impersonate="chrome", timeout=300,
cookies=cookies_dict)
print(response.url)
# print(response.text)
response = json.loads(response.text)
break
except:
time.sleep(random.uniform(15, 30.75))
try:
response_data = response['data']
print('code::', response['code'])
print('message::', response['message'])
print('estMonSales::', response_data['estMonSales'])
est = response_data.get('estMonSales')
if est is None:
# 没拿到数据,跳出循环
break
if est == 0.0:
print(f"{c_name} 排名{i}:销量 0,跳出循环。")
break
print(type(est))
print('获取数据:', c_name, i, est, year_month)
sales = int(est)
name_rnak_list.append((c_name, i, sales, year_month))
time.sleep(random.uniform(20, 45.75))
# break
except Exception as e:
print(e,5555555)
time.sleep(10) # # 调用自动化登录账号 报错 账号被封禁了。切换下一个账号
cookies_dict = sellersprite_login(num=0)
for i in range(4):
try:
inset_sql = f"INSERT INTO {db_base}_one_category (name, rank,orders,`year_month`) values (%s, %s, %s, %s)"
print(inset_sql)
engine_db_msyql = mysql_connect(site=db_base)
with engine_db_msyql.begin() as conn:
conn.execute(
f"INSERT INTO {db_base}_one_category (name, rank,orders,`year_month`) values (%s, %s, %s, %s)",
name_rnak_list)
up_sql = f"UPDATE all_site_category set state=3 WHERE site='{db_base}' and state=2 and c_id='{c_id}'"
print('更新状态:', up_sql)
db_cursor_connect_update(up_sql, 'us')
break
except Exception as e:
print('存储失败:', e)
time.sleep(20)
print('当前完成。获取下一个分类销量')
time.sleep(random.uniform(50, 120.5))
def sellersprite_login(num=2):
global num_list
num_list.append(num)
print('num_list',num_list)
if len(num_list) > 2:
num = 2
if len(num_list)>4:
num = 1
if len(num_list)>5:
print('睡眠')
num_list = []
num = 0
time.sleep(14400)
user_list = [['18307967347', 'Aa123456.'], ['qq16531218653@163.com', 'qq16531218653'], ['15368051270', '123456']]
print('登录账号:', user_list[num])
# 配置 Chrome 浏览器 - 端口 9222
chrome_options = ChromiumOptions() chrome_options = ChromiumOptions()
chrome_options.set_browser_path(r'C:\Program Files\Google\Chrome\Application\chrome.exe') chrome_options.set_browser_path(r'C:\Program Files\Google\Chrome\Application\chrome.exe')
chrome_options.set_local_port(9333) # 设置 Chrome 的调试端口 chrome_options.set_local_port(9333)
page_chrome = ChromiumPage(addr_or_opts=chrome_options) page_chrome = ChromiumPage(addr_or_opts=chrome_options)
print(f"Chrome 浏览器运行在端口: {9333}")
page_chrome.get("https://www.sellersprite.com/cn/w/user/login") page_chrome.get("https://www.sellersprite.com/cn/w/user/login")
page_chrome.set.window.max() page_chrome.set.window.max()
page_chrome.set.cookies.clear() page_chrome.set.cookies.clear()
time.sleep(random.randint(1, 3)) time.sleep(random.randint(1, 3))
page_chrome.refresh() page_chrome.refresh()
# 等待页面初始加载
time.sleep(random.randint(1, 3)) time.sleep(random.randint(1, 3))
page_chrome.get("https://www.sellersprite.com/cn/w/user/login") page_chrome.get("https://www.sellersprite.com/cn/w/user/login")
time.sleep(random.randint(6, 10)) time.sleep(random.randint(6, 10))
export_orders = page_chrome.ele('xpath://a[text()="账号登录"]', timeout=10) page_chrome.ele('xpath://a[text()="账号登录"]', timeout=10).click()
export_orders.click()
print('点击账号登录') print('点击账号登录')
time.sleep(random.randint(5, 10)) time.sleep(random.randint(5, 10))
email_input = page_chrome.ele('xpath://div[@id="form_signin_password"]//input[@name="email"]') email_input = page_chrome.ele('xpath://div[@id="form_signin_password"]//input[@name="email"]')
email_input.clear() # 清除任何预填充的内容 email_input.clear()
email_input.input(user_list[num][0]) # 输入文本password email_input.input(account[0])
print("已输入账号到邮箱输入框") print("已输入账号")
time.sleep(random.randint(5, 10)) time.sleep(random.randint(5, 10))
password_input = page_chrome.ele('xpath://div[@id="form_signin_password"]//input[@type="password"]') password_input = page_chrome.ele('xpath://div[@id="form_signin_password"]//input[@type="password"]')
password_input.clear() # 清除任何预填充的内容 password_input.clear()
password_input.input(user_list[num][1]) password_input.input(account[1])
time.sleep(random.randint(5, 10)) time.sleep(random.randint(5, 10))
page_chrome.actions.type(Keys.ENTER) page_chrome.actions.type(Keys.ENTER)
time.sleep(random.randint(5, 10)) time.sleep(random.randint(5, 10))
page_chrome.get('https://www.sellersprite.com/v2/tools/sales-estimator') page_chrome.get('https://www.sellersprite.com/v2/tools/sales-estimator')
time.sleep(random.randint(5, 10)) time.sleep(random.randint(5, 10))
original_cookies_list = page_chrome.cookies()
# 将 cookies 列表转换为字典 cookies = {c['name']: c['value'] for c in page_chrome.cookies()}
original_cookie_dict = {cookie['name']: cookie['value'] for cookie in original_cookies_list} print('获取到 cookies,key数量:', len(cookies))
print('original_cookie_dict::', original_cookie_dict)
page_chrome.close() page_chrome.close()
return original_cookie_dict return cookies
def fetch_rank_sales(db_base, c_name, c_id, rank, cookies_dict):
"""请求单个 rank 的销量,返回 response dict 或 None(3次全部失败)"""
headers = {
"Referer": "https://www.sellersprite.com/v2/tools/sales-estimator",
"Origin": "https://www.sellersprite.com",
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh-TW;q=0.9,zh;q=0.8",
"Cache-Control": "no-cache",
"User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
}
url = "https://www.sellersprite.com/v2/tools/sales-estimator/bsr.json"
data = {"station": db_base.upper(), "cid": c_id, "bsr": f"{rank}"}
print(c_name, '请求参数 data::', data)
for _ in range(3):
try:
resp = curl_cffi.post(url, headers=headers, data=data,
impersonate="chrome", timeout=300, cookies=cookies_dict)
return json.loads(resp.text)
except:
time.sleep(random.uniform(5, 20.75))
return None
def sellersprite_spider(db_base):
month = time.strftime("%m")
year = time.strftime("%Y")
year_month = f'{year}_{int(month)}'
# 同时捞 state IN (1,2),state=2 是上次崩溃遗留的,允许重跑
sql = f"select `name`,c_id from all_site_category where site='{db_base}' and state in (1,2)"
category_name_list_df = db_cursor_connect_msyql_read('us', sql)
print(category_name_list_df)
category_name_list = list(category_name_list_df['name'] + '|-|==|' + category_name_list_df['c_id'])
account_mgr = AccountManager()
cookies_dict = sellersprite_login(account_mgr.current())
for category_name_id in category_name_list:
c_name = category_name_id.split('|-|==|')[0]
c_id = category_name_id.split('|-|==|')[1]
print(c_name, c_id)
name_rnak_list = []
# state → 2 标记处理中
db_cursor_connect_update(
f"UPDATE all_site_category set state=2 WHERE site='{db_base}' and state in (1,2) and c_id='{c_id}'",
'us'
)
for rank in RANK_LIST:
response = fetch_rank_sales(db_base, c_name, c_id, rank, cookies_dict)
if response is None:
print(f"{c_name} cid={c_id} rank={rank} 请求3次全部失败,跳过")
continue
# 先判断 response_data 是否为 None,再取字段
response_data = response.get('data')
print('code::', response.get('code'), 'message::', response.get('message'))
if not response_data:
print(f'{c_name} rank={rank} data为空,可能cookie失效,切换账号重试')
account_mgr.switch_next()
cookies_dict = sellersprite_login(account_mgr.current())
# 换号后重试当前 rank
response = fetch_rank_sales(db_base, c_name, c_id, rank, cookies_dict)
if response is None:
continue
response_data = response.get('data')
if not response_data:
continue
est = response_data.get('estMonSales')
print('estMonSales::', est)
if est is None:
print(f"{c_name} 排名{rank}:estMonSales=None,跳出循环")
break
if est == 0.0:
print(f"{c_name} 排名{rank}:销量 0,跳出循环")
break
name_rnak_list.append((c_name, rank, int(est), year_month))
time.sleep(random.uniform(10, 25.75))
# 空列表不入库,state 重置回 1 等待下次重跑
if not name_rnak_list:
print(f'{c_name} 无数据,跳过入库,state 重置回 1')
db_cursor_connect_update(
f"UPDATE all_site_category set state=1 WHERE site='{db_base}' and c_id='{c_id}'",
'us'
)
continue
insert_sql = f"INSERT INTO {db_base}_one_category (name, rank, orders, `year_month`) values (%s, %s, %s, %s)"
for attempt in range(10):
try:
engine_db = mysql_connect(site=db_base)
with engine_db.begin() as conn:
conn.executemany(insert_sql, name_rnak_list)
db_cursor_connect_update(
f"UPDATE all_site_category set state=3 WHERE site='{db_base}' and state=2 and c_id='{c_id}'",
'us'
)
print(f'{c_name} 入库完成,共 {len(name_rnak_list)} 条')
break
except Exception as e:
print(f'存储失败(第{attempt + 1}次):', e)
time.sleep(10)
print('当前完成。获取下一个分类销量')
time.sleep(random.uniform(30, 60.5))
def run(): def run():
for i in [ 'uk', 'de']: for site in ['us', 'de', 'uk']:
sellersprite_spider(i) sellersprite_spider(site)
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment