Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
S
spider
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
selection-new
spider
Commits
3f158caf
Commit
3f158caf
authored
Jan 06, 2026
by
Peng
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
694247a4
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
762 additions
and
65 deletions
+762
-65
tk_video_ch_DY.py
..._projects/projects/tiktok/TK_video_data/tk_video_ch_DY.py
+347
-0
tk_video_ch_FA.py
..._projects/projects/tiktok/TK_video_data/tk_video_ch_FA.py
+13
-11
tk_video_ch_ba.py
..._projects/projects/tiktok/TK_video_data/tk_video_ch_ba.py
+15
-14
tk_video_ch_zy.py
..._projects/projects/tiktok/TK_video_data/tk_video_ch_zy.py
+347
-0
tk_video_edg_FA.py
...projects/projects/tiktok/TK_video_data/tk_video_edg_FA.py
+13
-13
tk_video_edg_ba.py
...projects/projects/tiktok/TK_video_data/tk_video_edg_ba.py
+13
-13
tk_video_sum_ch.py
...projects/projects/tiktok/TK_video_data/tk_video_sum_ch.py
+14
-14
No files found.
wangjing_projects/projects/tiktok/TK_video_data/tk_video_ch_DY.py
View file @
3f158caf
import
os
os
.
environ
[
'NO_PROXY'
]
=
'stackoverflow.com'
import
logging
logging
.
captureWarnings
(
True
)
from
DrissionPage
import
ChromiumPage
,
ChromiumOptions
import
time
from
datetime
import
datetime
,
timedelta
from
time
import
sleep
from
random
import
randint
import
requests
import
math
import
pandas
as
pd
import
redis
import
json
from
pathlib
import
Path
import
re
import
random
class
TkVideo
():
def
__init__
(
self
):
# 修改请求头
self
.
headers
=
{
'accept'
:
'*/*'
,
'accept-language'
:
'en-US,en;q=0.9'
,
# 'en-US,en;q=0.9'
'cache-control'
:
'no-cache'
,
'content-type'
:
'application/json; charset=UTF-8'
,
'origin'
:
'https://www.tiktok.com'
,
'pragma'
:
'no-cache'
,
'priority'
:
'u=1, i'
,
'referer'
:
'https://www.tiktok.com/'
,
'sec-ch-ua'
:
'"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"'
,
'sec-ch-ua-mobile'
:
'?0'
,
'sec-ch-ua-platform'
:
'"Windows"'
,
'sec-fetch-dest'
:
'empty'
,
'sec-fetch-mode'
:
'cors'
,
'sec-fetch-site'
:
'cross-site'
,
'sec-fetch-storage-access'
:
'active'
,
'user-agent'
:
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
,
}
self
.
key
=
"DanyolyHome"
self
.
download_folder
=
r"D:\Downloads"
self
.
receiver_name
=
'pengyanbing'
# Redis 配置信息
self
.
REDIS_CONFIG
=
{
'host'
:
'120.79.147.190'
,
'port'
:
6379
,
'password'
:
'fG7#vT6kQ1pX'
,
'db'
:
13
,
'decode_responses'
:
True
}
# self.page_chrome = ChromiumPage()
# 配置 Chrome 浏览器 - 端口 9222
chrome_options
=
ChromiumOptions
()
chrome_options
.
set_browser_path
(
r'C:\Program Files\Google\Chrome\Application\chrome.exe'
)
chrome_options
.
set_local_port
(
9333
)
# 设置 Chrome 的调试端口
self
.
page_chrome
=
ChromiumPage
(
addr_or_opts
=
chrome_options
)
print
(
f
"Chrome 浏览器运行在端口: {9333}"
)
def
get_datetime
(
self
):
"""获取当前日期,并计算前2天的完整日期(年-月-日),并按照指定格式输出"""
# 获取今天的日期
today
=
datetime
.
today
()
.
date
()
# 开始日期:去年的今天
self
.
start_date
=
today
.
replace
(
year
=
today
.
year
-
1
)
self
.
start_year
=
self
.
start_date
.
year
self
.
start_month
=
self
.
start_date
.
month
self
.
start_day
=
self
.
start_date
.
day
# 结束日期:今天的前一天
self
.
end_date
=
today
-
timedelta
(
days
=
3
)
self
.
end_year
=
self
.
end_date
.
year
self
.
end_month
=
self
.
end_date
.
month
self
.
end_day
=
self
.
end_date
.
day
def
get_day
(
self
):
try
:
self
.
page_chrome
.
get
(
"https://www.tiktok.com/business-suite/insight/video"
)
self
.
page_chrome
.
set
.
window
.
max
()
# 等待页面初始加载
time
.
sleep
(
random
.
randint
(
6
,
10
))
export_orders
=
self
.
page_chrome
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
10
)
export_orders
.
click
()
print
(
'点击自定义'
)
time
.
sleep
(
random
.
randint
(
5
,
10
))
# 先点击开始时间:2024年7月1日
self
.
page_chrome
.
ele
(
f
"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.start_year} {self.start_month} 月')]"
f
"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f
"//div[@class='tiktok-datepicker-day valid in-this-month']"
f
"//span[text()='{self.start_day}']/parent::div"
)
.
click
()
print
(
f
'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日'
)
time
.
sleep
(
random
.
randint
(
5
,
8
))
for
i
in
range
(
10
):
try
:
xpath
=
(
f
"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
f
"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f
"//span[text()='{self.end_day}']/parent::div"
)
print
(
'结束日期 xpath::'
,
xpath
)
ele
=
self
.
page_chrome
.
ele
(
f
"xpath={xpath}"
,
timeout
=
5
)
ele
.
click
()
print
(
f
'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}'
)
self
.
get_data
()
time
.
sleep
(
random
.
randint
(
5
,
8
))
return
True
# 成功返回
except
Exception
as
e
:
print
(
f
'❌ 无法点击 {self.end_year}-{self.end_month}-{self.end_day},错误:{e}'
)
# 往前推一天
time
.
sleep
(
random
.
randint
(
5
,
15
))
print
(
'⛔ 连续尝试失败,未找到可点击的日期,请检查页面状态或网络连接。'
)
return
False
except
Exception
as
e
:
print
(
f
"get_day出现错误: {e}"
)
self
.
send_error_notification_via_wechat
(
e
)
# 如果有这个方法可以取消注释
def
get_data
(
self
):
try
:
self
.
page_chrome
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
13
)
.
click
()
print
(
'已点击更新'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
13
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
13
)
.
click
()
print
(
'已点击Xlsx'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
13
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
# 点击首页 获取店铺名称
self
.
page_chrome
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
13
)
.
click
()
print
(
'已点击首页'
)
sleep
(
randint
(
5
,
10
))
self
.
shop_name
=
self
.
page_chrome
.
ele
(
'xpath://div[@class="text-H6-Bold"]'
)
.
text
print
(
f
'已获取店铺名: {self.shop_name}'
)
sleep
(
randint
(
5
,
10
))
self
.
save_to_redis
()
self
.
send_success_message_via_wechat
()
time
.
sleep
(
8
)
# self.page_chrome.quit()
except
Exception
as
e
:
print
(
f
"get_data出现错误: {e}"
)
self
.
send_error_notification_via_wechat
(
e
)
def
connect_redis
(
self
):
"""建立 Redis 连接"""
self
.
r
=
redis
.
StrictRedis
(
**
self
.
REDIS_CONFIG
)
try
:
self
.
r
.
ping
()
# 测试连接
print
(
"✅ 成功连接到 Redis"
)
except
redis
.
exceptions
.
ConnectionError
as
e
:
print
(
f
"❌ 无法连接到 Redis: {e}"
)
raise
def
read_excel
(
self
,
file_path
):
"""读取 Excel 文件内容,并防止数值被转为科学计数法"""
print
(
f
"📄 正在读取文件:{file_path}"
)
# 定义列名映射(中文 → 英文)
column_mapping
=
{
'视频标题'
:
'video_title'
,
'视频链接'
:
'video_url'
,
'发布时间'
:
'publish_date'
,
'视频观看次数'
:
'views'
,
'点赞数'
:
'likes'
,
'评论数'
:
'comments'
,
'分享次数'
:
'shares'
,
'添加到收藏'
:
'favorites'
}
# 强制所有列为字符串类型,防止科学计数法
df
=
pd
.
read_excel
(
file_path
,
dtype
=
str
)
# 替换列名为英文
df
.
rename
(
columns
=
column_mapping
,
inplace
=
True
)
data
=
df
.
to_dict
(
orient
=
'records'
)
# 转换为字典列表
print
(
f
"📊 已读取 {len(data)} 条记录"
)
return
data
def
process_data
(
self
,
data
,
account
):
processed_data
=
[]
current_time
=
datetime
.
now
()
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)
for
record
in
data
:
# 提取 content_id
video_link
=
record
.
get
(
'video_url'
,
''
)
if
video_link
:
content_id
=
video_link
.
split
(
'/'
)[
-
1
]
else
:
content_id
=
''
# 添加 account 和 update_time 字段
processed_record
=
{
'account'
:
account
,
'content_id'
:
content_id
,
'update_time'
:
current_time
,
**
record
# 合并原始记录
}
# 替换 NaN 和 None 为 空字符串
cleaned_record
=
{
key
:
(
""
if
pd
.
isna
(
value
)
or
value
is
None
or
str
(
value
)
.
strip
()
.
lower
()
==
"nan"
else
value
)
for
key
,
value
in
processed_record
.
items
()
}
processed_data
.
append
(
cleaned_record
)
return
processed_data
def
store_data_in_redis
(
self
,
r
,
data
):
"""将数据存储到 Redis 中,对相同的 shop_code 清除旧数据后写入新数据"""
key
=
f
"tk_video_data:{self.key}:order:list"
# ⚠️ 先删除旧数据(实现“覆盖”)
if
r
.
exists
(
key
):
r
.
delete
(
key
)
print
(
f
"🗑️ 已清除旧数据: {key}"
)
# 写入新数据
pipe
=
r
.
pipeline
()
for
record
in
data
:
value
=
json
.
dumps
(
record
,
ensure_ascii
=
False
)
pipe
.
rpush
(
key
,
value
)
pipe
.
execute
()
# 批量执行,提高效率
print
(
f
"💾 已写入新数据到键: {key},共 {len(data)} 条记录"
)
def
find_specific_file
(
self
):
download_path
=
Path
(
self
.
download_folder
)
if
self
.
start_month
<
10
:
start_month
=
f
'0{self.start_month}'
else
:
start_month
=
self
.
start_month
if
self
.
start_day
<
10
:
start_day
=
f
'0{self.start_day}'
else
:
start_day
=
self
.
start_day
if
self
.
end_month
<
10
:
end_month
=
f
'0{self.end_month}'
else
:
end_month
=
self
.
end_month
if
self
.
end_day
<
10
:
end_day
=
f
'0{self.end_day}'
else
:
end_day
=
self
.
end_day
# 构建基础前缀(使用真正的括号)
base_prefix
=
f
"视频({self.start_year}_{start_month}_{start_day}-{self.end_year}_{end_month}_{end_day})"
# 构建正则表达式:以 base_prefix 开头,后面可以跟任意内容
pattern
=
re
.
escape
(
base_prefix
)
+
r'.*$'
print
(
"匹配模式:"
,
pattern
)
for
file
in
download_path
.
iterdir
():
if
file
.
is_file
()
and
re
.
fullmatch
(
pattern
,
file
.
name
):
return
str
(
file
)
raise
FileNotFoundError
(
f
"未找到匹配 {base_prefix} 的文件"
)
def
save_to_redis
(
self
):
EXCEL_FILE
=
self
.
find_specific_file
()
print
(
f
'保存文件:{EXCEL_FILE}'
)
# 读取 Excel 数据
data
=
self
.
read_excel
(
EXCEL_FILE
)
processed_data
=
self
.
process_data
(
data
,
self
.
shop_name
)
self
.
store_data_in_redis
(
self
.
r
,
processed_data
)
try
:
print
(
'删除下载文件'
,
EXCEL_FILE
)
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
"account"
:
self
.
receiver_name
,
"title"
:
'【TK视频数据下载成功提醒】'
,
"content"
:
f
'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("
%
Y-
%
m-
%
d
%
H:
%
M:
%
S")}'
}
response
=
requests
.
post
(
url
=
webhook_url
,
data
=
data
,
timeout
=
15
)
if
response
.
status_code
==
200
:
print
(
"已成功发送通知到企业微信"
)
else
:
print
(
f
"发送通知失败: {response.text}"
)
def
send_error_notification_via_wechat
(
self
,
error_message
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
"account"
:
self
.
receiver_name
,
'title'
:
'【TK视频数据下载异常提醒】'
,
'content'
:
f
'账号:{self.key},错误信息:{error_message}, 时间: {datetime.now().strftime("
%
Y-
%
m-
%
d
%
H:
%
M:
%
S")}'
}
response
=
requests
.
post
(
url
=
webhook_url
,
data
=
data
,
timeout
=
15
)
if
response
.
status_code
==
200
:
print
(
"已成功发送错误通知到企业微信"
)
else
:
print
(
f
"发送错误通知失败: {response.text}"
)
def
run
(
self
):
self
.
connect_redis
()
self
.
get_datetime
()
self
.
get_day
()
print
(
'完成关闭浏览器'
)
time
.
sleep
(
5
)
# 如果 ChromiumPage 底层保存了 browser 对象
# 或者如果它是基于 Selenium WebDriver
self
.
page_chrome
.
quit
()
if
__name__
==
'__main__'
:
TkVideo
()
.
run
()
wangjing_projects/projects/tiktok/TK_video_data/tk_video_ch_FA.py
View file @
3f158caf
...
...
@@ -85,7 +85,7 @@ class TkVideo():
# 等待页面初始加载
time
.
sleep
(
random
.
randint
(
6
,
10
))
export_orders
=
self
.
page_chrome
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
0
)
export_orders
=
self
.
page_chrome
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
3
)
export_orders
.
click
()
print
(
'点击自定义'
)
time
.
sleep
(
random
.
randint
(
5
,
10
))
...
...
@@ -102,9 +102,7 @@ class TkVideo():
# 初始目标日期为 deadline(可能已经是上个月的某一天)
max_attempts
=
31
while
max_attempts
>
0
:
for
i
in
range
(
10
):
try
:
xpath
=
(
f
"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
...
...
@@ -112,7 +110,7 @@ class TkVideo():
f
"//span[text()='{self.end_day}']/parent::div"
)
print
(
'结束日期 xpath::'
,
xpath
)
ele
=
self
.
page_chrome
.
ele
(
f
"xpath={xpath}"
,
timeout
=
5
)
ele
=
self
.
page_chrome
.
ele
(
f
"xpath={xpath}"
,
timeout
=
8
)
ele
.
click
()
print
(
f
'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}'
)
self
.
get_data
()
...
...
@@ -133,24 +131,24 @@ class TkVideo():
def
get_data
(
self
):
try
:
self
.
page_chrome
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击更新'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击Xlsx'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
# 点击首页 获取店铺名称
self
.
page_chrome
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击首页'
)
sleep
(
randint
(
5
,
10
))
...
...
@@ -293,7 +291,11 @@ class TkVideo():
data
=
self
.
read_excel
(
EXCEL_FILE
)
processed_data
=
self
.
process_data
(
data
,
self
.
shop_name
)
self
.
store_data_in_redis
(
self
.
r
,
processed_data
)
try
:
print
(
'删除下载文件'
,
EXCEL_FILE
)
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
...
...
wangjing_projects/projects/tiktok/TK_video_data/tk_video_ch_ba.py
View file @
3f158caf
...
...
@@ -15,7 +15,7 @@ import redis
import
json
from
pathlib
import
Path
import
re
from
sqlalchemy
import
create_engine
import
random
class
TkVideo
():
...
...
@@ -84,7 +84,7 @@ class TkVideo():
# 等待页面初始加载
time
.
sleep
(
random
.
randint
(
6
,
10
))
export_orders
=
self
.
page_chrome
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
0
)
export_orders
=
self
.
page_chrome
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
3
)
export_orders
.
click
()
print
(
'点击自定义'
)
time
.
sleep
(
random
.
randint
(
5
,
10
))
...
...
@@ -98,10 +98,7 @@ class TkVideo():
)
.
click
()
print
(
f
'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日'
)
time
.
sleep
(
random
.
randint
(
3
,
5
))
max_attempts
=
31
while
max_attempts
>
0
:
for
i
in
range
(
10
):
try
:
xpath
=
(
f
"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
...
...
@@ -109,11 +106,11 @@ class TkVideo():
f
"//span[text()='{self.end_day}']/parent::div"
)
print
(
'结束日期 xpath::'
,
xpath
)
ele
=
self
.
page_chrome
.
ele
(
f
"xpath={xpath}"
,
timeout
=
5
)
ele
=
self
.
page_chrome
.
ele
(
f
"xpath={xpath}"
,
timeout
=
8
)
ele
.
click
()
print
(
f
'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}'
)
self
.
get_data
()
time
.
sleep
(
random
.
randint
(
3
,
5
))
time
.
sleep
(
random
.
randint
(
5
,
8
))
return
True
# 成功返回
except
Exception
as
e
:
...
...
@@ -130,24 +127,24 @@ class TkVideo():
def
get_data
(
self
):
try
:
self
.
page_chrome
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击更新'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击Xlsx'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
# 点击首页 获取店铺名称
self
.
page_chrome
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击首页'
)
sleep
(
randint
(
5
,
10
))
...
...
@@ -290,7 +287,11 @@ class TkVideo():
data
=
self
.
read_excel
(
EXCEL_FILE
)
processed_data
=
self
.
process_data
(
data
,
self
.
shop_name
)
self
.
store_data_in_redis
(
self
.
r
,
processed_data
)
try
:
print
(
'删除下载文件'
,
EXCEL_FILE
)
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
...
...
wangjing_projects/projects/tiktok/TK_video_data/tk_video_ch_zy.py
View file @
3f158caf
import
os
os
.
environ
[
'NO_PROXY'
]
=
'stackoverflow.com'
import
logging
logging
.
captureWarnings
(
True
)
from
DrissionPage
import
ChromiumPage
,
ChromiumOptions
import
time
from
datetime
import
datetime
,
timedelta
from
time
import
sleep
from
random
import
randint
import
requests
import
math
import
pandas
as
pd
import
redis
import
json
from
pathlib
import
Path
import
re
import
random
class
TkVideo
():
def
__init__
(
self
):
# 修改请求头
self
.
headers
=
{
'accept'
:
'*/*'
,
'accept-language'
:
'en-US,en;q=0.9'
,
# 'en-US,en;q=0.9'
'cache-control'
:
'no-cache'
,
'content-type'
:
'application/json; charset=UTF-8'
,
'origin'
:
'https://www.tiktok.com'
,
'pragma'
:
'no-cache'
,
'priority'
:
'u=1, i'
,
'referer'
:
'https://www.tiktok.com/'
,
'sec-ch-ua'
:
'"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"'
,
'sec-ch-ua-mobile'
:
'?0'
,
'sec-ch-ua-platform'
:
'"Windows"'
,
'sec-fetch-dest'
:
'empty'
,
'sec-fetch-mode'
:
'cors'
,
'sec-fetch-site'
:
'cross-site'
,
'sec-fetch-storage-access'
:
'active'
,
'user-agent'
:
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
,
}
self
.
key
=
"Zehyaanua"
self
.
download_folder
=
r"D:\Downloads"
self
.
receiver_name
=
'pengyanbing'
# Redis 配置信息
self
.
REDIS_CONFIG
=
{
'host'
:
'120.79.147.190'
,
'port'
:
6379
,
'password'
:
'fG7#vT6kQ1pX'
,
'db'
:
13
,
'decode_responses'
:
True
}
# self.page_chrome = ChromiumPage()
# 配置 Chrome 浏览器 - 端口 9222
chrome_options
=
ChromiumOptions
()
chrome_options
.
set_browser_path
(
r'C:\Program Files\Google\Chrome\Application\chrome.exe'
)
chrome_options
.
set_local_port
(
9333
)
# 设置 Chrome 的调试端口
self
.
page_chrome
=
ChromiumPage
(
addr_or_opts
=
chrome_options
)
print
(
f
"Chrome 浏览器运行在端口: {9333}"
)
def
get_datetime
(
self
):
"""获取当前日期,并计算前2天的完整日期(年-月-日),并按照指定格式输出"""
# 获取今天的日期
today
=
datetime
.
today
()
.
date
()
# 开始日期:去年的今天
self
.
start_date
=
today
.
replace
(
year
=
today
.
year
-
1
)
self
.
start_year
=
self
.
start_date
.
year
self
.
start_month
=
self
.
start_date
.
month
self
.
start_day
=
self
.
start_date
.
day
# 结束日期:今天的前一天
self
.
end_date
=
today
-
timedelta
(
days
=
3
)
self
.
end_year
=
self
.
end_date
.
year
self
.
end_month
=
self
.
end_date
.
month
self
.
end_day
=
self
.
end_date
.
day
def
get_day
(
self
):
try
:
self
.
page_chrome
.
get
(
"https://www.tiktok.com/business-suite/insight/video"
)
self
.
page_chrome
.
set
.
window
.
max
()
# 等待页面初始加载
time
.
sleep
(
random
.
randint
(
6
,
10
))
export_orders
=
self
.
page_chrome
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
10
)
export_orders
.
click
()
print
(
'点击自定义'
)
time
.
sleep
(
random
.
randint
(
5
,
10
))
# 先点击开始时间:2024年7月1日
self
.
page_chrome
.
ele
(
f
"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.start_year} {self.start_month} 月')]"
f
"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f
"//div[@class='tiktok-datepicker-day valid in-this-month']"
f
"//span[text()='{self.start_day}']/parent::div"
)
.
click
()
print
(
f
'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日'
)
time
.
sleep
(
random
.
randint
(
5
,
8
))
for
i
in
range
(
10
):
try
:
xpath
=
(
f
"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
f
"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f
"//span[text()='{self.end_day}']/parent::div"
)
print
(
'结束日期 xpath::'
,
xpath
)
ele
=
self
.
page_chrome
.
ele
(
f
"xpath={xpath}"
,
timeout
=
5
)
ele
.
click
()
print
(
f
'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}'
)
self
.
get_data
()
time
.
sleep
(
random
.
randint
(
5
,
8
))
return
True
# 成功返回
except
Exception
as
e
:
print
(
f
'❌ 无法点击 {self.end_year}-{self.end_month}-{self.end_day},错误:{e}'
)
# 往前推一天
time
.
sleep
(
random
.
randint
(
5
,
15
))
print
(
'⛔ 连续尝试失败,未找到可点击的日期,请检查页面状态或网络连接。'
)
return
False
except
Exception
as
e
:
print
(
f
"get_day出现错误: {e}"
)
self
.
send_error_notification_via_wechat
(
e
)
# 如果有这个方法可以取消注释
def
get_data
(
self
):
try
:
self
.
page_chrome
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
13
)
.
click
()
print
(
'已点击更新'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
13
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
13
)
.
click
()
print
(
'已点击Xlsx'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
13
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
# 点击首页 获取店铺名称
self
.
page_chrome
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
13
)
.
click
()
print
(
'已点击首页'
)
sleep
(
randint
(
5
,
10
))
self
.
shop_name
=
self
.
page_chrome
.
ele
(
'xpath://div[@class="text-H6-Bold"]'
)
.
text
print
(
f
'已获取店铺名: {self.shop_name}'
)
sleep
(
randint
(
5
,
10
))
self
.
save_to_redis
()
self
.
send_success_message_via_wechat
()
time
.
sleep
(
8
)
# self.page_chrome.quit()
except
Exception
as
e
:
print
(
f
"get_data出现错误: {e}"
)
self
.
send_error_notification_via_wechat
(
e
)
def
connect_redis
(
self
):
"""建立 Redis 连接"""
self
.
r
=
redis
.
StrictRedis
(
**
self
.
REDIS_CONFIG
)
try
:
self
.
r
.
ping
()
# 测试连接
print
(
"✅ 成功连接到 Redis"
)
except
redis
.
exceptions
.
ConnectionError
as
e
:
print
(
f
"❌ 无法连接到 Redis: {e}"
)
raise
def
read_excel
(
self
,
file_path
):
"""读取 Excel 文件内容,并防止数值被转为科学计数法"""
print
(
f
"📄 正在读取文件:{file_path}"
)
# 定义列名映射(中文 → 英文)
column_mapping
=
{
'视频标题'
:
'video_title'
,
'视频链接'
:
'video_url'
,
'发布时间'
:
'publish_date'
,
'视频观看次数'
:
'views'
,
'点赞数'
:
'likes'
,
'评论数'
:
'comments'
,
'分享次数'
:
'shares'
,
'添加到收藏'
:
'favorites'
}
# 强制所有列为字符串类型,防止科学计数法
df
=
pd
.
read_excel
(
file_path
,
dtype
=
str
)
# 替换列名为英文
df
.
rename
(
columns
=
column_mapping
,
inplace
=
True
)
data
=
df
.
to_dict
(
orient
=
'records'
)
# 转换为字典列表
print
(
f
"📊 已读取 {len(data)} 条记录"
)
return
data
def
process_data
(
self
,
data
,
account
):
processed_data
=
[]
current_time
=
datetime
.
now
()
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)
for
record
in
data
:
# 提取 content_id
video_link
=
record
.
get
(
'video_url'
,
''
)
if
video_link
:
content_id
=
video_link
.
split
(
'/'
)[
-
1
]
else
:
content_id
=
''
# 添加 account 和 update_time 字段
processed_record
=
{
'account'
:
account
,
'content_id'
:
content_id
,
'update_time'
:
current_time
,
**
record
# 合并原始记录
}
# 替换 NaN 和 None 为 空字符串
cleaned_record
=
{
key
:
(
""
if
pd
.
isna
(
value
)
or
value
is
None
or
str
(
value
)
.
strip
()
.
lower
()
==
"nan"
else
value
)
for
key
,
value
in
processed_record
.
items
()
}
processed_data
.
append
(
cleaned_record
)
return
processed_data
def
store_data_in_redis
(
self
,
r
,
data
):
"""将数据存储到 Redis 中,对相同的 shop_code 清除旧数据后写入新数据"""
key
=
f
"tk_video_data:{self.key}:order:list"
# ⚠️ 先删除旧数据(实现“覆盖”)
if
r
.
exists
(
key
):
r
.
delete
(
key
)
print
(
f
"🗑️ 已清除旧数据: {key}"
)
# 写入新数据
pipe
=
r
.
pipeline
()
for
record
in
data
:
value
=
json
.
dumps
(
record
,
ensure_ascii
=
False
)
pipe
.
rpush
(
key
,
value
)
pipe
.
execute
()
# 批量执行,提高效率
print
(
f
"💾 已写入新数据到键: {key},共 {len(data)} 条记录"
)
def
find_specific_file
(
self
):
download_path
=
Path
(
self
.
download_folder
)
if
self
.
start_month
<
10
:
start_month
=
f
'0{self.start_month}'
else
:
start_month
=
self
.
start_month
if
self
.
start_day
<
10
:
start_day
=
f
'0{self.start_day}'
else
:
start_day
=
self
.
start_day
if
self
.
end_month
<
10
:
end_month
=
f
'0{self.end_month}'
else
:
end_month
=
self
.
end_month
if
self
.
end_day
<
10
:
end_day
=
f
'0{self.end_day}'
else
:
end_day
=
self
.
end_day
# 构建基础前缀(使用真正的括号)
base_prefix
=
f
"视频({self.start_year}_{start_month}_{start_day}-{self.end_year}_{end_month}_{end_day})"
# 构建正则表达式:以 base_prefix 开头,后面可以跟任意内容
pattern
=
re
.
escape
(
base_prefix
)
+
r'.*$'
print
(
"匹配模式:"
,
pattern
)
for
file
in
download_path
.
iterdir
():
if
file
.
is_file
()
and
re
.
fullmatch
(
pattern
,
file
.
name
):
return
str
(
file
)
raise
FileNotFoundError
(
f
"未找到匹配 {base_prefix} 的文件"
)
def
save_to_redis
(
self
):
EXCEL_FILE
=
self
.
find_specific_file
()
print
(
f
'保存文件:{EXCEL_FILE}'
)
# 读取 Excel 数据
data
=
self
.
read_excel
(
EXCEL_FILE
)
processed_data
=
self
.
process_data
(
data
,
self
.
shop_name
)
self
.
store_data_in_redis
(
self
.
r
,
processed_data
)
try
:
print
(
'删除下载文件'
,
EXCEL_FILE
)
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
"account"
:
self
.
receiver_name
,
"title"
:
'【TK视频数据下载成功提醒】'
,
"content"
:
f
'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("
%
Y-
%
m-
%
d
%
H:
%
M:
%
S")}'
}
response
=
requests
.
post
(
url
=
webhook_url
,
data
=
data
,
timeout
=
15
)
if
response
.
status_code
==
200
:
print
(
"已成功发送通知到企业微信"
)
else
:
print
(
f
"发送通知失败: {response.text}"
)
def
send_error_notification_via_wechat
(
self
,
error_message
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
"account"
:
self
.
receiver_name
,
'title'
:
'【TK视频数据下载异常提醒】'
,
'content'
:
f
'账号:{self.key},错误信息:{error_message}, 时间: {datetime.now().strftime("
%
Y-
%
m-
%
d
%
H:
%
M:
%
S")}'
}
response
=
requests
.
post
(
url
=
webhook_url
,
data
=
data
,
timeout
=
15
)
if
response
.
status_code
==
200
:
print
(
"已成功发送错误通知到企业微信"
)
else
:
print
(
f
"发送错误通知失败: {response.text}"
)
def
run
(
self
):
self
.
connect_redis
()
self
.
get_datetime
()
self
.
get_day
()
print
(
'完成关闭浏览器'
)
time
.
sleep
(
5
)
# 如果 ChromiumPage 底层保存了 browser 对象
# 或者如果它是基于 Selenium WebDriver
self
.
page_chrome
.
quit
()
if
__name__
==
'__main__'
:
TkVideo
()
.
run
()
wangjing_projects/projects/tiktok/TK_video_data/tk_video_edg_FA.py
View file @
3f158caf
...
...
@@ -90,7 +90,7 @@ class TkVideo():
# 等待页面初始加载
time
.
sleep
(
random
.
randint
(
6
,
10
))
export_orders
=
self
.
page_edge
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
0
)
export_orders
=
self
.
page_edge
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
3
)
export_orders
.
click
()
print
(
'点击自定义'
)
time
.
sleep
(
random
.
randint
(
5
,
10
))
...
...
@@ -104,11 +104,7 @@ class TkVideo():
)
.
click
()
print
(
f
'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日'
)
time
.
sleep
(
random
.
randint
(
3
,
5
))
max_attempts
=
31
while
max_attempts
>
0
:
for
i
in
range
(
10
):
try
:
xpath
=
(
f
"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
...
...
@@ -116,7 +112,7 @@ class TkVideo():
f
"//span[text()='{self.end_day}']/parent::div"
)
print
(
'结束日期 xpath::'
,
xpath
)
ele
=
self
.
page_edge
.
ele
(
f
"xpath={xpath}"
,
timeout
=
5
)
ele
=
self
.
page_edge
.
ele
(
f
"xpath={xpath}"
,
timeout
=
8
)
ele
.
click
()
print
(
f
'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}'
)
self
.
get_data
()
...
...
@@ -137,24 +133,24 @@ class TkVideo():
def
get_data
(
self
):
try
:
self
.
page_edge
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击更新'
)
sleep
(
randint
(
5
,
10
))
self
.
page_edge
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
self
.
page_edge
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击Xlsx'
)
sleep
(
randint
(
5
,
10
))
self
.
page_edge
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
# 点击首页 获取店铺名称
self
.
page_edge
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击首页'
)
sleep
(
randint
(
5
,
10
))
...
...
@@ -298,7 +294,11 @@ class TkVideo():
data
=
self
.
read_excel
(
EXCEL_FILE
)
processed_data
=
self
.
process_data
(
data
,
self
.
shop_name
)
self
.
store_data_in_redis
(
self
.
r
,
processed_data
)
try
:
print
(
'删除下载文件'
,
EXCEL_FILE
)
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
...
...
wangjing_projects/projects/tiktok/TK_video_data/tk_video_edg_ba.py
View file @
3f158caf
...
...
@@ -89,7 +89,7 @@ class TkVideo():
# 等待页面初始加载
time
.
sleep
(
random
.
randint
(
6
,
10
))
export_orders
=
self
.
page_edge
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
0
)
export_orders
=
self
.
page_edge
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
3
)
export_orders
.
click
()
print
(
'点击自定义'
)
time
.
sleep
(
random
.
randint
(
5
,
10
))
...
...
@@ -104,11 +104,7 @@ class TkVideo():
print
(
f
'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日'
)
time
.
sleep
(
random
.
randint
(
3
,
5
))
max_attempts
=
31
while
max_attempts
>
0
:
for
i
in
range
(
10
):
try
:
xpath
=
(
f
"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
...
...
@@ -116,7 +112,7 @@ class TkVideo():
f
"//span[text()='{self.end_day}']/parent::div"
)
print
(
'结束日期 xpath::'
,
xpath
)
ele
=
self
.
page_edge
.
ele
(
f
"xpath={xpath}"
,
timeout
=
5
)
ele
=
self
.
page_edge
.
ele
(
f
"xpath={xpath}"
,
timeout
=
8
)
ele
.
click
()
print
(
f
'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}'
)
self
.
get_data
()
...
...
@@ -137,24 +133,24 @@ class TkVideo():
def
get_data
(
self
):
try
:
self
.
page_edge
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击更新'
)
sleep
(
randint
(
5
,
10
))
self
.
page_edge
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
self
.
page_edge
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击Xlsx'
)
sleep
(
randint
(
5
,
10
))
self
.
page_edge
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
# 点击首页 获取店铺名称
self
.
page_edge
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_edge
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击首页'
)
sleep
(
randint
(
5
,
10
))
...
...
@@ -298,7 +294,11 @@ class TkVideo():
data
=
self
.
read_excel
(
EXCEL_FILE
)
processed_data
=
self
.
process_data
(
data
,
self
.
shop_name
)
self
.
store_data_in_redis
(
self
.
r
,
processed_data
)
try
:
print
(
'删除下载文件'
,
EXCEL_FILE
)
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
...
...
wangjing_projects/projects/tiktok/TK_video_data/tk_video_sum_ch.py
View file @
3f158caf
...
...
@@ -87,7 +87,7 @@ class TkVideo():
# 等待页面初始加载
time
.
sleep
(
random
.
randint
(
6
,
10
))
export_orders
=
self
.
page_chrome
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
0
)
export_orders
=
self
.
page_chrome
.
ele
(
'xpath://span[text()="自定义"]'
,
timeout
=
1
3
)
export_orders
.
click
()
print
(
'点击自定义'
)
time
.
sleep
(
random
.
randint
(
5
,
10
))
...
...
@@ -101,11 +101,7 @@ class TkVideo():
)
.
click
()
print
(
f
'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日'
)
time
.
sleep
(
random
.
randint
(
3
,
5
))
max_attempts
=
31
while
max_attempts
>
0
:
for
i
in
range
(
10
):
try
:
xpath
=
(
f
"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
...
...
@@ -113,7 +109,7 @@ class TkVideo():
f
"//span[text()='{self.end_day}']/parent::div"
)
print
(
'结束日期 xpath::'
,
xpath
)
ele
=
self
.
page_chrome
.
ele
(
f
"xpath={xpath}"
,
timeout
=
5
)
ele
=
self
.
page_chrome
.
ele
(
f
"xpath={xpath}"
,
timeout
=
8
)
ele
.
click
()
print
(
f
'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}'
)
self
.
get_data
()
...
...
@@ -134,28 +130,28 @@ class TkVideo():
def
get_data
(
self
):
try
:
self
.
page_chrome
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://div[text()="更新"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击更新'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="Xlsx"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击Xlsx'
)
sleep
(
randint
(
5
,
10
))
self
.
page_chrome
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://button[text()="下载数据"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击下载数据'
)
sleep
(
randint
(
5
,
10
))
# 点击首页 获取店铺名称
self
.
page_chrome
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
0
)
.
click
()
self
.
page_chrome
.
ele
(
'xpath://span[text()="首页"]'
,
timeout
=
1
3
)
.
click
()
print
(
'已点击首页'
)
sleep
(
randint
(
5
,
10
))
self
.
shop_name
=
self
.
page_chrome
.
ele
(
'xpath://div[@class="text-H6-Bold"]'
)
.
text
self
.
shop_name
=
self
.
page_chrome
.
ele
(
'xpath://div[@class="
flex items-center"]//div[@class="
text-H6-Bold"]'
)
.
text
print
(
f
'已获取店铺名: {self.shop_name}'
)
sleep
(
randint
(
5
,
10
))
...
...
@@ -294,7 +290,11 @@ class TkVideo():
data
=
self
.
read_excel
(
EXCEL_FILE
)
processed_data
=
self
.
process_data
(
data
,
self
.
shop_name
)
self
.
store_data_in_redis
(
self
.
r
,
processed_data
)
try
:
print
(
'删除下载文件'
,
EXCEL_FILE
)
os
.
remove
(
EXCEL_FILE
)
except
:
print
(
'删除数据失败'
)
def
send_success_message_via_wechat
(
self
):
webhook_url
=
'http://47.112.96.71:8082/selection/sendMessage'
# 替换为你的企业微信机器人的Webhook URL
data
=
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment