Commit ec9229bd by Peng

no message

parent 58220d8c
import time import json
import pandas as pd import pandas as pd
from secure_db_client import get_remote_engine
import time
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool
from sqlalchemy import text
from sqlalchemy.orm import sessionmaker
import platform
import traceback
import json
import uuid
from sqlalchemy.exc import SQLAlchemyError
class ConnectSpider: class ConnectSpider:
def __init__(self): def __init__(self):
self.pg_port = 54328 self.db_engine = get_remote_engine(
self.pg_db = "selection" site_name='us', # -> database "selection"
self.pg_user = "postgres" db_type="postgresql_14_outer", # -> 服务端 alias "mysql"
self.pg_pwd = "F9kL2sXe81rZq" )
self.pg_host = "61.145.136.61" self.db_engine192 = get_remote_engine(
self.db_engine = create_engine(f"postgresql://{self.pg_user}:{self.pg_pwd}@{self.pg_host}:{self.pg_port}/{self.pg_db}") site_name='us', # -> database "selection"
db_type="postgresql_14_outer", # -> 服务端 alias "mysql"
pg_host = "192.168.10.223" )
self.db_engine192 = create_engine( # self.pg_port = 54328
f"postgresql://{self.pg_user}:{self.pg_pwd}@{self.pg_host}:{self.pg_port}/{self.pg_db}") # self.pg_db = "selection"
# self.pg_user = "postgres"
# mysql # self.pg_pwd = "F9kL2sXe81rZq"
self.sql_port = 3306 # self.pg_host = "61.145.136.61"
self.sql_db = "selection" # pg_host = "192.168.10.223"
self.sql_user = "adv_yswg" # self.db_engine192 = create_engine(
self.sql_pwd = "Gd1pGJog1ysLMLBdML8w81" # f"postgresql://{self.pg_user}:{self.pg_pwd}@{self.pg_host}:{self.pg_port}/{self.pg_db}")
self.sql_host = "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com"
self.charset = 'utf8mb4'
# 创建数据库连接字符串
connection_string_mysql = f"mysql+pymysql://{self.sql_user}:{self.sql_pwd}@{self.sql_host}:{self.sql_port}/{self.sql_db}?charset={self.charset}"
self.mysql_engine = create_engine(connection_string_mysql)
# mysql
sql_port = 19030
sql_db = "test"
sql_user = "fangxingjun"
sql_pwd = "fangxingjun12345"
sql_host = "192.168.10.151"
wai_host = "113.100.143.162"
# 创建数据库连接字符串
connection_string_mysql = f"mysql+pymysql://{sql_user}:{sql_pwd}@{wai_host}:{sql_port}/{sql_db}"
self.mysql_test = create_engine(connection_string_mysql)
# mysql
sql_port = 19030
sql_db = "selection"
sql_user = "fangxingjun"
sql_pwd = "fangxingjun12345"
sql_host = "192.168.10.151"
wai_host = "113.100.143.162"
# 创建数据库连接字符串
connection_string_mysql = f"mysql+pymysql://{sql_user}:{sql_pwd}@{sql_host}:{sql_port}/{sql_db}"
self.mysql_selection = create_engine(connection_string_mysql)
def mysql(self): def mysql(self):
sql_port = 3306 mysql_engine = get_remote_engine(
sql_db = "us_spider" site_name='us', # -> database "selection"
sql_user = "adv_yswg" db_type="mysql", # -> 服务端 alias "mysql"
sql_pwd = "Gd1pGJog1ysLMLBdML8w81" )
sql_host = "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com"
charset = 'utf8mb4'
# 创建数据库连接字符串
connection_string_mysql = f"mysql+pymysql://{sql_user}:{sql_pwd}@{sql_host}:{sql_port}/{sql_db}?charset={charset}"
mysql_engine = create_engine(connection_string_mysql)
return mysql_engine return mysql_engine
def mysql_us_spider(self): def save_stock_img_id(self,items):
sql_port = 19030 # sql = """
# sql_db = "us_spider" # INSERT INTO stock_image_id_wj
sql_db = "test" # (account_id, image_id, state, created_at, image_title, image_size_info)
sql_user = "fangxingjun" # VALUES (%s, %s, %s, %s, %s, %s)
sql_pwd = "fangxingjun12345" # ON DUPLICATE KEY UPDATE
sql_host = "192.168.10.151" # state = VALUES(state),
# created_at = VALUES(created_at),
# 创建数据库连接字符串 # image_title = VALUES(image_title),
connection_string_mysql = f"mysql+pymysql://{sql_user}:{sql_pwd}@{sql_host}:{sql_port}/{sql_db}" # image_size_info = VALUES(image_size_info)
mysql_us_spider_engine = create_engine(connection_string_mysql) # """
return mysql_us_spider_engine sql = """
def save_stock_img_id(self, items):
"""批量保存数据到数据库。"""
# 提取 image_title 和 image_size_info
processed_items = []
for item in items:
image_title = item.get('title', '')
image_size_info = json.dumps(item.get('sizes', {}))
processed_item = {
'account_id': item['account_id'],
'image_id': item['image_id'],
'state': item['state'],
'created_at': item['created_at'],
'image_title': image_title,
'image_size_info': image_size_info
}
processed_items.append(processed_item)
# # 定义DataFrame的列 旧代码
# columns = ['account_id', 'image_id', 'state', 'created_at', 'image_title', 'image_size_info']
# df = pd.DataFrame(processed_items, columns=columns)
#
# # 使用with语句管理数据库连接
# with self.db_engine192.connect() as connection:
# df.to_sql(
# name=table_name,
# con=connection,
# if_exists='append',
# index=False
# )
insert_sql = text("""
INSERT INTO stock_image_id_wj INSERT INTO stock_image_id_wj
(account_id, image_id, state, created_at, image_title, image_size_info) (account_id, image_id, state, created_at, image_title, image_size_info)
VALUES VALUES (%s, %s, %s, %s, %s, %s::jsonb)
(:account_id, :image_id, :state, :created_at, :image_title, :image_size_info) ON CONFLICT (account_id, image_id) DO UPDATE SET
ON CONFLICT (account_id, image_id) DO NOTHING state = EXCLUDED.state,
""") created_at = EXCLUDED.created_at,
print('新代码插入id',insert_sql) # 不需要担心有重复的。表里面有唯一索引。会跳过。 image_title = EXCLUDED.image_title,
# 假设 processed_items 是 [{'account_id':..., 'image_id':..., …}, …] image_size_info = EXCLUDED.image_size_info;
with self.db_engine192.begin() as conn: """
conn.execute(insert_sql, processed_items) params = [
(
def save_homedepot_projects(self, items): item['account_id'],
"""批量保存数据到数据库。""" item['image_id'],
table_name = "homedepot_projects_items" item['state'],
item['created_at'],
# 提取 image_title 和 image_size_info item.get('title', ''),
processed_items = [] json.dumps(item.get('sizes', {}))
for item in items:
image_title = item.get('title', '')
image_size_info = json.dumps(item.get('sizes', {}))
processed_item = {
'account_id': item['account_id'],
'image_id': item['image_id'],
'state': item['state'],
'created_at': item['created_at'],
'image_title': image_title,
'image_size_info': image_size_info
}
processed_items.append(processed_item)
# 定义DataFrame的列
columns = ['account_id', 'image_id', 'state', 'created_at', 'image_title', 'image_size_info']
df = pd.DataFrame(processed_items, columns=columns)
# 使用with语句管理数据库连接
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
) )
for item in items
]
for i in range(5):
def get_account_id(self, item_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_summary_wj"
query = text(f"SELECT account_id, id FROM {table_name} WHERE id = :item_id")
result = connection.execute(query, {"item_id": item_id})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
try: try:
accounts = df_status.account_id.iloc[0] with self.db_engine192.begin() as conn:
except IndexError: conn.execute(sql, params)
accounts = None # 或者处理不存在的情况 print('存储更新成功')
return accounts break
except Exception as e:
def update_id_to_3(self,account_id): time.sleep(30)
with self.db_engine192.connect() as connection: print('save_stock_img_id 报错。', e)
def update_id_to_3(self, account_id):
for i in range(5):
try:
with self.db_engine192.begin() as connection:
table_name = "stock_image_summary_wj" table_name = "stock_image_summary_wj"
success_id = tuple(account_id) print(account_id)
sql_update = text(f"UPDATE {table_name} SET state = 3 WHERE account_id IN :success_id") sql_update = f"UPDATE {table_name} SET state = 3 WHERE account_id='{account_id}'"
result = connection.execute(sql_update, {"success_id": success_id}) print(sql_update,'成功更新为3')
print('成功更新为3') connection.execute(sql_update)
connection.close() break
except Exception as e:
time.sleep(30)
print('update_id_to_3 报错。', e)
def update_all_states_to_1(self,state=1,item_id=None): def update_all_states_to_1(self, state=1, item_id=None):
for i in range(5):
try: try:
with self.db_engine192.begin() as connection: # 使用 begin() 自动管理事务 with self.db_engine192.begin() as connection: # 使用 begin() 自动管理事务
table_name = "stock_image_summary_wj" table_name = "stock_image_summary_wj"
if state==3: if state == 3:
sql_update = text(f"UPDATE {table_name} SET state = {state} where id={item_id}") sql_update = f"UPDATE {table_name} SET state = {state} where id={item_id}"
else: else:
sql_update = text(f"UPDATE {table_name} SET state = {state}") sql_update = f"UPDATE {table_name} SET state = {state}"
print(sql_update) print(sql_update)
result = connection.execute(sql_update) connection.execute(sql_update)
print(f'成功更新所有状态为1,受影响行数:{result.rowcount}') break
# 显式提交事务(虽然 begin() 已经自动提交)
connection.commit()
except Exception as e: except Exception as e:
time.sleep(30)
print(f'更新状态失败:{e}') print(f'更新状态失败:{e}')
# 回滚事务 # 回滚事务
if 'connection' in locals():
connection.rollback()
def save_account(self,items):
"""批量保存数据到数据库。"""
table_name = "stock_image_summary_wj"
# 定义DataFrame的列
columns = ['account_id', 'account_secret', 'year_month', 'spider_date','state','created_time']
df = pd.DataFrame(items, columns=columns)
# 使用with语句管理数据库连接
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
)
print("保存成功!")
def delet_datails(self, image_id_list):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
# 使用 SQLAlchemy 的 text 函数来创建 SQL 语句
query = text(f"SELECT image_id FROM {table_name} WHERE account_id = 'zhouweiqing@yswg.com.cn';")
# 使用 connection.execute() 来执行查询
result = connection.execute(query).fetchall()
# 获取表中的 image_id 列表
db_image_ids = [row[0] for row in result]
# 找出不在 image_id_list 中的 image_id
non_existent_image_ids = set(db_image_ids) - set(image_id_list)
# 删除不在 image_id_list 中的记录
for image_id in non_existent_image_ids:
delete_query = text(
f"DELETE FROM {table_name} WHERE account_id = 'zhouweiqing@yswg.com.cn' AND image_id = '{image_id}';")
connection.execute(delete_query)
# 提交更改
connection.commit()
def get_datails_image_id(self, account_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
sql_query = text(f"SELECT image_id FROM {table_name} WHERE account_id = :account_id and created_time < '2024-09-02 00:00:00'")
result = connection.execute(sql_query, {"account_id": account_id})
image_id_list = [int(row[0]) for row in result.fetchall()]
# 提交更改
# connection.commit()
return image_id_list
# 1111111111111
def save_stock_detail(self, item): def save_stock_detail(self, item):
"""批量保存数据到数据库。""" """批量保存数据到数据库。"""
table_name = "stock_image_detail_wj" table_name = "stock_image_detail_wj"
# 将item包装成列表 # 将item包装成列表
items_list = [item] items_list = [item]
# 定义DataFrame的列 # 定义DataFrame的列
columns = ['account_id', 'image_id', 'image_size_info', 'image_title', 'image_type', 'image_url','state', 'created_time'] columns = ['account_id', 'image_id', 'image_size_info', 'image_title', 'image_type', 'image_url', 'state',
'created_time']
df = pd.DataFrame(items_list, columns=columns) df = pd.DataFrame(items_list, columns=columns)
for i in range(5):
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
)
# print("保存成功!")
def get_pic_urls_limit1(self, account_id):
pic_data_list = [] # 创建一个空列表来存储结果
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
query = text(
f"""select image_url, image_id, image_title from {table_name} where account_id = :account_id and state = 1 limit 1""")
try: try:
result = connection.execute(query, {'account_id': account_id}) self.db_engine192.to_sql(df, table_name, if_exists='append')
for row in result: # 遍历所有的结果行 print("保存成功!")
if row is not None: break
# 将 RowProxy 转换为标准的字典
row_dict = dict(zip(result.keys(), row)) # 使用 keys 和 fetchone 的结果创建字典
# 直接构建所需格式的字符串
pic_datas = f"{row_dict['image_url']}||{row_dict['image_id']}||{row_dict['image_title']}"
pic_data_list.append(pic_datas) # 添加到列表中
if not pic_data_list:
# print("No data found for the given account_id")
return False
else:
return pic_data_list # 返回列表
except Exception as e: except Exception as e:
print(f"An error occurred: {e}") time.sleep(30)
return False print(f'save_stock_detail 报错:{e}')
# 回滚事务
def get_pic_urls(self, account_id):
pic_data_list = [] # 创建一个空列表来存储结果
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
query = text(
f"""select image_url, image_id, image_title from {table_name} where account_id = :account_id and state = 1""")
def get_stock_images_id(self, account_id):
for i in range(5):
try: try:
result = connection.execute(query, {'account_id': account_id})
for row in result: # 遍历所有的结果行
if row is not None:
# 将 RowProxy 转换为标准的字典
row_dict = dict(zip(result.keys(), row)) # 使用 keys 和 fetchone 的结果创建字典
# 直接构建所需格式的字符串
pic_datas = f"{row_dict['image_url']}||{row_dict['image_id']}||{row_dict['image_title']}"
pic_data_list.append(pic_datas) # 添加到列表中
if not pic_data_list:
# print("No data found for the given account_id")
return False
else:
return pic_data_list # 返回列表
except Exception as e:
print(f"An error occurred: {e}")
return False
def get_stock_images_id2(self, account_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj" table_name = "stock_image_id_wj"
# 特定的 image_id 列表 # 修改查询语句以匹配你的数据表名称和列名称
specific_image_ids = [ query = f""" SELECT image_id,id,image_title,image_size_info FROM {table_name} where account_id ='{account_id}' and state = 1"""
'1025406430',
'782084149',
'2340663257',
'2444918601',
'2481076155',
'2534369399',
'2522128969',
'2522144147',
'2482077119',
'2475085855',
'2560247125',
'1115348984',
'2555951185',
'1644852415',
'1644852424',
'258700904',
'2540342353',
'2555951245',
'2529955899',
'1309059847',
'1899316957',
'2416180707',
'1978653428',
'2520112131',
'1447499252',
'2335787565',
'1780440524',
'2316295613',
'2463106909',
'2527382733',
'2548693637',
'2460743889',
'2489123001',
'2527399543',
'2456315025',
'2469939069',
'2305915213',
'1660111006',
'2218802639',
'453729808',
'2295540279',
'2323950095',
'2323950087',
'2057817146',
'2541104423',
'231076948',
'2196541827',
'2407612765',
'2521017693',
'2554778219',
'2523427909',
'2520799267',
'2533854931',
'2498052331',
'2521798533',
'2471652945',
'2445858817',
'2449783031',
'1735869230',
'1106587370',
'2393397957',
'2527382699',
'2348771553',
'1822384931',
'2564084221'
]
# specific_image_ids = ['2509630613', '2568241787', '2568242327', '2568242443', '2568242799', '2568242949']
# 修改查询语句以匹配你的数据表名称、列名称,并加入 image_id 条件
query = text(f"""SELECT image_id,id,image_title,image_size_info FROM {table_name}
WHERE account_id = :account_id
AND image_id IN :image_ids""")
print(query) print(query)
result = connection.execute(query, {'account_id': account_id, 'image_ids': tuple(specific_image_ids)}) df_status = self.db_engine192.read_sql(query)
try: try:
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
df_status['id'] = df_status['id'].astype(str)
image_id_id_pairs = list(
df_status['image_id'].astype(str) + '||-||' + df_status['id'] + '||-||' + df_status[
'image_title'] + '||-||' + df_status['image_size_info'])
print(f'账号:{account_id}需爬取{len(image_id_id_pairs)}张')
return image_id_id_pairs
except Exception as e:
print(e)
return False
def img_size_is_1(self, account_id, image_ids):
# 确保image_ids列表非空
if not image_ids:
print("No image IDs provided.")
return False
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj"
query = text(f"""SELECT image_id, id, image_title, image_size_info FROM {table_name} WHERE account_id = :account_id AND image_id IN :image_ids""")
try:
result = connection.execute(query, {'account_id': account_id, 'image_ids': tuple(image_ids)})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
df_status['id'] = df_status['id'].astype(str) df_status['id'] = df_status['id'].astype(str)
image_id_id_pairs = list( image_id_id_pairs = list(
df_status['image_id'] + '||-||' + df_status['id'] + '||-||' + df_status['image_title'] + '||-||' + df_status['image_id'] + '||-||' + df_status['id'] + '||-||' + df_status['image_title'] + '||-||' +
...@@ -456,273 +139,104 @@ class ConnectSpider: ...@@ -456,273 +139,104 @@ class ConnectSpider:
except Exception as e: except Exception as e:
print(e) print(e)
return False return False
def get_stock_images_id(self,account_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj"
# 修改查询语句以匹配你的数据表名称和列名称
query = text(f""" SELECT image_id,id,image_title,image_size_info FROM {table_name} where account_id = :account_id and state = 1""")
print(query)
result = connection.execute(query, {'account_id': account_id})
try:
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
df_status['id'] = df_status['id'].astype(str)
image_id_id_pairs = list(df_status['image_id'] + '||-||' + df_status['id'] + '||-||' + df_status['image_title'] + '||-||' + df_status['image_size_info'])
print(f'账号:{account_id}需爬取{len(image_id_id_pairs)}张')
connection.close()
return image_id_id_pairs
except Exception as e: except Exception as e:
print(e) time.sleep(30)
return False print(f'get_stock_images_id 报错:{e}')
# 回滚事务
def get_kong_images_id(self, account_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
# 使用子查询来过滤掉已经有 image_size_info 的 image_id
query = text(
f"""SELECT image_id, image_type, image_url
FROM {table_name}
WHERE account_id = :account_id
AND image_id NOT IN (
SELECT image_id
FROM {table_name}
WHERE account_id = :account_id
AND image_size_info != '{{}}'
)
AND image_size_info = '{{}}'"""
)
print(query)
result = connection.execute(query, {'account_id': account_id})
df_status = pd.DataFrame(result.fetchall(), columns=result.keys())
if df_status.empty:
return []
data_list = list(df_status['image_id'] + '||' + df_status['image_type'] + '||' + df_status['image_url'])
connection.close()
return data_list
def get_stock_image_detail(self, account_id):
with self.mysql_selection.connect() as connection:
table_name = "stock_image_detail"
query = text(
f"""SELECT account_id, image_id, image_size_info, image_title, image_type, image_url, created_time FROM {table_name} WHERE account_id = :account_id""")
print(query)
result = connection.execute(query, {'account_id': account_id})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
# 将 Timestamp 转换为字符串格式
df_status['created_time'] = df_status['created_time'].dt.strftime('%Y-%m-%d %H:%M:%S')
# 拼接字符串
detail_datas = list(
df_status['account_id'] + '||-||' +
df_status['image_id'] + '||-||' +
df_status['image_size_info'] + '||-||' +
df_status['image_title'] + '||-||' +
df_status['image_type'] + '||-||' +
df_status['image_url'] + '||-||' +
df_status['created_time']
)
print(f'账号:{account_id} 一共 {len(detail_datas)} 条数据')
return detail_datas
def save_stock_detail_move(self, data_list):
table_name = "stock_image_detail_wj"
# 定义DataFrame的列
columns = ['account_id', 'image_id', 'image_size_info', 'image_title', 'image_type', 'image_url',
'created_time']
df = pd.DataFrame(data_list, columns=columns)
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
)
print("保存成功!")
# 11111111111
def update_image_id_to_3(self, item_id): def update_image_id_to_3(self, item_id):
with self.db_engine192.connect() as connection: for i in range(5):
try:
with self.db_engine192.begin() as connection:
table_name = "stock_image_id_wj" table_name = "stock_image_id_wj"
trans = connection.begin() sql_update = f"UPDATE {table_name} SET state = 3 WHERE id = {item_id}"
sql_update = text(f"UPDATE {table_name} SET state = 3 WHERE id = :item_id") connection.execute(sql_update)
result = connection.execute(sql_update, {"item_id": item_id}) break
trans.commit() except Exception as e:
time.sleep(30)
def update_url_state_to_3(self,image_id): print(f'update_image_id_to_3 报错:{e}')
with self.db_engine192.connect() as connection: # 回滚事务
table_name = "stock_image_detail_wj"
trans = connection.begin()
sql_update = text(f"UPDATE {table_name} SET state = 3 WHERE image_id = :image_id and state = 1")
result = connection.execute(sql_update, {"image_id": image_id})
trans.commit()
# 11111111111
def update_image_id_to_4(self, item_id): def update_image_id_to_4(self, item_id):
with self.db_engine192.connect() as connection: for i in range(5):
table_name = "stock_image_id_wj"
trans = connection.begin()
sql_update = text(f"UPDATE {table_name} SET state = 4 WHERE id = :item_id")
result = connection.execute(sql_update, {"item_id": item_id})
connection.close()
trans.commit()
def save_stock_cookie(self,item):
table_name = "stock_cookie_wj"
# 将item包装成列表
items_list = [item]
# 定义DataFrame的列
columns = ['account_id', 'cookie', 'state', 'created_at']
df = pd.DataFrame(items_list, columns=columns)
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
)
print("保存成功!")
def updata_ck_state(self,ck_id):
with self.db_engine192.connect() as connection:
table_name = "stock_cookie_wj"
# 使用参数化查询防止SQL注入
query = text(
f"""UPDATE {table_name} SET state = :new_state WHERE id = :ck_id;""")
try: try:
# 执行更新语句 with self.db_engine192.begin() as connection:
result = connection.execute(query, {'new_state': 3, 'ck_id': ck_id}) table_name = "stock_image_id_wj"
# 提交事务以确保更改被保存到数据库中 sql_update = f"UPDATE {table_name} SET state = 4 WHERE id = {item_id}"
connection.commit() connection.execute(sql_update)
break
# 检查是否有行受到影响
if result.rowcount > 0:
print('修改cookie状态为3')
return True # 更新成功
else:
return False # 没有找到匹配项或没有更新任何行
except Exception as e: except Exception as e:
# print(f"An error occurred: {e}") time.sleep(30)
return False print(f'update_image_id_to_4 报错:{e}')
def get_stock_cookie(self, account): def get_cookie_account(self, item_id):
with self.db_engine192.connect() as connection: for i in range(5):
table_name = "stock_cookie_wj" try:
# 使用参数化查询
query = text(
f"""SELECT id, cookie, state FROM {table_name} WHERE account_id = :account_id AND state = :state LIMIT 1;""")
result = connection.execute(query, {'account_id': account, 'state': 1}).mappings().first()
if result is not None:
# 通过键名访问字典中的元素
cookie_id_state = f"{result['id']}||-||{result['cookie']}||-||{result['state']}"
return cookie_id_state
else:
return None # 没有找到匹配项时返回None
def get_stock_cookie_list(self, account):
with self.db_engine192.connect() as connection:
table_name = "stock_cookie_wj"
# 使用参数化查询
query = text(f"""SELECT cookie FROM {table_name} WHERE account_id = :account_id AND state = :state ; """)
result = connection.execute(query, {'account_id': account, 'state': 1})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
cookie_list = df_status['cookie'].tolist() if 'cookie' in df_status.columns else []
return cookie_list
def get_cookie_account(self,item_id):
# try:
with self.db_engine192.connect() as connection:
table_name = "stock_image_summary_wj" table_name = "stock_image_summary_wj"
# 修改查询语句以匹配你的数据表名称和列名称 # 修改查询语句以匹配你的数据表名称和列名称
query = text(f"""SELECT account_id,account_secret FROM {table_name} where id = :item_id and state= :state_int;""") query = f"""SELECT account_id,account_secret FROM {table_name} where id = {item_id} and state= 1;"""
print(query) print(query)
result = connection.execute(query, {'item_id': item_id,'state_int':1}) df_status = self.db_engine192.read_sql(query)
print(result) if len(df_status) > 0:
df_status = pd.DataFrame(result.fetchall())
if len(df_status)>0:
df_status.columns = result.keys()
account_id = df_status.account_id.iloc[0] account_id = df_status.account_id.iloc[0]
account_secret = df_status.account_secret.iloc[0] account_secret = df_status.account_secret.iloc[0]
account_list = [account_id, account_secret] account_list = [account_id, account_secret]
print(account_list,'232323====32') print(account_list, '232323====32')
# print(111111111111)
connection.close()
return account_list return account_list
else: else:
return None return None
# except Exception as e: except Exception as e:
# print(111111111) time.sleep(30)
print(f'get_cookie_account 报错:{e}')
def get_stock_test_id(self,username):
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj"
# 修改查询语句以匹配你的数据表名称和列名称
query = text(f"""SELECT image_id FROM {table_name} WHERE account_id = :username LIMIT 1;""")
result = connection.execute(query, {'username': username})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
image_id = df_status.image_id.iloc[0]
connection.close()
return image_id
def upload_data(self, account_id, image_id, upload_time, err_msg):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
# 强制将 image_id 转换为字符串类型
image_id = str(image_id)
err_msg_json = json.dumps(err_msg)
sql_update = text(
f"UPDATE {table_name} SET upload_time = :upload_time, err_msg = :err_msg WHERE account_id = :account_id AND image_id = :image_id AND created_time < '2024-09-02 00:00:00'")
result = connection.execute(sql_update, {
"upload_time": upload_time,
"err_msg": err_msg_json,
"account_id": account_id,
"image_id": image_id
})
def upload_success_data(self, account_id, image_id, upload_time):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
# 强制将 image_id 转换为字符串类型
image_id = str(image_id)
sql_update = text(
f"UPDATE {table_name} SET upload_time = :upload_time WHERE account_id = :account_id AND image_id = :image_id AND created_time < '2024-09-02 00:00:00'")
result = connection.execute(sql_update, {
"upload_time": upload_time,
"account_id": account_id,
"image_id": image_id
})
def get_all_image_id(self): def get_all_image_id(self):
with self.db_engine192.connect() as connection: for i in range(5):
try:
table_name = "stock_image_detail_wj" table_name = "stock_image_detail_wj"
sql_query = f"SELECT image_id FROM {table_name} " sql_query = f"SELECT image_id FROM {table_name} "
df_status = pd.read_sql(sql_query, con=connection) df_status = self.db_engine192.read_sql(sql_query)
image_id = list(df_status['image_id'].astype(str)) image_id = list(df_status['image_id'].astype(str))
connection.close()
return image_id return image_id
except Exception as e:
time.sleep(30)
print(f'get_all_image_id 报错:{e}')
def update_url_state_to_3(self, image_id):
for i in range(5):
try:
with self.db_engine192.begin() as connection:
table_name = "stock_image_detail_wj"
sql_update = f"UPDATE {table_name} SET state = 3 WHERE image_id ='{image_id}' and state = 1"
connection.execute(sql_update)
break
except Exception as e:
time.sleep(30)
print(f'update_url_state_to_3 报错:{e}')
def get_pic_urls(self, account_id):
pic_data_list = [] # 创建一个空列表来存储结果
table_name = "stock_image_detail_wj"
query =f"""select image_url, image_id, image_title from {table_name} where account_id = '{account_id}' and state = 1"""
try:
result_df = self.db_engine192.read_sql(query)
# print(result_df)
result_list = result_df.values.tolist()
for row in result_list: # 遍历所有的结果行
if row is not None:
# 直接构建所需格式的字符串
pic_datas = f"{row[0]}||{row[1]}||{row[2]}"
pic_data_list.append(pic_datas) # 添加到列表中
"""
['https://download.shutterstock.com/gatekeeper/W3siZCI6ICJzaHV0dGVyc3RvY2stbWVkaWEiLCAiayI6ICJwaG90by8yNDY2MDI5NDI1L2h1Z2UuanBnIiwgImRjIjogImlkbF8xMjMiLCAiZSI6IDE3NDYwMTIzNDQsICJtIjogMX0sICJBb0dOUzZDMXNiVU1XczgxMFN5YVBsUEJrakEiXQ==/shutterstock_2466029425.jpg||2466029425||Beautiful smiling model in sunglasses. Female dressed in summer hipster white T-shirt and jeans. Posing near white wall in the street. Funny and positive woman having fun outdoors, in sunglasses']
"""
if not pic_data_list:
# print("No data found for the given account_id")
return False
else:
return pic_data_list # 返回列表
except Exception as e:
print(f"An error occurred: {e}")
return False
if __name__ == '__main__': if __name__ == '__main__':
ConnectSpider().get_cookie_account(1) ConnectSpider().get_cookie_account(10)
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import sys
import os import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from time import sleep from time import sleep
from random import randint from random import randint
from all_connect import ConnectSpider from all_connect import ConnectSpider
import traceback
Con = ConnectSpider() Con = ConnectSpider()
import imaplib import imaplib
import email import email
import os import os
import time
import requests
import hashlib
os.environ['NO_PROXY'] = 'stackoverflow.com' os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging import logging
logging.captureWarnings(True) logging.captureWarnings(True)
from DrissionPage import ChromiumPage from DrissionPage import ChromiumPage,ChromiumOptions
import json import json
import requests from curl_cffi import requests
import re import re
import random import random
import time import time
...@@ -24,7 +29,6 @@ import calendar ...@@ -24,7 +29,6 @@ import calendar
import sys import sys
class GetStockImgId(object): class GetStockImgId(object):
def __init__(self): def __init__(self):
self.headers = { self.headers = {
...@@ -66,17 +70,30 @@ class GetStockImgId(object): ...@@ -66,17 +70,30 @@ class GetStockImgId(object):
) )
self.headers['user-agent'] = ua self.headers['user-agent'] = ua
def get_url_month(self,page,cookie,start_date,last_date): def get_url_month(self, page, cookie, start_date, last_date):
self.random_ua() # self.random_ua()
response = requests.get( "https://www.shutterstock.com/napi/s/dam/holdings/search?include=media-item%2Cmedia-item.track-assets%2Cmedia-item.cms-entry&sort=-licensedAt&useMms=true&channel=shutterstock&page[size]=50&filter[licensedSince]={start_date}T00%3A00%3A00Z&filter[licensedUntil]={last_date}T23%3A59%3A59Z&filter[assetStatus]=comped%2Clicensed&language=zh"
f'https://www.shutterstock.com/api/s/dam/holdings/search?include=media-item%2Cmedia-item.track-assets%2Cmedia-item.cms-entry&sort=-licensedAt&useMms=true&channel=shutterstock&page[size]=200&filter[licensedSince]={start_date}T00%3A00%3A00Z&filter[licensedUntil]={last_date}T23%3A59%3A59Z&page[number]={page}&filter[assetStatus]=comped%2Clicensed&language=zh', url = f"https://www.shutterstock.com/napi/s/dam/holdings/search?include=media-item%2Cmedia-item.track-assets%2Cmedia-item.cms-entry&sort=-licensedAt&useMms=true&channel=shutterstock&page[size]=50&filter[licensedSince]={start_date}T00%3A00%3A00Z&filter[licensedUntil]={last_date}T23%3A59%3A59Z&page[number]={page}&filter[assetStatus]=comped%2Clicensed&language=zh"
cookies=cookie, print('url:',url)
headers=self.headers, # url = "https://www.shutterstock.com/napi/s/dam/holdings/search"
) # params = {
# "include": "media-item,media-item.track-assets,media-item.cms-entry",
# "sort": "-licensedAt",
# "useMms": "true",
# "channel": "shutterstock",
# "page\\[size\\]": "50",
# "filter\\[licensedSince\\]": "2025-09-01T00:00:00Z",
# "filter\\[licensedUntil\\]": "2025-09-30T23:59:59Z",
# "filter\\[assetStatus\\]": "comped,licensed",
# "language": "zh"
# }
response = requests.get(url, headers=self.headers, cookies=cookie)
print(response) print(response)
print(response.url)
return response return response
def get_img_id(self,response,account_id,page): def get_img_id(self, response, account_id, page):
try: try:
# print(response.json()) # print(response.json())
data = response.json()['included'] data = response.json()['included']
...@@ -96,6 +113,7 @@ class GetStockImgId(object): ...@@ -96,6 +113,7 @@ class GetStockImgId(object):
data_list.append(datas) data_list.append(datas)
# 保存 # 保存
print('准备保存:')
Con.save_stock_img_id(data_list) Con.save_stock_img_id(data_list)
print(f"{account_id}第{page}页保存id成功,") print(f"{account_id}第{page}页保存id成功,")
return True return True
...@@ -136,14 +154,13 @@ class GetStockImgId(object): ...@@ -136,14 +154,13 @@ class GetStockImgId(object):
print(f"Start Date: {start_date}") print(f"Start Date: {start_date}")
print(f"Last Date: {last_date}") print(f"Last Date: {last_date}")
while is_continue: while is_continue:
try: try:
response = self.get_url_month(page, cookie, str(start_date), str(last_date)) response = self.get_url_month(page, cookie, str(start_date), str(last_date))
if response.status_code == 200: if response.status_code == 200:
# 更新是否继续标志位 # 更新是否继续标志位
is_continue = self.get_img_id(response, account_id, page) is_continue = self.get_img_id(response, account_id, page)
print('is_continue:',is_continue)
# 如果不再继续,则更新数据库并将当前账户标记为已完成 # 如果不再继续,则更新数据库并将当前账户标记为已完成
if not is_continue: if not is_continue:
Con.update_id_to_3(account_id) Con.update_id_to_3(account_id)
...@@ -160,11 +177,19 @@ class GetStockImgId(object): ...@@ -160,11 +177,19 @@ class GetStockImgId(object):
# 抛出异常以停止外层循环 # 抛出异常以停止外层循环
raise raise
class GetSS_details(): class GetSS_details():
def __init__(self): def __init__(self):
self.account = '' self.account = ''
self.pwd = '' self.pwd = ''
self.page = ChromiumPage() # self.page = ChromiumPage()
# 配置 Chrome 浏览器 - 端口 9222
chrome_options = ChromiumOptions()
chrome_options.set_browser_path(r'C:\Program Files\Google\Chrome\Application\chrome.exe')
chrome_options.set_local_port(9333) # 设置 Chrome 的调试端口
self.page = ChromiumPage(addr_or_opts=chrome_options)
print(f"Chrome 浏览器运行在端口: {9333}")
self.headers = { self.headers = {
'accept': 'application/json', 'accept': 'application/json',
'accept-language': 'zh-CN,zh;q=0.9', 'accept-language': 'zh-CN,zh;q=0.9',
...@@ -192,29 +217,33 @@ class GetSS_details(): ...@@ -192,29 +217,33 @@ class GetSS_details():
'username': 'pengyanbing@yswg.com.cn', 'username': 'pengyanbing@yswg.com.cn',
'password': 'Python3.8', 'password': 'Python3.8',
} }
self.get_microservice_token()
def get_ck(self): def get_ck(self):
print('获取登录后的cookie')
try: try:
self.page.get('https://www.shutterstock.com/zh/catalog/') self.page.get('https://www.shutterstock.com/zh/catalog/licenses')
sleep(randint(2, 4)) sleep(randint(2, 4))
# 获取 cookies 列表 # 获取 cookies 列表
original_cookies_list = self.page.cookies() original_cookies_list = self.page.cookies()
# 将 cookies 列表转换为字典 # 将 cookies 列表转换为字典
original_cookie_dict = {cookie['name']: cookie['value'] for cookie in original_cookies_list} original_cookie_dict = {cookie['name']: cookie['value'] for cookie in original_cookies_list}
print('original_cookie_dict::',original_cookie_dict)
# 检查 accts_customer_sso1 是否等于 '-undefined' # # 检查 accts_customer_sso1 是否等于 '-undefined'
if 'accts_customer_sso1' in original_cookie_dict and original_cookie_dict.get( # if 'accts_customer_sso1' in original_cookie_dict and original_cookie_dict.get(
'accts_customer_sso1') == '-undefined': # 'accts_customer_sso1') == '-undefined':
# 组合成新的值并更新 accts_customer_sso1 # # 组合成新的值并更新 accts_customer_sso1
new_value = f"{original_cookie_dict.get('htjs_user_id', '')}-undefined" # new_value = f"{original_cookie_dict.get('htjs_user_id', '')}-undefined"
original_cookie_dict['accts_customer_sso1'] = new_value # original_cookie_dict['accts_customer_sso1'] = new_value
#
keys_of_interest = ['datadome', 'accts_customer_sso1', 'next.sid'] # keys_of_interest = ['datadome', 'accts_customer_sso1', 'next.sid']
cookies = {key: original_cookie_dict[key] for key in keys_of_interest if key in original_cookie_dict} # cookies = {key: original_cookie_dict[key] for key in keys_of_interest if key in original_cookie_dict}
#
# print('filtered_cookies:', cookies) # # print('filtered_cookies:', cookies)
return cookies return original_cookie_dict
except Exception as e: except Exception as e:
print('获取cookie出错:', e) print('获取cookie出错:', e)
...@@ -225,7 +254,7 @@ class GetSS_details(): ...@@ -225,7 +254,7 @@ class GetSS_details():
login_out.click() login_out.click()
sleep(randint(2, 4)) sleep(randint(2, 4))
self.page.ele('@text()=登出').click() self.page.ele('@text()=登出').click()
else : else:
login_out = self.page.ele('.MuiAvatar-root MuiAvatar-circular MuiAvatar-colorDefault mui-1jeofke') login_out = self.page.ele('.MuiAvatar-root MuiAvatar-circular MuiAvatar-colorDefault mui-1jeofke')
if login_out: if login_out:
login_out.click() login_out.click()
...@@ -238,7 +267,7 @@ class GetSS_details(): ...@@ -238,7 +267,7 @@ class GetSS_details():
sleep(randint(2, 4)) sleep(randint(2, 4))
self.page.ele('@text()=登出').click() self.page.ele('@text()=登出').click()
def decode_body(self,body): def decode_body(self, body):
"""尝试多种编码方式解码邮件内容""" """尝试多种编码方式解码邮件内容"""
encodings = ['utf-8', 'gb18030', 'iso-8859-1', 'latin1'] encodings = ['utf-8', 'gb18030', 'iso-8859-1', 'latin1']
for encoding in encodings: for encoding in encodings:
...@@ -323,81 +352,140 @@ class GetSS_details(): ...@@ -323,81 +352,140 @@ class GetSS_details():
except: except:
pass pass
def yxyzm(self,iframe): def yxyzm(self):
print('需要输入邮箱验证码') print('需要输入邮箱验证码 等待2分钟')
sleep(randint(62, 140))
iframe = self.page.get_frame('#login-iframe')
sleep(randint(2, 4)) sleep(randint(2, 4))
yzm = self.fetch_verification_code(self.email_value_config) yzm = self.fetch_verification_code(self.email_value_config)
yzm_input = iframe.ele('.MuiFormLabel-root MuiInputLabel-root MuiInputLabel-formControl MuiInputLabel-animated MuiInputLabel-sizeSmall MuiInputLabel-standard MuiFormLabel-colorPrimary css-17839r8') try:
print(('验证码输入'))
yzm_input = iframe.ele('@text()=输入代码')
sleep(randint(2, 4))
yzm_input.input(yzm)
except:
yzm_input = iframe.ele(
'.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall css-186x7cf')
sleep(randint(2, 4)) sleep(randint(2, 4))
yzm_input.input(yzm) yzm_input.input(yzm)
iframe.ele('.MuiTouchRipple-root css-w0pj6f').click() print('点击验证')
iframe.ele('@text()=验证').click()
def get_microservice_token(self):
for i in range(5):
try:
url = "http://wx.yswg.com.cn:8000/microservice-system/system/admin/getToken"
timestamp = str(int(time.time()))
secret = "dafa17fb-0e97-4246-a6b3-d574e44d212d"
md5_value = hashlib.md5((secret + timestamp).encode("utf-8")).hexdigest()
response = requests.post(url, json={
"module": "spider",
"weChatId": "pengyanbing",
"secret": md5_value,
"timestamp": timestamp
})
res = response.json()
print(res)
if (res['code'] == 200):
userinfo = res['data']
self.token = userinfo['token']
expireTime = userinfo['expireTime']
print(self.token, expireTime)
else:
raise Exception(res['msg'])
break
except Exception as e:
print('get_microservice_token, 报错',e)
time.sleep(20)
def login(self): def login(self):
try: try:
# 打开页面 # 打开页面
self.page.get('https://www.shutterstock.com/zh/catalog/') self.page.get('https://www.shutterstock.com/zh/catalog/')
sleep(randint(2, 4)) sleep(randint(12, 24))
try: try:
# print('No thanks') # print('No thanks')
print('click No thanks') print('click No thanks')
login_button = self.page.ele('xpath://a[@id="continue"]', timeout=10) login_button = self.page.ele('xpath://a[@id="continue"]', timeout=15)
login_button.click() login_button.click()
except: except:
print('No thanks 错误') print('No thanks 错误')
print('开始登录。', self.account, self.pwd)
# 判断是否在登录状态 # 判断是否在登录状态
self.login_out() # self.login_out()
# 查找并点击登录按钮 # 查找并点击登录按钮
login_button = self.page.ele('xpath://a[@data-automation="loginButton"]', timeout=10) login_button = self.page.ele('xpath://a[@data-automation="loginButton"]', timeout=15)
login_button.click() login_button.click()
sleep(randint(2, 4)) sleep(randint(12,24))
# 等待页面加载,切换到 iframe # 等待页面加载,切换到 iframe
iframe = self.page.get_frame('#login-iframe') iframe = self.page.get_frame('#login-iframe')
print('已切换到 login-iframe') print('已切换到 login-iframe')
# 查找并输入邮箱 # 查找并输入邮箱
print("正在等待邮箱输入框...") print("正在等待邮箱输入框...")
sleep(10) sleep(15)
email_input = iframe.ele('.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall css-186x7cf') # email_input = iframe.ele('.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall css-186x7cf')
email_input = iframe.ele('xpath://input[@name="username"]')
email_input.clear() # 清除任何预填充的内容 email_input.clear() # 清除任何预填充的内容
email_input.input(self.account) # 输入文本 email_input.input(self.account) # 输入文本
print("已输入账号到邮箱输入框") print("已输入账号到邮箱输入框")
sleep(randint(2, 4))
# 查找并输入密码 # 查找并输入密码
print("正在等待密码输入框...") print("正在等待密码输入框...")
email_input = iframe.ele('.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall MuiInputBase-inputAdornedEnd css-186x7cf') email_input = iframe.ele(
'.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall MuiInputBase-inputAdornedEnd css-186x7cf')
email_input.clear() # 清除任何预填充的内容 email_input.clear() # 清除任何预填充的内容
email_input.input(self.pwd) email_input.input(self.pwd)
print("已输入密码到密码输入框") print("已输入密码到密码输入框")
sleep(randint(2, 4)) sleep(randint(3, 5))
# 查找并点击登录按钮 # 查找并点击登录按钮
submit_button = iframe.ele('.MuiButtonBase-root MuiButton-root MuiButton-contained MuiButton-containedPrimary MuiButton-sizeMedium MuiButton-containedSizeMedium MuiButton-disableElevation MuiButton-fullWidth css-df622d') print('查找并点击登录按钮')
# submit_button = iframe.ele(
# '.MuiButtonBase-root MuiButton-root MuiButton-contained MuiButton-containedPrimary MuiButton-sizeMedium MuiButton-containedSizeMedium MuiButton-disableElevation MuiButton-fullWidth css-1w8itp0')
try:
submit_button = iframe.ele('.LoginForm_bottomSpacingMd__e2Mnm')
submit_button.click() submit_button.click()
except:
print('切换点击')
sleep(randint(3, 4))
iframe.ele('.MuiButtonBase-root MuiButton-root MuiButton-contained MuiButton-containedPrimary MuiButton-sizeMedium MuiButton-containedSizeMedium MuiButton-disableElevation MuiButton-fullWidth css-1is1osn').click()
print('已点击登录...') print('已点击登录...')
sleep(randint(8, 15))
except Exception as e: except Exception as e:
print(f"出现错误: {e}") print(f"出现错误: {e}", f"\n{traceback.format_exc()}")
return False return False
try: try:
sleep(randint(5, 8)) print(33333333333)
iframe = self.page.get_frame('#login-iframe')
sleep(randint(4, 8))
h3_element = iframe.ele( h3_element = iframe.ele(
'.MuiFormLabel-root MuiInputLabel-root MuiInputLabel-formControl MuiInputLabel-animated MuiInputLabel-sizeSmall MuiInputLabel-standard MuiFormLabel-colorPrimary css-17839r8') '.FormHeader_root__fHtRy wrapper-component_center__zG6GW')
if h3_element: h3_ = iframe.ele('@text()=输入验证代码')# 要继续,请输入发送到您电子邮件中的代码
self.yxyzm(iframe) h3_1 = iframe.ele('xpath://h3[contains(text(),"输入验证代码")]')# 要继续,请输入发送到您电子邮件中的代码
P_text1 = iframe.ele('xpath://p[contains(text(),"未收到代码?单击")]', timeout=15)
P_text2 = iframe.ele('xpath://p[contains(text(),"电子邮件中")]', timeout=15)
if h3_element or h3_ or h3_1 or P_text1 or P_text2 or '输入验证代码' in iframe.html or '输入验证代码' in self.page.html:
self.yxyzm()
else: else:
print('不需要验证码') print('不需要验证码')
sleep(10) sleep(10)
self.page.refresh()
sleep(randint(5, 10))
self.page.get('https://www.shutterstock.com/zh/catalog/licenses')
sleep(randint(4, 8))
ck = self.get_ck() ck = self.get_ck()
return ck return ck
except Exception as e: except Exception as e:
print(e) print(e)
print('不需要验证码') print('不需要验证码11111111111')
sleep(randint(5, 8)) sleep(randint(5, 8))
ck = self.get_ck() ck = self.get_ck()
return ck return ck
...@@ -416,16 +504,22 @@ class GetSS_details(): ...@@ -416,16 +504,22 @@ class GetSS_details():
max_retries = 3 max_retries = 3
retries = 0 retries = 0
while retries <= max_retries: while retries <= max_retries:
headers = {
"authorization": self.token
}
try: try:
response = requests.post(url, data=data_json) response = requests.post(url, data=data_json,headers=headers)
if response.status_code == 200: if response.status_code == 200:
return response.json() return response.json()
else: else:
print(url,'2323')
print(f'请求失败,状态码: {response.status_code},重试 ({retries}/{max_retries})') print(f'请求失败,状态码: {response.status_code},重试 ({retries}/{max_retries})')
retries += 1 retries += 1
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print(f'请求异常: {e},重试 ({retries}/{max_retries})') print(f'请求异常: {e},重试 ({retries}/{max_retries})')
retries += 1 retries += 1
self.get_microservice_token()
raise Exception(f'请求失败,已达到最大重试次数:{max_retries} 次') raise Exception(f'请求失败,已达到最大重试次数:{max_retries} 次')
def get_jpg(self, cookies, image_id): def get_jpg(self, cookies, image_id):
...@@ -544,12 +638,12 @@ class GetSS_details(): ...@@ -544,12 +638,12 @@ class GetSS_details():
if retry > max_retries: if retry > max_retries:
logging.warning("超过重试次数,跳过该图片") logging.warning("超过重试次数,跳过该图片")
return False return False
sleep_time = [random.randint(60, 180), random.randint(180, 240), random.randint(1800, 1900)][retry - 1] sleep_time = [random.randint(60, 180), random.randint(180, 240), random.randint(1800, 1900)][
retry - 1]
logging.warning(f"未知错误,等待{sleep_time}s 第{retry}次重试...") logging.warning(f"未知错误,等待{sleep_time}s 第{retry}次重试...")
time.sleep(sleep_time) time.sleep(sleep_time)
continue # 继续下一次重试 continue # 继续下一次重试
def run_get_stock_img_id(self, account, cookie): def run_get_stock_img_id(self, account, cookie):
"""封装GetStockImgId.run()调用""" """封装GetStockImgId.run()调用"""
try: try:
...@@ -566,7 +660,7 @@ class GetSS_details(): ...@@ -566,7 +660,7 @@ class GetSS_details():
print(f"开始抓取 item_id: {item_id}") print(f"开始抓取 item_id: {item_id}")
self.page.clear_cache() # 清除浏览器缓存和session信息。下一个账号直接登录。优化上一个账号没有退出导致新账号登录失败 self.page.clear_cache() # 清除浏览器缓存和session信息。下一个账号直接登录。优化上一个账号没有退出导致新账号登录失败
if item_id == 1 and int(day)<2: if item_id == 1 and int(day) < 2:
Con.update_all_states_to_1(state=2) Con.update_all_states_to_1(state=2)
wait_time = random.uniform(6, 10) wait_time = random.uniform(6, 10)
...@@ -584,7 +678,7 @@ class GetSS_details(): ...@@ -584,7 +678,7 @@ class GetSS_details():
image_id_id_pairs = Con.get_stock_images_id(self.account) image_id_id_pairs = Con.get_stock_images_id(self.account)
if not image_id_id_pairs: if not image_id_id_pairs:
print(f'{self.account} 已全部爬取完成') print(f'{self.account} 已全部爬取完成')
Con.update_all_states_to_1(state=3,item_id=item_id) Con.update_all_states_to_1(state=3, item_id=item_id)
continue continue
counts_start = 0 counts_start = 0
...@@ -594,7 +688,8 @@ class GetSS_details(): ...@@ -594,7 +688,8 @@ class GetSS_details():
image_id, item_id_str, image_title, image_size_info = image_id_id_pairs[count].split('||-||') image_id, item_id_str, image_title, image_size_info = image_id_id_pairs[count].split('||-||')
print(f'执行 {self.account}: {image_id}, {item_id_str}, 计数: {count}') print(f'执行 {self.account}: {image_id}, {item_id_str}, 计数: {count}')
try: try:
chong_shi = self.get_pic(self.account, image_id, item_id_str, image_title, image_size_info, cookie, wait_time) chong_shi = self.get_pic(self.account, image_id, item_id_str, image_title, image_size_info,
cookie, wait_time)
if not chong_shi: if not chong_shi:
stop_flag = True stop_flag = True
break break
...@@ -615,11 +710,5 @@ class GetSS_details(): ...@@ -615,11 +710,5 @@ class GetSS_details():
break break
if __name__ == '__main__': if __name__ == '__main__':
GetSS_details().run() GetSS_details().run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment