Commit b15406ae by Peng

no message

parent 067652bd
import uuid
import base64
import json
from datetime import timedelta
from flask import Flask, request, jsonify, g
from flask_bcrypt import Bcrypt
import redis
import pymysql
from pymysql.err import OperationalError
app = Flask(__name__)
# 输出 JSON 时保留非 ASCII 字符(如中文)
app.config['JSON_AS_ASCII'] = False
DB_CONFIG = {
'host': '120.77.232.73',
'port': 3306,
'user': 'yswg_it_cangchu',
'password': 'Yswg@inv-cangchu241011420',
'db': 'inventory'
}
# REDIS_CONN = {
# "redis_host": "120.77.232.73",
# "redis_port": 6379,
# "redis_pwd": "yswgInventoryTest@202307#",
# "redis_db": 1
#
# }
REDIS_CONN = {
"redis_host": "113.100.143.162",
"redis_port": 6379,
"redis_pwd": "fG7#vT6kQ1pX",
"redis_db": 14
}
def mysql_db_conn():
connect_mysql_db = pymysql.connect(host=DB_CONFIG['host'], port=DB_CONFIG['port'],
user=DB_CONFIG['user'],
password=DB_CONFIG['password'], database=DB_CONFIG['db'],
charset="utf8mb4")
return connect_mysql_db
def redis_db_conn():
redis_client = redis.Redis(host=REDIS_CONN['redis_host'], port=REDIS_CONN['redis_port'],
password=REDIS_CONN['redis_pwd'], db=REDIS_CONN['redis_db'])
return redis_client
# 初始化Flask-Bcrypt
dbcrypt = Bcrypt(app)
# 生成 UUID
def generate_token():
raw = uuid.uuid4().bytes
token = base64.urlsafe_b64encode(raw).rstrip(b'=').decode('utf-8')
return token
# 公共方法:获取当前请求的用户信息
def get_current_user():
"""
从 flask.g 获取当前用户信息(dict),如果不存在返回 None
"""
return getattr(g, 'current_user', None)
# 中间件:在每次请求前校验 token(除登录外)
@app.before_request
def verify_token_middleware():
# 登录接口不需要校验 token
if request.endpoint == 'login':
return None
token = request.headers.get('inventory-token')
print(' 从请求的 headers 获取 token: ', token)
if not token:
return jsonify({'code': 401, 'error': '缺少 inventory-token'})
redis_client = redis_db_conn()
user_json = redis_client.get(token)
redis_client.close()
print('根据 token 去redis查询 是否过期')
if not user_json:
return jsonify({'code': 401, 'error': '无效或已过期的 token'})
try:
g.current_user = json.loads(user_json)
except json.JSONDecodeError:
return jsonify({'code': 500, 'error': '解析信息失败'})
@app.route('/index', methods=['GET'])
def index_():
user = get_current_user()
print('打印请求用户的信息:', user)
return jsonify({
'message': f'欢迎,{user["name"]}!',
'user_id': user['id']
})
@app.route('/user/members/index', methods=['GET'])
def user_index():
user = get_current_user()
print('打印请求用户的信息:', user)
return jsonify({
'message': f'欢迎,{user["name"]}!',
'user_id': user['id']
})
@app.route('/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
print(username)
print(password)
if not username or not password:
return jsonify({"code": 400, 'error': '用户名和密码不能为空'})
# SQL 查询用户
try:
conn = mysql_db_conn()
# 设置 pymysql.cursors.DictCursor 字典输出 格式
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
sql = f"SELECT `password`,id,name,email FROM users WHERE wechat_id='{username}' LIMIT 1"
print(sql)
cursor.execute(sql)
user = cursor.fetchone()
print('查询用户是否存在:', user)
except OperationalError:
# 数据库连接失败,请检查配置
return jsonify({"code": 500, 'error': '服务端 错误'})
finally:
try:
conn.close()
except:
pass
if user is None:
return jsonify({"code": 404, 'error': '用户不存在'})
# 检查密码
if not dbcrypt.check_password_hash(user['password'], password):
return jsonify({"code": 401, 'error': '密码 或 用户名 错误'})
# 生成token并存储到Redis中
token = generate_token()
user_obj = {
'id': user['id'],
'name': user['name'],
'email': user['email'],
}
print(token)
# 序列化时保留中文
redis_client = redis_db_conn()
redis_client.setex(token, timedelta(hours=23), json.dumps(user_obj, ensure_ascii=False))
redis_client.close()
return jsonify({"code": 200, 'token': token}), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
import os
import sys
from sqlalchemy import text
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.db_connect import BaseUtils
from flask import Flask, request, jsonify
import pandas as pd
app = Flask(__name__)
def db_mysql_connect():
mysql_db = BaseUtils().mysql_connect()
return mysql_db
def select_workflow_progress(sql_select):
print('查询语句:',sql_select)
mysql_connect = db_mysql_connect()
df = pd.read_sql(sql_select, con=mysql_connect)
if df.values:
return list(df['status_val'])[0]
else:
return None
def update_workflow_progress(sql_update):
print('更新语句:',sql_update)
mysql_connect = db_mysql_connect()
stmt = text(sql_update)
with mysql_connect.begin() as conn:
conn.execute(stmt)
@app.route('/workflow_progress', methods=['POST'])
def get_args():
print(request.form) # 打印所有表单数据
site_name = request.form.get('site_name')
sql = request.form.get('sql')
sql_type = request.form.get('sql_type')
if sql_type == 'select':
status_val = select_workflow_progress(sql)
items = {"code": 200, "message": "select success", 'status_val': status_val}
elif sql_type == 'update':
update_workflow_progress(sql)
items = {"code": 200, "message": "update success", 'status_val': 1}
else:
items = {"code": 400, "message": "Parameter error"}
return jsonify(items)
if __name__ == '__main__':
app.run('0.0.0.0', 10249)
...@@ -3,25 +3,35 @@ import sys ...@@ -3,25 +3,35 @@ import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
import curl_cffi import curl_cffi
from DrissionPage import ChromiumPage, ChromiumOptions from curl_cffi import requests as cffi_requests
from DrissionPage.common import Keys import hashlib
import json import json
import time import time
from utils.secure_db_client import get_remote_engine from utils.secure_db_client import get_remote_engine
import random import random
# ===== 原始排名(50000以内)=====
# RANK_LIST = [
# 1, 10, 30, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000,
# *range(1100, 10001, 100),
# *range(11000, 21000, 1000),
# 25000, 30000, 35000, 40000, 45000, 50000
# ]
# ACCOUNTS = [
# ['18823832416', '18823832416qaz'],
# ['15368051270', '123456'],
# ['18307967347', 'Aa123456.'],
# ['qq16531218653@163.com', 'qq16531218653'],
# ]
# ===== 大排名(50000以上,只用18823832416账号)=====
RANK_LIST = [ RANK_LIST = [
1, 10, 30, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 55000, 60000, 65000, 70000, 75000, 80000, 85000, 90000, 95000, 100000,
*range(1100, 10001, 100), 150000, 200000, 250000, 300000, 350000, 400000, 450000, 500000, 550000,
*range(11000, 21000, 1000), 600000, 650000, 700000, 750000, 800000, 850000, 900000, 950000, 1000000
25000, 30000, 35000, 40000, 45000, 50000
] ]
ACCOUNTS = [ ACCOUNTS = [
['18823832416', '18823832416qaz'], ['18823832416', '18823832416qaz'],
['15368051270', '123456'],
['18307967347', 'Aa123456.'],
['qq16531218653@163.com', 'qq16531218653'],
] ]
...@@ -73,46 +83,49 @@ def db_cursor_connect_msyql_read(site, sql): ...@@ -73,46 +83,49 @@ def db_cursor_connect_msyql_read(site, sql):
def sellersprite_login(account): def sellersprite_login(account):
"""传入 [username, password],返回 cookies_dict""" """纯 Python 登录卖家精灵,返回 cookies_dict
print('登录账号:', account[0]) 密码加密逻辑(来自 JS 逆向):
chrome_options = ChromiumOptions() password 字段 = MD5(原始密码)
chrome_options.set_browser_path(r'C:\Program Files\Google\Chrome\Application\chrome.exe') salt 字段 = MD5(邮箱 + MD5(原始密码))
chrome_options.set_local_port(9333) POST 到 /w/user/signin
"""
page_chrome = ChromiumPage(addr_or_opts=chrome_options) def md5(s):
page_chrome.get("https://www.sellersprite.com/cn/w/user/login") return hashlib.md5(s.encode()).hexdigest()
page_chrome.set.window.max()
page_chrome.set.cookies.clear() email = account[0]
time.sleep(random.randint(1, 3)) raw_pwd = account[1]
page_chrome.refresh() password_md5 = md5(raw_pwd)
time.sleep(random.randint(1, 3)) salt = md5(email + password_md5)
page_chrome.get("https://www.sellersprite.com/cn/w/user/login")
time.sleep(random.randint(6, 10)) print(f'登录账号:{email}')
session = cffi_requests.Session(impersonate="chrome")
page_chrome.ele('xpath://a[text()="账号登录"]', timeout=10).click()
print('点击账号登录') session.get("https://www.sellersprite.com/cn/w/user/login", timeout=30)
time.sleep(random.randint(5, 10))
resp = session.post(
email_input = page_chrome.ele('xpath://div[@id="form_signin_password"]//input[@name="email"]') "https://www.sellersprite.com/w/user/signin",
email_input.clear() data={
email_input.input(account[0]) "email": email,
print("已输入账号") "password": password_md5,
time.sleep(random.randint(5, 10)) "salt": salt,
"autoLogin": "Y",
password_input = page_chrome.ele('xpath://div[@id="form_signin_password"]//input[@type="password"]') "callback": ""
password_input.clear() },
password_input.input(account[1]) headers={
time.sleep(random.randint(5, 10)) "Referer": "https://www.sellersprite.com/cn/w/user/login",
page_chrome.actions.type(Keys.ENTER) "Origin": "https://www.sellersprite.com",
time.sleep(random.randint(5, 10)) },
timeout=30,
page_chrome.get('https://www.sellersprite.com/v2/tools/sales-estimator') allow_redirects=True
time.sleep(random.randint(5, 10)) )
cookies = {c['name']: c['value'] for c in page_chrome.cookies()} cookies_dict = dict(session.cookies)
print('获取到 cookies,key数量:', len(cookies)) print(f'登录完成,cookies 数量:{len(cookies_dict)}')
page_chrome.close()
return cookies if 'rank-login-user' not in cookies_dict and 'Sprite-X-Token' not in cookies_dict:
print(f'登录可能失败,响应URL: {resp.url}, 状态码: {resp.status_code}')
return cookies_dict
def fetch_rank_sales(db_base, c_name, c_id, rank, cookies_dict): def fetch_rank_sales(db_base, c_name, c_id, rank, cookies_dict):
...@@ -160,6 +173,7 @@ def sellersprite_spider(db_base): ...@@ -160,6 +173,7 @@ def sellersprite_spider(db_base):
print(c_name, c_id) print(c_name, c_id)
name_rnak_list = [] name_rnak_list = []
zero_sales_break = False
# state → 2 标记处理中 # state → 2 标记处理中
db_cursor_connect_update( db_cursor_connect_update(
...@@ -198,13 +212,17 @@ def sellersprite_spider(db_base): ...@@ -198,13 +212,17 @@ def sellersprite_spider(db_base):
break break
if est == 0.0: if est == 0.0:
print(f"{c_name} 排名{rank}:销量 0,跳出循环") print(f"{c_name} 排名{rank}:销量 0,跳出循环")
zero_sales_break = True
break break
name_rnak_list.append((c_name, rank, int(est), year_month)) name_rnak_list.append((c_name, rank, int(est), year_month))
time.sleep(random.uniform(10, 25.75)) time.sleep(random.uniform(10, 25.75))
# 空列表不入库,state 重置回 1 等待下次重跑 # 空列表不入库
if not name_rnak_list: if not name_rnak_list:
if zero_sales_break:
print(f'{c_name} 销量为0,真实无数据,跳过')
else:
print(f'{c_name} 无数据,跳过入库,state 重置回 1') print(f'{c_name} 无数据,跳过入库,state 重置回 1')
db_cursor_connect_update( db_cursor_connect_update(
f"UPDATE all_site_category set state=1 WHERE site='{db_base}' and c_id='{c_id}'", f"UPDATE all_site_category set state=1 WHERE site='{db_base}' and c_id='{c_id}'",
...@@ -217,7 +235,7 @@ def sellersprite_spider(db_base): ...@@ -217,7 +235,7 @@ def sellersprite_spider(db_base):
try: try:
engine_db = mysql_connect(site=db_base) engine_db = mysql_connect(site=db_base)
with engine_db.begin() as conn: with engine_db.begin() as conn:
conn.executemany(insert_sql, name_rnak_list) conn.execute(insert_sql, name_rnak_list)
db_cursor_connect_update( db_cursor_connect_update(
f"UPDATE all_site_category set state=3 WHERE site='{db_base}' and state=2 and c_id='{c_id}'", f"UPDATE all_site_category set state=3 WHERE site='{db_base}' and state=2 and c_id='{c_id}'",
'us' 'us'
...@@ -233,7 +251,7 @@ def sellersprite_spider(db_base): ...@@ -233,7 +251,7 @@ def sellersprite_spider(db_base):
def run(): def run():
for site in ['us', 'de', 'uk']: for site in ['us','de', 'uk']:
sellersprite_spider(site) sellersprite_spider(site)
......
import os
import json
import html as html_module
from lxml import etree
root_dir = r'C:\Users\ASUS\Desktop\新建文件夹\新建文件夹 (2)'
for fname in os.listdir(root_dir):
if not fname.lower().endswith('.html'):
continue
file_path = os.path.join(root_dir, fname)
if not os.path.isfile(file_path):
continue
# 1) 读取并解析 HTML
print(file_path)
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
response_s = etree.HTML(content)
# 分别抓取所有 carousel 的 data-options 和它们的标题 h2
data_options_list = response_s.xpath(
"//div[@data-marketplaceid='ATVPDKIKX0DER']/@data-a-carousel-options")
h2_list = response_s.xpath("//div[@data-marketplaceid='ATVPDKIKX0DER']//h2/text()")
result = {}
result_sp = {}
result_list = []
# Customers also search us_B0D4QGW5RX.html
data_sp_list = response_s.xpath(
"//div[@class='a-column a-span8']/h2[contains(@class,'carousel-heading')]/text()")
for sp_h2 in data_sp_list:
print(sp_h2)
if sp_h2 != 'Videos':
data_sp = response_s.xpath(
f"""//div[@class='a-column a-span8']/h2[contains(text(),"{sp_h2}")]/parent::div/parent::div/parent::div/parent::div/@data-a-carousel-options""")
if data_sp:
decoded_sp = html_module.unescape(data_sp[0])
decoded_sp = json.loads(decoded_sp)
if decoded_sp.get('ajax'):
inner_sp_h2_list = decoded_sp.get('ajax', {}).get('id_list', [])
sp_h2_asin_list = [item.split('|')[0] for item in inner_sp_h2_list]
if sp_h2_asin_list:
result_sp[sp_h2] = sp_h2_asin_list
if result_sp:
result_list.append(result_sp)
if h2_list and data_options_list:
count = min(len(data_options_list), len(h2_list))
for i in range(count):
raw_json_str = data_options_list[i]
title = h2_list[i].strip()
# 解码 → 解析 → 提取 id 列表
decoded = html_module.unescape(raw_json_str)
outer = json.loads(decoded)
inner_list = outer.get('ajax', {}).get('id_list', [])
asin_list = [json.loads(item)['id'] for item in inner_list]
result[title] = asin_list
if result:
result_list.append(result)
h2_str_list = response_s.xpath(
'//h2[contains(@class,"a-spacing-medium")]/text()|//div[@class="a-column a-span8"]/h2[contains(@class,"carousel-heading")]/text()')
if h2_str_list:
for h2_str in h2_str_list:
if h2_str != 'Videos':
data_asin_list = response_s.xpath(
f"""//h2[contains(text(),"{h2_str}")]/parent::div/parent::div//@data-asin|//h2[contains(text(),"{h2_str}")]/parent::div/parent::div/parent::div//@data-asin""")
print('h2_str_list::', h2_str, data_asin_list)
if data_asin_list:
result[h2_str] = data_asin_list
result_list.append(result)
print('result_list 广告流量ASIN:', result_list)
if result_list:
result_list_json = json.dumps(result_list, ensure_ascii=False)
else:
result_list_json = None
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment