Commit 859e586e by Peng

亚马逊代码 修改了数据库连接方式通过调用星钧接口进行增删改查。不直连数据库。

xnj 优化了 当一个账号下载图片完成 切换下一个账号失败问题
tiktok 优化 代码有bug 修复完成。下载视频数据起始时间被写死了,XPATH定位也有问题。修复完成
parent cd1e5c51
......@@ -237,15 +237,15 @@ if __name__ == '__main__':
# week = '04'
print("week 周:", week)
time.sleep(2)
# handle_obj_us = CalculateMean(site_name='us', year=2025, week=week)
# handle_obj_us.run()
# handle_obj_us.sendMessage(week, site_name='us')
handle_obj_uk = CalculateMean(site_name='uk', year=2025, week=week)
handle_obj_uk.run()
handle_obj_uk.sendMessage(week, site_name='uk')
handle_obj_de = CalculateMean(site_name='de', year=2025, week=week)
handle_obj_de.run()
handle_obj_de.sendMessage(week, site_name='de')
handle_obj_us = CalculateMean(site_name='us', year=2025, week=week)
handle_obj_us.run()
handle_obj_us.sendMessage(week, site_name='us')
#handle_obj_uk = CalculateMean(site_name='uk', year=2025, week=week)
#handle_obj_uk.run()
# handle_obj_uk.sendMessage(week, site_name='uk')
#handle_obj_de = CalculateMean(site_name='de', year=2025, week=week)
#handle_obj_de.run()
#handle_obj_de.sendMessage(week, site_name='de')
# handle_obj_fr = CalculateMean(site_name='fr', year=2025, week=week)
# handle_obj_fr.run()
# handle_obj_fr.sendMessage(week, site_name='fr')
......
import sys
import os
import sys
import traceback
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from amazon_params import py_ja3
from utils.requests_param import Requests_param_val
from utils.db_connect import BaseUtils
from amazon_params.params import DB_CONN_DICT
import pymysql
import random
import time
from lxml import etree
......@@ -20,6 +18,7 @@ import datetime
import requests
from amazon_spider.VPS_IP import pppoe_ip
from curl_cffi import requests as requests2
from utils.secure_db_client import get_remote_engine
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
sess = requests.Session()
......@@ -57,24 +56,43 @@ class bsr_catgory(BaseUtils):
self.columns = ['cate_current_id', 'asin', 'bsr_rank', 'price', 'rating', 'total_comments', "week",
"year_month", 'category_id']
def db_engine_us(self, site_name, db_type):
engine_mysql = get_remote_engine(
site_name=site_name, # -> database "selection"
db_type=db_type, # -> 服务端 alias "mysql"
)
return engine_mysql
def db_cursor_connect_update(self, sql, site):
for i in range(3):
try:
engine_us_mysql = self.db_engine_us(site, 'mysql')
print('更新sql:', sql)
with engine_us_mysql.begin() as conn:
conn.execute(sql)
break
except:
print(site, 'db_cursor_connect 报错:', sql)
def db_cursor_connect_msyql_read(self, site=None,select_state1_sql=None):
for i in range(3):
try:
if site:
engine_us_mysql = self.db_engine_us(site, 'mysql')
else:
engine_us_mysql = self.db_engine_us(self.site_name, 'mysql')
print(select_state1_sql)
df = engine_us_mysql.read_sql(select_state1_sql)
return df
except Exception as e:
import traceback
traceback.print_exc() # ★ 打印完整栈到终端
print(e, 'db_cursor_connect_msyql_read 报错:', select_state1_sql)
def init_db(self, site_name):
self.engine = self.mysql_connect()
self.engine_pg6 = self.pg_connect_6()
if site_name == 'us':
self.connect = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'],
user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'], database="selection", charset="utf8mb4")
else:
self.connect = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'],
user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'], database="selection_" + site_name,
charset="utf8mb4")
connect_us = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'],
user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'], database="selection", charset="utf8mb4")
cursor_us = connect_us.cursor()
self.cursor = self.connect.cursor()
if site_name == "us":
self.site_url = 'https://www.amazon.com'
self.host = 'www.amazon.com'
......@@ -95,12 +113,11 @@ class bsr_catgory(BaseUtils):
self.host = 'www.amazon.it'
self.reuests_para_val = Requests_param_val(site_name=self.site_name)
self.year_month = f'{self.year}_{self.month}'
sele_sql = f"SELECT week FROM week_20_to_30 WHERE `year_month`='{self.year}_{self.month}'"
sele_sql = f"SELECT `week` FROM week_20_to_30 WHERE `year_month`='{self.year}_{self.month}'"
print(sele_sql)
cursor_us.execute(sele_sql)
year_week = cursor_us.fetchall()
self.year_week = year_week[-1]
print(self.year_week[0])
df_year_week = self.db_cursor_connect_msyql_read(site='us',select_state1_sql=sele_sql)
self.year_week = list(df_year_week['week'])[-1]
print(self.year_week, '====当前周===1232333')
def safeIndex(self, list: list, index: int, default: object = None):
"""
......@@ -178,16 +195,16 @@ class bsr_catgory(BaseUtils):
print('解析失败')
try:
account = 'pengyanbing'
title = site + ' bsr 榜单'
title = self.site_name + ' bsr 榜单'
content = f' bsr 榜单解析 url 失败 节点数:{nodes_num} \n 解析url:{url}'
db_class = connect_db(site)
db_class = connect_db(self.site_name)
db_class.send_mg(account, title, content)
except:
pass
return items
def html_4(self, bum):
print('bum 当前请求的层级:',bum)
print('bum 当前请求的层级:', bum)
while True:
if self.name_path_queue.empty() == False:
querys = self.name_path_queue.get()
......@@ -313,7 +330,7 @@ class bsr_catgory(BaseUtils):
bsr_url_category_id_dict = self.parse_url(bum - 1, bsr_url)
print('bsr_url_category_id_dict', bsr_url_category_id_dict)
redirect_first_id_list = []
print('获取下一级数量:',len(a_2_list))
print('获取下一级数量:', len(a_2_list))
if a_2_list:
self.id_and_en_name_list.append((bsr_id, h1_bsr_name, 1, bsr_url_category_id_dict['category_id'],
bsr_url_category_id_dict['category_parent_id']))
......@@ -362,11 +379,11 @@ class bsr_catgory(BaseUtils):
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name}")]/parent::div/parent::div//a/@href',
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name}")]/parent::div/parent::div/parent::div//a/@href',
f'//div[@role="treeitem"]/span[contains(text(),"{bsr_class_name[0:5]}")]/parent::div/parent::div/parent::div//a/@href']
print('获取同类节点数量,',len(xpath_herf_list))
print('获取同类节点数量,', len(xpath_herf_list))
for xpath_herf in xpath_herf_list:
a_herf_list = response.xpath(xpath_herf)
if a_herf_list:
print('获取同类节点数量',self.site_url + a_herf_list[0])
print('获取同类节点数量', self.site_url + a_herf_list[0])
a_url_dict = self.parse_url(bum, self.site_url + a_herf_list[0])
print('获取同类节点数量 a_url_dict1111:::', a_url_dict)
redirect_first_id_list.append(
......@@ -378,7 +395,7 @@ class bsr_catgory(BaseUtils):
if redirect_first_id_list:
# 获取当前请求url 分类id。对 first_id 拼接
self.redirect_first_id_list.append([bsr_url, '|-|'.join(redirect_first_id_list[0])])
print(self.year_week[0], '====当前周===', self.week)
print(self.year_week, '====当前周===', self.week)
ele_next = response.xpath(
'//a[contains(text(), "Weiter")]/@href|//a[contains(text(), "Next")]/@href|//a[contains(text(), "Page suivante")]/@href|//a[contains(text(), "Suivant")]/@href|//a[contains(text(), "Avanti")]/@href|//a[contains(text(), "Siguiente")]/@href|//a[contains(text(), "Seite")]/@href|//a[contains(text(), "siguiente")]/@href|//a[contains(text(), "Pagina")]/@href')
data_client_recs_list = response.xpath(
......@@ -551,7 +568,7 @@ class bsr_catgory(BaseUtils):
df['date_info'] = self.time_strftime_
df.drop_duplicates(['asin', 'bsr_rank', 'cate_current_id'], inplace=True) # 去重
print(df.shape)
df.to_sql(f'{self.site_name}_bs_category_top100_asin', con=self.engine, if_exists="append", index=False)
self.engine.to_sql(df, f'{self.site_name}_bs_category_top100_asin', if_exists="append")
self.item_list = []
except Exception as e:
print(e, '报错存储')
......@@ -587,8 +604,8 @@ class bsr_catgory(BaseUtils):
order by {self.site_name}_bs_category.category_id, category_parent_id;
"""
print('path_sql:', path_sql)
self.cursor.execute(path_sql)
exist_rows = self.cursor.fetchall()
df_exist_rows = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=path_sql)
exist_rows = df_exist_rows.values.tolist()
group1_id = []
group2_id = []
for row in exist_rows:
......@@ -605,16 +622,14 @@ class bsr_catgory(BaseUtils):
else:
sql1 = f"update {self.site_name}_bs_category set delete_time = null where id in {tuple(group1_id)}"
print('sql1::', sql1)
self.cursor.execute(sql1)
self.connect.commit()
self.db_cursor_connect_update(sql1, self.site_name)
if group2_id:
if len(group2_id) == 1:
sql2 = f"update {self.site_name}_bs_category set delete_time = '{_strftime_}' where id in ({group2_id[0]})"
else:
sql2 = f"update {self.site_name}_bs_category set delete_time = '{_strftime_}' where id in {tuple(group2_id)}"
print('sql2::', sql2)
self.cursor.execute(sql2)
self.connect.commit()
self.db_cursor_connect_update(sql2, self.site_name)
except Exception as e:
print('更新 delete_time 报错', traceback.format_exc())
......@@ -630,15 +645,14 @@ class bsr_catgory(BaseUtils):
print('重定向url:', category_bsr_url)
update_first_id_sql = f'''update {self.site_name}_bs_category set redirect_first_id="{redirect_first_id_name_id_list[1]}" where `path`="{category_bsr_url}"'''
print('update_first_id_sql::', update_first_id_sql)
self.cursor.execute(update_first_id_sql)
self.connect.commit()
self.db_cursor_connect_update(update_first_id_sql, self.site_name)
else:
if len(len_first_name) == 2:
print('重定向url:', category_bsr_url)
update_first_id_sql = f'''update {self.site_name}_bs_category set redirect_first_id="{redirect_first_id_name_id_list[1]}" where `path`="{category_bsr_url}"'''
print('update_first_id_sql::', update_first_id_sql)
self.cursor.execute(update_first_id_sql)
self.connect.commit()
self.db_cursor_connect_update(update_first_id_sql, self.site_name)
except Exception as e:
print('更新 redirect_first_id_name_id_list 报错', traceback.format_exc())
......@@ -649,30 +663,25 @@ class bsr_catgory(BaseUtils):
self.init_db(self.site_name)
for name_num_path in self.insert_list:
print('name_num_path:', name_num_path)
self.connect.ping(reconnect=True)
save_name_num_list = []
while True:
# 不存在就插入
try:
self.connect.ping(reconnect=True)
select_sql_id = f'''SELECT id FROM {self.site_name}_bs_category WHERE `path`="{name_num_path[3]}"'''
self.cursor.execute(select_sql_id)
name_id = self.cursor.fetchone()
if name_id is None:
df_id = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=select_sql_id)
if not df_id.empty:
save_name_num_list.append(name_num_path)
else:
select_sql_name = f'''SELECT en_name FROM {self.site_name}_bs_category WHERE `path`="{name_num_path[3]}" order by id desc '''
self.cursor.execute(select_sql_name)
en_name = self.cursor.fetchone()
print('en_name::', en_name)
if en_name[0] == name_num_path[1]:
df_en_name = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=select_sql_name)
print('en_name::', df_en_name.values)
if df_en_name['en_name'][0] == name_num_path[1]:
pass
else:
_strftime_ = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
update_name_sql = f'''update {self.site_name}_bs_category set delete_time = '2023-06-19 00:00:00' WHERE `path`="{name_num_path[3]}" and delete_time is null'''
print('更新 en_name:', update_name_sql)
self.cursor.execute(update_name_sql)
self.connect.commit()
self.db_cursor_connect_update(update_name_sql, self.site_name)
save_name_num_list.append(name_num_path)
break
except Exception as e:
......@@ -682,21 +691,19 @@ class bsr_catgory(BaseUtils):
continue
# # 插入新的数据
print('save_name_num_list::', save_name_num_list)
self.cursor.executemany(
with self.engine.begin() as conn:
conn.execute(
f"insert into {self.site_name}_bs_category (p_id, en_name, nodes_num,path, category_id, category_parent_id) values (%s, %s,%s, %s,%s, %s)",
save_name_num_list)
self.connect.commit()
self.init_db(self.site_name)
for id_en_mane in self.id_and_en_name_list:
self.connect.ping(reconnect=True)
while True:
try:
# self.id_and_en_name_list.append((bsr_id, h1_bsr_name, 2,bsr_url_category_id_dict['category_id'],bsr_url_category_id_dict['category_parent_id']))
# 更新 and_en_name, 根据 id 更新 分类名称
update_sql = f'update {self.site_name}_bs_category set and_en_name="{id_en_mane[1]}",leaf_node={id_en_mane[2]} WHERE category_id="{id_en_mane[3]}" and category_parent_id="{id_en_mane[4]}"'
print("更新 and_en_name:", update_sql)
self.cursor.execute(update_sql)
self.connect.commit()
self.db_cursor_connect_update(update_sql, self.site_name)
break
except Exception as e:
print('更新失败:', e)
......@@ -709,20 +716,18 @@ class bsr_catgory(BaseUtils):
select id, path,nodes_num from {self.site_name}_bs_category where category_first_id is null and category_parent_id != '0'
and delete_time is null;
"""
self.cursor.execute(sql)
id_path_list = self.cursor.fetchall()
print(id_path_list)
if id_path_list:
df_nodes_num = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=sql)
if not df_nodes_num.empty:
id_path_list = df_nodes_num.values.tolist()
for id_path in id_path_list:
data_dict = self.parse_url(id_path[2], id_path[1])
print(data_dict)
updata_sql = f"""UPDATE {self.site_name}_bs_category set category_first_id="{data_dict['category_first_id']}" WHERE id={id_path[0]}"""
print(updata_sql)
self.cursor.execute(updata_sql)
self.connect.commit()
self.db_cursor_connect_update(updata_sql, self.site_name)
updata_sql_none = f"UPDATE {self.site_name}_bs_category set and_en_name=NULL WHERE and_en_name='None'"
self.cursor.execute(updata_sql_none)
self.connect.commit()
self.db_cursor_connect_update(updata_sql_none, self.site_name)
def run(self):
print(" run 函数 是抓取 分类节点,只新增,不删除")
......@@ -801,22 +806,22 @@ class bsr_catgory(BaseUtils):
def select_id_1(self):
# 查询 子节点的顶级父类id
select_sql_1 = f'select id from {self.site_name}_bs_category where nodes_num=2'
self.cursor.execute(select_sql_1)
id_1_list = self.cursor.fetchall()
for id in id_1_list:
df_id = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=select_sql_1)
df_id_lsit = df_id.values.tolist()
for id in df_id_lsit:
en_name_id_list = []
select_p_id = f"select t3.id,t4.en_name from (select t1.id,t1.parent_id,if(find_in_set(parent_id, @pids) > 0, @pids := concat(@pids, ',',id), 0) as ischild from (select id,p_id as parent_id from {self.site_name}_bs_category t order by p_id,id) t1,(select @pids := {id[0]}) t2) t3 LEFT JOIN {self.site_name}_bs_category t4 on t3.id = t4.id where ischild != 0;"
self.cursor.execute(select_p_id)
all_id_lsit = self.cursor.fetchall()
if all_id_lsit:
print('select_p_id::',select_p_id)
df_all_id = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=select_p_id)
if not df_all_id.empty:
all_id_lsit = df_all_id.values.tolist()
for en_name_id in all_id_lsit:
en_name_id_list.append(en_name_id[0])
id_tuple = tuple(en_name_id_list)
print(len(id_tuple))
try:
update_sql = f'update {self.site_name}_bs_category set one_category_id={id[0]} where id in {id_tuple}'
self.cursor.execute(update_sql)
self.connect.commit()
self.db_cursor_connect_update(update_sql, self.site_name)
except Exception as e:
print(e)
time.sleep(10)
......@@ -826,14 +831,13 @@ class bsr_catgory(BaseUtils):
def db_read_data(self, i, state_sql):
while True:
try:
with self.engine.begin() as conn:
sql_read = f'select id,en_name,`path`,category_id from {self.site_name}_bs_category where nodes_num={i} and category_state=1 and delete_time is NULL'
print(sql_read)
a = conn.execute(sql_read)
df_read = pd.DataFrame(a, columns=['id', 'en_name', 'path', 'category_id'])
df_read = self.engine.read_sql(sql_read)
df_read.drop_duplicates(subset='category_id', inplace=True)
if df_read.shape[0] == 0:
return []
with self.engine.begin() as conn:
index_tuple = tuple(df_read['category_id'])
id_tuple = tuple(df_read['id'])
if state_sql == True:
......@@ -848,6 +852,7 @@ class bsr_catgory(BaseUtils):
sql_update = f"""UPDATE {self.site_name}_bs_category a set category_state=2 where a.id in {id_tuple}"""
print('sql_update:', sql_update)
conn.execute(sql_update)
categorys_list = list(df_read.id.astype(
"U") + '|-|' + df_read.en_name + '|-|' + df_read.path + '|-|' + df_read.category_id)
category_url_list = []
......@@ -877,8 +882,8 @@ class bsr_catgory(BaseUtils):
and delete_time is null
order by category_id,category_first_id
"""
self.cursor.execute(id_sql)
id_tuple = self.cursor.fetchall()
df_id_tuple = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=id_sql)
id_tuple = df_id_tuple.values.tolist()
id_list = []
for id in id_tuple:
id_list.append(id[0])
......@@ -888,8 +893,7 @@ class bsr_catgory(BaseUtils):
update_category_state_sql = f"""
update {self.site_name}_bs_category set category_state=1 where id in {tuple(id_list)}"""
print('update_sql::', update_category_state_sql)
self.cursor.execute(update_category_state_sql)
self.connect.commit()
self.db_cursor_connect_update(update_category_state_sql, self.site_name)
except:
print('报错:update_category_state 函数')
self.init_db(self.site_name)
......@@ -919,16 +923,14 @@ class bsr_catgory(BaseUtils):
def updata_category_state(self):
for i in range(3):
try:
self.connect.ping(reconnect=True)
days = ((datetime.datetime.now()) + datetime.timedelta(days=-8)).strftime("%Y-%m-%d")
_day = ((datetime.datetime.now()) + datetime.timedelta()).strftime("%Y-%m-%d")
update_sql = f'UPDATE {self.site_name}_bs_category set category_state=1;'
self.cursor.execute(update_sql)
self.connect.commit()
self.db_cursor_connect_update(update_sql, self.site_name)
delect_sql = f"DELETE FROM {self.site_name}_bs_category_top100_asin WHERE date_info < '{days}';"
print(delect_sql)
self.cursor.execute(delect_sql)
self.connect.commit()
self.db_cursor_connect_update(delect_sql, self.site_name)
inset_sql = f"""INSERT INTO {self.site_name}_bs_category_reprot (
id, p_id, ch_name, en_name, nodes_num, `path`, created_at, updated_at, is_show,
......@@ -942,14 +944,14 @@ class bsr_catgory(BaseUtils):
'{_day}' as date_info
FROM {self.site_name}_bs_category;"""
print(inset_sql)
self.cursor.execute(inset_sql)
self.connect.commit()
self.db_cursor_connect_update(inset_sql, self.site_name)
_0_days = ((datetime.datetime.now()) + datetime.timedelta(days=0)).strftime("%Y-%m-%d")
select_sql = f"select count(id) FROM {self.site_name}_bs_category_top100_asin WHERE date_info = '{_0_days}';"
print(select_sql)
self.cursor.execute(select_sql)
count_data_num = self.cursor.fetchone()
self.send_ms_count_data_num(self.site_name, count_data_num[0], _0_days)
df_count_data_num = self.db_cursor_connect_msyql_read(site=None,select_state1_sql=select_sql)
count_data_num = df_count_data_num['count(id)'][0]
print('count_data_num::',count_data_num)
self.send_ms_count_data_num(self.site_name, count_data_num, _0_days)
break
except Exception as e:
self.init_db(self.site_name)
......@@ -960,7 +962,8 @@ class bsr_catgory(BaseUtils):
def dele_self_real_spider(self):
print('每天晚上定时删除贺哲的抓取表。用户已经取消收藏店铺')
select_sql = 'select data_id from user_collection_syn where data_type =2'
df_data_id = pd.read_sql(select_sql, con=self.engine_pg6)
df_data_id = self.engine_pg6.read_sql(select_sql)
print('df_data_id::', df_data_id)
for i in range(2):
try:
with self.engine.begin() as conn:
......@@ -968,9 +971,11 @@ class bsr_catgory(BaseUtils):
sql_delete = f"delete from us_self_real_spider where account_id not in ('{tuple(df_data_id.data_id)[0]}') and data_type=9;"
else:
sql_delete = f"delete from us_self_real_spider where account_id not in {tuple(set(df_data_id.data_id))} and data_type=9;"
print(sql_delete)
conn.execute(sql_delete)
break
except:
except Exception as e:
print('dele_self_real_spider', e)
time.sleep(10)
self.init_db('us')
......@@ -988,7 +993,6 @@ class bsr_catgory(BaseUtils):
if __name__ == '__main__':
# pppoe_ip()
print("如果运行 run 函数 有个别类目 别名没有抓取到,结束之后 运行 run_start 抓取,获取所有 and_en_name 为空的类目id,path")
for site in ['us', 'de', 'uk']:
spider_us = bsr_catgory(site_name=site)
......@@ -997,6 +1001,6 @@ if __name__ == '__main__':
spider_us.run_update_redirect_flag()
spider_us.updata_category_first_id()
spider_us.send_ms()
for site in ['us', 'de', 'uk']:
for site in ['us','de', 'uk']:
spider_us = bsr_catgory(site_name=site)
spider_us.updata_category_state()
......@@ -354,7 +354,7 @@ def junglescout_spider(db_base):
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh-TW;q=0.9,zh;q=0.8",
"Cache-Control": "no-cache",
'Cookie': 'Hm_lvt_e0dfc78949a2d7c553713cb5c573a486=1754303192; HMACCOUNT=B44A3AA867DA8C05; _gcl_au=1.1.1206959369.1754303192; _ga=GA1.1.747881747.1754303192; MEIQIA_TRACK_ID=30ooGiCl3FRjp9OxyAufmK3iejx; MEIQIA_VISIT_ID=30ooGnIqlsOe6yn29kfqf4EnQuF; ecookie=xzfwf4ZvXd9I4bOT_CN; c57b73d8686537c32dea=36830b46c328d771081e3a79f5c51e04; _fp=65dbbe41a37f8f9fbe702eba96328267; _gaf_fp=9ac0984901990f4b1772551060468cf0; rank-login-user=7970634571pnkdYV6Hfb0IWfvuV/gc88fclJAm6p5JWeQpD30JCgc6kY0X2uN+iF7vjvKtVgBU; rank-login-user-info="eyJuaWNrbmFtZSI6IuW4heWTpSIsImlzQWRtaW4iOmZhbHNlLCJhY2NvdW50IjoiMTgzKioqKjczNDciLCJ0b2tlbiI6Ijc5NzA2MzQ1NzFwbmtkWVY2SGZiMElXZnZ1Vi9nYzg4ZmNsSkFtNnA1SldlUXBEMzBKQ2djNmtZMFgydU4raUY3dmp2S3RWZ0JVIn0="; Sprite-X-Token=eyJhbGciOiJSUzI1NiIsImtpZCI6IjE2Nzk5NjI2YmZlMDQzZTBiYzI5NTEwMTE4ODA3YWExIn0.eyJqdGkiOiJvQjhGc25vZ0ludmp5S3luRmlsSjdnIiwiaWF0IjoxNzU0MzAzMTk3LCJleHAiOjE3NTQzODk1OTcsIm5iZiI6MTc1NDMwMzEzNywic3ViIjoieXVueWEiLCJpc3MiOiJyYW5rIiwiYXVkIjoic2VsbGVyU3BhY2UiLCJpZCI6MTQ2NjIxNSwicGkiOm51bGwsIm5uIjoi5biF5ZOlIiwic3lzIjoiU1NfQ04iLCJlZCI6Ik4iLCJwaG4iOiIxODMwNzk2NzM0NyIsImVtIjoiMzE1OTk4MDg5MkBxcS5jb20iLCJtbCI6IkcifQ.OOEsxsBWHf6J1ta8ueS0i-8fVxuxstNOtoJ2gWSxcJwr6UbRMiHiXqo3fNwkwzYrBjp75oz7xbdaui3LPu90-VZCUyh5lXoiFBjZD-iVJcQNTqkfYbV3siHtjRS27LBBh4UJLRRdSAfxP5iZscz640WHj9PupOXYUDPbljOsWOC4jBYSY3Ek3ikxH70BFluOvrD8kpwfQvbhmue_0fZAqu-rACr3ed5cpDUc3YQiFH7sDRkV0FJv4SLLm1qxLvSo4RmNftfYUBggsLl7qM0tQyBQh2BooUIt8ZBldTmtUdJiz9shLu1kYyv_zzoXtgfMmpdNADM85W0INKp1u5DGAg; ao_lo_to_n="7970634571pnkdYV6Hfb0IWfvuV/gc85dImjms+dJ7IrpjIs0CNJBquIGSx1xPUHU/OAMezoHKbqvLvZZuXrKHmPj6PK6OtV+0hL1+N+4daHAf8FeCzWg="; JSESSIONID=421BD32330EB1F2A12E2571E4D00CE8F; _ga_38NCVF2XST=GS2.1.s1754303191$o1$g1$t1754303203$j48$l0$h314662329; Hm_lpvt_e0dfc78949a2d7c553713cb5c573a486=1754303203; _ga_CN0F80S6GL=GS2.1.s1754303192$o1$g1$t1754303204$j48$l0$h0',
'Cookie': 'Hm_lvt_e0dfc78949a2d7c553713cb5c573a486=1754561346; HMACCOUNT=F022519658636435; _ga=GA1.1.1814436837.1754561346; MEIQIA_TRACK_ID=30xFWMfHnmUko4gRxOqdrJNPOcY; MEIQIA_VISIT_ID=30xFWMuukIH8mg0Y3QtIVUHsOlv; ecookie=6fLTD5dFES0wy5bJ_CN; 5d6b3550f67a0d98a3f2=300e7c0221464bf96a29eee60c456f00; _fp=65dbbe41a37f8f9fbe702eba96328267; _gaf_fp=395de153390ead6f40f8e8ab7a472e28; _gcl_au=1.1.174716114.1754561346.1679700600.1754561355.1754561355; current_guest=qtmXt8RNChOi_250807-180618; rank-login-user=55981645716j2gNzbXWw3NxEgY4QumA2+nJmFK4cRNMQNZD9W4ScvveWtruw9iXoAChaMVh09V; rank-login-user-info="eyJuaWNrbmFtZSI6Iuilv+mXqOWQuembqiIsImlzQWRtaW4iOmZhbHNlLCJhY2NvdW50IjoiMTMzKioqKjU0MDciLCJ0b2tlbiI6IjU1OTgxNjQ1NzE2ajJnTnpiWFd3M054RWdZNFF1bUEyK25KbUZLNGNSTk1RTlpEOVc0U2N2dmVXdHJ1dzlpWG9BQ2hhTVZoMDlWIn0="; Sprite-X-Token=eyJhbGciOiJSUzI1NiIsImtpZCI6IjE2Nzk5NjI2YmZlMDQzZTBiYzI5NTEwMTE4ODA3YWExIn0.eyJqdGkiOiJOQlhkbHU4eTlSZXdwS2doOWpJVzJBIiwiaWF0IjoxNzU0NTYxMzU1LCJleHAiOjE3NTQ2NDc3NTUsIm5iZiI6MTc1NDU2MTI5NSwic3ViIjoieXVueWEiLCJpc3MiOiJyYW5rIiwiYXVkIjoic2VsbGVyU3BhY2UiLCJpZCI6MTMzNDkzLCJwaSI6bnVsbCwibm4iOiLopb_pl6jlkLnpm6oiLCJzeXMiOiJTU19DTiIsImVkIjoiTiIsInBobiI6IjEzMzkyNDE1NDA3IiwiZW0iOiJxcTE2NTMxMjE4NjUzQDE2My5jb20iLCJtbCI6IkcifQ.csgALwYW8BkMpPMNB_LWfTmx9J4lYpLbqZW95ikqbz02AjJLMkoR8SmYfs_l5Y8_kr91FN-mMNG0_uG6LlMZg1_I_OWTX1GIVEqixiM4LnXO31VMK3yPCTEdFAUNosLKmXaLBsAkyovg82onGSOX3Sp8yy3QzCwREZc0TEVAONK7vBp0fFheyZNwejzyBfw_b7NPkFkTfvwzZo25QaHJyfkh0hxYgwtoDPSS_FmKrkpyh_zjbk7QIpJhY98k3ElI2OjdeAcE0ublxLemPI8GCwvj_V26Ob3mJ0WnvwyM5e2XBdCXF3Tn1OjOWvNP_fFr9TKDHguKLfZZzLOIO9gmkQ; ao_lo_to_n="55981645716j2gNzbXWw3NxEgY4QumA0yFbZjZZBIPjXVnHzCoK/gvYEiwOtDSpCKptN3oC6H7pg4af19gw3X2vJfRDGlTzgAJp8Uby054LbQLjTr8OLk="; rank-guest-user=6598164571W4w7830gIdYfyJ4dBpV8rZZnQ5nxne/EL2NJNXxISww1iqfwc5k9B1MBi3+pbbvB; _ga_38NCVF2XST=GS2.1.s1754561346$o1$g1$t1754561361$j45$l0$h2087898121; Hm_lpvt_e0dfc78949a2d7c553713cb5c573a486=1754561362; _ga_CN0F80S6GL=GS2.1.s1754561347$o1$g1$t1754561362$j45$l0$h0; JSESSIONID=012AF629221AF9FF44705008C9CE11D7',
"User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
}
url = "https://www.sellersprite.com/v2/tools/sales-estimator/bsr.json"
......
......@@ -20,6 +20,7 @@ import threading
import pandas as pd
import datetime
import json
from utils.secure_db_client import get_remote_engine
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
sess = requests.Session()
......@@ -54,34 +55,53 @@ class nsr_catgory(BaseUtils):
"year_month", 'category_id']
self.int_parse()
def db_engine_us(self, site_name, db_type):
engine_mysql = get_remote_engine(
site_name=site_name, # -> database "selection"
db_type=db_type, # -> 服务端 alias "mysql"
)
return engine_mysql
def db_cursor_connect_update(self, sql, site):
for i in range(3):
try:
engine_us_mysql = self.db_engine_us(site, 'mysql')
print('更新sql:', sql)
with engine_us_mysql.begin() as conn:
conn.execute(sql)
break
except:
print(site, 'db_cursor_connect 报错:', sql)
def db_cursor_connect_msyql_read(self, site=None, select_state1_sql=None):
for i in range(3):
try:
if site:
engine_us_mysql = self.db_engine_us(site, 'mysql')
else:
engine_us_mysql = self.db_engine_us(self.site_name, 'mysql')
print(select_state1_sql)
df = engine_us_mysql.read_sql(select_state1_sql)
return df
except Exception as e:
import traceback
traceback.print_exc() # ★ 打印完整栈到终端
print(e, 'db_cursor_connect_msyql_read 报错:', select_state1_sql)
def int_parse(self):
self.reuests_para_val = Requests_param_val(site_name=self.site_name, spider="seller_account_product")
self.year_month = f'{self.year}_{self.month}'
sele_sql = f"SELECT week FROM week_20_to_30 WHERE `year_month`='{self.year}_{self.month}'"
print(sele_sql)
self.cursor_us.execute(sele_sql)
year_week = self.cursor_us.fetchall()
self.year_week = year_week[-1]
print(self.year_week[0])
df_year_week = self.db_cursor_connect_msyql_read(site='us', select_state1_sql=sele_sql)
self.year_week = list(df_year_week['week'])[-1]
print(self.year_week, '====当前周===1232333')
def init_db(self, site_name):
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
if site_name == 'us':
self.connect = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'],
user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'], database="selection", charset="utf8mb4")
else:
self.connect = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'],
user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'], database="selection_" + site_name,
charset="utf8mb4")
connect_us = pymysql.connect(host=DB_CONN_DICT['mysql_host'], port=DB_CONN_DICT['mysql_port'],
user=DB_CONN_DICT['mysql_user'],
password=DB_CONN_DICT['mysql_pwd'], database="selection", charset="utf8mb4")
self.cursor_us = connect_us.cursor()
self.cursor = self.connect.cursor()
if site_name == "us":
self.site_url = 'https://www.amazon.com'
self.host = 'www.amazon.com'
......@@ -313,7 +333,7 @@ class nsr_catgory(BaseUtils):
self.id_and_en_name_list.append((bsr_id, h1_bsr_name, 2, bsr_url_category_id_list['category_id'],
bsr_url_category_id_list['category_parent_id']))
print(self.year_week[0], '====当前周===', self.week)
print(self.year_week, '====当前周===', self.week)
ele_next = response.xpath(
'//a[contains(text(), "Weiter")]/@href|//a[contains(text(), "Next")]/@href|//a[contains(text(), "Page suivante")]/@href|//a[contains(text(), "Suivant")]/@href|//a[contains(text(), "Avanti")]/@href|//a[contains(text(), "Siguiente")]/@href|//a[contains(text(), "Seite")]/@href|//a[contains(text(), "siguiente")]/@href|//a[contains(text(), "Pagina")]/@href')
data_client_recs_list = response.xpath(
......@@ -481,8 +501,7 @@ class nsr_catgory(BaseUtils):
print(df['date_info'])
df.drop_duplicates(['asin', 'bsr_rank', 'cate_current_id'], inplace=True) # 去重
print(df.shape, '111111111111')
df.to_sql(f'{self.site_name}_new_releases_top100_asin', con=self.engine, if_exists="append",
index=False)
self.engine.to_sql(df, f'{self.site_name}_new_releases_top100_asin', if_exists="append")
self.item_list = []
except Exception as e:
print(e, '报错存储')
......@@ -490,8 +509,7 @@ class nsr_catgory(BaseUtils):
time.sleep(10)
continue
try:
df.to_sql(f'{self.site_name}_new_releases_top100_asin', con=self.engine_pg, if_exists="append",
index=False)
self.engine_pg.to_sql(df, f'{self.site_name}_new_releases_top100_asin', if_exists="append")
except Exception as e:
print(e, '报错存储')
self.engine = self.mysql_connect()
......@@ -511,36 +529,6 @@ class nsr_catgory(BaseUtils):
self.headers_num_int = 0
def update_delete_time(self):
# try:
# print(self.catgory_next_path_list)
# # 更新 分类 url 已经不在亚马逊new_releases榜单下
# if self.catgory_next_path_list:
# id_list = []
# path_list = []
# for category_path in self.catgory_next_path_list:
# category_url_list = category_path[1].split('|-|')
# print('category_url_list::', category_url_list)
# # 根据p_id 查所有的path
# path_sql = f'SELECT `path`,id FROM {self.site_name}_new_releases WHERE p_id={category_path[0]}'
# self.cursor.execute(path_sql)
# path_id_list = self.cursor.fetchall()
# for path_id in path_id_list:
# if path_id[0] not in category_url_list:
# print(path_id[1])
# print(path_id[0])
# id_list.append(path_id[1]) # id
# path_list.append(path_id[0]) # 链接
# _strftime_ = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# if path_list:
# if len(path_list) == 1:
# update_sql = f'update {self.site_name}_new_releases set delete_time="{_strftime_}" where `path` in ("{path_list[0]}")'
# else:
# update_sql = f'update {self.site_name}_new_releases set delete_time="{_strftime_}" where `path` in {tuple(path_list)}'
# print('update_sql::', update_sql)
# self.cursor.execute(update_sql)
# self.connect.commit()
# except Exception as e:
# print('更新 delete_time 报错', traceback.format_exc(e))
try:
if self.catgory_next_path_list:
for category_path in self.catgory_next_path_list:
......@@ -554,8 +542,8 @@ class nsr_catgory(BaseUtils):
order by {self.site_name}_new_releases.category_id, category_parent_id;
"""
print('path_sql:', path_sql)
self.cursor.execute(path_sql)
exist_rows = self.cursor.fetchall()
df_exist_rows = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=path_sql)
exist_rows = df_exist_rows.values.tolist()
group1_id = []
group2_id = []
for row in exist_rows:
......@@ -572,47 +560,40 @@ class nsr_catgory(BaseUtils):
else:
sql1 = f"update {self.site_name}_new_releases set delete_time = null where id in {tuple(group1_id)}"
print('sql1::', sql1)
self.cursor.execute(sql1)
self.connect.commit()
self.db_cursor_connect_update(sql1, self.site_name)
if group2_id:
if len(group2_id) == 1:
sql2 = f"update {self.site_name}_new_releases set delete_time = '{_strftime_}' where id in ({group2_id[0]})"
else:
sql2 = f"update {self.site_name}_new_releases set delete_time = '{_strftime_}' where id in {tuple(group2_id)}"
print('sql2::', sql2)
self.cursor.execute(sql2)
self.connect.commit()
self.db_cursor_connect_update(sql2, self.site_name)
except Exception as e:
print('更新 delete_time 报错', traceback.format_exc())
def save_date(self):
for name_num_path in self.insert_list:
self.connect.ping(reconnect=True)
save_name_num_list = []
while True:
# 不存在就插入
try:
self.connect.ping(reconnect=True)
select_sql_id = f'''SELECT id FROM {self.site_name}_new_releases WHERE `path`="{name_num_path[3]}"'''
print('select_sql_id:', select_sql_id)
self.cursor.execute(select_sql_id)
name_id = self.cursor.fetchone()
if name_id is None:
df_id = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql_id)
if not df_id.empty:
save_name_num_list.append(name_num_path)
else:
select_sql_name = f'''SELECT en_name FROM {self.site_name}_new_releases WHERE `path`="{name_num_path[3]}" order by id desc'''
print('select_sql_name:', select_sql_name)
self.cursor.execute(select_sql_name)
en_name = self.cursor.fetchone()
print(en_name[0], '33333333333333333333333', name_num_path[1])
if en_name[0] == name_num_path[1]:
df_en_name = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql_name)
print(df_en_name['en_name'][0], '33333333333333333333333', name_num_path[1])
if df_en_name['en_name'][0] == name_num_path[1]:
pass
else:
_strftime_ = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
update_name_sql = f'''update {self.site_name}_new_releases set delete_time = '2023-06-19 00:00:00' WHERE `path`="{name_num_path[3]}" and delete_time is null'''
print('更新 en_name:', update_name_sql)
self.cursor.execute(update_name_sql)
self.connect.commit()
self.db_cursor_connect_update(update_name_sql, self.site_name)
save_name_num_list.append(name_num_path)
break
except Exception as e:
......@@ -620,11 +601,13 @@ class nsr_catgory(BaseUtils):
time.sleep(10)
self.init_db(self.site_name)
continue
# # 插入新的数据
self.cursor.executemany(
print('save_name_num_list::', save_name_num_list)
with self.engine.begin() as conn:
conn.execute(
f"insert into {self.site_name}_new_releases (p_id, en_name, nodes_num,path, category_id, category_parent_id) values (%s, %s,%s, %s,%s, %s)",
save_name_num_list)
self.connect.commit()
def run(self):
print(" run 函数 是抓取 分类节点,只新增,不删除")
......@@ -672,39 +655,37 @@ class nsr_catgory(BaseUtils):
# 查询 子节点的顶级父类id
print('查询 子节点的顶级父类id')
select_sql_1 = f'select id from {self.site_name}_new_releases where nodes_num=2'
self.cursor.execute(select_sql_1)
id_1_list = self.cursor.fetchall()
print(id_1_list)
for id in id_1_list:
df_id = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql_1)
df_id_lsit = df_id.values.tolist()
print(df_id_lsit)
for id in df_id_lsit:
try:
en_name_id_list = []
select_p_id = f"select t3.id,t4.en_name from (select t1.id,t1.parent_id,if(find_in_set(parent_id, @pids) > 0, @pids := concat(@pids, ',',id), 0) as ischild from (select id,p_id as parent_id from {self.site_name}_new_releases t order by p_id,id) t1,(select @pids := {id[0]}) t2) t3 LEFT JOIN {self.site_name}_new_releases t4 on t3.id = t4.id where ischild != 0;"
print("select_p_id:", select_p_id)
self.cursor.execute(select_p_id)
all_id_lsit = self.cursor.fetchall()
if all_id_lsit:
print('select_p_id::', select_p_id)
df_all_id = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_p_id)
if not df_all_id.empty:
all_id_lsit = df_all_id.values.tolist()
for en_name_id in all_id_lsit:
en_name_id_list.append(en_name_id[0])
id_tuple = tuple(en_name_id_list)
print(len(id_tuple))
update_sql = f'update {self.site_name}_new_releases set one_category_id={id[0]} where id in {id_tuple}'
self.cursor.execute(update_sql)
self.connect.commit()
self.db_cursor_connect_update(update_sql, self.site_name)
except:
pass
def db_read_data(self, i):
while True:
try:
with self.engine.begin() as conn:
sql_read = f'select id,en_name,`path`,category_id from {self.site_name}_new_releases where nodes_num={i} and category_state=1 and delete_time is NULL'
print(sql_read)
a = conn.execute(sql_read)
df_read = pd.DataFrame(a, columns=['id', 'en_name', 'path', 'category_id'])
df_read = self.engine.read_sql(sql_read)
df_read.drop_duplicates(subset='category_id', inplace=True)
print('df_read::', df_read)
if df_read.shape[0] == 0:
return []
with self.engine.begin() as conn:
index_tuple = tuple(df_read['category_id'])
if len(index_tuple) == 1:
sql_update = f"""UPDATE {self.site_name}_new_releases a set category_state=2 where a.category_id in ("{index_tuple[0]}")"""
......@@ -758,30 +739,27 @@ class nsr_catgory(BaseUtils):
def updata_category_state(self):
for i in range(3):
try:
self.connect.ping(reconnect=True)
update_sql = f'UPDATE {self.site_name}_new_releases set category_state=1;'
print(update_sql)
self.cursor.execute(update_sql)
self.connect.commit()
self.db_cursor_connect_update(update_sql, self.site_name)
days = ((datetime.datetime.now()) + datetime.timedelta(days=-6)).strftime("%Y-%m-%d")
delect_sql = f"DELETE FROM {self.site_name}_new_releases_top100_asin WHERE date_info < '{days}';"
print(delect_sql)
self.cursor.execute(delect_sql)
self.connect.commit()
self.db_cursor_connect_update(delect_sql, self.site_name)
_0_days = ((datetime.datetime.now()) + datetime.timedelta(days=0)).strftime("%Y-%m-%d")
select_sql = f"select count(id) FROM {self.site_name}_new_releases_top100_asin WHERE date_info = '{_0_days}';"
print(select_sql)
self.cursor.execute(select_sql)
count_data_num = self.cursor.fetchone()
self.send_ms_count_data_num(self.site_name,count_data_num[0],_0_days)
df_count_data_num = self.db_cursor_connect_msyql_read(site=None, select_state1_sql=select_sql)
count_data_num = df_count_data_num['count(id)'][0]
print('count_data_num::', count_data_num)
self.send_ms_count_data_num(self.site_name,count_data_num,_0_days)
break
except Exception as e:
print(e, '222222222')
time.sleep(20)
continue
updata_sql_none = f"UPDATE {self.site_name}_new_releases set and_en_name=NULL WHERE and_en_name='None'"
self.cursor.execute(updata_sql_none)
self.connect.commit()
self.db_cursor_connect_update(updata_sql_none, self.site_name)
def update_sql(self):
......
......@@ -76,7 +76,6 @@ class async_account_name_products(BaseUtils):
self.engine_pg6 = self.pg_connect_6()
self.engine = self.mysql_connect()
self.db_user_collection_syn = self.site_name + DB_REQUESTS_ASIN_PARAMS['db_user_collection_syn'][2:]
self.db_self_product_detail = self.site_name + DB_REQUESTS_ASIN_PARAMS['db_self_product_detail'][2:]
def get_product(self, t_num):
while True:
......
category_name,text
"Policies, agreements, and guidelines",Amazon Services Business Solutions AgreementChanges to the Amazon Services Business Solutions AgreementProgram PoliciesChanges to program policiesIntellectual Property for Rights OwnersInternational selling agreementsAdditional GuidelinesAbout seller facial dataAbout Brand Registry facial data verificationUse of business credit reportsDirective on Administrative Cooperation–7th Amendment (DAC7)About the INFORM Consumers ActReport Infringement form: Enter ASINs in bulkUsing the Report Infringement form
......@@ -104,8 +104,8 @@ if __name__ == '__main__':
# 根据 engine 选择那个库。爬虫库 14, 抓取me搜索词是6,爬虫一般使用14,根据情况调整
month = 7
engine_db_num = 14
# for site in ['de','uk']:
for site in ['us']:
for site in ['de']:
# for site in ['us']:
time.sleep(0)
count_all_syn_st_id(site_name=site,month=month).get_minid_maxid()
......@@ -79,34 +79,29 @@ class Save_asin_detail(BaseUtils):
else:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
lock_key = f"{self.db_syn}_{self.month}_lock"
lock = self.redis_client.lock(lock_key, timeout=25) # 10秒超时
with self.engine_pg.begin() as conn:
sql_read = f"SELECT asin, id, date_info, asin_is_variation,data_type,volume,weight_str FROM {self.db_syn}_{self.month} WHERE STATE = 1 ORDER BY id FOR UPDATE SKIP LOCKED LIMIT {self.read_size}"
print(sql_read)
self.df_read = self.engine_pg.read_sql(sql_read)
self.df_read = self.engine_pg.read_then_update(
select_sql=sql_read,
update_table=f"{self.db_syn}_{self.month}",
set_values={"state": 2}, # 把库存清零
where_keys=["id"], # WHERE sku = :sku
)
self.df_read.drop_duplicates(['asin'], inplace=True)
if self.df_read.shape[0] > 0:
self.index_tuple = tuple(self.df_read['id'])
print(self.index_tuple,'self.index_tuplself.index_tuplself.index_tupl')
# 使用默认值填充空值
self.df_read['volume'].fillna('null', inplace=True)
self.df_read['weight_str'].fillna('null', inplace=True)
if len(self.index_tuple) == 1:
sql_update = f"""UPDATE {self.db_syn}_{self.month} a set state=2 where a.id in ({self.index_tuple[0]})"""
else:
sql_update = f"""UPDATE {self.db_syn}_{self.month} a set state=2 where a.id in {self.index_tuple}"""
conn.execute(sql_update)
asin_list = list(
self.df_read.asin + '|' + self.df_read.date_info + '|' + self.df_read.asin_is_variation.astype(
"U") + '|' + self.df_read.data_type.astype("U") + '|' + self.df_read.volume.astype(
"U") + '|' + self.df_read.weight_str.astype("U"))
if lock.locked():
lock.release()
return asin_list
else:
if lock.locked():
lock.release()
return []
except LockError:
print("获取锁失败1111,其他程序正在查询")
......@@ -126,35 +121,30 @@ class Save_asin_detail(BaseUtils):
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
if self.minid_maxid_list:
lock_key = f"{self.db_syn}_{self.month}_lock"
lock = self.redis_client.lock(lock_key, timeout=25) # 10秒超时
minid, maxid = self.minid_maxid_list[0].split('-')
with self.engine_pg.begin() as conn:
# sql_read = f"-- SELECT asin, id, date_info, asin_is_variation,data_type,volume,weight_str FROM {self.db_syn}_{self.month} WHERE STATE = 1 and id BETWEEN {minid} AND {maxid} limit {self.read_size} for update"
sql_read = f"SELECT asin, id, date_info, asin_is_variation,data_type,volume,weight_str FROM {self.db_syn}_{self.month} WHERE state = 1 AND id BETWEEN {minid} AND {maxid} ORDER BY id FOR UPDATE SKIP LOCKED LIMIT {self.read_size};"
print(sql_read)
self.df_read = self.engine_pg.read_sql(sql_read)
# self.df_read = self.engine_pg.read_sql(sql_read)
self.df_read = self.engine_pg.read_then_update(
select_sql=sql_read,
update_table=f"{self.db_syn}_{self.month}",
set_values={"state": 2}, # 把库存清零
where_keys=["id"], # WHERE sku = :sku
)
self.df_read.drop_duplicates(['asin'], inplace=True)
if self.df_read.shape[0] > 0:
# 使用默认值填充空值
self.df_read['volume'].fillna('null', inplace=True)
self.df_read['weight_str'].fillna('null', inplace=True)
self.index_tuple = tuple(self.df_read['id'])
if len(self.index_tuple) == 1:
sql_update = f"""UPDATE {self.db_syn}_{self.month} a set state=2 where a.id in ({self.index_tuple[0]})"""
else:
sql_update = f"""UPDATE {self.db_syn}_{self.month} a set state=2 where a.id in {self.index_tuple}"""
conn.execute(sql_update)
asin_list = list(
self.df_read.asin + '|' + self.df_read.date_info + '|' + self.df_read.asin_is_variation.astype(
"U") + '|' + self.df_read.data_type.astype("U") + '|' + self.df_read.volume.astype(
"U") + '|' + self.df_read.weight_str.astype("U"))
if lock.locked():
lock.release()
return asin_list
else:
if lock.locked():
lock.release()
print('重新获取', self.minid_maxid_list[0], '无数据')
self.minid_maxid_list = self.reuests_para_val.get_minid_maxid(site_name=self.site_name,
state=3,
......
......@@ -473,15 +473,13 @@ class async_account_name_products(BaseUtils):
while True:
try:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
with self.engine.begin() as conn:
sql_read = f'SELECT account_name, id, seller_id FROM {self.db_seller_account_syn} WHERE product_state=1 LIMIT {self.read_size} for update;'
print(sql_read)
a = conn.execute(sql_read)
self.df_read = pd.DataFrame(a, columns=['account_name', 'id', 'seller_id'])
self.df_read = self.engine.read_sql(sql_read)
if self.df_read.shape[0] == 0:
self.stop_item_queue = False
return []
with self.engine.begin() as conn:
self.index_tuple = tuple(self.df_read['id'])
if len(self.index_tuple) == 1:
sql_update = f"""UPDATE {self.db_seller_account_syn} a set product_state=2 where a.id in ({self.index_tuple[0]})"""
......@@ -505,7 +503,6 @@ class async_account_name_products(BaseUtils):
df_asin_variation.drop_duplicates(['seller_id', 'asin'], inplace=True) # 去重
self.account_name_list_update = list(df_asin_variation.seller_id) # 获取状态3的店铺名称
if self.asin_detail_list:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
with self.engine_pg.begin() as conn:
if len(set(df_asin_variation.seller_id)) == 1:
......@@ -514,9 +511,8 @@ class async_account_name_products(BaseUtils):
sql_delete = f"delete from {self.db_asin_detail_product}_{self.year_month} where seller_id in {tuple(set(df_asin_variation.seller_id))};"
conn.execute(sql_delete)
print(f"存储店铺asin信息 {self.db_asin_detail_product}_{self.year_month}")
df_asin_variation.to_sql(self.db_asin_detail_product + f'_{self.year_month}', con=self.engine_pg,
if_exists='append',
index=False)
self.engine_pg.to_sql(df_asin_variation,self.db_asin_detail_product + f'_{self.year_month}',
if_exists='append')
self.asin_detail_list = []
break
except Exception as e:
......@@ -544,7 +540,6 @@ class async_account_name_products(BaseUtils):
while True:
try:
if self.seller_account_num_list:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
with self.engine_pg.begin() as conn:
buyBox_list_ = list(set(self.seller_account_num_list))
......@@ -577,7 +572,6 @@ class async_account_name_products(BaseUtils):
while True:
try:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
with self.engine.begin() as conn:
# 1,3:1--回滚;3--成功
if id_tuple:
......
......@@ -305,16 +305,14 @@ class search_temp_pg(BaseUtils):
while True:
try:
self.engine_pg = self.pg_connect()
with self.engine_pg.begin() as conn:
sql_read = f"""SELECT id, search_term, url FROM {self.db_search_term} where state=1 and month={self.month} LIMIT {self.read_size} for update;"""
# sql_read = f"""SELECT id, search_term, url FROM {self.db_search_term} where STATE = 1 and month={self.month} ORDER BY id FOR UPDATE SKIP LOCKED LIMIT {self.read_size};"""
print(sql_read)
a = conn.execute(sql_read)
self.df_read = pd.DataFrame(a, columns=['id', 'search_term', 'url'])
self.df_read = self.engine.read_sql(sql_read)
if self.df_read.shape[0] > 0:
self.id_tuple = tuple(self.df_read.id)
self.date_info = f'2025-{self.month}'
print('date_info::', self.date_info, ' 月:', self.month)
with self.engine_pg.begin() as conn:
if len(self.id_tuple) == 1:
sql_update = f'UPDATE {self.db_search_term} set state=2 where id in ({self.id_tuple[0]});'
else:
......@@ -427,7 +425,7 @@ class search_temp_pg(BaseUtils):
if df.shape[0] > 0:
print("db_name:", db_name)
df['asin'] = df['asin'].str.replace('/', '')
df.to_sql(db_name, con=self.engine_pg, if_exists="append", index=False)
self.engine_pg.to_sql(df, db_name, if_exists="append")
break
except Exception as e:
print(e, f"\n{traceback.format_exc()}")
......@@ -451,12 +449,10 @@ class search_temp_pg(BaseUtils):
print(len(self.sort_all_list))
df_being_sold.drop_duplicates(['search_term', 'quantity_being_sold'], inplace=True) # 去重
if df_being_sold.shape[0] > 0:
df_being_sold.to_sql(f'{self.db_brand_analytics}_{year_moth_list[0]}', con=self.engine_pg,
if_exists='append',
index=False)
self.engine_pg.to_sql(df_being_sold,f'{self.db_brand_analytics}_{year_moth_list[0]}',
if_exists='append')
break
except Exception as e:
print(df_being_sold.values.tolist())
print('db_update_brand::', e, f"\n{traceback.format_exc()}")
time.sleep(5)
continue
......
......@@ -331,16 +331,13 @@ class async_account_feedback(BaseUtils):
while True:
try:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
with self.engine.begin() as conn:
sql_read = f'SELECT url, account_name, id,seller_id FROM {self.db_seller_account_syn} WHERE STATE=1 LIMIT {self.read_size} for update;'
print(sql_read)
a = conn.execute(sql_read)
self.df_read = pd.DataFrame(a, columns=['url', 'account_name', 'id', 'seller_id'])
self.df_read = self.engine.read_sql(sql_read)
if self.df_read.shape[0] == 0:
self.stop_item_queue = False
print(f"**************** {self.site_name} feedback 抓取完毕 **********************")
return []
with self.engine.begin() as conn:
self.index_tuple = tuple(self.df_read['id'])
if len(self.index_tuple) == 1:
sql_update = f"""UPDATE {self.db_seller_account_syn} a set state=2 where a.id in ({self.index_tuple[0]})"""
......@@ -358,7 +355,6 @@ class async_account_feedback(BaseUtils):
def save_data(self):
while True:
try:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
df_account_detail = pd.DataFrame(data=self.account_detail_list, columns=self.cols_db)
df_account_detail.drop_duplicates(['seller_id'], inplace=True) # 去重
......@@ -371,9 +367,8 @@ class async_account_feedback(BaseUtils):
sql_delete = f"delete from {self.db_seller_account_feedback + '_' + self.year_month} where seller_id in {tuple(set(df_account_detail.seller_id))};"
conn.execute(sql_delete)
print(f"feedback 信息 {self.db_seller_account_feedback + '_' + self.year_month}")
df_account_detail.to_sql(self.db_seller_account_feedback + f'_{self.year_month}',
con=self.engine_pg, if_exists='append',
index=False)
self.engine_pg.to_sql(df_account_detail, self.db_seller_account_feedback + f'_{self.year_month}',
if_exists='append')
self.account_detail_list = []
break
except Exception as e:
......@@ -405,7 +400,6 @@ class async_account_feedback(BaseUtils):
while True:
try:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
with self.engine.begin() as conn:
# 1,3:1--回滚;3--成功
if id_tuple:
......
import sys
import os
import sys
from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
......@@ -7,60 +7,85 @@ from multiprocessing import Pool
from amazon_spider.search_term_pg import search_temp_pg
from threading_spider.db_connectivity import connect_db
import time
import pandas as pd
import random
from amazon_spider.VPS_IP import pppoe_ip
from threading_spider.post_to_dolphin import DolphinschedulerHelper
from utils.secure_db_client import get_remote_engine
from utils.db_connect import BaseUtils
def db_engine(site_name, db_type):
"""
"mysql": "mysql", # 阿里云mysql
"postgresql_14": "postgresql_14", # pg14爬虫库-内网
"postgresql_14_outer": "postgresql_14_outer", # pg14爬虫库-外网
"postgresql_15": "postgresql_15", # pg15正式库-内网
"postgresql_15_outer": "postgresql_15_outer", # pg15正式库-外网
"postgresql_cluster": "postgresql_cluster", # pg集群-内网
"postgresql_cluster_outer": "postgresql_cluster_outer", # pg集群-外网
"doris": "doris", # doris集群-内网
"""
engine_mysql = get_remote_engine(
site_name=site_name, # -> database "selection"
db_type=db_type, # -> 服务端 alias "mysql"
)
return engine_mysql
def select_search_term_state(engine_pg, month, site):
def select_search_term_state(month, site):
for i in range(5):
try:
sql_read = f"SELECT id FROM {site}_search_term_month_syn where state in (1,2) and date_info='2025-{month}' LIMIT 1"
print(sql_read)
df = pd.read_sql(sql_read, con=engine_pg)
id_tuple = list(df.id)
if id_tuple:
engine_pg = db_engine(site, 'postgresql_14_outer')
df = engine_pg.read_sql(sql_read)
if not df.empty:
id_tuple = [1]
else:
id_tuple = None
return id_tuple
except:
db_class = connect_db(site)
engine_pg = db_class.pg_db() # pg
except Exception as e:
print(e, '报错11。', sql_read)
return 1
def db_cursor_connect_update(sql, site):
for i in range(3):
try:
engine_us_mysql = db_engine('us', 'mysql')
print('更新sql:', sql)
with engine_us_mysql.begin() as conn:
conn.execute(sql)
break
except:
print(site, 'db_cursor_connect 报错:', sql)
def select_sate_mysql(site, num=None, month=None, week=None):
db_class = connect_db(site)
engine_pg = db_class.pg_db() # pg
cursor_us, connect_us = db_class.us_mysql_db() # us站点
print('month::', month)
if num == 1:
sql_select_ = f"select status_val from workflow_progress where date_info='2025-{week}' and date_type='week' and page='反查搜索词' and site_name='{site}'"
print(sql_select_)
cursor_us.execute(sql_select_)
# 获取结果
status_dict = cursor_us.fetchone()
print(status_dict)
if int(status_dict[0]) in (1, 2):
engine_us_mysql = db_engine('us', 'mysql')
df = engine_us_mysql.read_sql(sql_select_)
if int(df.status_val[0]) in (1, 2):
update_workflow_progress = f"update workflow_progress set status_val=3,status='抓取结束' where page='反查搜索词' and date_info='2025-{week}' and site_name='{site}' and date_type='week'"
print('update_workflow_progress: 修改状态3 ', update_workflow_progress)
cursor_us.execute(update_workflow_progress)
connect_us.commit()
db_cursor_connect_update(update_workflow_progress, site)
account = 'pengyanbing'
title = site + '站点 搜索词'
content = f'{month} 月 搜索词 已结束,请确认下一步流程!时间:'
db_class.send_mg(account, title, content)
ii = 0
for i in range(11):
id_tuple = select_search_term_state(engine_pg, month, site)
id_tuple = select_search_term_state(month, site)
time.sleep(180)
if id_tuple is None:
ii += 1
if ii > 8:
break
redis_client = BaseUtils().redis_db()
lock_key = "ALL站点-asin同步-pg-api_lock"
lock = redis_client.lock(lock_key, timeout=5) # 10秒超时
if id_tuple is None:
DolphinschedulerHelper.start_process_instance_common(
project_name="big_data_selection",
......@@ -75,14 +100,18 @@ def select_sate_mysql(site, num=None, month=None, week=None):
title = site + '站点 搜索词'
content = f'{month} 月 搜索词 已结束,成功调度 ALL站点-asin同步-pg-api'
db_class.send_mg(account, title, content)
if lock.locked():
lock.release()
return True
if lock.locked():
lock.release()
else:
print('5555555555555555555555555555555555')
return False
if num == 3:
# 搜索词多进程已经抓完。最后执行单进程抓取。
id_tuple = select_search_term_state(engine_pg, month, site)
id_tuple = select_search_term_state(month, site)
if id_tuple is None:
select_sate_mysql(site, num=1, month=month, week=week)
return False
......@@ -98,7 +127,7 @@ def long_time_task(site, proxy_name, month):
if __name__ == '__main__':
pppoe_ip()
site_list = ['us','de','uk']
site_list = ['us', 'de', 'uk']
month = int(sys.argv[1])
week = int(sys.argv[2])
proxy_name = None
......
import sys
import os
import sys
from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
import time, random
import multiprocessing
from amazon_spider.seller_account_feedback import async_account_feedback
from amazon_spider.products_asin import async_account_name_products
from threading_spider.db_connectivity import connect_db
from amazon_spider.VPS_IP import pppoe_ip
from utils.secure_db_client import get_remote_engine
def db_engine(site_name, db_type):
"""
"mysql": "mysql", # 阿里云mysql
"postgresql_14": "postgresql_14", # pg14爬虫库-内网
"postgresql_14_outer": "postgresql_14_outer", # pg14爬虫库-外网
"postgresql_15": "postgresql_15", # pg15正式库-内网
"postgresql_15_outer": "postgresql_15_outer", # pg15正式库-外网
"postgresql_cluster": "postgresql_cluster", # pg集群-内网
"postgresql_cluster_outer": "postgresql_cluster_outer", # pg集群-外网
"doris": "doris", # doris集群-内网
"""
engine_mysql = get_remote_engine(
site_name=site_name, # -> database "selection"
db_type=db_type, # -> 服务端 alias "mysql"
)
return engine_mysql
def select_sate_mysql(site, num=None, page=None, week=None,table_name=None):
db_class = connect_db(site)
cursor_mysql_db, connect_mysql_db = db_class.mysql_db() # mysql
cursor_us, connect_us = db_class.us_mysql_db() # us站点
def select_sate_mysql(site, num=None, page=None, week=None, table_name=None):
print('当前抓取周::', week)
if int(week) < 10:
week = f'0{int(week)}'
......@@ -23,19 +39,16 @@ def select_sate_mysql(site, num=None, page=None, week=None,table_name=None):
print('查询店铺产品。 feedback 抓取状态')
sql_read = f"SELECT status_val FROM workflow_progress where page='{page}' and date_info='2025-{week}' and site_name='{site}' and date_type='week'"
print(sql_read)
cursor_us.execute(sql_read)
status_val_tuple = cursor_us.fetchone()
print(status_val_tuple)
if status_val_tuple[0] == 3:
engine_pg = db_engine('us', 'mysql')
df = engine_pg.read_sql(sql_read)
print(df.status_val)
if df.status_val[0] == 3:
print(site, page, '已完成')
return False
else:
print(f'开始 {page}')
return True
cursor_mysql_db.close()
connect_mysql_db.close()
cursor_us.close()
connect_us.close()
def feedback_products(site, proxy_name, week):
print('抓取店铺')
......@@ -49,12 +62,14 @@ def feedback_products(site, proxy_name, week):
products_asin_us = async_account_name_products(site, read_size=200, proxy_name=proxy_name, week=week)
products_asin_us.run()
def long_time_task(site, proxy_name, week):
feedback_products(site, proxy_name, week)
if __name__ == '__main__':
pppoe_ip()
site_list = ['us','de','uk','fr','es','it']
site_list = ['us', 'de', 'uk', 'fr', 'es', 'it']
week = int(sys.argv[1])
proxy_name = None
for site in site_list:
......
......@@ -86,6 +86,7 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
lock_key = f"{year_week}_{site}_lock"
lock = redis_client.lock(lock_key, timeout=5) # 10秒超时
select_sql = f"select status_val from workflow_progress WHERE date_info='{year_week}' and date_type='week' and site_name='{site}' and page='ASIN详情'"
print(select_sql)
df_state = db_cursor_connect_msyql_read(select_sql)
print('df_state.status_val::', df_state.status_val)
if int(df_state.status_val[0]) != 3:
......
......@@ -80,6 +80,13 @@ class BaseUtils(object):
# time.sleep(3)
# continue
def pg_connect_6(self):
engine_pg15 = get_remote_engine(
site_name=self.site_name, # -> database "selection"
db_type="postgresql_15_outer", # -> 服务端 alias "mysql"
)
print('engine_pg15::', engine_pg15)
return engine_pg15
def pg_reconnect(self, table_name=None, e=None):
self.engine_pg = self.pg_connect()
......
......@@ -327,10 +327,10 @@ class Requests_param_val(BaseUtils):
print(sql_update)
conn.execute(sql_update)
sql_read = f"""SELECT id, minid_maxid FROM {self.site_name}_syn_asin_all_minid_maxid WHERE STATE = 1 and yaer_week = '{year_week}' LIMIT 1;"""
a = conn.execute(sql_read)
df_read = pd.DataFrame(a, columns=['id', 'minid_maxid'])
df_read = self.engine.read_sql(sql_read)
if df_read.shape[0] > 0:
minid_maxid_list = list(df_read.minid_maxid)
print(minid_maxid_list)
else:
minid_maxid_list = []
print('获取id 区间 self.minid_maxid_list:::', minid_maxid_list)
......@@ -349,4 +349,4 @@ class Requests_param_val(BaseUtils):
md5_hex_digest = md5_hash.hexdigest()
return md5_hex_digest
if __name__ == '__main__':
Requests_param_val(site_name='us').get_minid_maxid()
Requests_param_val(site_name='uk').get_minid_maxid(month='07',state=1)
......@@ -29,8 +29,9 @@ db_type_alias_map = {
DEFAULT_SERVERS = [
# "http://192.168.200.210:7777", # 内网
# "http://192.168.10.217:7777", # 内网-h7
# "http://61.145.136.61:7777", # 外网
"http://61.145.136.61:7777", # 外网
"http://61.145.136.61:7779", # 外网
]
# ---------------------------
......@@ -200,6 +201,32 @@ class RemoteEngine:
# "data": df_to_json_records(df), # ← 清洗后的 records
"site_name": self.database})
def read_then_update(
self,
select_sql: str,
update_table: str,
set_values: dict,
where_keys: List[str],
error_if_empty: bool = False,
):
"""
动态生成 UPDATE:把 select_sql 读到的行,按 where_keys 精准更新 set_values
返回 (DataFrame, rows_updated)
"""
payload = {
"db": self.db,
"site_name": self.database,
"select_sql": select_sql,
"update_table": update_table,
"set_values": set_values,
"where_keys": where_keys,
"error_if_empty": error_if_empty,
}
resp = self._request("read_then_update", payload)
df = pd.DataFrame(resp["read_result"])
rows_updated = resp.get("rows_updated", 0)
return df
def begin(self):
return RemoteTransaction(self.db, self.database,
self.session, self.urls)
......
......@@ -564,6 +564,8 @@ class GetSS_details():
day = time.strftime("%d")
for item_id in range(1, 33):
print(f"开始抓取 item_id: {item_id}")
self.page.clear_cache() # 清除浏览器缓存和session信息。下一个账号直接登录。优化上一个账号没有退出导致新账号登录失败
if item_id == 1 and int(day)<2:
Con.update_all_states_to_1(state=2)
......
......@@ -64,20 +64,19 @@ class TkVideo():
def get_datetime(self):
"""获取当前日期,并计算前2天的完整日期(年-月-日),并按照指定格式输出"""
today = datetime.now()
self.deadline = today - timedelta(days=2)
# 获取今天的日期
today = datetime.today().date()
# 开始日期:去年的今天
self.start_date = today.replace(year=today.year - 1)
self.start_year = self.start_date.year
self.start_month = self.start_date.month
self.start_day = self.start_date.day
# 结束日期:今天的前一天
self.end_date = today - timedelta(days=3)
self.end_year = self.end_date.year
self.end_month = self.end_date.month
self.end_day = self.end_date.day
# 提取年、月、日
self.deadline_year = self.deadline.year
self.deadline_month = self.deadline.month # 自动是整数,不带前导零
self.deadline_day = self.deadline.day
# 输出年-月-日格式
print(f'{self.deadline_year}-{self.deadline_month}-{self.deadline_day}')
# 新增:输出 年_月 格式,月份带前导零(例如 06)
self.deadline_year_month = self.deadline.strftime("%Y_%m")
print(self.deadline_year_month) # 输出示例:2025_06
def get_day(self):
try:
......@@ -93,50 +92,37 @@ class TkVideo():
# 先点击开始时间:2024年7月1日
self.page_chrome.ele(
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '2024 7 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper'][1]"
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.start_year} {self.start_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='1']/parent::div"
f"//span[text()='{self.start_day}']/parent::div"
).click()
print('已输入开始时间2024 7 月 1 日')
print(f'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日')
time.sleep(random.randint(3, 5))
# 初始目标日期为 deadline(可能已经是上个月的某一天)
current_date = self.deadline
max_attempts = 31
while max_attempts > 0:
year = current_date.year
month = current_date.month
day = current_date.day
try:
xpath = (
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{year} {month} 月')]"
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='{day}']/parent::div"
f"//span[text()='{self.end_day}']/parent::div"
)
print('结束日期 xpath::', xpath)
ele = self.page_chrome.ele(f"xpath={xpath}", timeout=5)
ele.click()
print(f'✅ 成功点击日期:{year}-{month}-{day}')
# self.day = str(day)
self.day = f"{day:02d}"
# 更新 self.deadline_year_month 为最新选中日期
self.deadline_year_month = current_date.strftime("%Y_%m")
print(f'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}')
self.get_data()
time.sleep(random.randint(3, 5))
return True # 成功返回
except Exception as e:
print(f'❌ 无法点击 {year}-{month}-{day},错误:{e}')
print(f'❌ 无法点击 {self.end_year}-{self.end_month}-{self.end_day},错误:{e}')
# 往前推一天
current_date -= timedelta(days=1)
max_attempts -= 1
time.sleep(random.randint(3, 5))
time.sleep(random.randint(5, 15))
print('⛔ 连续尝试失败,未找到可点击的日期,请检查页面状态或网络连接。')
return False
......@@ -268,8 +254,26 @@ class TkVideo():
def find_specific_file(self):
download_path = Path(self.download_folder)
if self.start_month < 10:
start_month = f'0{self.start_month}'
else:
start_month = self.start_month
if self.start_day < 10:
start_day = f'0{self.start_day}'
else:
start_day = self.start_day
if self.end_month < 10:
end_month = f'0{self.end_month}'
else:
end_month = self.end_month
if self.end_day < 10:
end_day = f'0{self.end_day}'
else:
end_day = self.end_day
# 构建基础前缀(使用真正的括号)
base_prefix = f"视频(2024_07_01-{self.deadline_year_month}_{self.day})"
base_prefix = f"视频({self.start_year}_{start_month}_{start_day}-{self.end_year}_{end_month}_{end_day})"
# 构建正则表达式:以 base_prefix 开头,后面可以跟任意内容
pattern = re.escape(base_prefix) + r'.*$'
......@@ -295,7 +299,7 @@ class TkVideo():
data = {
"account": self.receiver_name,
"title": '【TK视频数据下载成功提醒】',
"content": f'账号: {self.key}, 文件:视频(2024_07_01-{self.deadline_year_month}_{self.day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
"content": f'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
......@@ -323,10 +327,10 @@ class TkVideo():
self.get_datetime()
self.get_day()
print('完成关闭浏览器')
time.sleep(3)
time.sleep(5)
# 如果 ChromiumPage 底层保存了 browser 对象
# 或者如果它是基于 Selenium WebDriver
# self.page_chrome.quit()
self.page_chrome.quit()
if __name__ == '__main__':
......
......@@ -64,20 +64,18 @@ class TkVideo():
def get_datetime(self):
"""获取当前日期,并计算前2天的完整日期(年-月-日),并按照指定格式输出"""
today = datetime.now()
self.deadline = today - timedelta(days=2)
# 提取年、月、日
self.deadline_year = self.deadline.year
self.deadline_month = self.deadline.month # 自动是整数,不带前导零
self.deadline_day = self.deadline.day
# 输出年-月-日格式
print(f'{self.deadline_year}-{self.deadline_month}-{self.deadline_day}')
# 新增:输出 年_月 格式,月份带前导零(例如 06)
self.deadline_year_month = self.deadline.strftime("%Y_%m")
print(self.deadline_year_month) # 输出示例:2025_06
# 获取今天的日期
today = datetime.today().date()
# 开始日期:去年的今天
self.start_date = today.replace(year=today.year - 1)
self.start_year = self.start_date.year
self.start_month = self.start_date.month
self.start_day = self.start_date.day
# 结束日期:今天的前一天
self.end_date = today - timedelta(days=3)
self.end_year = self.end_date.year
self.end_month = self.end_date.month
self.end_day = self.end_date.day
def get_day(self):
try:
......@@ -93,51 +91,34 @@ class TkVideo():
# 先点击开始时间:2024年7月1日
self.page_chrome.ele(
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '2024 7 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper'][1]"
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.start_year} {self.start_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='1']/parent::div"
f"//span[text()='{self.start_day}']/parent::div"
).click()
print('已输入开始时间2024 7 月 1 日')
print(f'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日')
time.sleep(random.randint(3, 5))
# 初始目标日期为 deadline(可能已经是上个月的某一天)
current_date = self.deadline
max_attempts = 31
while max_attempts > 0:
year = current_date.year
month = current_date.month
day = current_date.day
try:
xpath = (
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{year} {month} 月')]"
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='{day}']/parent::div"
f"//span[text()='{self.end_day}']/parent::div"
)
print('结束日期 xpath::', xpath)
ele = self.page_chrome.ele(f"xpath={xpath}", timeout=5)
ele.click()
print(f'✅ 成功点击日期:{year}-{month}-{day}')
# self.day = str(day)
self.day = f"{day:02d}"
# 更新 self.deadline_year_month 为最新选中日期
self.deadline_year_month = current_date.strftime("%Y_%m")
print(f'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}')
self.get_data()
time.sleep(random.randint(3, 5))
return True # 成功返回
except Exception as e:
print(f'❌ 无法点击 {year}-{month}-{day},错误:{e}')
print(f'❌ 无法点击 {self.end_year}-{self.end_month}-{self.end_day},错误:{e}')
# 往前推一天
current_date -= timedelta(days=1)
max_attempts -= 1
time.sleep(random.randint(3, 5))
time.sleep(random.randint(5, 15))
print('⛔ 连续尝试失败,未找到可点击的日期,请检查页面状态或网络连接。')
return False
......@@ -268,8 +249,26 @@ class TkVideo():
def find_specific_file(self):
download_path = Path(self.download_folder)
if self.start_month < 10:
start_month = f'0{self.start_month}'
else:
start_month = self.start_month
if self.start_day < 10:
start_day = f'0{self.start_day}'
else:
start_day = self.start_day
if self.end_month < 10:
end_month = f'0{self.end_month}'
else:
end_month = self.end_month
if self.end_day < 10:
end_day = f'0{self.end_day}'
else:
end_day = self.end_day
# 构建基础前缀(使用真正的括号)
base_prefix = f"视频(2024_07_01-{self.deadline_year_month}_{self.day})"
base_prefix = f"视频({self.start_year}_{start_month}_{start_day}-{self.end_year}_{end_month}_{end_day})"
# 构建正则表达式:以 base_prefix 开头,后面可以跟任意内容
pattern = re.escape(base_prefix) + r'.*$'
......@@ -295,7 +294,7 @@ class TkVideo():
data = {
"account": self.receiver_name,
"title": '【TK视频数据下载成功提醒】',
"content": f'账号: {self.key}, 文件:视频(2024_07_01-{self.deadline_year_month}_{self.day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
"content": f'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
......@@ -323,10 +322,10 @@ class TkVideo():
self.get_datetime()
self.get_day()
print('完成关闭浏览器')
time.sleep(3)
time.sleep(5)
# 如果 ChromiumPage 底层保存了 browser 对象
# 或者如果它是基于 Selenium WebDriver
# self.page_chrome.quit()
self.page_chrome.quit()
if __name__ == '__main__':
......
......@@ -64,21 +64,18 @@ class TkVideo():
def get_datetime(self):
"""获取当前日期,并计算前2天的完整日期(年-月-日),并按照指定格式输出"""
today = datetime.now()
self.deadline = today - timedelta(days=2)
# 提取年、月、日
self.deadline_year = self.deadline.year
self.deadline_month = self.deadline.month # 自动是整数,不带前导零
self.deadline_day = self.deadline.day
# 输出年-月-日格式
print(f'{self.deadline_year}-{self.deadline_month}-{self.deadline_day}')
# 新增:输出 年_月 格式,月份带前导零(例如 06)
self.deadline_year_month = self.deadline.strftime("%Y_%m")
print(self.deadline_year_month) # 输出示例:2025_06
# 获取今天的日期
today = datetime.today().date()
# 开始日期:去年的今天
self.start_date = today.replace(year=today.year - 1)
self.start_year = self.start_date.year
self.start_month = self.start_date.month
self.start_day = self.start_date.day
# 结束日期:今天的前一天
self.end_date = today - timedelta(days=3)
self.end_year = self.end_date.year
self.end_month = self.end_date.month
self.end_day = self.end_date.day
def get_day(self):
try:
self.page_chrome.get("https://www.tiktok.com/business-suite/insight/video")
......@@ -93,50 +90,35 @@ class TkVideo():
# 先点击开始时间:2024年7月1日
self.page_chrome.ele(
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '2024 7 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper'][1]"
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.start_year} {self.start_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='1']/parent::div"
f"//span[text()='{self.start_day}']/parent::div"
).click()
print('已输入开始时间2024 7 月 1 日')
print(f'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日')
time.sleep(random.randint(3, 5))
# 初始目标日期为 deadline(可能已经是上个月的某一天)
current_date = self.deadline
max_attempts = 31
while max_attempts > 0:
year = current_date.year
month = current_date.month
day = current_date.day
try:
xpath = (
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{year} {month} 月')]"
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='{day}']/parent::div"
f"//span[text()='{self.end_day}']/parent::div"
)
print('结束日期 xpath::', xpath)
ele = self.page_chrome.ele(f"xpath={xpath}", timeout=5)
ele.click()
print(f'✅ 成功点击日期:{year}-{month}-{day}')
# self.day = str(day)
self.day = f"{day:02d}"
# 更新 self.deadline_year_month 为最新选中日期
self.deadline_year_month = current_date.strftime("%Y_%m")
print(f'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}')
self.get_data()
time.sleep(random.randint(3, 5))
return True # 成功返回
except Exception as e:
print(f'❌ 无法点击 {year}-{month}-{day},错误:{e}')
print(f'❌ 无法点击 {self.end_year}-{self.end_month}-{self.end_day},错误:{e}')
# 往前推一天
current_date -= timedelta(days=1)
max_attempts -= 1
time.sleep(random.randint(3, 5))
time.sleep(random.randint(5, 15))
print('⛔ 连续尝试失败,未找到可点击的日期,请检查页面状态或网络连接。')
return False
......@@ -268,8 +250,26 @@ class TkVideo():
def find_specific_file(self):
download_path = Path(self.download_folder)
if self.start_month < 10:
start_month = f'0{self.start_month}'
else:
start_month = self.start_month
if self.start_day < 10:
start_day = f'0{self.start_day}'
else:
start_day = self.start_day
if self.end_month < 10:
end_month = f'0{self.end_month}'
else:
end_month = self.end_month
if self.end_day < 10:
end_day = f'0{self.end_day}'
else:
end_day = self.end_day
# 构建基础前缀(使用真正的括号)
base_prefix = f"视频(2024_07_01-{self.deadline_year_month}_{self.day})"
base_prefix = f"视频({self.start_year}_{start_month}_{start_day}-{self.end_year}_{end_month}_{end_day})"
# 构建正则表达式:以 base_prefix 开头,后面可以跟任意内容
pattern = re.escape(base_prefix) + r'.*$'
......@@ -295,7 +295,7 @@ class TkVideo():
data = {
"account": self.receiver_name,
"title": '【TK视频数据下载成功提醒】',
"content": f'账号: {self.key}, 文件:视频(2024_07_01-{self.deadline_year_month}_{self.day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
"content": f'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
......@@ -323,10 +323,10 @@ class TkVideo():
self.get_datetime()
self.get_day()
print('完成关闭浏览器')
time.sleep(3)
time.sleep(5)
# 如果 ChromiumPage 底层保存了 browser 对象
# 或者如果它是基于 Selenium WebDriver
# self.page_chrome.quit()
self.page_chrome.quit()
if __name__ == '__main__':
......
......@@ -64,20 +64,18 @@ class TkVideo():
def get_datetime(self):
"""获取当前日期,并计算前2天的完整日期(年-月-日),并按照指定格式输出"""
today = datetime.now()
self.deadline = today - timedelta(days=2)
# 提取年、月、日
self.deadline_year = self.deadline.year
self.deadline_month = self.deadline.month # 自动是整数,不带前导零
self.deadline_day = self.deadline.day
# 输出年-月-日格式
print(f'{self.deadline_year}-{self.deadline_month}-{self.deadline_day}')
# 新增:输出 年_月 格式,月份带前导零(例如 06)
self.deadline_year_month = self.deadline.strftime("%Y_%m")
print(self.deadline_year_month) # 输出示例:2025_06
# 获取今天的日期
today = datetime.today().date()
# 开始日期:去年的今天
self.start_date = today.replace(year=today.year - 1)
self.start_year = self.start_date.year
self.start_month = self.start_date.month
self.start_day = self.start_date.day
# 结束日期:今天的前一天
self.end_date = today - timedelta(days=3)
self.end_year = self.end_date.year
self.end_month = self.end_date.month
self.end_day = self.end_date.day
def get_day(self):
try:
......@@ -93,50 +91,35 @@ class TkVideo():
# 先点击开始时间:2024年7月1日
self.page_chrome.ele(
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '2024 7 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper'][1]"
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.start_year} {self.start_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='1']/parent::div"
f"//span[text()='{self.start_day}']/parent::div"
).click()
print('已输入开始时间2024 7 月 1 日')
print(f'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日')
time.sleep(random.randint(3, 5))
# 初始目标日期为 deadline(可能已经是上个月的某一天)
current_date = self.deadline
max_attempts = 31
while max_attempts > 0:
year = current_date.year
month = current_date.month
day = current_date.day
try:
xpath = (
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{year} {month} 月')]"
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='{day}']/parent::div"
f"//span[text()='{self.end_day}']/parent::div"
)
print('结束日期 xpath::', xpath)
ele = self.page_chrome.ele(f"xpath={xpath}", timeout=5)
ele.click()
print(f'✅ 成功点击日期:{year}-{month}-{day}')
# self.day = str(day)
self.day = f"{day:02d}"
# 更新 self.deadline_year_month 为最新选中日期
self.deadline_year_month = current_date.strftime("%Y_%m")
print(f'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}')
self.get_data()
time.sleep(random.randint(3, 5))
return True # 成功返回
except Exception as e:
print(f'❌ 无法点击 {year}-{month}-{day},错误:{e}')
print(f'❌ 无法点击 {self.end_year}-{self.end_month}-{self.end_day},错误:{e}')
# 往前推一天
current_date -= timedelta(days=1)
max_attempts -= 1
time.sleep(random.randint(3, 5))
time.sleep(random.randint(5, 15))
print('⛔ 连续尝试失败,未找到可点击的日期,请检查页面状态或网络连接。')
return False
......@@ -268,9 +251,26 @@ class TkVideo():
def find_specific_file(self):
download_path = Path(self.download_folder)
if self.start_month < 10:
start_month = f'0{self.start_month}'
else:
start_month = self.start_month
if self.start_day < 10:
start_day = f'0{self.start_day}'
else:
start_day = self.start_day
if self.end_month < 10:
end_month = f'0{self.end_month}'
else:
end_month = self.end_month
if self.end_day < 10:
end_day = f'0{self.end_day}'
else:
end_day = self.end_day
# 构建基础前缀(使用真正的括号)
base_prefix = f"视频(2024_07_01-{self.deadline_year_month}_{self.day})"
base_prefix = f"视频({self.start_year}_{start_month}_{start_day}-{self.end_year}_{end_month}_{end_day})"
# 构建正则表达式:以 base_prefix 开头,后面可以跟任意内容
pattern = re.escape(base_prefix) + r'.*$'
......@@ -296,7 +296,7 @@ class TkVideo():
data = {
"account": self.receiver_name,
"title": '【TK视频数据下载成功提醒】',
"content": f'账号: {self.key}, 文件:视频(2024_07_01-{self.deadline_year_month}_{self.day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
"content": f'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
......@@ -324,10 +324,10 @@ class TkVideo():
self.get_datetime()
self.get_day()
print('完成关闭浏览器')
time.sleep(3)
time.sleep(5)
# 如果 ChromiumPage 底层保存了 browser 对象
# 或者如果它是基于 Selenium WebDriver
# self.page_chrome.quit()
self.page_chrome.quit()
if __name__ == '__main__':
......
......@@ -70,20 +70,18 @@ class TkVideo():
def get_datetime(self):
"""获取当前日期,并计算前2天的完整日期(年-月-日),并按照指定格式输出"""
today = datetime.now()
self.deadline = today - timedelta(days=2)
# 提取年、月、日
self.deadline_year = self.deadline.year
self.deadline_month = self.deadline.month # 自动是整数,不带前导零
self.deadline_day = self.deadline.day
# 输出年-月-日格式
print(f'{self.deadline_year}-{self.deadline_month}-{self.deadline_day}')
# 新增:输出 年_月 格式,月份带前导零(例如 06)
self.deadline_year_month = self.deadline.strftime("%Y_%m")
print(self.deadline_year_month) # 输出示例:2025_06
# 获取今天的日期
today = datetime.today().date()
# 开始日期:去年的今天
self.start_date = today.replace(year=today.year - 1)
self.start_year = self.start_date.year
self.start_month = self.start_date.month
self.start_day = self.start_date.day
# 结束日期:今天的前一天
self.end_date = today - timedelta(days=3)
self.end_year = self.end_date.year
self.end_month = self.end_date.month
self.end_day = self.end_date.day
def get_day(self):
try:
......@@ -99,50 +97,36 @@ class TkVideo():
# 先点击开始时间:2024年7月1日
self.page_edge.ele(
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '2024 7 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper'][1]"
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.start_year} {self.start_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='1']/parent::div"
f"//span[text()='{self.start_day}']/parent::div"
).click()
print('已输入开始时间2024 7 月 1 日')
print(f'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日')
time.sleep(random.randint(3, 5))
# 初始目标日期为 deadline(可能已经是上个月的某一天)
current_date = self.deadline
max_attempts = 31
while max_attempts > 0:
year = current_date.year
month = current_date.month
day = current_date.day
try:
xpath = (
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{year} {month} 月')]"
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='{day}']/parent::div"
f"//span[text()='{self.end_day}']/parent::div"
)
print('结束日期 xpath::', xpath)
ele = self.page_edge.ele(f"xpath={xpath}", timeout=5)
ele.click()
print(f'✅ 成功点击日期:{year}-{month}-{day}')
# self.day = str(day)
self.day = f"{day:02d}"
# 更新 self.deadline_year_month 为最新选中日期
self.deadline_year_month = current_date.strftime("%Y_%m")
print(f'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}')
self.get_data()
time.sleep(random.randint(3, 5))
return True # 成功返回
except Exception as e:
print(f'❌ 无法点击 {year}-{month}-{day},错误:{e}')
print(f'❌ 无法点击 {self.end_year}-{self.end_month}-{self.end_day},错误:{e}')
# 往前推一天
current_date -= timedelta(days=1)
max_attempts -= 1
time.sleep(random.randint(3, 5))
time.sleep(random.randint(5, 15))
print('⛔ 连续尝试失败,未找到可点击的日期,请检查页面状态或网络连接。')
return False
......@@ -275,8 +259,26 @@ class TkVideo():
def find_specific_file(self):
download_path = Path(self.download_folder)
if self.start_month < 10:
start_month = f'0{self.start_month}'
else:
start_month = self.start_month
if self.start_day < 10:
start_day = f'0{self.start_day}'
else:
start_day = self.start_day
if self.end_month < 10:
end_month = f'0{self.end_month}'
else:
end_month = self.end_month
if self.end_day < 10:
end_day = f'0{self.end_day}'
else:
end_day = self.end_day
# 构建基础前缀(使用真正的括号)
base_prefix = f"视频(2024_07_01-{self.deadline_year_month}_{self.day})"
base_prefix = f"视频({self.start_year}_{start_month}_{start_day}-{self.end_year}_{end_month}_{end_day})"
# 构建正则表达式:以 base_prefix 开头,后面可以跟任意内容
pattern = re.escape(base_prefix) + r'.*$'
......@@ -302,7 +304,7 @@ class TkVideo():
data = {
"account": self.receiver_name,
"title": '【TK视频数据下载成功提醒】',
"content": f'账号: {self.key}, 文件:视频(2024_07_01-{self.deadline_year_month}_{self.day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
"content": f'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
......@@ -330,10 +332,10 @@ class TkVideo():
self.get_datetime()
self.get_day()
print('完成关闭浏览器')
time.sleep(3)
time.sleep(5)
# 如果 ChromiumPage 底层保存了 browser 对象
# 或者如果它是基于 Selenium WebDriver
# self.page_edge.quit()
self.page_edge.quit()
if __name__ == '__main__':
TkVideo().run()
......
......@@ -68,20 +68,19 @@ class TkVideo():
def get_datetime(self):
"""获取当前日期,并计算前2天的完整日期(年-月-日),并按照指定格式输出"""
today = datetime.now()
self.deadline = today - timedelta(days=2)
# 获取今天的日期
today = datetime.today().date()
# 开始日期:去年的今天
self.start_date = today.replace(year=today.year - 1)
self.start_year = self.start_date.year
self.start_month = self.start_date.month
self.start_day = self.start_date.day
# 结束日期:今天的前一天
self.end_date = today - timedelta(days=3)
self.end_year = self.end_date.year
self.end_month = self.end_date.month
self.end_day = self.end_date.day
# 提取年、月、日
self.deadline_year = self.deadline.year
self.deadline_month = self.deadline.month # 自动是整数,不带前导零
self.deadline_day = self.deadline.day
# 输出年-月-日格式
print(f'{self.deadline_year}-{self.deadline_month}-{self.deadline_day}')
# 新增:输出 年_月 格式,月份带前导零(例如 06)
self.deadline_year_month = self.deadline.strftime("%Y_%m")
print(self.deadline_year_month) # 输出示例:2025_06
def get_day(self):
try:
......@@ -97,50 +96,37 @@ class TkVideo():
# 先点击开始时间:2024年7月1日
self.page_edge.ele(
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '2024 7 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper'][1]"
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.start_year} {self.start_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='1']/parent::div"
f"//span[text()='{self.start_day}']/parent::div"
).click()
print('已输入开始时间2024 7 月 1 日')
print(f'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日')
time.sleep(random.randint(3, 5))
# 初始目标日期为 deadline(可能已经是上个月的某一天)
current_date = self.deadline
max_attempts = 31
while max_attempts > 0:
year = current_date.year
month = current_date.month
day = current_date.day
try:
xpath = (
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{year} {month} 月')]"
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='{day}']/parent::div"
f"//span[text()='{self.end_day}']/parent::div"
)
print('结束日期 xpath::', xpath)
ele = self.page_edge.ele(f"xpath={xpath}", timeout=5)
ele.click()
print(f'✅ 成功点击日期:{year}-{month}-{day}')
# self.day = str(day)
self.day = f"{day:02d}"
# 更新 self.deadline_year_month 为最新选中日期
self.deadline_year_month = current_date.strftime("%Y_%m")
print(f'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}')
self.get_data()
time.sleep(random.randint(3, 5))
return True # 成功返回
except Exception as e:
print(f'❌ 无法点击 {year}-{month}-{day},错误:{e}')
print(f'❌ 无法点击 {self.end_year}-{self.end_month}-{self.end_day},错误:{e}')
# 往前推一天
current_date -= timedelta(days=1)
max_attempts -= 1
time.sleep(random.randint(3, 5))
time.sleep(random.randint(5, 15))
print('⛔ 连续尝试失败,未找到可点击的日期,请检查页面状态或网络连接。')
return False
......@@ -273,8 +259,26 @@ class TkVideo():
def find_specific_file(self):
download_path = Path(self.download_folder)
if self.start_month < 10:
start_month = f'0{self.start_month}'
else:
start_month = self.start_month
if self.start_day < 10:
start_day = f'0{self.start_day}'
else:
start_day = self.start_day
if self.end_month < 10:
end_month = f'0{self.end_month}'
else:
end_month = self.end_month
if self.end_day < 10:
end_day = f'0{self.end_day}'
else:
end_day = self.end_day
# 构建基础前缀(使用真正的括号)
base_prefix = f"视频(2024_07_01-{self.deadline_year_month}_{self.day})"
base_prefix = f"视频({self.start_year}_{start_month}_{start_day}-{self.end_year}_{end_month}_{end_day})"
# 构建正则表达式:以 base_prefix 开头,后面可以跟任意内容
pattern = re.escape(base_prefix) + r'.*$'
......@@ -300,7 +304,7 @@ class TkVideo():
data = {
"account": self.receiver_name,
"title": '【TK视频数据下载成功提醒】',
"content": f'账号: {self.key}, 文件:视频(2024_07_01-{self.deadline_year_month}_{self.day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
"content": f'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
......@@ -327,10 +331,10 @@ class TkVideo():
self.get_datetime()
self.get_day()
print('完成关闭浏览器')
time.sleep(3)
time.sleep(5)
# 如果 ChromiumPage 底层保存了 browser 对象
# 或者如果它是基于 Selenium WebDriver
# self.page_edge.quit()
self.page_edge.quit()
if __name__ == '__main__':
TkVideo().run()
......
......@@ -66,20 +66,19 @@ class TkVideo():
def get_datetime(self):
"""获取当前日期,并计算前2天的完整日期(年-月-日),并按照指定格式输出"""
today = datetime.now()
self.deadline = today - timedelta(days=2)
# 获取今天的日期
today = datetime.today().date()
# 开始日期:去年的今天
self.start_date = today.replace(year=today.year - 1)
self.start_year = self.start_date.year
self.start_month = self.start_date.month
self.start_day = self.start_date.day
# 结束日期:今天的前一天
self.end_date = today - timedelta(days=3)
self.end_year = self.end_date.year
self.end_month = self.end_date.month
self.end_day = self.end_date.day
# 提取年、月、日
self.deadline_year = self.deadline.year
self.deadline_month = self.deadline.month # 自动是整数,不带前导零
self.deadline_day = self.deadline.day
# 输出年-月-日格式
print(f'{self.deadline_year}-{self.deadline_month}-{self.deadline_day}')
# 新增:输出 年_月 格式,月份带前导零(例如 06)
self.deadline_year_month = self.deadline.strftime("%Y_%m")
print(self.deadline_year_month) # 输出示例:2025_06
def get_day(self):
try:
......@@ -95,50 +94,36 @@ class TkVideo():
# 先点击开始时间:2024年7月1日
self.page_chrome.ele(
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '2024 7 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper'][1]"
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.start_year} {self.start_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='1']/parent::div"
f"//span[text()='{self.start_day}']/parent::div"
).click()
print('已输入开始时间2024 7 月 1 日')
print(f'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日')
time.sleep(random.randint(3, 5))
# 初始目标日期为 deadline(可能已经是上个月的某一天)
current_date = self.deadline
max_attempts = 31
while max_attempts > 0:
year = current_date.year
month = current_date.month
day = current_date.day
try:
xpath = (
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{year} {month} 月')]"
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='{day}']/parent::div"
f"//span[text()='{self.end_day}']/parent::div"
)
print('结束日期 xpath::', xpath)
ele = self.page_chrome.ele(f"xpath={xpath}", timeout=5)
ele.click()
print(f'✅ 成功点击日期:{year}-{month}-{day}')
# self.day = str(day)
self.day = f"{day:02d}"
# 更新 self.deadline_year_month 为最新选中日期
self.deadline_year_month = current_date.strftime("%Y_%m")
print(f'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}')
self.get_data()
time.sleep(random.randint(3, 5))
return True # 成功返回
except Exception as e:
print(f'❌ 无法点击 {year}-{month}-{day},错误:{e}')
print(f'❌ 无法点击 {self.end_year}-{self.end_month}-{self.end_day},错误:{e}')
# 往前推一天
current_date -= timedelta(days=1)
max_attempts -= 1
time.sleep(random.randint(3, 5))
time.sleep(random.randint(5, 15))
print('⛔ 连续尝试失败,未找到可点击的日期,请检查页面状态或网络连接。')
return False
......@@ -270,8 +255,26 @@ class TkVideo():
def find_specific_file(self):
download_path = Path(self.download_folder)
if self.start_month < 10:
start_month = f'0{self.start_month}'
else:
start_month = self.start_month
if self.start_day < 10:
start_day = f'0{self.start_day}'
else:
start_day = self.start_day
if self.end_month < 10:
end_month = f'0{self.end_month}'
else:
end_month = self.end_month
if self.end_day < 10:
end_day = f'0{self.end_day}'
else:
end_day = self.end_day
# 构建基础前缀(使用真正的括号)
base_prefix = f"视频(2024_07_01-{self.deadline_year_month}_{self.day})"
base_prefix = f"视频({self.start_year}_{start_month}_{start_day}-{self.end_year}_{end_month}_{end_day})"
# 构建正则表达式:以 base_prefix 开头,后面可以跟任意内容
pattern = re.escape(base_prefix) + r'.*$'
......@@ -297,7 +300,7 @@ class TkVideo():
data = {
"account": self.receiver_name,
"title": '【TK视频数据下载成功提醒】',
"content": f'账号: {self.key}, 文件:视频(2024_07_01-{self.deadline_year_month}_{self.day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
"content": f'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
......@@ -324,7 +327,11 @@ class TkVideo():
self.connect_redis()
self.get_datetime()
self.get_day()
print('完成关闭浏览器')
time.sleep(5)
# 如果 ChromiumPage 底层保存了 browser 对象
# 或者如果它是基于 Selenium WebDriver
self.page_chrome.quit()
if __name__ == '__main__':
......
# -*- coding: utf-8 -*-
import os
os.environ['NO_PROXY'] = 'stackoverflow.com'
import logging
logging.captureWarnings(True)
from DrissionPage import ChromiumPage, ChromiumOptions
import time
......@@ -11,17 +11,15 @@ from datetime import datetime, timedelta
from time import sleep
from random import randint
import requests
import math
import pandas as pd
import redis
import json
from pathlib import Path
import re
from sqlalchemy import create_engine
import random
class TkVideo():
class TkVideo():
def __init__(self):
# 修改请求头
......@@ -81,20 +79,20 @@ class TkVideo():
def get_datetime(self):
"""获取当前日期,并计算前2天的完整日期(年-月-日),并按照指定格式输出"""
today = datetime.now()
self.deadline = today - timedelta(days=2)
# 提取年、月、日
self.deadline_year = self.deadline.year
self.deadline_month = self.deadline.month # 自动是整数,不带前导零
self.deadline_day = self.deadline.day
# 输出年-月-日格式
print(f'{self.deadline_year}-{self.deadline_month}-{self.deadline_day}')
# 获取今天的日期
today = datetime.today().date()
# 开始日期:去年的今天
self.start_date = today.replace(year=today.year - 1)
self.start_year = self.start_date.year
self.start_month = self.start_date.month
self.start_day = self.start_date.day
# 结束日期:今天的前一天
self.end_date = today - timedelta(days=3)
self.end_year = self.end_date.year
self.end_month = self.end_date.month
self.end_day = self.end_date.day
# 新增:输出 年_月 格式,月份带前导零(例如 06)
self.deadline_year_month = self.deadline.strftime("%Y_%m")
print(self.deadline_year_month) # 输出示例:2025_06
def get_day(self):
try:
......@@ -110,50 +108,35 @@ class TkVideo():
# 先点击开始时间:2024年7月1日
self.page_edge.ele(
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '2024 7 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper'][1]"
f"xpath=//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.start_year} {self.start_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='1']/parent::div"
f"//span[text()='{self.start_day}']/parent::div"
).click()
print('已输入开始时间2024 7 月 1 日')
print(f'已输入开始时间{self.start_year} {self.start_month} 月 {self.start_day} 日')
time.sleep(random.randint(3, 5))
# 初始目标日期为 deadline(可能已经是上个月的某一天)
current_date = self.deadline
max_attempts = 31
while max_attempts > 0:
year = current_date.year
month = current_date.month
day = current_date.day
try:
xpath = (
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{year} {month} 月')]"
f"//div[@class='tiktok-datepicker-month-title' and contains(text(), '{self.end_year} {self.end_month} 月')]"
f"/following-sibling::div[@class='tiktok-datepicker-day-wrapper']"
f"//div[@class='tiktok-datepicker-day valid in-this-month']"
f"//span[text()='{day}']/parent::div"
f"//span[text()='{self.end_day}']/parent::div"
)
print('结束日期 xpath::', xpath)
ele = self.page_edge.ele(f"xpath={xpath}", timeout=5)
ele.click()
print(f'✅ 成功点击日期:{year}-{month}-{day}')
# self.day = str(day)
self.day = f"{day:02d}"
# 更新 self.deadline_year_month 为最新选中日期
self.deadline_year_month = current_date.strftime("%Y_%m")
print(f'✅ 成功点击日期:{self.end_year}-{self.end_month}-{self.end_day}')
self.get_data()
time.sleep(random.randint(3, 5))
return True # 成功返回
except Exception as e:
print(f'❌ 无法点击 {year}-{month}-{day},错误:{e}')
print(f'❌ 无法点击 {self.end_year}-{self.end_month}-{self.end_day},错误:{e}')
# 往前推一天
current_date -= timedelta(days=1)
max_attempts -= 1
time.sleep(random.randint(3, 5))
time.sleep(random.randint(5, 15))
print('⛔ 连续尝试失败,未找到可点击的日期,请检查页面状态或网络连接。')
return False
......@@ -198,7 +181,6 @@ class TkVideo():
print(f"get_data出现错误: {e}")
self.send_error_notification_via_wechat(e)
def connect_redis(self):
"""建立 Redis 连接"""
self.r = redis.StrictRedis(**self.REDIS_CONFIG)
......@@ -285,9 +267,28 @@ class TkVideo():
def find_specific_file(self):
download_path = Path(self.download_folder)
if self.start_month<10:
start_month = f'0{self.start_month}'
else:
start_month = self.start_month
if self.start_day < 10:
start_day = f'0{self.start_day}'
else:
start_day = self.start_day
if self.end_month<10:
end_month = f'0{self.end_month}'
else:
end_month = self.end_month
if self.end_day < 10:
end_day = f'0{self.end_day}'
else:
end_day = self.end_day
# 构建基础前缀(使用真正的括号)
base_prefix = f"视频(2024_07_01-{self.deadline_year_month}_{self.day})"
base_prefix = f"视频({self.start_year}_{start_month}_{start_day}-{self.end_year}_{end_month}_{end_day})"
# 构建正则表达式:以 base_prefix 开头,后面可以跟任意内容
pattern = re.escape(base_prefix) + r'.*$'
......@@ -301,6 +302,7 @@ class TkVideo():
def save_to_redis(self):
EXCEL_FILE = self.find_specific_file()
print(f'保存文件:{EXCEL_FILE}')
# 读取 Excel 数据
......@@ -308,12 +310,13 @@ class TkVideo():
processed_data = self.process_data(data, self.shop_name)
self.store_data_in_redis(self.r, processed_data)
def send_success_message_via_wechat(self):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
"title": '【TK视频数据下载成功提醒】',
"content": f'账号: {self.key}, 文件:视频(2024_07_01-{self.deadline_year_month}_{self.day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
"content": f'账号: {self.key}, 文件:视频({self.start_year}_{self.start_month}_{self.start_day}-{self.end_year}_{self.end_month}_{self.end_day}), 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data, timeout=15)
......@@ -322,15 +325,15 @@ class TkVideo():
else:
print(f"发送通知失败: {response.text}")
def send_error_notification_via_wechat(self,error_message):
def send_error_notification_via_wechat(self, error_message):
webhook_url = 'http://47.112.96.71:8082/selection/sendMessage' # 替换为你的企业微信机器人的Webhook URL
data = {
"account": self.receiver_name,
'title':'【TK视频数据下载异常提醒】',
'content':f'账号:{self.key},错误信息:{error_message}, 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
'title': '【TK视频数据下载异常提醒】',
'content': f'账号:{self.key},错误信息:{error_message}, 时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
}
response = requests.post(url=webhook_url, data=data,timeout=15)
response = requests.post(url=webhook_url, data=data, timeout=15)
if response.status_code == 200:
print("已成功发送错误通知到企业微信")
else:
......@@ -341,21 +344,11 @@ class TkVideo():
self.get_datetime()
self.get_day()
print('完成关闭浏览器')
time.sleep(3)
time.sleep(5)
# 如果 ChromiumPage 底层保存了 browser 对象
# 或者如果它是基于 Selenium WebDriver
# self.page_edge.quit()
self.page_edge.quit()
if __name__ == '__main__':
TkVideo().run()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment