Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
S
spider
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
selection-new
spider
Commits
1ca4f487
Commit
1ca4f487
authored
Nov 20, 2025
by
Peng
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
aa95175d
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
0 additions
and
1353 deletions
+0
-1353
all_connect.py
...s/picture_material/stock_summery/new_stock/all_connect.py
+0
-728
xnj_url.py
...jects/picture_material/stock_summery/new_stock/xnj_url.py
+0
-625
No files found.
wangjing_projects/projects/picture_material/stock_summery/new_stock/all_connect.py
deleted
100644 → 0
View file @
aa95175d
import
time
import
pandas
as
pd
from
sqlalchemy
import
create_engine
from
sqlalchemy.pool
import
NullPool
from
sqlalchemy
import
text
from
sqlalchemy.orm
import
sessionmaker
import
platform
import
traceback
import
json
import
uuid
from
sqlalchemy.exc
import
SQLAlchemyError
class
ConnectSpider
:
def
__init__
(
self
):
self
.
pg_port
=
54328
self
.
pg_db
=
"selection"
self
.
pg_user
=
"postgres"
self
.
pg_pwd
=
"F9kL2sXe81rZq"
self
.
pg_host
=
"61.145.136.61"
self
.
db_engine
=
create_engine
(
f
"postgresql://{self.pg_user}:{self.pg_pwd}@{self.pg_host}:{self.pg_port}/{self.pg_db}"
)
pg_host
=
"192.168.10.223"
self
.
db_engine192
=
create_engine
(
f
"postgresql://{self.pg_user}:{self.pg_pwd}@{self.pg_host}:{self.pg_port}/{self.pg_db}"
)
# mysql
self
.
sql_port
=
3306
self
.
sql_db
=
"selection"
self
.
sql_user
=
"adv_yswg"
self
.
sql_pwd
=
"Gd1pGJog1ysLMLBdML8w81"
self
.
sql_host
=
"rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com"
self
.
charset
=
'utf8mb4'
# 创建数据库连接字符串
connection_string_mysql
=
f
"mysql+pymysql://{self.sql_user}:{self.sql_pwd}@{self.sql_host}:{self.sql_port}/{self.sql_db}?charset={self.charset}"
self
.
mysql_engine
=
create_engine
(
connection_string_mysql
)
# mysql
sql_port
=
19030
sql_db
=
"test"
sql_user
=
"fangxingjun"
sql_pwd
=
"fangxingjun12345"
sql_host
=
"192.168.10.151"
wai_host
=
"113.100.143.162"
# 创建数据库连接字符串
connection_string_mysql
=
f
"mysql+pymysql://{sql_user}:{sql_pwd}@{wai_host}:{sql_port}/{sql_db}"
self
.
mysql_test
=
create_engine
(
connection_string_mysql
)
# mysql
sql_port
=
19030
sql_db
=
"selection"
sql_user
=
"fangxingjun"
sql_pwd
=
"fangxingjun12345"
sql_host
=
"192.168.10.151"
wai_host
=
"113.100.143.162"
# 创建数据库连接字符串
connection_string_mysql
=
f
"mysql+pymysql://{sql_user}:{sql_pwd}@{sql_host}:{sql_port}/{sql_db}"
self
.
mysql_selection
=
create_engine
(
connection_string_mysql
)
def
mysql
(
self
):
sql_port
=
3306
sql_db
=
"us_spider"
sql_user
=
"adv_yswg"
sql_pwd
=
"Gd1pGJog1ysLMLBdML8w81"
sql_host
=
"rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com"
charset
=
'utf8mb4'
# 创建数据库连接字符串
connection_string_mysql
=
f
"mysql+pymysql://{sql_user}:{sql_pwd}@{sql_host}:{sql_port}/{sql_db}?charset={charset}"
mysql_engine
=
create_engine
(
connection_string_mysql
)
return
mysql_engine
def
mysql_us_spider
(
self
):
sql_port
=
19030
# sql_db = "us_spider"
sql_db
=
"test"
sql_user
=
"fangxingjun"
sql_pwd
=
"fangxingjun12345"
sql_host
=
"192.168.10.151"
# 创建数据库连接字符串
connection_string_mysql
=
f
"mysql+pymysql://{sql_user}:{sql_pwd}@{sql_host}:{sql_port}/{sql_db}"
mysql_us_spider_engine
=
create_engine
(
connection_string_mysql
)
return
mysql_us_spider_engine
def
save_stock_img_id
(
self
,
items
):
"""批量保存数据到数据库。"""
# 提取 image_title 和 image_size_info
processed_items
=
[]
for
item
in
items
:
image_title
=
item
.
get
(
'title'
,
''
)
image_size_info
=
json
.
dumps
(
item
.
get
(
'sizes'
,
{}))
processed_item
=
{
'account_id'
:
item
[
'account_id'
],
'image_id'
:
item
[
'image_id'
],
'state'
:
item
[
'state'
],
'created_at'
:
item
[
'created_at'
],
'image_title'
:
image_title
,
'image_size_info'
:
image_size_info
}
processed_items
.
append
(
processed_item
)
# # 定义DataFrame的列 旧代码
# columns = ['account_id', 'image_id', 'state', 'created_at', 'image_title', 'image_size_info']
# df = pd.DataFrame(processed_items, columns=columns)
#
# # 使用with语句管理数据库连接
# with self.db_engine192.connect() as connection:
# df.to_sql(
# name=table_name,
# con=connection,
# if_exists='append',
# index=False
# )
insert_sql
=
text
(
"""
INSERT INTO stock_image_id_wj
(account_id, image_id, state, created_at, image_title, image_size_info)
VALUES
(:account_id, :image_id, :state, :created_at, :image_title, :image_size_info)
ON CONFLICT (account_id, image_id) DO NOTHING
"""
)
print
(
'新代码插入id'
,
insert_sql
)
# 不需要担心有重复的。表里面有唯一索引。会跳过。
# 假设 processed_items 是 [{'account_id':..., 'image_id':..., …}, …]
with
self
.
db_engine192
.
begin
()
as
conn
:
conn
.
execute
(
insert_sql
,
processed_items
)
def
save_homedepot_projects
(
self
,
items
):
"""批量保存数据到数据库。"""
table_name
=
"homedepot_projects_items"
# 提取 image_title 和 image_size_info
processed_items
=
[]
for
item
in
items
:
image_title
=
item
.
get
(
'title'
,
''
)
image_size_info
=
json
.
dumps
(
item
.
get
(
'sizes'
,
{}))
processed_item
=
{
'account_id'
:
item
[
'account_id'
],
'image_id'
:
item
[
'image_id'
],
'state'
:
item
[
'state'
],
'created_at'
:
item
[
'created_at'
],
'image_title'
:
image_title
,
'image_size_info'
:
image_size_info
}
processed_items
.
append
(
processed_item
)
# 定义DataFrame的列
columns
=
[
'account_id'
,
'image_id'
,
'state'
,
'created_at'
,
'image_title'
,
'image_size_info'
]
df
=
pd
.
DataFrame
(
processed_items
,
columns
=
columns
)
# 使用with语句管理数据库连接
with
self
.
db_engine192
.
connect
()
as
connection
:
df
.
to_sql
(
name
=
table_name
,
con
=
connection
,
if_exists
=
'append'
,
index
=
False
)
def
get_account_id
(
self
,
item_id
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_summary_wj"
query
=
text
(
f
"SELECT account_id, id FROM {table_name} WHERE id = :item_id"
)
result
=
connection
.
execute
(
query
,
{
"item_id"
:
item_id
})
df_status
=
pd
.
DataFrame
(
result
.
fetchall
())
df_status
.
columns
=
result
.
keys
()
try
:
accounts
=
df_status
.
account_id
.
iloc
[
0
]
except
IndexError
:
accounts
=
None
# 或者处理不存在的情况
return
accounts
def
update_id_to_3
(
self
,
account_id
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_summary_wj"
success_id
=
tuple
(
account_id
)
sql_update
=
text
(
f
"UPDATE {table_name} SET state = 3 WHERE account_id IN :success_id"
)
result
=
connection
.
execute
(
sql_update
,
{
"success_id"
:
success_id
})
print
(
'成功更新为3'
)
connection
.
close
()
def
update_all_states_to_1
(
self
,
state
=
1
,
item_id
=
None
):
try
:
with
self
.
db_engine192
.
begin
()
as
connection
:
# 使用 begin() 自动管理事务
table_name
=
"stock_image_summary_wj"
if
state
==
3
:
sql_update
=
text
(
f
"UPDATE {table_name} SET state = {state} where id={item_id}"
)
else
:
sql_update
=
text
(
f
"UPDATE {table_name} SET state = {state}"
)
print
(
sql_update
)
result
=
connection
.
execute
(
sql_update
)
print
(
f
'成功更新所有状态为1,受影响行数:{result.rowcount}'
)
# 显式提交事务(虽然 begin() 已经自动提交)
connection
.
commit
()
except
Exception
as
e
:
print
(
f
'更新状态失败:{e}'
)
# 回滚事务
if
'connection'
in
locals
():
connection
.
rollback
()
def
save_account
(
self
,
items
):
"""批量保存数据到数据库。"""
table_name
=
"stock_image_summary_wj"
# 定义DataFrame的列
columns
=
[
'account_id'
,
'account_secret'
,
'year_month'
,
'spider_date'
,
'state'
,
'created_time'
]
df
=
pd
.
DataFrame
(
items
,
columns
=
columns
)
# 使用with语句管理数据库连接
with
self
.
db_engine192
.
connect
()
as
connection
:
df
.
to_sql
(
name
=
table_name
,
con
=
connection
,
if_exists
=
'append'
,
index
=
False
)
print
(
"保存成功!"
)
def
delet_datails
(
self
,
image_id_list
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_detail_wj"
# 使用 SQLAlchemy 的 text 函数来创建 SQL 语句
query
=
text
(
f
"SELECT image_id FROM {table_name} WHERE account_id = 'zhouweiqing@yswg.com.cn';"
)
# 使用 connection.execute() 来执行查询
result
=
connection
.
execute
(
query
)
.
fetchall
()
# 获取表中的 image_id 列表
db_image_ids
=
[
row
[
0
]
for
row
in
result
]
# 找出不在 image_id_list 中的 image_id
non_existent_image_ids
=
set
(
db_image_ids
)
-
set
(
image_id_list
)
# 删除不在 image_id_list 中的记录
for
image_id
in
non_existent_image_ids
:
delete_query
=
text
(
f
"DELETE FROM {table_name} WHERE account_id = 'zhouweiqing@yswg.com.cn' AND image_id = '{image_id}';"
)
connection
.
execute
(
delete_query
)
# 提交更改
connection
.
commit
()
def
get_datails_image_id
(
self
,
account_id
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_detail_wj"
sql_query
=
text
(
f
"SELECT image_id FROM {table_name} WHERE account_id = :account_id and created_time < '2024-09-02 00:00:00'"
)
result
=
connection
.
execute
(
sql_query
,
{
"account_id"
:
account_id
})
image_id_list
=
[
int
(
row
[
0
])
for
row
in
result
.
fetchall
()]
# 提交更改
# connection.commit()
return
image_id_list
# 1111111111111
def
save_stock_detail
(
self
,
item
):
"""批量保存数据到数据库。"""
table_name
=
"stock_image_detail_wj"
# 将item包装成列表
items_list
=
[
item
]
# 定义DataFrame的列
columns
=
[
'account_id'
,
'image_id'
,
'image_size_info'
,
'image_title'
,
'image_type'
,
'image_url'
,
'state'
,
'created_time'
]
df
=
pd
.
DataFrame
(
items_list
,
columns
=
columns
)
with
self
.
db_engine192
.
connect
()
as
connection
:
df
.
to_sql
(
name
=
table_name
,
con
=
connection
,
if_exists
=
'append'
,
index
=
False
)
# print("保存成功!")
def
get_pic_urls_limit1
(
self
,
account_id
):
pic_data_list
=
[]
# 创建一个空列表来存储结果
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_detail_wj"
query
=
text
(
f
"""select image_url, image_id, image_title from {table_name} where account_id = :account_id and state = 1 limit 1"""
)
try
:
result
=
connection
.
execute
(
query
,
{
'account_id'
:
account_id
})
for
row
in
result
:
# 遍历所有的结果行
if
row
is
not
None
:
# 将 RowProxy 转换为标准的字典
row_dict
=
dict
(
zip
(
result
.
keys
(),
row
))
# 使用 keys 和 fetchone 的结果创建字典
# 直接构建所需格式的字符串
pic_datas
=
f
"{row_dict['image_url']}||{row_dict['image_id']}||{row_dict['image_title']}"
pic_data_list
.
append
(
pic_datas
)
# 添加到列表中
if
not
pic_data_list
:
# print("No data found for the given account_id")
return
False
else
:
return
pic_data_list
# 返回列表
except
Exception
as
e
:
print
(
f
"An error occurred: {e}"
)
return
False
def
get_pic_urls
(
self
,
account_id
):
pic_data_list
=
[]
# 创建一个空列表来存储结果
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_detail_wj"
query
=
text
(
f
"""select image_url, image_id, image_title from {table_name} where account_id = :account_id and state = 1"""
)
try
:
result
=
connection
.
execute
(
query
,
{
'account_id'
:
account_id
})
for
row
in
result
:
# 遍历所有的结果行
if
row
is
not
None
:
# 将 RowProxy 转换为标准的字典
row_dict
=
dict
(
zip
(
result
.
keys
(),
row
))
# 使用 keys 和 fetchone 的结果创建字典
# 直接构建所需格式的字符串
pic_datas
=
f
"{row_dict['image_url']}||{row_dict['image_id']}||{row_dict['image_title']}"
pic_data_list
.
append
(
pic_datas
)
# 添加到列表中
if
not
pic_data_list
:
# print("No data found for the given account_id")
return
False
else
:
return
pic_data_list
# 返回列表
except
Exception
as
e
:
print
(
f
"An error occurred: {e}"
)
return
False
def
get_stock_images_id2
(
self
,
account_id
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_id_wj"
# 特定的 image_id 列表
specific_image_ids
=
[
'1025406430'
,
'782084149'
,
'2340663257'
,
'2444918601'
,
'2481076155'
,
'2534369399'
,
'2522128969'
,
'2522144147'
,
'2482077119'
,
'2475085855'
,
'2560247125'
,
'1115348984'
,
'2555951185'
,
'1644852415'
,
'1644852424'
,
'258700904'
,
'2540342353'
,
'2555951245'
,
'2529955899'
,
'1309059847'
,
'1899316957'
,
'2416180707'
,
'1978653428'
,
'2520112131'
,
'1447499252'
,
'2335787565'
,
'1780440524'
,
'2316295613'
,
'2463106909'
,
'2527382733'
,
'2548693637'
,
'2460743889'
,
'2489123001'
,
'2527399543'
,
'2456315025'
,
'2469939069'
,
'2305915213'
,
'1660111006'
,
'2218802639'
,
'453729808'
,
'2295540279'
,
'2323950095'
,
'2323950087'
,
'2057817146'
,
'2541104423'
,
'231076948'
,
'2196541827'
,
'2407612765'
,
'2521017693'
,
'2554778219'
,
'2523427909'
,
'2520799267'
,
'2533854931'
,
'2498052331'
,
'2521798533'
,
'2471652945'
,
'2445858817'
,
'2449783031'
,
'1735869230'
,
'1106587370'
,
'2393397957'
,
'2527382699'
,
'2348771553'
,
'1822384931'
,
'2564084221'
]
# specific_image_ids = ['2509630613', '2568241787', '2568242327', '2568242443', '2568242799', '2568242949']
# 修改查询语句以匹配你的数据表名称、列名称,并加入 image_id 条件
query
=
text
(
f
"""SELECT image_id,id,image_title,image_size_info FROM {table_name}
WHERE account_id = :account_id
AND image_id IN :image_ids"""
)
print
(
query
)
result
=
connection
.
execute
(
query
,
{
'account_id'
:
account_id
,
'image_ids'
:
tuple
(
specific_image_ids
)})
try
:
df_status
=
pd
.
DataFrame
(
result
.
fetchall
())
df_status
.
columns
=
result
.
keys
()
df_status
[
'id'
]
=
df_status
[
'id'
]
.
astype
(
str
)
image_id_id_pairs
=
list
(
df_status
[
'image_id'
]
.
astype
(
str
)
+
'||-||'
+
df_status
[
'id'
]
+
'||-||'
+
df_status
[
'image_title'
]
+
'||-||'
+
df_status
[
'image_size_info'
])
print
(
f
'账号:{account_id}需爬取{len(image_id_id_pairs)}张'
)
return
image_id_id_pairs
except
Exception
as
e
:
print
(
e
)
return
False
def
img_size_is_1
(
self
,
account_id
,
image_ids
):
# 确保image_ids列表非空
if
not
image_ids
:
print
(
"No image IDs provided."
)
return
False
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_id_wj"
query
=
text
(
f
"""SELECT image_id, id, image_title, image_size_info FROM {table_name} WHERE account_id = :account_id AND image_id IN :image_ids"""
)
try
:
result
=
connection
.
execute
(
query
,
{
'account_id'
:
account_id
,
'image_ids'
:
tuple
(
image_ids
)})
df_status
=
pd
.
DataFrame
(
result
.
fetchall
())
df_status
.
columns
=
result
.
keys
()
df_status
[
'id'
]
=
df_status
[
'id'
]
.
astype
(
str
)
image_id_id_pairs
=
list
(
df_status
[
'image_id'
]
+
'||-||'
+
df_status
[
'id'
]
+
'||-||'
+
df_status
[
'image_title'
]
+
'||-||'
+
df_status
[
'image_size_info'
])
print
(
f
'账号:{account_id}需爬取{len(image_id_id_pairs)}张'
)
return
image_id_id_pairs
except
Exception
as
e
:
print
(
e
)
return
False
def
get_stock_images_id
(
self
,
account_id
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_id_wj"
# 修改查询语句以匹配你的数据表名称和列名称
query
=
text
(
f
""" SELECT image_id,id,image_title,image_size_info FROM {table_name} where account_id = :account_id and state = 1"""
)
print
(
query
)
result
=
connection
.
execute
(
query
,
{
'account_id'
:
account_id
})
try
:
df_status
=
pd
.
DataFrame
(
result
.
fetchall
())
df_status
.
columns
=
result
.
keys
()
df_status
[
'id'
]
=
df_status
[
'id'
]
.
astype
(
str
)
image_id_id_pairs
=
list
(
df_status
[
'image_id'
]
+
'||-||'
+
df_status
[
'id'
]
+
'||-||'
+
df_status
[
'image_title'
]
+
'||-||'
+
df_status
[
'image_size_info'
])
print
(
f
'账号:{account_id}需爬取{len(image_id_id_pairs)}张'
)
connection
.
close
()
return
image_id_id_pairs
except
Exception
as
e
:
print
(
e
)
return
False
def
get_kong_images_id
(
self
,
account_id
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_detail_wj"
# 使用子查询来过滤掉已经有 image_size_info 的 image_id
query
=
text
(
f
"""SELECT image_id, image_type, image_url
FROM {table_name}
WHERE account_id = :account_id
AND image_id NOT IN (
SELECT image_id
FROM {table_name}
WHERE account_id = :account_id
AND image_size_info != '{{}}'
)
AND image_size_info = '{{}}'"""
)
print
(
query
)
result
=
connection
.
execute
(
query
,
{
'account_id'
:
account_id
})
df_status
=
pd
.
DataFrame
(
result
.
fetchall
(),
columns
=
result
.
keys
())
if
df_status
.
empty
:
return
[]
data_list
=
list
(
df_status
[
'image_id'
]
+
'||'
+
df_status
[
'image_type'
]
+
'||'
+
df_status
[
'image_url'
])
connection
.
close
()
return
data_list
def
get_stock_image_detail
(
self
,
account_id
):
with
self
.
mysql_selection
.
connect
()
as
connection
:
table_name
=
"stock_image_detail"
query
=
text
(
f
"""SELECT account_id, image_id, image_size_info, image_title, image_type, image_url, created_time FROM {table_name} WHERE account_id = :account_id"""
)
print
(
query
)
result
=
connection
.
execute
(
query
,
{
'account_id'
:
account_id
})
df_status
=
pd
.
DataFrame
(
result
.
fetchall
())
df_status
.
columns
=
result
.
keys
()
# 将 Timestamp 转换为字符串格式
df_status
[
'created_time'
]
=
df_status
[
'created_time'
]
.
dt
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)
# 拼接字符串
detail_datas
=
list
(
df_status
[
'account_id'
]
+
'||-||'
+
df_status
[
'image_id'
]
+
'||-||'
+
df_status
[
'image_size_info'
]
+
'||-||'
+
df_status
[
'image_title'
]
+
'||-||'
+
df_status
[
'image_type'
]
+
'||-||'
+
df_status
[
'image_url'
]
+
'||-||'
+
df_status
[
'created_time'
]
)
print
(
f
'账号:{account_id} 一共 {len(detail_datas)} 条数据'
)
return
detail_datas
def
save_stock_detail_move
(
self
,
data_list
):
table_name
=
"stock_image_detail_wj"
# 定义DataFrame的列
columns
=
[
'account_id'
,
'image_id'
,
'image_size_info'
,
'image_title'
,
'image_type'
,
'image_url'
,
'created_time'
]
df
=
pd
.
DataFrame
(
data_list
,
columns
=
columns
)
with
self
.
db_engine192
.
connect
()
as
connection
:
df
.
to_sql
(
name
=
table_name
,
con
=
connection
,
if_exists
=
'append'
,
index
=
False
)
print
(
"保存成功!"
)
# 11111111111
def
update_image_id_to_3
(
self
,
item_id
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_id_wj"
trans
=
connection
.
begin
()
sql_update
=
text
(
f
"UPDATE {table_name} SET state = 3 WHERE id = :item_id"
)
result
=
connection
.
execute
(
sql_update
,
{
"item_id"
:
item_id
})
trans
.
commit
()
def
update_url_state_to_3
(
self
,
image_id
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_detail_wj"
trans
=
connection
.
begin
()
sql_update
=
text
(
f
"UPDATE {table_name} SET state = 3 WHERE image_id = :image_id and state = 1"
)
result
=
connection
.
execute
(
sql_update
,
{
"image_id"
:
image_id
})
trans
.
commit
()
# 11111111111
def
update_image_id_to_4
(
self
,
item_id
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_id_wj"
trans
=
connection
.
begin
()
sql_update
=
text
(
f
"UPDATE {table_name} SET state = 4 WHERE id = :item_id"
)
result
=
connection
.
execute
(
sql_update
,
{
"item_id"
:
item_id
})
connection
.
close
()
trans
.
commit
()
def
save_stock_cookie
(
self
,
item
):
table_name
=
"stock_cookie_wj"
# 将item包装成列表
items_list
=
[
item
]
# 定义DataFrame的列
columns
=
[
'account_id'
,
'cookie'
,
'state'
,
'created_at'
]
df
=
pd
.
DataFrame
(
items_list
,
columns
=
columns
)
with
self
.
db_engine192
.
connect
()
as
connection
:
df
.
to_sql
(
name
=
table_name
,
con
=
connection
,
if_exists
=
'append'
,
index
=
False
)
print
(
"保存成功!"
)
def
updata_ck_state
(
self
,
ck_id
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_cookie_wj"
# 使用参数化查询防止SQL注入
query
=
text
(
f
"""UPDATE {table_name} SET state = :new_state WHERE id = :ck_id;"""
)
try
:
# 执行更新语句
result
=
connection
.
execute
(
query
,
{
'new_state'
:
3
,
'ck_id'
:
ck_id
})
# 提交事务以确保更改被保存到数据库中
connection
.
commit
()
# 检查是否有行受到影响
if
result
.
rowcount
>
0
:
print
(
'修改cookie状态为3'
)
return
True
# 更新成功
else
:
return
False
# 没有找到匹配项或没有更新任何行
except
Exception
as
e
:
# print(f"An error occurred: {e}")
return
False
def
get_stock_cookie
(
self
,
account
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_cookie_wj"
# 使用参数化查询
query
=
text
(
f
"""SELECT id, cookie, state FROM {table_name} WHERE account_id = :account_id AND state = :state LIMIT 1;"""
)
result
=
connection
.
execute
(
query
,
{
'account_id'
:
account
,
'state'
:
1
})
.
mappings
()
.
first
()
if
result
is
not
None
:
# 通过键名访问字典中的元素
cookie_id_state
=
f
"{result['id']}||-||{result['cookie']}||-||{result['state']}"
return
cookie_id_state
else
:
return
None
# 没有找到匹配项时返回None
def
get_stock_cookie_list
(
self
,
account
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_cookie_wj"
# 使用参数化查询
query
=
text
(
f
"""SELECT cookie FROM {table_name} WHERE account_id = :account_id AND state = :state ; """
)
result
=
connection
.
execute
(
query
,
{
'account_id'
:
account
,
'state'
:
1
})
df_status
=
pd
.
DataFrame
(
result
.
fetchall
())
df_status
.
columns
=
result
.
keys
()
cookie_list
=
df_status
[
'cookie'
]
.
tolist
()
if
'cookie'
in
df_status
.
columns
else
[]
return
cookie_list
def
get_cookie_account
(
self
,
item_id
):
# try:
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_summary_wj"
# 修改查询语句以匹配你的数据表名称和列名称
query
=
text
(
f
"""SELECT account_id,account_secret FROM {table_name} where id = :item_id and state= :state_int;"""
)
print
(
query
)
result
=
connection
.
execute
(
query
,
{
'item_id'
:
item_id
,
'state_int'
:
1
})
print
(
result
)
df_status
=
pd
.
DataFrame
(
result
.
fetchall
())
if
len
(
df_status
)
>
0
:
df_status
.
columns
=
result
.
keys
()
account_id
=
df_status
.
account_id
.
iloc
[
0
]
account_secret
=
df_status
.
account_secret
.
iloc
[
0
]
account_list
=
[
account_id
,
account_secret
]
print
(
account_list
,
'232323====32'
)
# print(111111111111)
connection
.
close
()
return
account_list
else
:
return
None
# except Exception as e:
# print(111111111)
def
get_stock_test_id
(
self
,
username
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_id_wj"
# 修改查询语句以匹配你的数据表名称和列名称
query
=
text
(
f
"""SELECT image_id FROM {table_name} WHERE account_id = :username LIMIT 1;"""
)
result
=
connection
.
execute
(
query
,
{
'username'
:
username
})
df_status
=
pd
.
DataFrame
(
result
.
fetchall
())
df_status
.
columns
=
result
.
keys
()
image_id
=
df_status
.
image_id
.
iloc
[
0
]
connection
.
close
()
return
image_id
def
upload_data
(
self
,
account_id
,
image_id
,
upload_time
,
err_msg
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_detail_wj"
# 强制将 image_id 转换为字符串类型
image_id
=
str
(
image_id
)
err_msg_json
=
json
.
dumps
(
err_msg
)
sql_update
=
text
(
f
"UPDATE {table_name} SET upload_time = :upload_time, err_msg = :err_msg WHERE account_id = :account_id AND image_id = :image_id AND created_time < '2024-09-02 00:00:00'"
)
result
=
connection
.
execute
(
sql_update
,
{
"upload_time"
:
upload_time
,
"err_msg"
:
err_msg_json
,
"account_id"
:
account_id
,
"image_id"
:
image_id
})
def
upload_success_data
(
self
,
account_id
,
image_id
,
upload_time
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_detail_wj"
# 强制将 image_id 转换为字符串类型
image_id
=
str
(
image_id
)
sql_update
=
text
(
f
"UPDATE {table_name} SET upload_time = :upload_time WHERE account_id = :account_id AND image_id = :image_id AND created_time < '2024-09-02 00:00:00'"
)
result
=
connection
.
execute
(
sql_update
,
{
"upload_time"
:
upload_time
,
"account_id"
:
account_id
,
"image_id"
:
image_id
})
def
get_all_image_id
(
self
):
with
self
.
db_engine192
.
connect
()
as
connection
:
table_name
=
"stock_image_detail_wj"
sql_query
=
f
"SELECT image_id FROM {table_name} "
df_status
=
pd
.
read_sql
(
sql_query
,
con
=
connection
)
image_id
=
list
(
df_status
[
'image_id'
]
.
astype
(
str
))
connection
.
close
()
return
image_id
if
__name__
==
'__main__'
:
ConnectSpider
()
.
get_cookie_account
(
1
)
wangjing_projects/projects/picture_material/stock_summery/new_stock/xnj_url.py
deleted
100644 → 0
View file @
aa95175d
# -*- coding: utf-8 -*-
import
sys
import
os
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
# 上级目录
from
time
import
sleep
from
random
import
randint
from
all_connect
import
ConnectSpider
Con
=
ConnectSpider
()
import
imaplib
import
email
import
os
os
.
environ
[
'NO_PROXY'
]
=
'stackoverflow.com'
import
logging
logging
.
captureWarnings
(
True
)
from
DrissionPage
import
ChromiumPage
import
json
import
requests
import
re
import
random
import
time
from
datetime
import
datetime
,
timedelta
import
calendar
import
sys
class
GetStockImgId
(
object
):
def
__init__
(
self
):
self
.
headers
=
{
'accept'
:
'application/json'
,
'accept-language'
:
'zh-CN,zh;q=0.9'
,
'content-type'
:
'application/json'
,
'newrelic'
:
'eyJ2IjpbMCwxXSwiZCI6eyJ0eSI6IkJyb3dzZXIiLCJhYyI6Ijk2NzIzMiIsImFwIjoiMTU4ODYzMjc5MiIsImlkIjoiMjgzNzAxYzA5ODljNWI4YiIsInRyIjoiMDYwYTQwMzI4MjhiMGNlM2ZkZmJlYzAxNDU5NTVhZDUiLCJ0aSI6MTczNTg4NTk5ODcxOX19'
,
'origin'
:
'https://www.shutterstock.com'
,
'priority'
:
'u=1, i'
,
'referer'
:
'https://www.shutterstock.com/zh/catalog/licenses?startDate=2024-12-01&endDate=2024-12-31'
,
'sec-ch-ua'
:
'"Not)A;Brand";v="99", "Google Chrome";v="127", "Chromium";v="127"'
,
'sec-ch-ua-mobile'
:
'?0'
,
'sec-ch-ua-platform'
:
'"Windows"'
,
'sec-fetch-dest'
:
'empty'
,
'sec-fetch-mode'
:
'cors'
,
'sec-fetch-site'
:
'same-origin'
,
'traceparent'
:
'00-060a4032828b0ce3fdfbec0145955ad5-283701c0989c5b8b-01'
,
'tracestate'
:
'967232@nr=0-1-967232-1588632792-283701c0989c5b8b----1735885998719'
,
'user-agent'
:
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36'
,
'x-end-app-name'
:
'next-web'
,
'x-end-app-version'
:
'837034fdc61'
,
'x-newrelic-id'
:
'XQAAU1VRGwIEVVhaBgYGUlI='
,
'x-request-id'
:
'c3a36b63-ff03-4c2f-9a94-5381cd4485a7'
,
}
def
random_ua
(
self
):
first_num
=
random
.
randint
(
55
,
62
)
third_num
=
random
.
randint
(
0
,
3200
)
fourth_num
=
random
.
randint
(
0
,
140
)
os_type
=
[
'(Windows NT 6.1; WOW64)'
,
'(Windows NT 10.0; WOW64)'
,
'(X11; Linux x86_64)'
,
'(Macintosh; Intel Mac OS X 10_12_6)'
]
chrome_version
=
'Chrome/{}.0.{}.{}'
.
format
(
first_num
,
third_num
,
fourth_num
)
ua
=
' '
.
join
([
'Mozilla/5.0'
,
random
.
choice
(
os_type
),
'AppleWebKit/537.36'
,
'(KHTML, like Gecko)'
,
chrome_version
,
'Safari/537.36'
]
)
self
.
headers
[
'user-agent'
]
=
ua
def
get_url_month
(
self
,
page
,
cookie
,
start_date
,
last_date
):
self
.
random_ua
()
response
=
requests
.
get
(
f
'https://www.shutterstock.com/api/s/dam/holdings/search?include=media-item
%2
Cmedia-item.track-assets
%2
Cmedia-item.cms-entry&sort=-licensedAt&useMms=true&channel=shutterstock&page[size]=200&filter[licensedSince]={start_date}T00
%3
A00
%3
A00Z&filter[licensedUntil]={last_date}T23
%3
A59
%3
A59Z&page[number]={page}&filter[assetStatus]=comped
%2
Clicensed&language=zh'
,
cookies
=
cookie
,
headers
=
self
.
headers
,
)
print
(
response
)
return
response
def
get_img_id
(
self
,
response
,
account_id
,
page
):
try
:
# print(response.json())
data
=
response
.
json
()[
'included'
]
if
data
:
data_list
=
[]
for
item
in
data
:
datas
=
{}
item_id
=
item
[
'id'
]
title
=
item
[
'attributes'
][
'title'
]
sizes
=
item
[
'attributes'
][
'sizes'
]
datas
[
'account_id'
]
=
account_id
datas
[
'image_id'
]
=
int
(
item_id
)
datas
[
'title'
]
=
title
datas
[
'sizes'
]
=
sizes
datas
[
'state'
]
=
1
datas
[
'created_at'
]
=
datetime
.
now
()
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)
data_list
.
append
(
datas
)
# 保存
Con
.
save_stock_img_id
(
data_list
)
print
(
f
"{account_id}第{page}页保存id成功,"
)
return
True
else
:
print
(
'最后一页,全部保存成功'
)
return
False
except
Exception
as
e
:
print
(
e
)
if
'included'
in
str
(
e
):
print
(
'最后一页,全部保存成功'
)
return
False
def
get_last_month_start_end
(
self
):
# 获取今天的日期
today
=
datetime
.
today
()
# 计算上个月的第一天
first_day_of_this_month
=
today
.
replace
(
day
=
1
)
first_day_of_last_month
=
(
first_day_of_this_month
-
timedelta
(
days
=
1
))
.
replace
(
day
=
1
)
# 计算上个月的最后一天
_
,
last_day_of_last_month
=
calendar
.
monthrange
(
first_day_of_last_month
.
year
,
first_day_of_last_month
.
month
)
last_day_of_last_month_date
=
first_day_of_last_month
.
replace
(
day
=
last_day_of_last_month
)
# 格式化输出
start_date
=
first_day_of_last_month
.
strftime
(
'
%
Y-
%
m-
%
d'
)
last_date
=
last_day_of_last_month_date
.
strftime
(
'
%
Y-
%
m-
%
d'
)
return
start_date
,
last_date
def
run
(
self
,
account_id
,
cookie
):
is_continue
=
True
page
=
1
start_date
,
last_date
=
self
.
get_last_month_start_end
()
# start_date = '2023-12-01'
# last_date = '2023-12-31'
print
(
f
"Start Date: {start_date}"
)
print
(
f
"Last Date: {last_date}"
)
while
is_continue
:
try
:
response
=
self
.
get_url_month
(
page
,
cookie
,
str
(
start_date
),
str
(
last_date
))
if
response
.
status_code
==
200
:
# 更新是否继续标志位
is_continue
=
self
.
get_img_id
(
response
,
account_id
,
page
)
# 如果不再继续,则更新数据库并将当前账户标记为已完成
if
not
is_continue
:
Con
.
update_id_to_3
(
account_id
)
break
# 等待一段时间再进行下一次请求
time
.
sleep
(
random
.
randint
(
3
,
6
))
page
+=
1
else
:
print
(
f
'状态码为{response.status_code}, 请求失败'
)
raise
except
Exception
as
e
:
print
(
e
)
# 抛出异常以停止外层循环
raise
class
GetSS_details
():
def
__init__
(
self
):
self
.
account
=
''
self
.
pwd
=
''
self
.
page
=
ChromiumPage
()
self
.
headers
=
{
'accept'
:
'application/json'
,
'accept-language'
:
'zh-CN,zh;q=0.9'
,
'content-type'
:
'application/json'
,
'newrelic'
:
'eyJ2IjpbMCwxXSwiZCI6eyJ0eSI6IkJyb3dzZXIiLCJhYyI6Ijk2NzIzMiIsImFwIjoiMTU4ODYzMjc5MiIsImlkIjoiMDdjNDZhYTI3ZTBlMTAyZiIsInRyIjoiOGI4ODQ3MzNiNjFjNDNlY2YxMGEzOTQ2MzQ4MDE2NzQiLCJ0aSI6MTczNTk5NzEzNjEyOH19'
,
'origin'
:
'https://www.shutterstock.com'
,
'priority'
:
'u=1, i'
,
'referer'
:
'https://www.shutterstock.com/zh/catalog/'
,
'sec-ch-ua'
:
'"Not)A;Brand";v="99", "Google Chrome";v="127", "Chromium";v="127"'
,
'sec-ch-ua-mobile'
:
'?0'
,
'sec-ch-ua-platform'
:
'"Windows"'
,
'sec-fetch-dest'
:
'empty'
,
'sec-fetch-mode'
:
'cors'
,
'sec-fetch-site'
:
'same-origin'
,
'traceparent'
:
'00-8b884733b61c43ecf10a394634801674-07c46aa27e0e102f-01'
,
'tracestate'
:
'967232@nr=0-1-967232-1588632792-07c46aa27e0e102f----1735997136128'
,
'user-agent'
:
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36'
,
'x-end-app-name'
:
'next-web'
,
'x-end-app-version'
:
'5ca4a4c05d8'
,
'x-newrelic-id'
:
'XQAAU1VRGwIEVVhaBgYGUlI='
,
'x-request-id'
:
'15754a73-f152-4983-99b4-6af058379880'
,
}
self
.
email_value_config
=
{
'imap_server'
:
'imap.exmail.qq.com'
,
'username'
:
'pengyanbing@yswg.com.cn'
,
'password'
:
'Python3.8'
,
}
def
get_ck
(
self
):
try
:
self
.
page
.
get
(
'https://www.shutterstock.com/zh/catalog/'
)
sleep
(
randint
(
2
,
4
))
# 获取 cookies 列表
original_cookies_list
=
self
.
page
.
cookies
()
# 将 cookies 列表转换为字典
original_cookie_dict
=
{
cookie
[
'name'
]:
cookie
[
'value'
]
for
cookie
in
original_cookies_list
}
# 检查 accts_customer_sso1 是否等于 '-undefined'
if
'accts_customer_sso1'
in
original_cookie_dict
and
original_cookie_dict
.
get
(
'accts_customer_sso1'
)
==
'-undefined'
:
# 组合成新的值并更新 accts_customer_sso1
new_value
=
f
"{original_cookie_dict.get('htjs_user_id', '')}-undefined"
original_cookie_dict
[
'accts_customer_sso1'
]
=
new_value
keys_of_interest
=
[
'datadome'
,
'accts_customer_sso1'
,
'next.sid'
]
cookies
=
{
key
:
original_cookie_dict
[
key
]
for
key
in
keys_of_interest
if
key
in
original_cookie_dict
}
# print('filtered_cookies:', cookies)
return
cookies
except
Exception
as
e
:
print
(
'获取cookie出错:'
,
e
)
def
login_out
(
self
):
login_out
=
self
.
page
.
ele
(
'.MuiAvatar-root MuiAvatar-circular MuiAvatar-colorDefault mui-9jj0tt-avatarSize'
)
if
login_out
:
login_out
.
click
()
sleep
(
randint
(
2
,
4
))
self
.
page
.
ele
(
'@text()=登出'
)
.
click
()
else
:
login_out
=
self
.
page
.
ele
(
'.MuiAvatar-root MuiAvatar-circular MuiAvatar-colorDefault mui-1jeofke'
)
if
login_out
:
login_out
.
click
()
sleep
(
randint
(
2
,
4
))
self
.
page
.
ele
(
'@text()=登出'
)
.
click
()
else
:
login_out
=
self
.
page
.
ele
(
'.MuiAvatar-root MuiAvatar-circular MuiAvatar-colorDefault mui-1ki7tcg'
)
if
login_out
:
login_out
.
click
()
sleep
(
randint
(
2
,
4
))
self
.
page
.
ele
(
'@text()=登出'
)
.
click
()
def
decode_body
(
self
,
body
):
"""尝试多种编码方式解码邮件内容"""
encodings
=
[
'utf-8'
,
'gb18030'
,
'iso-8859-1'
,
'latin1'
]
for
encoding
in
encodings
:
try
:
decoded_body
=
body
.
decode
(
encoding
)
return
decoded_body
except
UnicodeDecodeError
:
continue
return
body
.
decode
(
'latin1'
,
errors
=
'replace'
)
# 最后尝试 latin1 编码,替换无法解码的字符
def
extract_verification_code
(
self
,
text_body
):
"""提取验证码"""
patterns
=
[
r'以验证您的身份:(\d{6})'
,
r'一次性密码:(\d{6})'
,
r'验证码:(\d{6})'
,
r'(\d{6})\s*此密码仅可使用一次'
]
for
pattern
in
patterns
:
match
=
re
.
search
(
pattern
,
text_body
)
if
match
:
return
match
.
group
(
1
)
return
None
def
fetch_verification_code
(
self
,
email_value_config
):
try
:
mail
=
imaplib
.
IMAP4_SSL
(
email_value_config
[
'imap_server'
])
mail
.
login
(
email_value_config
[
"username"
],
email_value_config
[
"password"
])
mail
.
select
(
'inbox'
)
search_query
=
'(FROM "noreply@shutterstock.com")'
result
,
data
=
mail
.
search
(
None
,
search_query
)
if
result
!=
'OK'
:
print
(
"没有找到邮件"
)
return
None
email_ids
=
data
[
0
]
.
split
()
for
email_id
in
reversed
(
email_ids
):
# 从最新的邮件开始
result
,
data
=
mail
.
fetch
(
email_id
,
"(RFC822)"
)
raw_email
=
data
[
0
][
1
]
email_message
=
email
.
message_from_bytes
(
raw_email
)
if
email_message
.
is_multipart
():
for
part
in
email_message
.
walk
():
content_type
=
part
.
get_content_type
()
if
content_type
==
'text/plain'
:
body
=
part
.
get_payload
(
decode
=
True
)
decoded_body
=
self
.
decode_body
(
body
)
if
'shutterstock'
in
decoded_body
:
text_body
=
decoded_body
verification_code
=
self
.
extract_verification_code
(
text_body
)
if
verification_code
:
print
(
"验证码是:"
,
verification_code
)
mail
.
close
()
mail
.
logout
()
return
verification_code
else
:
body
=
email_message
.
get_payload
(
decode
=
True
)
decoded_body
=
self
.
decode_body
(
body
)
if
'shutterstock'
in
decoded_body
:
text_body
=
decoded_body
verification_code
=
self
.
extract_verification_code
(
text_body
)
if
verification_code
:
print
(
"验证码是:"
,
verification_code
)
mail
.
close
()
mail
.
logout
()
return
verification_code
print
(
"没有找到符合条件的邮件"
)
mail
.
close
()
mail
.
logout
()
return
None
except
imaplib
.
IMAP4
.
error
as
e
:
print
(
f
"IMAP4 error: {e}"
)
except
Exception
as
e
:
print
(
f
"An unexpected error occurred: {e}"
)
finally
:
try
:
mail
.
close
()
mail
.
logout
()
except
:
pass
def
yxyzm
(
self
,
iframe
):
print
(
'需要输入邮箱验证码'
)
sleep
(
randint
(
2
,
4
))
yzm
=
self
.
fetch_verification_code
(
self
.
email_value_config
)
yzm_input
=
iframe
.
ele
(
'.MuiFormLabel-root MuiInputLabel-root MuiInputLabel-formControl MuiInputLabel-animated MuiInputLabel-sizeSmall MuiInputLabel-standard MuiFormLabel-colorPrimary css-17839r8'
)
sleep
(
randint
(
2
,
4
))
yzm_input
.
input
(
yzm
)
iframe
.
ele
(
'.MuiTouchRipple-root css-w0pj6f'
)
.
click
()
def
login
(
self
):
try
:
# 打开页面
self
.
page
.
get
(
'https://www.shutterstock.com/zh/catalog/'
)
sleep
(
randint
(
2
,
4
))
try
:
# print('No thanks')
print
(
'click No thanks'
)
login_button
=
self
.
page
.
ele
(
'xpath://a[@id="continue"]'
,
timeout
=
10
)
login_button
.
click
()
except
:
print
(
'No thanks 错误'
)
# 判断是否在登录状态
self
.
login_out
()
# 查找并点击登录按钮
login_button
=
self
.
page
.
ele
(
'xpath://a[@data-automation="loginButton"]'
,
timeout
=
10
)
login_button
.
click
()
sleep
(
randint
(
2
,
4
))
# 等待页面加载,切换到 iframe
iframe
=
self
.
page
.
get_frame
(
'#login-iframe'
)
print
(
'已切换到 login-iframe'
)
# 查找并输入邮箱
print
(
"正在等待邮箱输入框..."
)
sleep
(
10
)
email_input
=
iframe
.
ele
(
'.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall css-186x7cf'
)
email_input
.
clear
()
# 清除任何预填充的内容
email_input
.
input
(
self
.
account
)
# 输入文本
print
(
"已输入账号到邮箱输入框"
)
# 查找并输入密码
print
(
"正在等待密码输入框..."
)
email_input
=
iframe
.
ele
(
'.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall MuiInputBase-inputAdornedEnd css-186x7cf'
)
email_input
.
clear
()
# 清除任何预填充的内容
email_input
.
input
(
self
.
pwd
)
print
(
"已输入密码到密码输入框"
)
sleep
(
randint
(
2
,
4
))
# 查找并点击登录按钮
submit_button
=
iframe
.
ele
(
'.MuiButtonBase-root MuiButton-root MuiButton-contained MuiButton-containedPrimary MuiButton-sizeMedium MuiButton-containedSizeMedium MuiButton-disableElevation MuiButton-fullWidth css-df622d'
)
submit_button
.
click
()
print
(
'已点击登录...'
)
except
Exception
as
e
:
print
(
f
"出现错误: {e}"
)
return
False
try
:
sleep
(
randint
(
5
,
8
))
h3_element
=
iframe
.
ele
(
'.MuiFormLabel-root MuiInputLabel-root MuiInputLabel-formControl MuiInputLabel-animated MuiInputLabel-sizeSmall MuiInputLabel-standard MuiFormLabel-colorPrimary css-17839r8'
)
if
h3_element
:
self
.
yxyzm
(
iframe
)
else
:
print
(
'不需要验证码'
)
sleep
(
10
)
ck
=
self
.
get_ck
()
return
ck
except
Exception
as
e
:
print
(
e
)
print
(
'不需要验证码'
)
sleep
(
randint
(
5
,
8
))
ck
=
self
.
get_ck
()
return
ck
def
transmission_api
(
self
,
account_id
,
image_id
,
image_size_info
,
image_title
,
image_type
,
image_url
):
# url = 'http://192.168.2.97:6661/microservice-visual/visual/fileSystem/saveImageDetail?token=dacce869-0471-4ec7-ac50-3b3b1ec22c87'
url
=
'http://wx.yswg.com.cn:8000/microservice-visual/visual/fileSystem/saveImageDetail?token=dacce869-0471-4ec7-ac50-3b3b1ec22c87'
transmission_data
=
{}
transmission_data
[
'accountId'
]
=
account_id
transmission_data
[
'imageId'
]
=
image_id
transmission_data
[
'imageSizeInfo'
]
=
image_size_info
transmission_data
[
'imageTitle'
]
=
image_title
transmission_data
[
'imageType'
]
=
image_type
transmission_data
[
'imageUrl'
]
=
image_url
data_json
=
json
.
dumps
(
transmission_data
)
max_retries
=
3
retries
=
0
while
retries
<=
max_retries
:
try
:
response
=
requests
.
post
(
url
,
data
=
data_json
)
if
response
.
status_code
==
200
:
return
response
.
json
()
else
:
print
(
f
'请求失败,状态码: {response.status_code},重试 ({retries}/{max_retries})'
)
retries
+=
1
except
requests
.
exceptions
.
RequestException
as
e
:
print
(
f
'请求异常: {e},重试 ({retries}/{max_retries})'
)
retries
+=
1
raise
Exception
(
f
'请求失败,已达到最大重试次数:{max_retries} 次'
)
def
get_jpg
(
self
,
cookies
,
image_id
):
json_data
=
{
'required_cookies'
:
''
,
'content'
:
[
{
'content_id'
:
f
'{image_id}'
,
'content_type'
:
'photo'
,
'content_size'
:
'huge'
,
'content_format'
:
'jpg'
,
'license_name'
:
'standard'
,
'show_modal'
:
True
,
},
],
}
response
=
requests
.
post
(
'https://www.shutterstock.com/napi/licensees/current/redownload'
,
cookies
=
cookies
,
headers
=
self
.
headers
,
json
=
json_data
,
timeout
=
600
)
image_url
=
json
.
loads
(
response
.
text
)[
'meta'
][
'licensedContent'
][
0
][
'downloadUrl'
]
return
image_url
def
get_png
(
self
,
cookie
,
image_id
):
json_data
=
{
'required_cookies'
:
''
,
'content'
:
[
{
'content_id'
:
f
'{image_id}'
,
'content_type'
:
'photo'
,
'content_size'
:
'large'
,
'content_format'
:
'png'
,
'include_shadows'
:
True
,
'angle'
:
'G03'
,
'license_name'
:
'standard'
,
'show_modal'
:
True
,
},
],
}
response
=
requests
.
post
(
'https://www.shutterstock.com/napi/licensees/current/redownload'
,
cookies
=
cookie
,
headers
=
self
.
headers
,
json
=
json_data
,
timeout
=
600
)
image_url
=
json
.
loads
(
response
.
text
)[
'meta'
][
'licensedContent'
][
0
][
'downloadUrl'
]
return
image_url
def
get_pic
(
self
,
account_id
,
image_id
,
item_id
,
image_title
,
image_size_info
,
cookie
,
wait_time
):
retry
=
0
max_retries
=
3
while
retry
<=
max_retries
:
try
:
start_time
=
datetime
.
now
()
.
strftime
(
"
%
m-
%
d
%
H:
%
M:
%
S"
)
all_image_id
=
Con
.
get_all_image_id
()
if
str
(
image_id
)
in
set
(
all_image_id
):
print
(
f
'{image_id}已上传过'
)
state
=
3
else
:
state
=
1
# 尝试获取 JPG
try
:
image_url
=
self
.
get_jpg
(
cookie
,
image_id
)
image_type
=
'jpg'
except
Exception
as
e
:
if
'meta'
in
str
(
e
):
# JPG 失败,尝试 PNG
image_url
=
self
.
get_png
(
cookie
,
image_id
)
image_type
=
'png'
else
:
raise
# 构建 item 数据
item
=
{
'account_id'
:
account_id
,
'image_id'
:
image_id
,
'image_size_info'
:
image_size_info
,
'image_title'
:
image_title
,
'image_type'
:
image_type
,
'image_url'
:
image_url
,
'state'
:
state
,
'created_time'
:
datetime
.
now
()
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)
}
# 存储数据 & 调用 API
Con
.
save_stock_detail
(
item
)
Con
.
update_image_id_to_3
(
item_id
)
self
.
transmission_api
(
account_id
,
image_id
,
image_size_info
,
image_title
,
image_type
,
image_url
)
now_time
=
datetime
.
now
()
.
strftime
(
"
%
m-
%
d
%
H:
%
M:
%
S"
)
print
(
f
'pic_name:{image_title[:38]},time:{start_time}——{now_time}爬取成功'
)
time
.
sleep
(
wait_time
)
return
True
except
Exception
as
e
:
logging
.
error
(
f
"发生错误: {e}"
)
retry
+=
1
if
'image_title'
in
str
(
e
):
Con
.
update_image_id_to_4
(
item_id
)
print
(
f
'{image_id}过期 修改为4'
)
return
False
elif
'meta'
in
str
(
e
):
if
retry
>
max_retries
:
logging
.
warning
(
"超过重试次数,跳过该图片"
)
return
False
logging
.
warning
(
f
"meta 错误,等待两小时刷新页面 第{retry}次重试..."
)
time
.
sleep
(
7200
)
self
.
page
.
get
(
'https://www.shutterstock.com/zh/catalog/'
)
continue
# 继续下一次重试
else
:
if
retry
>
max_retries
:
logging
.
warning
(
"超过重试次数,跳过该图片"
)
return
False
sleep_time
=
[
random
.
randint
(
60
,
180
),
random
.
randint
(
180
,
240
),
random
.
randint
(
1800
,
1900
)][
retry
-
1
]
logging
.
warning
(
f
"未知错误,等待{sleep_time}s 第{retry}次重试..."
)
time
.
sleep
(
sleep_time
)
continue
# 继续下一次重试
def
run_get_stock_img_id
(
self
,
account
,
cookie
):
"""封装GetStockImgId.run()调用"""
try
:
get_img_id
=
GetStockImgId
()
get_img_id
.
run
(
account
,
cookie
)
return
True
except
Exception
as
e
:
logging
.
error
(
f
"Error occurred in GetStockImgId.run(): {e}"
)
return
False
def
run
(
self
):
day
=
time
.
strftime
(
"
%
d"
)
for
item_id
in
range
(
1
,
33
):
print
(
f
"开始抓取 item_id: {item_id}"
)
self
.
page
.
clear_cache
()
# 清除浏览器缓存和session信息。下一个账号直接登录。优化上一个账号没有退出导致新账号登录失败
if
item_id
==
1
and
int
(
day
)
<
2
:
Con
.
update_all_states_to_1
(
state
=
2
)
wait_time
=
random
.
uniform
(
6
,
10
)
account_list
=
Con
.
get_cookie_account
(
item_id
)
if
account_list
:
self
.
account
=
account_list
[
0
]
self
.
pwd
=
account_list
[
1
]
cookie
=
self
.
login
()
# 登录并获取cookie
# cookie = self.get_ck()
if
not
self
.
run_get_stock_img_id
(
self
.
account
,
cookie
):
logging
.
critical
(
"Stopping the entire program due to critical error."
)
sys
.
exit
(
1
)
# 终止整个程序
image_id_id_pairs
=
Con
.
get_stock_images_id
(
self
.
account
)
if
not
image_id_id_pairs
:
print
(
f
'{self.account} 已全部爬取完成'
)
Con
.
update_all_states_to_1
(
state
=
3
,
item_id
=
item_id
)
continue
counts_start
=
0
counts_last
=
len
(
image_id_id_pairs
)
stop_flag
=
False
# 初始化变量
for
count
in
range
(
counts_start
,
counts_last
):
image_id
,
item_id_str
,
image_title
,
image_size_info
=
image_id_id_pairs
[
count
]
.
split
(
'||-||'
)
print
(
f
'执行 {self.account}: {image_id}, {item_id_str}, 计数: {count}'
)
try
:
chong_shi
=
self
.
get_pic
(
self
.
account
,
image_id
,
item_id_str
,
image_title
,
image_size_info
,
cookie
,
wait_time
)
if
not
chong_shi
:
stop_flag
=
True
break
except
Exception
as
e
:
if
'Expected axis has 0 elements, new values have 2 elements'
in
str
(
e
):
print
(
f
'{self.account} 已全部爬取完成'
)
time
.
sleep
(
10
)
else
:
logging
.
error
(
f
'发生错误: {e}, 停止循环'
)
break
if
count
==
counts_last
-
1
:
print
(
f
'{self.account} 全部爬取完成1122=='
)
Con
.
update_all_states_to_1
(
state
=
3
,
item_id
=
item_id
)
if
stop_flag
:
print
(
'超过重试次数,暂停'
)
logging
.
warning
(
'超过重试次数,暂停'
)
break
if
__name__
==
'__main__'
:
GetSS_details
()
.
run
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment