Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
S
spider
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
selection-new
spider
Commits
1461772e
Commit
1461772e
authored
Jan 19, 2026
by
Peng
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
取消下载成功消息通知。避免忽略报错信息。消息推送太多。
parent
fa7c47c7
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
42 additions
and
163 deletions
+42
-163
tk_video_ch_DY.py
..._projects/projects/tiktok/TK_video_data/tk_video_ch_DY.py
+0
-14
tk_video_ch_FA.py
..._projects/projects/tiktok/TK_video_data/tk_video_ch_FA.py
+0
-14
tk_video_ch_FR.py
..._projects/projects/tiktok/TK_video_data/tk_video_ch_FR.py
+14
-26
tk_video_ch_SA.py
..._projects/projects/tiktok/TK_video_data/tk_video_ch_SA.py
+13
-24
tk_video_ch_zy.py
..._projects/projects/tiktok/TK_video_data/tk_video_ch_zy.py
+0
-14
tk_video_edg_FA.py
...projects/projects/tiktok/TK_video_data/tk_video_edg_FA.py
+0
-14
tk_video_edg_ba.py
...projects/projects/tiktok/TK_video_data/tk_video_edg_ba.py
+0
-14
tk_video_sum_ch.py
...projects/projects/tiktok/TK_video_data/tk_video_sum_ch.py
+0
-14
tk_video_sum_edg.py
...rojects/projects/tiktok/TK_video_data/tk_video_sum_edg.py
+15
-29
No files found.
wangjing_projects/projects/tiktok/TK_video_data/tk_video_ch_DY.py
View file @
1461772e
...
...
@@ -154,7 +154,6 @@ class TkVideo():
sleep
(
randint
(
5
,
10
))
self
.
save_to_redis
()
self
.
send_success_message_via_wechat
()
time
.
sleep
(
8
)
# self.page_chrome.quit()
...
...
@@ -293,19 +292,6 @@ class TkVideo():
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
"account"
:
self
.
receiver_name
,
"title"
:
'【TK视频数据下载成功提醒】'
,
"content"
:
f
'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("
%
Y-
%
m-
%
d
%
H:
%
M:
%
S")}'
}
response
=
requests
.
post
(
url
=
webhook_url
,
data
=
data
,
timeout
=
15
)
if
response
.
status_code
==
200
:
print
(
"已成功发送通知到企业微信"
)
else
:
print
(
f
"发送通知失败: {response.text}"
)
def
send_error_notification_via_wechat
(
self
,
error_message
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
...
...
wangjing_projects/projects/tiktok/TK_video_data/tk_video_ch_FA.py
View file @
1461772e
...
...
@@ -157,7 +157,6 @@ class TkVideo():
sleep
(
randint
(
5
,
10
))
self
.
save_to_redis
()
self
.
send_success_message_via_wechat
()
time
.
sleep
(
5
)
# self.page_chrome.quit()
...
...
@@ -296,19 +295,6 @@ class TkVideo():
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
"account"
:
self
.
receiver_name
,
"title"
:
'【TK视频数据下载成功提醒】'
,
"content"
:
f
'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("
%
Y-
%
m-
%
d
%
H:
%
M:
%
S")}'
}
response
=
requests
.
post
(
url
=
webhook_url
,
data
=
data
,
timeout
=
15
)
if
response
.
status_code
==
200
:
print
(
"已成功发送通知到企业微信"
)
else
:
print
(
f
"发送通知失败: {response.text}"
)
def
send_error_notification_via_wechat
(
self
,
error_message
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
...
...
wangjing_projects/projects/tiktok/TK_video_data/tk_video_ch_FR.py
View file @
1461772e
...
...
@@ -40,7 +40,7 @@ class TkVideo():
'sec-fetch-storage-access'
:
'active'
,
'user-agent'
:
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
,
}
self
.
key
=
"
BlissfulAbodeDecor
"
self
.
key
=
"
Juexica.shop
"
self
.
download_folder
=
r"D:\Downloads"
self
.
receiver_name
=
'pengyanbing'
# Redis 配置信息
...
...
@@ -84,7 +84,7 @@ class TkVideo():
# 等待页面初始加载
time
.
sleep
(
random
.
randint
(
6
,
10
))
export_orders
=
self
.
page_chrome
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
0
)
export_orders
=
self
.
page_chrome
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
3
)
export_orders
.
click
()
print
(
'点击自定义'
)
time
.
sleep
(
random
.
randint
(
5
,
10
))
...
...
@@ -98,9 +98,7 @@ class TkVideo():
)
.
click
()
print
(
f
'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日'
)
time
.
sleep
(
random
.
randint
(
3
,
5
))
max_attempts
=
31
while
max_attempts
>
0
:
for
i
in
range
(
10
):
try
:
xpath
=
(
f
"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
...
...
@@ -108,7 +106,7 @@ class TkVideo():
f
"//span[text()='{self.end_day}']/parent::div"
)
print
(
'结束日期 xpath::'
,
xpath
)
ele
=
self
.
page_chrome
.
ele
(
f
"xpath={xpath}"
,
timeout
=
5
)
ele
=
self
.
page_chrome
.
ele
(
f
"xpath={xpath}"
,
timeout
=
8
)
ele
.
click
()
print
(
f
'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}'
)
self
.
get_data
()
...
...
@@ -128,24 +126,24 @@ class TkVideo():
def
get_data
(
self
):
try
:
self
.
page_chrome
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击更新'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击Xlsx'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
# 点击首页 获取店铺名称
self
.
page_chrome
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击首页'
)
sleep
(
randint
(
5
,
10
))
...
...
@@ -154,7 +152,6 @@ class TkVideo():
sleep
(
randint
(
5
,
10
))
self
.
save_to_redis
()
self
.
send_success_message_via_wechat
()
time
.
sleep
(
5
)
# self.page_chrome.quit()
...
...
@@ -288,20 +285,11 @@ class TkVideo():
data
=
self
.
read_excel
(
EXCEL_FILE
)
processed_data
=
self
.
process_data
(
data
,
self
.
shop_name
)
self
.
store_data_in_redis
(
self
.
r
,
processed_data
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
"account"
:
self
.
receiver_name
,
"title"
:
'【TK视频数据下载成功提醒】'
,
"content"
:
f
'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("
%
Y-
%
m-
%
d
%
H:
%
M:
%
S")}'
}
response
=
requests
.
post
(
url
=
webhook_url
,
data
=
data
,
timeout
=
15
)
if
response
.
status_code
==
200
:
print
(
"已成功发送通知到企业微信"
)
else
:
print
(
f
"发送通知失败: {response.text}"
)
try
:
print
(
'删除下载文件'
,
EXCEL_FILE
)
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_error_notification_via_wechat
(
self
,
error_message
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
...
...
wangjing_projects/projects/tiktok/TK_video_data/tk_video_ch_SA.py
View file @
1461772e
...
...
@@ -83,7 +83,7 @@ class TkVideo():
# 等待页面初始加载
time
.
sleep
(
random
.
randint
(
6
,
10
))
export_orders
=
self
.
page_chrome
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
0
)
export_orders
=
self
.
page_chrome
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
3
)
export_orders
.
click
()
print
(
'点击自定义'
)
time
.
sleep
(
random
.
randint
(
5
,
10
))
...
...
@@ -98,8 +98,7 @@ class TkVideo():
print
(
f
'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日'
)
time
.
sleep
(
random
.
randint
(
3
,
5
))
max_attempts
=
31
while
max_attempts
>
0
:
for
i
in
range
(
10
):
try
:
xpath
=
(
f
"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
...
...
@@ -107,7 +106,7 @@ class TkVideo():
f
"//span[text()='{self.end_day}']/parent::div"
)
print
(
'结束日期 xpath::'
,
xpath
)
ele
=
self
.
page_chrome
.
ele
(
f
"xpath={xpath}"
,
timeout
=
5
)
ele
=
self
.
page_chrome
.
ele
(
f
"xpath={xpath}"
,
timeout
=
8
)
ele
.
click
()
print
(
f
'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}'
)
self
.
get_data
()
...
...
@@ -129,24 +128,24 @@ class TkVideo():
def
get_data
(
self
):
try
:
self
.
page_chrome
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击更新'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击Xlsx'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
# 点击首页 获取店铺名称
self
.
page_chrome
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击首页'
)
sleep
(
randint
(
5
,
10
))
...
...
@@ -155,7 +154,6 @@ class TkVideo():
sleep
(
randint
(
5
,
10
))
self
.
save_to_redis
()
self
.
send_success_message_via_wechat
()
time
.
sleep
(
5
)
# self.page_chrome.quit()
...
...
@@ -289,20 +287,11 @@ class TkVideo():
data
=
self
.
read_excel
(
EXCEL_FILE
)
processed_data
=
self
.
process_data
(
data
,
self
.
shop_name
)
self
.
store_data_in_redis
(
self
.
r
,
processed_data
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
"account"
:
self
.
receiver_name
,
"title"
:
'【TK视频数据下载成功提醒】'
,
"content"
:
f
'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("
%
Y-
%
m-
%
d
%
H:
%
M:
%
S")}'
}
response
=
requests
.
post
(
url
=
webhook_url
,
data
=
data
,
timeout
=
15
)
if
response
.
status_code
==
200
:
print
(
"已成功发送通知到企业微信"
)
else
:
print
(
f
"发送通知失败: {response.text}"
)
try
:
print
(
'删除下载文件'
,
EXCEL_FILE
)
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_error_notification_via_wechat
(
self
,
error_message
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
...
...
wangjing_projects/projects/tiktok/TK_video_data/tk_video_ch_zy.py
View file @
1461772e
...
...
@@ -154,7 +154,6 @@ class TkVideo():
sleep
(
randint
(
5
,
10
))
self
.
save_to_redis
()
self
.
send_success_message_via_wechat
()
time
.
sleep
(
8
)
# self.page_chrome.quit()
...
...
@@ -293,19 +292,6 @@ class TkVideo():
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
"account"
:
self
.
receiver_name
,
"title"
:
'【TK视频数据下载成功提醒】'
,
"content"
:
f
'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("
%
Y-
%
m-
%
d
%
H:
%
M:
%
S")}'
}
response
=
requests
.
post
(
url
=
webhook_url
,
data
=
data
,
timeout
=
15
)
if
response
.
status_code
==
200
:
print
(
"已成功发送通知到企业微信"
)
else
:
print
(
f
"发送通知失败: {response.text}"
)
def
send_error_notification_via_wechat
(
self
,
error_message
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
...
...
wangjing_projects/projects/tiktok/TK_video_data/tk_video_edg_FA.py
View file @
1461772e
...
...
@@ -159,7 +159,6 @@ class TkVideo():
sleep
(
randint
(
5
,
10
))
self
.
save_to_redis
()
self
.
send_success_message_via_wechat
()
time
.
sleep
(
5
)
self
.
page_edge
.
quit
()
...
...
@@ -299,19 +298,6 @@ class TkVideo():
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
"account"
:
self
.
receiver_name
,
"title"
:
'【TK视频数据下载成功提醒】'
,
"content"
:
f
'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("
%
Y-
%
m-
%
d
%
H:
%
M:
%
S")}'
}
response
=
requests
.
post
(
url
=
webhook_url
,
data
=
data
,
timeout
=
15
)
if
response
.
status_code
==
200
:
print
(
"已成功发送通知到企业微信"
)
else
:
print
(
f
"发送通知失败: {response.text}"
)
def
send_error_notification_via_wechat
(
self
,
error_message
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
...
...
wangjing_projects/projects/tiktok/TK_video_data/tk_video_edg_ba.py
View file @
1461772e
...
...
@@ -159,7 +159,6 @@ class TkVideo():
sleep
(
randint
(
5
,
10
))
self
.
save_to_redis
()
self
.
send_success_message_via_wechat
()
time
.
sleep
(
5
)
self
.
page_edge
.
quit
()
...
...
@@ -299,19 +298,6 @@ class TkVideo():
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
"account"
:
self
.
receiver_name
,
"title"
:
'【TK视频数据下载成功提醒】'
,
"content"
:
f
'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("
%
Y-
%
m-
%
d
%
H:
%
M:
%
S")}'
}
response
=
requests
.
post
(
url
=
webhook_url
,
data
=
data
,
timeout
=
15
)
if
response
.
status_code
==
200
:
print
(
"已成功发送通知到企业微信"
)
else
:
print
(
f
"发送通知失败: {response.text}"
)
def
send_error_notification_via_wechat
(
self
,
error_message
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
...
...
wangjing_projects/projects/tiktok/TK_video_data/tk_video_sum_ch.py
View file @
1461772e
...
...
@@ -156,7 +156,6 @@ class TkVideo():
sleep
(
randint
(
5
,
10
))
self
.
save_to_redis
()
self
.
send_success_message_via_wechat
()
time
.
sleep
(
5
)
# self.page_chrome.quit()
...
...
@@ -295,19 +294,6 @@ class TkVideo():
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
"account"
:
self
.
receiver_name
,
"title"
:
'【TK视频数据下载成功提醒】'
,
"content"
:
f
'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("
%
Y-
%
m-
%
d
%
H:
%
M:
%
S")}'
}
response
=
requests
.
post
(
url
=
webhook_url
,
data
=
data
,
timeout
=
15
)
if
response
.
status_code
==
200
:
print
(
"已成功发送通知到企业微信"
)
else
:
print
(
f
"发送通知失败: {response.text}"
)
def
send_error_notification_via_wechat
(
self
,
error_message
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
...
...
wangjing_projects/projects/tiktok/TK_video_data/tk_video_sum_edg.py
View file @
1461772e
...
...
@@ -101,7 +101,7 @@ class TkVideo():
# 等待页面初始加载
time
.
sleep
(
random
.
randint
(
6
,
10
))
export_orders
=
self
.
page_edge
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
0
)
export_orders
=
self
.
page_edge
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
3
)
export_orders
.
click
()
print
(
'点击自定义'
)
time
.
sleep
(
random
.
randint
(
5
,
10
))
...
...
@@ -115,10 +115,7 @@ class TkVideo():
)
.
click
()
print
(
f
'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日'
)
time
.
sleep
(
random
.
randint
(
3
,
5
))
max_attempts
=
31
while
max_attempts
>
0
:
for
i
in
range
(
10
):
try
:
xpath
=
(
f
"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
...
...
@@ -126,13 +123,12 @@ class TkVideo():
f
"//span[text()='{self.end_day}']/parent::div"
)
print
(
'结束日期 xpath::'
,
xpath
)
ele
=
self
.
page_edge
.
ele
(
f
"xpath={xpath}"
,
timeout
=
5
)
ele
=
self
.
page_edge
.
ele
(
f
"xpath={xpath}"
,
timeout
=
8
)
ele
.
click
()
print
(
f
'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}'
)
self
.
get_data
()
time
.
sleep
(
random
.
randint
(
3
,
5
))
return
True
# 成功返回
except
Exception
as
e
:
print
(
f
'❌ 无法点击 {self.end_year}-{self.end_month}-{self.end_day},错误:{e}'
)
# 往前推一天
...
...
@@ -147,33 +143,33 @@ class TkVideo():
def
get_data
(
self
):
try
:
self
.
page_edge
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击更新'
)
sleep
(
randint
(
5
,
10
))
self
.
page_edge
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
self
.
page_edge
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击Xlsx'
)
sleep
(
randint
(
5
,
10
))
self
.
page_edge
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
# 点击首页 获取店铺名称
self
.
page_edge
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击首页'
)
sleep
(
randint
(
5
,
10
))
self
.
shop_name
=
self
.
page_edge
.
ele
(
'xpath://div[@class="text-H6-Bold"]'
)
.
text
# self.shop_name = self.page_edge.ele('xpath://div[@class="text-H6-Bold"]').text
self
.
shop_name
=
self
.
page_edge
.
ele
(
'xpath://div[@class="flex items-center"]//div[@class="text-H6-Bold"]'
)
.
text
print
(
f
'已获取店铺名: {self.shop_name}'
)
sleep
(
randint
(
5
,
10
))
self
.
save_to_redis
()
self
.
send_success_message_via_wechat
()
time
.
sleep
(
5
)
self
.
page_edge
.
quit
()
...
...
@@ -309,21 +305,11 @@ class TkVideo():
data
=
self
.
read_excel
(
EXCEL_FILE
)
processed_data
=
self
.
process_data
(
data
,
self
.
shop_name
)
self
.
store_data_in_redis
(
self
.
r
,
processed_data
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
"account"
:
self
.
receiver_name
,
"title"
:
'【TK视频数据下载成功提醒】'
,
"content"
:
f
'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("
%
Y-
%
m-
%
d
%
H:
%
M:
%
S")}'
}
response
=
requests
.
post
(
url
=
webhook_url
,
data
=
data
,
timeout
=
15
)
if
response
.
status_code
==
200
:
print
(
"已成功发送通知到企业微信"
)
else
:
print
(
f
"发送通知失败: {response.text}"
)
try
:
print
(
'删除下载文件'
,
EXCEL_FILE
)
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_error_notification_via_wechat
(
self
,
error_message
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment