Commit a311edcd by Peng

Revert "no message"

This reverts commit ec9229bd.
parent ec9229bd
import json
import time
import pandas as pd
from secure_db_client import get_remote_engine
import time
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool
from sqlalchemy import text
from sqlalchemy.orm import sessionmaker
import platform
import traceback
import json
import uuid
from sqlalchemy.exc import SQLAlchemyError
class ConnectSpider:
def __init__(self):
self.db_engine = get_remote_engine(
site_name='us', # -> database "selection"
db_type="postgresql_14_outer", # -> 服务端 alias "mysql"
)
self.db_engine192 = get_remote_engine(
site_name='us', # -> database "selection"
db_type="postgresql_14_outer", # -> 服务端 alias "mysql"
)
# self.pg_port = 54328
# self.pg_db = "selection"
# self.pg_user = "postgres"
# self.pg_pwd = "F9kL2sXe81rZq"
# self.pg_host = "61.145.136.61"
# pg_host = "192.168.10.223"
# self.db_engine192 = create_engine(
# f"postgresql://{self.pg_user}:{self.pg_pwd}@{self.pg_host}:{self.pg_port}/{self.pg_db}")
self.pg_port = 54328
self.pg_db = "selection"
self.pg_user = "postgres"
self.pg_pwd = "F9kL2sXe81rZq"
self.pg_host = "61.145.136.61"
self.db_engine = create_engine(f"postgresql://{self.pg_user}:{self.pg_pwd}@{self.pg_host}:{self.pg_port}/{self.pg_db}")
pg_host = "192.168.10.223"
self.db_engine192 = create_engine(
f"postgresql://{self.pg_user}:{self.pg_pwd}@{self.pg_host}:{self.pg_port}/{self.pg_db}")
# mysql
self.sql_port = 3306
self.sql_db = "selection"
self.sql_user = "adv_yswg"
self.sql_pwd = "Gd1pGJog1ysLMLBdML8w81"
self.sql_host = "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com"
self.charset = 'utf8mb4'
# 创建数据库连接字符串
connection_string_mysql = f"mysql+pymysql://{self.sql_user}:{self.sql_pwd}@{self.sql_host}:{self.sql_port}/{self.sql_db}?charset={self.charset}"
self.mysql_engine = create_engine(connection_string_mysql)
# mysql
sql_port = 19030
sql_db = "test"
sql_user = "fangxingjun"
sql_pwd = "fangxingjun12345"
sql_host = "192.168.10.151"
wai_host = "113.100.143.162"
# 创建数据库连接字符串
connection_string_mysql = f"mysql+pymysql://{sql_user}:{sql_pwd}@{wai_host}:{sql_port}/{sql_db}"
self.mysql_test = create_engine(connection_string_mysql)
# mysql
sql_port = 19030
sql_db = "selection"
sql_user = "fangxingjun"
sql_pwd = "fangxingjun12345"
sql_host = "192.168.10.151"
wai_host = "113.100.143.162"
# 创建数据库连接字符串
connection_string_mysql = f"mysql+pymysql://{sql_user}:{sql_pwd}@{sql_host}:{sql_port}/{sql_db}"
self.mysql_selection = create_engine(connection_string_mysql)
def mysql(self):
mysql_engine = get_remote_engine(
site_name='us', # -> database "selection"
db_type="mysql", # -> 服务端 alias "mysql"
)
sql_port = 3306
sql_db = "us_spider"
sql_user = "adv_yswg"
sql_pwd = "Gd1pGJog1ysLMLBdML8w81"
sql_host = "rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com"
charset = 'utf8mb4'
# 创建数据库连接字符串
connection_string_mysql = f"mysql+pymysql://{sql_user}:{sql_pwd}@{sql_host}:{sql_port}/{sql_db}?charset={charset}"
mysql_engine = create_engine(connection_string_mysql)
return mysql_engine
def save_stock_img_id(self,items):
# sql = """
# INSERT INTO stock_image_id_wj
# (account_id, image_id, state, created_at, image_title, image_size_info)
# VALUES (%s, %s, %s, %s, %s, %s)
# ON DUPLICATE KEY UPDATE
# state = VALUES(state),
# created_at = VALUES(created_at),
# image_title = VALUES(image_title),
# image_size_info = VALUES(image_size_info)
# """
sql = """
def mysql_us_spider(self):
sql_port = 19030
# sql_db = "us_spider"
sql_db = "test"
sql_user = "fangxingjun"
sql_pwd = "fangxingjun12345"
sql_host = "192.168.10.151"
# 创建数据库连接字符串
connection_string_mysql = f"mysql+pymysql://{sql_user}:{sql_pwd}@{sql_host}:{sql_port}/{sql_db}"
mysql_us_spider_engine = create_engine(connection_string_mysql)
return mysql_us_spider_engine
def save_stock_img_id(self, items):
"""批量保存数据到数据库。"""
# 提取 image_title 和 image_size_info
processed_items = []
for item in items:
image_title = item.get('title', '')
image_size_info = json.dumps(item.get('sizes', {}))
processed_item = {
'account_id': item['account_id'],
'image_id': item['image_id'],
'state': item['state'],
'created_at': item['created_at'],
'image_title': image_title,
'image_size_info': image_size_info
}
processed_items.append(processed_item)
# # 定义DataFrame的列 旧代码
# columns = ['account_id', 'image_id', 'state', 'created_at', 'image_title', 'image_size_info']
# df = pd.DataFrame(processed_items, columns=columns)
#
# # 使用with语句管理数据库连接
# with self.db_engine192.connect() as connection:
# df.to_sql(
# name=table_name,
# con=connection,
# if_exists='append',
# index=False
# )
insert_sql = text("""
INSERT INTO stock_image_id_wj
(account_id, image_id, state, created_at, image_title, image_size_info)
VALUES (%s, %s, %s, %s, %s, %s::jsonb)
ON CONFLICT (account_id, image_id) DO UPDATE SET
state = EXCLUDED.state,
created_at = EXCLUDED.created_at,
image_title = EXCLUDED.image_title,
image_size_info = EXCLUDED.image_size_info;
"""
params = [
(
item['account_id'],
item['image_id'],
item['state'],
item['created_at'],
item.get('title', ''),
json.dumps(item.get('sizes', {}))
)
for item in items
]
for i in range(5):
try:
VALUES
(:account_id, :image_id, :state, :created_at, :image_title, :image_size_info)
ON CONFLICT (account_id, image_id) DO NOTHING
""")
print('新代码插入id',insert_sql) # 不需要担心有重复的。表里面有唯一索引。会跳过。
# 假设 processed_items 是 [{'account_id':..., 'image_id':..., …}, …]
with self.db_engine192.begin() as conn:
conn.execute(sql, params)
print('存储更新成功')
break
except Exception as e:
time.sleep(30)
print('save_stock_img_id 报错。', e)
def update_id_to_3(self, account_id):
for i in range(5):
conn.execute(insert_sql, processed_items)
def save_homedepot_projects(self, items):
"""批量保存数据到数据库。"""
table_name = "homedepot_projects_items"
# 提取 image_title 和 image_size_info
processed_items = []
for item in items:
image_title = item.get('title', '')
image_size_info = json.dumps(item.get('sizes', {}))
processed_item = {
'account_id': item['account_id'],
'image_id': item['image_id'],
'state': item['state'],
'created_at': item['created_at'],
'image_title': image_title,
'image_size_info': image_size_info
}
processed_items.append(processed_item)
# 定义DataFrame的列
columns = ['account_id', 'image_id', 'state', 'created_at', 'image_title', 'image_size_info']
df = pd.DataFrame(processed_items, columns=columns)
# 使用with语句管理数据库连接
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
)
def get_account_id(self, item_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_summary_wj"
query = text(f"SELECT account_id, id FROM {table_name} WHERE id = :item_id")
result = connection.execute(query, {"item_id": item_id})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
try:
with self.db_engine192.begin() as connection:
accounts = df_status.account_id.iloc[0]
except IndexError:
accounts = None # 或者处理不存在的情况
return accounts
def update_id_to_3(self,account_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_summary_wj"
print(account_id)
sql_update = f"UPDATE {table_name} SET state = 3 WHERE account_id='{account_id}'"
print(sql_update,'成功更新为3')
connection.execute(sql_update)
break
except Exception as e:
time.sleep(30)
print('update_id_to_3 报错。', e)
success_id = tuple(account_id)
sql_update = text(f"UPDATE {table_name} SET state = 3 WHERE account_id IN :success_id")
result = connection.execute(sql_update, {"success_id": success_id})
print('成功更新为3')
connection.close()
def update_all_states_to_1(self, state=1, item_id=None):
for i in range(5):
def update_all_states_to_1(self,state=1,item_id=None):
try:
with self.db_engine192.begin() as connection: # 使用 begin() 自动管理事务
table_name = "stock_image_summary_wj"
if state == 3:
sql_update = f"UPDATE {table_name} SET state = {state} where id={item_id}"
if state==3:
sql_update = text(f"UPDATE {table_name} SET state = {state} where id={item_id}")
else:
sql_update = f"UPDATE {table_name} SET state = {state}"
sql_update = text(f"UPDATE {table_name} SET state = {state}")
print(sql_update)
connection.execute(sql_update)
break
result = connection.execute(sql_update)
print(f'成功更新所有状态为1,受影响行数:{result.rowcount}')
# 显式提交事务(虽然 begin() 已经自动提交)
connection.commit()
except Exception as e:
time.sleep(30)
print(f'更新状态失败:{e}')
# 回滚事务
if 'connection' in locals():
connection.rollback()
def save_account(self,items):
"""批量保存数据到数据库。"""
table_name = "stock_image_summary_wj"
# 定义DataFrame的列
columns = ['account_id', 'account_secret', 'year_month', 'spider_date','state','created_time']
df = pd.DataFrame(items, columns=columns)
# 使用with语句管理数据库连接
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
)
print("保存成功!")
def delet_datails(self, image_id_list):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
# 使用 SQLAlchemy 的 text 函数来创建 SQL 语句
query = text(f"SELECT image_id FROM {table_name} WHERE account_id = 'zhouweiqing@yswg.com.cn';")
# 使用 connection.execute() 来执行查询
result = connection.execute(query).fetchall()
# 获取表中的 image_id 列表
db_image_ids = [row[0] for row in result]
# 找出不在 image_id_list 中的 image_id
non_existent_image_ids = set(db_image_ids) - set(image_id_list)
# 删除不在 image_id_list 中的记录
for image_id in non_existent_image_ids:
delete_query = text(
f"DELETE FROM {table_name} WHERE account_id = 'zhouweiqing@yswg.com.cn' AND image_id = '{image_id}';")
connection.execute(delete_query)
# 提交更改
connection.commit()
def get_datails_image_id(self, account_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
sql_query = text(f"SELECT image_id FROM {table_name} WHERE account_id = :account_id and created_time < '2024-09-02 00:00:00'")
result = connection.execute(sql_query, {"account_id": account_id})
image_id_list = [int(row[0]) for row in result.fetchall()]
# 提交更改
# connection.commit()
return image_id_list
# 1111111111111
def save_stock_detail(self, item):
"""批量保存数据到数据库。"""
table_name = "stock_image_detail_wj"
# 将item包装成列表
items_list = [item]
# 定义DataFrame的列
columns = ['account_id', 'image_id', 'image_size_info', 'image_title', 'image_type', 'image_url', 'state',
'created_time']
columns = ['account_id', 'image_id', 'image_size_info', 'image_title', 'image_type', 'image_url','state', 'created_time']
df = pd.DataFrame(items_list, columns=columns)
for i in range(5):
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
)
# print("保存成功!")
def get_pic_urls_limit1(self, account_id):
pic_data_list = [] # 创建一个空列表来存储结果
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
query = text(
f"""select image_url, image_id, image_title from {table_name} where account_id = :account_id and state = 1 limit 1""")
try:
self.db_engine192.to_sql(df, table_name, if_exists='append')
print("保存成功!")
break
result = connection.execute(query, {'account_id': account_id})
for row in result: # 遍历所有的结果行
if row is not None:
# 将 RowProxy 转换为标准的字典
row_dict = dict(zip(result.keys(), row)) # 使用 keys 和 fetchone 的结果创建字典
# 直接构建所需格式的字符串
pic_datas = f"{row_dict['image_url']}||{row_dict['image_id']}||{row_dict['image_title']}"
pic_data_list.append(pic_datas) # 添加到列表中
if not pic_data_list:
# print("No data found for the given account_id")
return False
else:
return pic_data_list # 返回列表
except Exception as e:
time.sleep(30)
print(f'save_stock_detail 报错:{e}')
# 回滚事务
print(f"An error occurred: {e}")
return False
def get_pic_urls(self, account_id):
pic_data_list = [] # 创建一个空列表来存储结果
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
query = text(
f"""select image_url, image_id, image_title from {table_name} where account_id = :account_id and state = 1""")
def get_stock_images_id(self, account_id):
for i in range(5):
try:
result = connection.execute(query, {'account_id': account_id})
for row in result: # 遍历所有的结果行
if row is not None:
# 将 RowProxy 转换为标准的字典
row_dict = dict(zip(result.keys(), row)) # 使用 keys 和 fetchone 的结果创建字典
# 直接构建所需格式的字符串
pic_datas = f"{row_dict['image_url']}||{row_dict['image_id']}||{row_dict['image_title']}"
pic_data_list.append(pic_datas) # 添加到列表中
if not pic_data_list:
# print("No data found for the given account_id")
return False
else:
return pic_data_list # 返回列表
except Exception as e:
print(f"An error occurred: {e}")
return False
def get_stock_images_id2(self, account_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj"
# 修改查询语句以匹配你的数据表名称和列名称
query = f""" SELECT image_id,id,image_title,image_size_info FROM {table_name} where account_id ='{account_id}' and state = 1"""
# 特定的 image_id 列表
specific_image_ids = [
'1025406430',
'782084149',
'2340663257',
'2444918601',
'2481076155',
'2534369399',
'2522128969',
'2522144147',
'2482077119',
'2475085855',
'2560247125',
'1115348984',
'2555951185',
'1644852415',
'1644852424',
'258700904',
'2540342353',
'2555951245',
'2529955899',
'1309059847',
'1899316957',
'2416180707',
'1978653428',
'2520112131',
'1447499252',
'2335787565',
'1780440524',
'2316295613',
'2463106909',
'2527382733',
'2548693637',
'2460743889',
'2489123001',
'2527399543',
'2456315025',
'2469939069',
'2305915213',
'1660111006',
'2218802639',
'453729808',
'2295540279',
'2323950095',
'2323950087',
'2057817146',
'2541104423',
'231076948',
'2196541827',
'2407612765',
'2521017693',
'2554778219',
'2523427909',
'2520799267',
'2533854931',
'2498052331',
'2521798533',
'2471652945',
'2445858817',
'2449783031',
'1735869230',
'1106587370',
'2393397957',
'2527382699',
'2348771553',
'1822384931',
'2564084221'
]
# specific_image_ids = ['2509630613', '2568241787', '2568242327', '2568242443', '2568242799', '2568242949']
# 修改查询语句以匹配你的数据表名称、列名称,并加入 image_id 条件
query = text(f"""SELECT image_id,id,image_title,image_size_info FROM {table_name}
WHERE account_id = :account_id
AND image_id IN :image_ids""")
print(query)
df_status = self.db_engine192.read_sql(query)
result = connection.execute(query, {'account_id': account_id, 'image_ids': tuple(specific_image_ids)})
try:
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
df_status['id'] = df_status['id'].astype(str)
image_id_id_pairs = list(
df_status['image_id'].astype(str) + '||-||' + df_status['id'] + '||-||' + df_status[
'image_title'] + '||-||' + df_status['image_size_info'])
print(f'账号:{account_id}需爬取{len(image_id_id_pairs)}张')
return image_id_id_pairs
except Exception as e:
print(e)
return False
def img_size_is_1(self, account_id, image_ids):
# 确保image_ids列表非空
if not image_ids:
print("No image IDs provided.")
return False
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj"
query = text(f"""SELECT image_id, id, image_title, image_size_info FROM {table_name} WHERE account_id = :account_id AND image_id IN :image_ids""")
try:
result = connection.execute(query, {'account_id': account_id, 'image_ids': tuple(image_ids)})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
df_status['id'] = df_status['id'].astype(str)
image_id_id_pairs = list(
df_status['image_id'] + '||-||' + df_status['id'] + '||-||' + df_status['image_title'] + '||-||' +
......@@ -139,104 +456,273 @@ class ConnectSpider:
except Exception as e:
print(e)
return False
def get_stock_images_id(self,account_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj"
# 修改查询语句以匹配你的数据表名称和列名称
query = text(f""" SELECT image_id,id,image_title,image_size_info FROM {table_name} where account_id = :account_id and state = 1""")
print(query)
result = connection.execute(query, {'account_id': account_id})
try:
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
df_status['id'] = df_status['id'].astype(str)
image_id_id_pairs = list(df_status['image_id'] + '||-||' + df_status['id'] + '||-||' + df_status['image_title'] + '||-||' + df_status['image_size_info'])
print(f'账号:{account_id}需爬取{len(image_id_id_pairs)}张')
connection.close()
return image_id_id_pairs
except Exception as e:
time.sleep(30)
print(f'get_stock_images_id 报错:{e}')
# 回滚事务
print(e)
return False
def get_kong_images_id(self, account_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
# 使用子查询来过滤掉已经有 image_size_info 的 image_id
query = text(
f"""SELECT image_id, image_type, image_url
FROM {table_name}
WHERE account_id = :account_id
AND image_id NOT IN (
SELECT image_id
FROM {table_name}
WHERE account_id = :account_id
AND image_size_info != '{{}}'
)
AND image_size_info = '{{}}'"""
)
print(query)
result = connection.execute(query, {'account_id': account_id})
df_status = pd.DataFrame(result.fetchall(), columns=result.keys())
if df_status.empty:
return []
data_list = list(df_status['image_id'] + '||' + df_status['image_type'] + '||' + df_status['image_url'])
connection.close()
return data_list
def get_stock_image_detail(self, account_id):
with self.mysql_selection.connect() as connection:
table_name = "stock_image_detail"
query = text(
f"""SELECT account_id, image_id, image_size_info, image_title, image_type, image_url, created_time FROM {table_name} WHERE account_id = :account_id""")
print(query)
result = connection.execute(query, {'account_id': account_id})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
# 将 Timestamp 转换为字符串格式
df_status['created_time'] = df_status['created_time'].dt.strftime('%Y-%m-%d %H:%M:%S')
# 拼接字符串
detail_datas = list(
df_status['account_id'] + '||-||' +
df_status['image_id'] + '||-||' +
df_status['image_size_info'] + '||-||' +
df_status['image_title'] + '||-||' +
df_status['image_type'] + '||-||' +
df_status['image_url'] + '||-||' +
df_status['created_time']
)
print(f'账号:{account_id} 一共 {len(detail_datas)} 条数据')
return detail_datas
def save_stock_detail_move(self, data_list):
table_name = "stock_image_detail_wj"
# 定义DataFrame的列
columns = ['account_id', 'image_id', 'image_size_info', 'image_title', 'image_type', 'image_url',
'created_time']
df = pd.DataFrame(data_list, columns=columns)
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
)
print("保存成功!")
# 11111111111
def update_image_id_to_3(self, item_id):
for i in range(5):
try:
with self.db_engine192.begin() as connection:
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj"
sql_update = f"UPDATE {table_name} SET state = 3 WHERE id = {item_id}"
connection.execute(sql_update)
break
except Exception as e:
time.sleep(30)
print(f'update_image_id_to_3 报错:{e}')
# 回滚事务
trans = connection.begin()
sql_update = text(f"UPDATE {table_name} SET state = 3 WHERE id = :item_id")
result = connection.execute(sql_update, {"item_id": item_id})
trans.commit()
def update_url_state_to_3(self,image_id):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
trans = connection.begin()
sql_update = text(f"UPDATE {table_name} SET state = 3 WHERE image_id = :image_id and state = 1")
result = connection.execute(sql_update, {"image_id": image_id})
trans.commit()
# 11111111111
def update_image_id_to_4(self, item_id):
for i in range(5):
try:
with self.db_engine192.begin() as connection:
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj"
sql_update = f"UPDATE {table_name} SET state = 4 WHERE id = {item_id}"
connection.execute(sql_update)
break
except Exception as e:
time.sleep(30)
print(f'update_image_id_to_4 报错:{e}')
trans = connection.begin()
sql_update = text(f"UPDATE {table_name} SET state = 4 WHERE id = :item_id")
result = connection.execute(sql_update, {"item_id": item_id})
connection.close()
trans.commit()
def save_stock_cookie(self,item):
table_name = "stock_cookie_wj"
# 将item包装成列表
items_list = [item]
# 定义DataFrame的列
columns = ['account_id', 'cookie', 'state', 'created_at']
df = pd.DataFrame(items_list, columns=columns)
with self.db_engine192.connect() as connection:
df.to_sql(
name=table_name,
con=connection,
if_exists='append',
index=False
)
print("保存成功!")
def updata_ck_state(self,ck_id):
with self.db_engine192.connect() as connection:
table_name = "stock_cookie_wj"
# 使用参数化查询防止SQL注入
query = text(
f"""UPDATE {table_name} SET state = :new_state WHERE id = :ck_id;""")
def get_cookie_account(self, item_id):
for i in range(5):
try:
# 执行更新语句
result = connection.execute(query, {'new_state': 3, 'ck_id': ck_id})
# 提交事务以确保更改被保存到数据库中
connection.commit()
# 检查是否有行受到影响
if result.rowcount > 0:
print('修改cookie状态为3')
return True # 更新成功
else:
return False # 没有找到匹配项或没有更新任何行
except Exception as e:
# print(f"An error occurred: {e}")
return False
def get_stock_cookie(self, account):
with self.db_engine192.connect() as connection:
table_name = "stock_cookie_wj"
# 使用参数化查询
query = text(
f"""SELECT id, cookie, state FROM {table_name} WHERE account_id = :account_id AND state = :state LIMIT 1;""")
result = connection.execute(query, {'account_id': account, 'state': 1}).mappings().first()
if result is not None:
# 通过键名访问字典中的元素
cookie_id_state = f"{result['id']}||-||{result['cookie']}||-||{result['state']}"
return cookie_id_state
else:
return None # 没有找到匹配项时返回None
def get_stock_cookie_list(self, account):
with self.db_engine192.connect() as connection:
table_name = "stock_cookie_wj"
# 使用参数化查询
query = text(f"""SELECT cookie FROM {table_name} WHERE account_id = :account_id AND state = :state ; """)
result = connection.execute(query, {'account_id': account, 'state': 1})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
cookie_list = df_status['cookie'].tolist() if 'cookie' in df_status.columns else []
return cookie_list
def get_cookie_account(self,item_id):
# try:
with self.db_engine192.connect() as connection:
table_name = "stock_image_summary_wj"
# 修改查询语句以匹配你的数据表名称和列名称
query = f"""SELECT account_id,account_secret FROM {table_name} where id = {item_id} and state= 1;"""
query = text(f"""SELECT account_id,account_secret FROM {table_name} where id = :item_id and state= :state_int;""")
print(query)
df_status = self.db_engine192.read_sql(query)
if len(df_status) > 0:
result = connection.execute(query, {'item_id': item_id,'state_int':1})
print(result)
df_status = pd.DataFrame(result.fetchall())
if len(df_status)>0:
df_status.columns = result.keys()
account_id = df_status.account_id.iloc[0]
account_secret = df_status.account_secret.iloc[0]
account_list = [account_id, account_secret]
print(account_list, '232323====32')
print(account_list,'232323====32')
# print(111111111111)
connection.close()
return account_list
else:
return None
except Exception as e:
time.sleep(30)
print(f'get_cookie_account 报错:{e}')
# except Exception as e:
# print(111111111)
def get_stock_test_id(self,username):
with self.db_engine192.connect() as connection:
table_name = "stock_image_id_wj"
# 修改查询语句以匹配你的数据表名称和列名称
query = text(f"""SELECT image_id FROM {table_name} WHERE account_id = :username LIMIT 1;""")
result = connection.execute(query, {'username': username})
df_status = pd.DataFrame(result.fetchall())
df_status.columns = result.keys()
image_id = df_status.image_id.iloc[0]
connection.close()
return image_id
def upload_data(self, account_id, image_id, upload_time, err_msg):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
# 强制将 image_id 转换为字符串类型
image_id = str(image_id)
err_msg_json = json.dumps(err_msg)
sql_update = text(
f"UPDATE {table_name} SET upload_time = :upload_time, err_msg = :err_msg WHERE account_id = :account_id AND image_id = :image_id AND created_time < '2024-09-02 00:00:00'")
result = connection.execute(sql_update, {
"upload_time": upload_time,
"err_msg": err_msg_json,
"account_id": account_id,
"image_id": image_id
})
def upload_success_data(self, account_id, image_id, upload_time):
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
# 强制将 image_id 转换为字符串类型
image_id = str(image_id)
sql_update = text(
f"UPDATE {table_name} SET upload_time = :upload_time WHERE account_id = :account_id AND image_id = :image_id AND created_time < '2024-09-02 00:00:00'")
result = connection.execute(sql_update, {
"upload_time": upload_time,
"account_id": account_id,
"image_id": image_id
})
def get_all_image_id(self):
for i in range(5):
try:
with self.db_engine192.connect() as connection:
table_name = "stock_image_detail_wj"
sql_query = f"SELECT image_id FROM {table_name} "
df_status = self.db_engine192.read_sql(sql_query)
df_status = pd.read_sql(sql_query, con=connection)
image_id = list(df_status['image_id'].astype(str))
connection.close()
return image_id
except Exception as e:
time.sleep(30)
print(f'get_all_image_id 报错:{e}')
def update_url_state_to_3(self, image_id):
for i in range(5):
try:
with self.db_engine192.begin() as connection:
table_name = "stock_image_detail_wj"
sql_update = f"UPDATE {table_name} SET state = 3 WHERE image_id ='{image_id}' and state = 1"
connection.execute(sql_update)
break
except Exception as e:
time.sleep(30)
print(f'update_url_state_to_3 报错:{e}')
def get_pic_urls(self, account_id):
pic_data_list = [] # 创建一个空列表来存储结果
table_name = "stock_image_detail_wj"
query =f"""select image_url, image_id, image_title from {table_name} where account_id = '{account_id}' and state = 1"""
try:
result_df = self.db_engine192.read_sql(query)
# print(result_df)
result_list = result_df.values.tolist()
for row in result_list: # 遍历所有的结果行
if row is not None:
# 直接构建所需格式的字符串
pic_datas = f"{row[0]}||{row[1]}||{row[2]}"
pic_data_list.append(pic_datas) # 添加到列表中
"""
['https://download.shutterstock.com/gatekeeper/W3siZCI6ICJzaHV0dGVyc3RvY2stbWVkaWEiLCAiayI6ICJwaG90by8yNDY2MDI5NDI1L2h1Z2UuanBnIiwgImRjIjogImlkbF8xMjMiLCAiZSI6IDE3NDYwMTIzNDQsICJtIjogMX0sICJBb0dOUzZDMXNiVU1XczgxMFN5YVBsUEJrakEiXQ==/shutterstock_2466029425.jpg||2466029425||Beautiful smiling model in sunglasses. Female dressed in summer hipster white T-shirt and jeans. Posing near white wall in the street. Funny and positive woman having fun outdoors, in sunglasses']
"""
if not pic_data_list:
# print("No data found for the given account_id")
return False
else:
return pic_data_list # 返回列表
except Exception as e:
print(f"An error occurred: {e}")
return False
if __name__ == '__main__':
ConnectSpider().get_cookie_account(10)
ConnectSpider().get_cookie_account(1)
# -*- coding: utf-8 -*-
import os
import sys
import os
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from time import sleep
from random import randint
from all_connect import ConnectSpider
import traceback
Con = ConnectSpider()
import imaplib
import email
import os
import time
import requests
import hashlib
os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging
logging.captureWarnings(True)
from DrissionPage import ChromiumPage,ChromiumOptions
from DrissionPage import ChromiumPage
import json
from curl_cffi import requests
import requests
import re
import random
import time
......@@ -29,6 +24,7 @@ import calendar
import sys
class GetStockImgId(object):
def __init__(self):
self.headers = {
......@@ -70,30 +66,17 @@ class GetStockImgId(object):
)
self.headers['user-agent'] = ua
def get_url_month(self, page, cookie, start_date, last_date):
# self.random_ua()
"https://www.shutterstock.com/napi/s/dam/holdings/search?include=media-item%2Cmedia-item.track-assets%2Cmedia-item.cms-entry&sort=-licensedAt&useMms=true&channel=shutterstock&page[size]=50&filter[licensedSince]={start_date}T00%3A00%3A00Z&filter[licensedUntil]={last_date}T23%3A59%3A59Z&filter[assetStatus]=comped%2Clicensed&language=zh"
url = f"https://www.shutterstock.com/napi/s/dam/holdings/search?include=media-item%2Cmedia-item.track-assets%2Cmedia-item.cms-entry&sort=-licensedAt&useMms=true&channel=shutterstock&page[size]=50&filter[licensedSince]={start_date}T00%3A00%3A00Z&filter[licensedUntil]={last_date}T23%3A59%3A59Z&page[number]={page}&filter[assetStatus]=comped%2Clicensed&language=zh"
print('url:',url)
# url = "https://www.shutterstock.com/napi/s/dam/holdings/search"
# params = {
# "include": "media-item,media-item.track-assets,media-item.cms-entry",
# "sort": "-licensedAt",
# "useMms": "true",
# "channel": "shutterstock",
# "page\\[size\\]": "50",
# "filter\\[licensedSince\\]": "2025-09-01T00:00:00Z",
# "filter\\[licensedUntil\\]": "2025-09-30T23:59:59Z",
# "filter\\[assetStatus\\]": "comped,licensed",
# "language": "zh"
# }
response = requests.get(url, headers=self.headers, cookies=cookie)
def get_url_month(self,page,cookie,start_date,last_date):
self.random_ua()
response = requests.get(
f'https://www.shutterstock.com/api/s/dam/holdings/search?include=media-item%2Cmedia-item.track-assets%2Cmedia-item.cms-entry&sort=-licensedAt&useMms=true&channel=shutterstock&page[size]=200&filter[licensedSince]={start_date}T00%3A00%3A00Z&filter[licensedUntil]={last_date}T23%3A59%3A59Z&page[number]={page}&filter[assetStatus]=comped%2Clicensed&language=zh',
cookies=cookie,
headers=self.headers,
)
print(response)
print(response.url)
return response
def get_img_id(self, response, account_id, page):
def get_img_id(self,response,account_id,page):
try:
# print(response.json())
data = response.json()['included']
......@@ -113,7 +96,6 @@ class GetStockImgId(object):
data_list.append(datas)
# 保存
print('准备保存:')
Con.save_stock_img_id(data_list)
print(f"{account_id}第{page}页保存id成功,")
return True
......@@ -154,13 +136,14 @@ class GetStockImgId(object):
print(f"Start Date: {start_date}")
print(f"Last Date: {last_date}")
while is_continue:
try:
response = self.get_url_month(page, cookie, str(start_date), str(last_date))
if response.status_code == 200:
# 更新是否继续标志位
is_continue = self.get_img_id(response, account_id, page)
print('is_continue:',is_continue)
# 如果不再继续,则更新数据库并将当前账户标记为已完成
if not is_continue:
Con.update_id_to_3(account_id)
......@@ -177,19 +160,11 @@ class GetStockImgId(object):
# 抛出异常以停止外层循环
raise
class GetSS_details():
def __init__(self):
self.account = ''
self.pwd = ''
# self.page = ChromiumPage()
# 配置 Chrome 浏览器 - 端口 9222
chrome_options = ChromiumOptions()
chrome_options.set_browser_path(r'C:\Program Files\Google\Chrome\Application\chrome.exe')
chrome_options.set_local_port(9333) # 设置 Chrome 的调试端口
self.page = ChromiumPage(addr_or_opts=chrome_options)
print(f"Chrome 浏览器运行在端口: {9333}")
self.page = ChromiumPage()
self.headers = {
'accept': 'application/json',
'accept-language': 'zh-CN,zh;q=0.9',
......@@ -217,33 +192,29 @@ class GetSS_details():
'username': 'pengyanbing@yswg.com.cn',
'password': 'Python3.8',
}
self.get_microservice_token()
def get_ck(self):
print('获取登录后的cookie')
try:
self.page.get('https://www.shutterstock.com/zh/catalog/licenses')
self.page.get('https://www.shutterstock.com/zh/catalog/')
sleep(randint(2, 4))
# 获取 cookies 列表
original_cookies_list = self.page.cookies()
# 将 cookies 列表转换为字典
original_cookie_dict = {cookie['name']: cookie['value'] for cookie in original_cookies_list}
print('original_cookie_dict::',original_cookie_dict)
# # 检查 accts_customer_sso1 是否等于 '-undefined'
# if 'accts_customer_sso1' in original_cookie_dict and original_cookie_dict.get(
# 'accts_customer_sso1') == '-undefined':
# # 组合成新的值并更新 accts_customer_sso1
# new_value = f"{original_cookie_dict.get('htjs_user_id', '')}-undefined"
# original_cookie_dict['accts_customer_sso1'] = new_value
#
# keys_of_interest = ['datadome', 'accts_customer_sso1', 'next.sid']
# cookies = {key: original_cookie_dict[key] for key in keys_of_interest if key in original_cookie_dict}
#
# # print('filtered_cookies:', cookies)
return original_cookie_dict
# 检查 accts_customer_sso1 是否等于 '-undefined'
if 'accts_customer_sso1' in original_cookie_dict and original_cookie_dict.get(
'accts_customer_sso1') == '-undefined':
# 组合成新的值并更新 accts_customer_sso1
new_value = f"{original_cookie_dict.get('htjs_user_id', '')}-undefined"
original_cookie_dict['accts_customer_sso1'] = new_value
keys_of_interest = ['datadome', 'accts_customer_sso1', 'next.sid']
cookies = {key: original_cookie_dict[key] for key in keys_of_interest if key in original_cookie_dict}
# print('filtered_cookies:', cookies)
return cookies
except Exception as e:
print('获取cookie出错:', e)
......@@ -254,7 +225,7 @@ class GetSS_details():
login_out.click()
sleep(randint(2, 4))
self.page.ele('@text()=登出').click()
else:
else :
login_out = self.page.ele('.MuiAvatar-root MuiAvatar-circular MuiAvatar-colorDefault mui-1jeofke')
if login_out:
login_out.click()
......@@ -267,7 +238,7 @@ class GetSS_details():
sleep(randint(2, 4))
self.page.ele('@text()=登出').click()
def decode_body(self, body):
def decode_body(self,body):
"""尝试多种编码方式解码邮件内容"""
encodings = ['utf-8', 'gb18030', 'iso-8859-1', 'latin1']
for encoding in encodings:
......@@ -352,140 +323,81 @@ class GetSS_details():
except:
pass
def yxyzm(self):
print('需要输入邮箱验证码 等待2分钟')
sleep(randint(62, 140))
iframe = self.page.get_frame('#login-iframe')
def yxyzm(self,iframe):
print('需要输入邮箱验证码')
sleep(randint(2, 4))
yzm = self.fetch_verification_code(self.email_value_config)
try:
print(('验证码输入'))
yzm_input = iframe.ele('@text()=输入代码')
sleep(randint(2, 4))
yzm_input.input(yzm)
except:
yzm_input = iframe.ele(
'.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall css-186x7cf')
yzm_input = iframe.ele('.MuiFormLabel-root MuiInputLabel-root MuiInputLabel-formControl MuiInputLabel-animated MuiInputLabel-sizeSmall MuiInputLabel-standard MuiFormLabel-colorPrimary css-17839r8')
sleep(randint(2, 4))
yzm_input.input(yzm)
print('点击验证')
iframe.ele('@text()=验证').click()
def get_microservice_token(self):
for i in range(5):
try:
url = "http://wx.yswg.com.cn:8000/microservice-system/system/admin/getToken"
timestamp = str(int(time.time()))
secret = "dafa17fb-0e97-4246-a6b3-d574e44d212d"
md5_value = hashlib.md5((secret + timestamp).encode("utf-8")).hexdigest()
response = requests.post(url, json={
"module": "spider",
"weChatId": "pengyanbing",
"secret": md5_value,
"timestamp": timestamp
})
res = response.json()
print(res)
if (res['code'] == 200):
userinfo = res['data']
self.token = userinfo['token']
expireTime = userinfo['expireTime']
print(self.token, expireTime)
else:
raise Exception(res['msg'])
break
except Exception as e:
iframe.ele('.MuiTouchRipple-root css-w0pj6f').click()
print('get_microservice_token, 报错',e)
time.sleep(20)
def login(self):
try:
# 打开页面
self.page.get('https://www.shutterstock.com/zh/catalog/')
sleep(randint(12, 24))
sleep(randint(2, 4))
try:
# print('No thanks')
print('click No thanks')
login_button = self.page.ele('xpath://a[@id="continue"]', timeout=15)
login_button = self.page.ele('xpath://a[@id="continue"]', timeout=10)
login_button.click()
except:
print('No thanks 错误')
print('开始登录。', self.account, self.pwd)
# 判断是否在登录状态
# self.login_out()
self.login_out()
# 查找并点击登录按钮
login_button = self.page.ele('xpath://a[@data-automation="loginButton"]', timeout=15)
login_button = self.page.ele('xpath://a[@data-automation="loginButton"]', timeout=10)
login_button.click()
sleep(randint(12,24))
sleep(randint(2, 4))
# 等待页面加载,切换到 iframe
iframe = self.page.get_frame('#login-iframe')
print('已切换到 login-iframe')
# 查找并输入邮箱
print("正在等待邮箱输入框...")
sleep(15)
# email_input = iframe.ele('.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall css-186x7cf')
email_input = iframe.ele('xpath://input[@name="username"]')
sleep(10)
email_input = iframe.ele('.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall css-186x7cf')
email_input.clear() # 清除任何预填充的内容
email_input.input(self.account) # 输入文本
print("已输入账号到邮箱输入框")
sleep(randint(2, 4))
# 查找并输入密码
print("正在等待密码输入框...")
email_input = iframe.ele(
'.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall MuiInputBase-inputAdornedEnd css-186x7cf')
email_input = iframe.ele('.MuiInputBase-input MuiInput-input MuiInputBase-inputSizeSmall MuiInputBase-inputAdornedEnd css-186x7cf')
email_input.clear() # 清除任何预填充的内容
email_input.input(self.pwd)
print("已输入密码到密码输入框")
sleep(randint(3, 5))
sleep(randint(2, 4))
# 查找并点击登录按钮
print('查找并点击登录按钮')
# submit_button = iframe.ele(
# '.MuiButtonBase-root MuiButton-root MuiButton-contained MuiButton-containedPrimary MuiButton-sizeMedium MuiButton-containedSizeMedium MuiButton-disableElevation MuiButton-fullWidth css-1w8itp0')
try:
submit_button = iframe.ele('.LoginForm_bottomSpacingMd__e2Mnm')
submit_button = iframe.ele('.MuiButtonBase-root MuiButton-root MuiButton-contained MuiButton-containedPrimary MuiButton-sizeMedium MuiButton-containedSizeMedium MuiButton-disableElevation MuiButton-fullWidth css-df622d')
submit_button.click()
except:
print('切换点击')
sleep(randint(3, 4))
iframe.ele('.MuiButtonBase-root MuiButton-root MuiButton-contained MuiButton-containedPrimary MuiButton-sizeMedium MuiButton-containedSizeMedium MuiButton-disableElevation MuiButton-fullWidth css-1is1osn').click()
print('已点击登录...')
sleep(randint(8, 15))
except Exception as e:
print(f"出现错误: {e}", f"\n{traceback.format_exc()}")
print(f"出现错误: {e}")
return False
try:
print(33333333333)
iframe = self.page.get_frame('#login-iframe')
sleep(randint(4, 8))
sleep(randint(5, 8))
h3_element = iframe.ele(
'.FormHeader_root__fHtRy wrapper-component_center__zG6GW')
h3_ = iframe.ele('@text()=输入验证代码')# 要继续,请输入发送到您电子邮件中的代码
h3_1 = iframe.ele('xpath://h3[contains(text(),"输入验证代码")]')# 要继续,请输入发送到您电子邮件中的代码
P_text1 = iframe.ele('xpath://p[contains(text(),"未收到代码?单击")]', timeout=15)
P_text2 = iframe.ele('xpath://p[contains(text(),"电子邮件中")]', timeout=15)
if h3_element or h3_ or h3_1 or P_text1 or P_text2 or '输入验证代码' in iframe.html or '输入验证代码' in self.page.html:
self.yxyzm()
'.MuiFormLabel-root MuiInputLabel-root MuiInputLabel-formControl MuiInputLabel-animated MuiInputLabel-sizeSmall MuiInputLabel-standard MuiFormLabel-colorPrimary css-17839r8')
if h3_element:
self.yxyzm(iframe)
else:
print('不需要验证码')
sleep(10)
self.page.refresh()
sleep(randint(5, 10))
self.page.get('https://www.shutterstock.com/zh/catalog/licenses')
sleep(randint(4, 8))
ck = self.get_ck()
return ck
except Exception as e:
print(e)
print('不需要验证码11111111111')
print('不需要验证码')
sleep(randint(5, 8))
ck = self.get_ck()
return ck
......@@ -504,22 +416,16 @@ class GetSS_details():
max_retries = 3
retries = 0
while retries <= max_retries:
headers = {
"authorization": self.token
}
try:
response = requests.post(url, data=data_json,headers=headers)
response = requests.post(url, data=data_json)
if response.status_code == 200:
return response.json()
else:
print(url,'2323')
print(f'请求失败,状态码: {response.status_code},重试 ({retries}/{max_retries})')
retries += 1
except requests.exceptions.RequestException as e:
print(f'请求异常: {e},重试 ({retries}/{max_retries})')
retries += 1
self.get_microservice_token()
raise Exception(f'请求失败,已达到最大重试次数:{max_retries} 次')
def get_jpg(self, cookies, image_id):
......@@ -638,12 +544,12 @@ class GetSS_details():
if retry > max_retries:
logging.warning("超过重试次数,跳过该图片")
return False
sleep_time = [random.randint(60, 180), random.randint(180, 240), random.randint(1800, 1900)][
retry - 1]
sleep_time = [random.randint(60, 180), random.randint(180, 240), random.randint(1800, 1900)][retry - 1]
logging.warning(f"未知错误,等待{sleep_time}s 第{retry}次重试...")
time.sleep(sleep_time)
continue # 继续下一次重试
def run_get_stock_img_id(self, account, cookie):
"""封装GetStockImgId.run()调用"""
try:
......@@ -660,7 +566,7 @@ class GetSS_details():
print(f"开始抓取 item_id: {item_id}")
self.page.clear_cache() # 清除浏览器缓存和session信息。下一个账号直接登录。优化上一个账号没有退出导致新账号登录失败
if item_id == 1 and int(day) < 2:
if item_id == 1 and int(day)<2:
Con.update_all_states_to_1(state=2)
wait_time = random.uniform(6, 10)
......@@ -678,7 +584,7 @@ class GetSS_details():
image_id_id_pairs = Con.get_stock_images_id(self.account)
if not image_id_id_pairs:
print(f'{self.account} 已全部爬取完成')
Con.update_all_states_to_1(state=3, item_id=item_id)
Con.update_all_states_to_1(state=3,item_id=item_id)
continue
counts_start = 0
......@@ -688,8 +594,7 @@ class GetSS_details():
image_id, item_id_str, image_title, image_size_info = image_id_id_pairs[count].split('||-||')
print(f'执行 {self.account}: {image_id}, {item_id_str}, 计数: {count}')
try:
chong_shi = self.get_pic(self.account, image_id, item_id_str, image_title, image_size_info,
cookie, wait_time)
chong_shi = self.get_pic(self.account, image_id, item_id_str, image_title, image_size_info, cookie, wait_time)
if not chong_shi:
stop_flag = True
break
......@@ -710,5 +615,11 @@ class GetSS_details():
break
if __name__ == '__main__':
GetSS_details().run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment