Commit 58d7dda5 by Peng

no message

parent 354a62ee
# -*- coding: utf-8 -*-
import random
import time
import traceback
import json
import redis
from DrissionPage import ChromiumPage, ChromiumOptions
from lxml import etree
class get_1688_order_data():
def __init__(self):
self.account = 'szcrgs'
self.pwd = 'aass369874.'
self.redis_db22 = self.redis_db()
self.alipay_data_list = []
def redis_db(self):
nums = 0
while True:
nums += 1
try:
redis_db22 = redis.Redis(host='r-wz9hlm6rorysnh17n7pd.redis.rds.aliyuncs.com', port=6379,
password='redis120:oHmd0NBYso1bXkYR3e00qU', db=22)
return redis_db22
except Exception as e:
print("redis_db 链接报错:", e, f"\n{traceback.format_exc()}")
time.sleep(3)
continue
def get_1688(self, list_orders):
# 配置 Chrome 浏览器 - 端口 9333
chrome_options = ChromiumOptions()
chrome_options.set_browser_path(r'C:\Program Files\Google\Chrome\Application\chrome.exe')
chrome_options.set_local_port(9333) # 设置 Chrome 的调试端口
chrom_page = ChromiumPage(addr_or_opts=chrome_options)
print(f"Chrome 浏览器运行在端口: {9333}")
self.get_order_tab(chrom_page, list_orders) # 请求订单页面。判断是否保持登录状态
def get_order_tab(self, chrom_page, list_orders):
url = 'https://air.1688.com/app/ctf-page/trade-order-list/buyer-order-list.html?tradeStatus=waitbuyerpay&spm=a260k.home2025.topmenu.dmyorder_popup_ddaifukuan&page=1&pageSize=10'
chrom_page.get(url)
time.sleep(random.randint(3, 6))
print('请求订单页面检测是否需要登录')
iframe = chrom_page.get_frame('#alibaba-login-box')
if iframe:
time.sleep(random.randint(1, 2))
h3_ = iframe.ele('xpath://input[contains(@aria-label,"请输入验证码")]') # 要继续,请输入发送到您电子邮件中的代码
if h3_:
print('检测需要登录。调用登录接口。')
self.login_1688(chrom_page)
if '已买到的货品' in chrom_page.html:
print('保持登录状态')
self.get_order_data(chrom_page, list_orders)
else:
print('需要人工进行手动确认。是否登录')
def login_1688(self, chrom_page):
chrom_page.get(
'https://login.taobao.com/?redirect_url=httpshttps://login.taobao.com/%3A%2F%2Flogin.1688.com%2Fmember%2Fjump.htm%3Ftarget%3Dhttps%253A%252F%252Flogin.1688.com%252Fmember%252FmarketSigninJump.htm%253FDone%253Dhttps%25253A%25252F%25252Fwww.1688.com%25253A443%25252F_____tmd_____%25252Fpage%25252Flogin_jump%25253Frand%25253DS3WxGHAgAt756EpznwfNzJq2AFA2qBNla3j6EINUS8We9dazM_iKElp8DwVSHZUevpC41Bx7RzivXIj9RnZgdg%252526_lgt_%25253D6b36f87a1938864b4e4d7e2396e81231___323066___61ca74e102fbceff8562f216f5565aa8___837b211a0c5c4d0311617da5fff37e257cad377890b4c45ccdf2cb0fd662949d0116866dff8a1320c7757932b793220a13c46e566f12ff28abc9bc86bbfd47676192af987253fb967084f2fe7063a67fb91cf1d97d7c03718d68e1ef493a2535c00d7d23ca3d1c435aa4532fb803554b6f04e307de4626a5a8d4138f2b0177e07b501633e2b96cd1d746a9b8ed94be932ffb746c5161b5bad21d9f1905d6eb902e2e5f6e3bc4089a61b81b6820ff47c1d181f94b131e5e19c785ea01a20d985fa3143bc70f7b40669fd876c1b8445641c35794192085452a88f6b88a40b38f45219a53e031ec6d1b176451e64f2fc695fd2621df67503eb624a099b86eeae8133e9e4aa20d6c048c55ca311f686a5da590bd22562b42cc57ee7b4734e99a60f2a8dd6bbac7f55fc3e2e9cb50d023a600b239ce59eb9c3b119a3aad7871959b14&style=tao_custom&from=1688web')
# 查找并点击登录按钮
email_input = chrom_page.ele('xpath://input[@name="fm-login-id"]')
email_input.clear() # 清除任何预填充的内容
email_input.input(self.account) # 输入文本
print("已输入账号到邮箱输入框")
time.sleep(random.randint(2, 4))
pwd_input = chrom_page.ele('xpath://input[@name="fm-login-password"]')
pwd_input.clear() # 清除任何预填充的内容
pwd_input.input(self.pwd) # 输入文本
print("已输入密码到密码输入框")
time.sleep(random.randint(2, 4))
try:
submit_button = chrom_page.ele('.fm-button fm-submit password-login ')
submit_button.click()
except:
submit_button = chrom_page.ele('xpath://button[contains(text(),"登录")]', timeout=15)
submit_button.click()
time.sleep(random.randint(2, 4))
try:
submit_button = chrom_page.ele('xpath://button[contains(text(),"保持")]', timeout=15)
submit_button.click()
except:
print('点击保持登录失败。直接请求订单页面')
time.sleep(random.randint(2, 4))
chrom_page.get(
'https://air.1688.com/app/ctf-page/trade-order-list/buyer-order-list.html?tradeStatus=waitbuyerpay&spm=a260k.home2025.topmenu.dmyorder_popup_ddaifukuan&page=1&pageSize=10')
time.sleep(random.randint(12, 14))
def get_order_data(self, chrom_page, list_orders):
url = 'https://air.1688.com/app/ctf-page/trade-order-list/buyer-order-list.html?tradeStatus=waitbuyerpay&spm=a260k.home2025.topmenu.dmyorder_popup_ddaifukuan&page=1&pageSize=10'
chrom_page.get(url)
for order_num in list_orders:
print()
print('当前执行查询订单号:', order_num)
print('---------------------------------------------------------------------------------')
js = '''
// 定义输入函数
function typeReal(elem, text) {
if (!elem) return false;
elem.focus();
elem.value = "";
for (let char of text) {
elem.dispatchEvent(new InputEvent("beforeinput", {
data: char,
inputType: "insertText",
bubbles: true
}));
elem.value += char;
elem.dispatchEvent(new Event("input", { bubbles: true }));
elem.dispatchEvent(new KeyboardEvent("keydown", { bubbles: true }));
elem.dispatchEvent(new KeyboardEvent("keyup", { bubbles: true }));
}
elem.dispatchEvent(new Event("change", { bubbles: true }));
elem.blur();
return true;
}
'''
js2 = f'''
// 获取 shadow DOM 深层 input
const realInput = document
.querySelector("body > article > app-root").shadowRoot
.querySelector("div > main > q-theme > order-search").shadowRoot
.querySelector("section > order-search-keywords").shadowRoot
.querySelector("div > q-input").shadowRoot
.querySelector("input");
// 执行输入
typeReal(realInput, "{order_num}");
'''
js = js + js2
chrom_page.run_js(js) # 填写订单号。定位搜索框
time.sleep(random.randint(3, 5))
print('点击搜索')
js_click = """
document.querySelector("body > article > app-root").shadowRoot.querySelector("div > main > q-theme > order-search").shadowRoot.querySelector("section > order-search-actions").shadowRoot.querySelector("div > q-button:nth-child(3)").click()
"""
chrom_page.run_js(js_click)
time.sleep(random.randint(3, 5))
print('循环 获取订单是否存在 结果')
for i in range(3):
js_get_attr = '''
const emptyBlock = document
.querySelector("body > article > app-root").shadowRoot
.querySelector("div > main > q-theme > order-list").shadowRoot
.querySelector("div.order-list-content > empty-block");
const text = emptyBlock?.shadowRoot ? emptyBlock.shadowRoot.textContent : emptyBlock?.textContent;
return text;
'''
attr_value = chrom_page.run_js(js_get_attr)
if attr_value:
break
time.sleep(2)
if attr_value:
print('待付款中没有查询到该订单:', order_num)
items = {
"account": self.account,
"order_id": order_num,
"type": "异常",
"json": {
"value": '待付款中没有查询到该订单'
}
}
self.alipay_data_list.append(items)
else:
print('获取订单成功')
js_click_2 = '''
const a = document
.querySelector("body > article > app-root").shadowRoot
.querySelector("div > main > q-theme > order-list").shadowRoot
.querySelector("order-item").shadowRoot
.querySelector("order-item-operator").shadowRoot
.querySelector("buyer-pay q-button.pc")
.shadowRoot.querySelector("a");
a.click();
'''
chrom_page.run_js(js_click_2)
time.sleep(random.randint(2, 5))
# 重点:等待新标签页
new_tab = chrom_page.wait.new_tab(timeout=7)
print("新标签页对象:", new_tab) # 获取最新开标签页对象
chrom_page_tab = chrom_page.get_tab(new_tab)
chrom_page_tab.set.activate() # 激活新标签页
time.sleep(random.randint(2, 5))
print('寻找是否有网银对公支付')
try:
bank_transfer_group = chrom_page_tab.ele('xpath://div[@data-channel="bank_transfer_group"]')
bank_transfer_group.click()
except:
js = '''document.querySelector("#root > div > div.global-payment-channel > div:nth-child(2) > div > div.channel-card-group.available > div > div:nth-child(2) > div").click()'''
chrom_page_tab.run_js(js)
time.sleep(random.randint(1, 2))
print('寻找支付方式')
# 对公支付
channel_name = chrom_page_tab.ele('xpath://span[contains(text(),"对公支付")]', timeout=5)
if channel_name:
chrom_page_tab.ele('xpath://span[contains(text(),"对公支付")]', timeout=5).click()
chrom_page_tab.ele('xpath://div[contains(text(),"网银或柜台转账")]', timeout=5).click()
print('找到网银。开始选择网银')
self.click_pay(chrom_page_tab)
self.online_banking(chrom_page_tab, order_num)
else:
zfb = chrom_page_tab.ele('xpath://span[contains(text(),"支付宝")]', timeout=5).click()
if zfb:
print('找到支付宝。开始选择支付宝')
self.click_pay(chrom_page_tab)
self.online_zfb(chrom_page_tab, order_num)
else:
print('没有找到可以支付方式。退出 订单号。', order_num)
items = {
"account": self.account,
"order_id": order_num,
"type": "无",
"json": {
"value": None
}
}
self.alipay_data_list.append(items)
chrom_page_tab.close() # 关闭标签页
def online_zfb(self, chrom_page_tab, order_num):
print('解析支付宝支付页面信息')
chrom_page_tab.ele('xpath://span[contains(text(),"去网商银行付款")]', timeout=5).click()
time.sleep(random.randint(5, 10))
html = etree.HTML(chrom_page_tab.html)
data_list = html.xpath('//div[contains(@class,"order-info-container")]//text()')
print(data_list)
del data_list[0]
data_dict = {data_list[i]: data_list[i + 1] for i in range(0, len(data_list), 2)}
items = {
"account": self.account,
"order_id": order_num,
"type": "支付宝",
"json": {
"merchant_name": data_dict['商户名称']
}
}
self.alipay_data_list.append(items)
def click_pay(self, chrom_page_tab):
time.sleep(random.randint(1, 2))
print('点击立即付款')
js_click_cashier = '''
const a = document.querySelector("body>div> #root>div>div>div>q-button").shadowRoot
.querySelector("div");
a.click();
'''
# 获取
chrom_page_tab.run_js(js_click_cashier)
time.sleep(random.randint(5, 10))
def online_banking(self, chrom_page_tab, order_num):
print('解析网银支付账户信息')
html = etree.HTML(chrom_page_tab.html)
data_list = html.xpath("//div[@class='account-info']//span//text()")
print(data_list)
while '复制' in data_list:
data_list.remove('复制')
data_dict = {data_list[i]: data_list[i + 1] for i in range(0, len(data_list), 2)}
print(data_dict)
items = {
"account": self.account,
"order_id": order_num,
"type": "对公支付",
"json": {
"bank_account": data_dict['收款账号'].replace(" ", ""),
"name": data_dict['收款户名'],
"bank_name": data_dict['收款银行'].replace('”', '').replace('“', ''),
"bank_loaction": data_dict['所在地'],
"price": data_dict['转账金额'].replace('¥', ''),
"bank_clearing_number": data_dict["联行号"].replace('(选填)', '')
}
}
self.alipay_data_list.append(items)
def get_account(self):
start_index = 0 # 起始索引
end_index = -1 # 结束索引,-1 表示获取整个列表
list_data_b = self.redis_db22.lrange(f'alipay:{self.account}', start_index, end_index)
list_orders = [item.decode('utf-8') for item in list_data_b]
if list_orders:
return list_orders
else:
return None
def run(self):
list_orders = self.get_account()
print('list_orders:', list_orders)
if list_orders:
self.get_1688(list_orders)
else:
print(self.account, ' :该店铺下没有可查询的订单')
print(self.alipay_data_list)
for alipay_data in self.alipay_data_list:
while True:
try:
json_data = json.dumps(alipay_data, ensure_ascii=False)
print(json_data)
self.redis_db22.rpush('alipay:response', json_data)
break
except:
print('写入redis报错:重试')
time.sleep(5)
# 删除
#self.redis_db22.lrem(f"alipay:{self.account}", 1, "dsaofi3232e232328938928")
if __name__ == '__main__':
get_1688_order_data().run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment