Commit cd1e5c51 by Peng

no message

parent 814f1b7b
......@@ -354,12 +354,12 @@ def junglescout_spider(db_base):
"Accept-Encoding": "gzip, deflate, br, zstd",
"Accept-Language": "zh-CN,zh-TW;q=0.9,zh;q=0.8",
"Cache-Control": "no-cache",
'Cookie': 'Hm_lvt_e0dfc78949a2d7c553713cb5c573a486=1754014538; HMACCOUNT=9F9252C9CBCC28DF; _gcl_au=1.1.1089500616.1754014538; _ga=GA1.1.420464702.1754014538; MEIQIA_TRACK_ID=30fNCDIKt41VFprESpAsdxA93ss; MEIQIA_VISIT_ID=30fNCEDPMElClEdkflJq4o3vq1u; ecookie=vR8uQMtRUHf2GPuw_CN; e6d47b7933e377ecd062=54dab66cb6737167d2467f75c45d482f; _fp=65dbbe41a37f8f9fbe702eba96328267; _gaf_fp=909522d879232499acd9b4ddab672d19; current_guest=hsba8eOK1Dg5_250801-107899; rank-login-user=4412704571jC8vqVc/Rw3YJBDFuUDJtYCCpovzEIJtbd/qlmC8t917Mll118BEKfWZetMkVyfW; rank-login-user-info="eyJuaWNrbmFtZSI6IuWViuWTiOWTiOWTiCIsImlzQWRtaW4iOmZhbHNlLCJhY2NvdW50IjoiMTUzKioqKjEyNzAiLCJ0b2tlbiI6IjQ0MTI3MDQ1NzFqQzh2cVZjL1J3M1lKQkRGdVVESnRZQ0Nwb3Z6RUlKdGJkL3FsbUM4dDkxN01sbDExOEJFS2ZXWmV0TWtWeWZXIn0="; Sprite-X-Token=eyJhbGciOiJSUzI1NiIsImtpZCI6IjE2Nzk5NjI2YmZlMDQzZTBiYzI5NTEwMTE4ODA3YWExIn0.eyJqdGkiOiJHOWQwOEtTcnVjdEgwcXZWZm1XNnlBIiwiaWF0IjoxNzU0MDE0NTQ0LCJleHAiOjE3NTQxMDA5NDQsIm5iZiI6MTc1NDAxNDQ4NCwic3ViIjoieXVueWEiLCJpc3MiOiJyYW5rIiwiYXVkIjoic2VsbGVyU3BhY2UiLCJpZCI6MTQ2NjIwMSwicGkiOm51bGwsIm5uIjoi5ZWK5ZOI5ZOI5ZOIIiwic3lzIjoiU1NfQ04iLCJlZCI6Ik4iLCJwaG4iOiIxNTM2ODA1MTI3MCIsImVtIjoibWVpeW91bGFAbWVpeW91bGEuY29tIiwibWwiOiJHIn0.Cwkc0tf7KniQbUgRyiZw8UED5dm3y8dOrK04ejg4a45H-W3FEBpQ6ERU8V7TTy2qKOJf8j1swyVxRIqJDrGRSwe4FBr8EKLsoZtxRe6DR0LYGx8xMmWmfUmVmwcBHR2M62RZlDO-fjvVPBuZwcLyUuslq2PZen2ugOUzdfQDHQJV8UMmWUvt1zHjjQZrRlda1tK0_TuHt8dBCZ-sC_CIooCAvXYYfMUSMeT2w_QmgFPc_EIozNKvv7EDzqisT4pR5AWKDdfoVUSWFBIVNwoulIMdtKLsVrlL8Xiq_2l3mG9NCfE0recVIGCRhV52lwWD3vT1O3bpCT-usWv0hXVgZA; ao_lo_to_n="4412704571jC8vqVc/Rw3YJBDFuUDJtakIKSNY+NxiJnARSLNieFEjr7klDXJb6hxls+GbtooUDUaltNvx27xhoy1Atnktnv2cJc/ZHsk63L1rTYoE0Cs="; rank-guest-user=5412704571Cjn566YjkeJT1oMWIEZzHnz3datXkhqNVFcNKbPORkfvRQdV46mvORTmyl8ul2JV; _ga_38NCVF2XST=GS2.1.s1754014538$o1$g1$t1754014551$j47$l0$h1907321182; Hm_lpvt_e0dfc78949a2d7c553713cb5c573a486=1754014552; _ga_CN0F80S6GL=GS2.1.s1754014539$o1$g1$t1754014552$j47$l0$h0; JSESSIONID=A5AEB0B286BCE5A27600AD3BD1DD6445',
'Cookie': 'Hm_lvt_e0dfc78949a2d7c553713cb5c573a486=1754303192; HMACCOUNT=B44A3AA867DA8C05; _gcl_au=1.1.1206959369.1754303192; _ga=GA1.1.747881747.1754303192; MEIQIA_TRACK_ID=30ooGiCl3FRjp9OxyAufmK3iejx; MEIQIA_VISIT_ID=30ooGnIqlsOe6yn29kfqf4EnQuF; ecookie=xzfwf4ZvXd9I4bOT_CN; c57b73d8686537c32dea=36830b46c328d771081e3a79f5c51e04; _fp=65dbbe41a37f8f9fbe702eba96328267; _gaf_fp=9ac0984901990f4b1772551060468cf0; rank-login-user=7970634571pnkdYV6Hfb0IWfvuV/gc88fclJAm6p5JWeQpD30JCgc6kY0X2uN+iF7vjvKtVgBU; rank-login-user-info="eyJuaWNrbmFtZSI6IuW4heWTpSIsImlzQWRtaW4iOmZhbHNlLCJhY2NvdW50IjoiMTgzKioqKjczNDciLCJ0b2tlbiI6Ijc5NzA2MzQ1NzFwbmtkWVY2SGZiMElXZnZ1Vi9nYzg4ZmNsSkFtNnA1SldlUXBEMzBKQ2djNmtZMFgydU4raUY3dmp2S3RWZ0JVIn0="; Sprite-X-Token=eyJhbGciOiJSUzI1NiIsImtpZCI6IjE2Nzk5NjI2YmZlMDQzZTBiYzI5NTEwMTE4ODA3YWExIn0.eyJqdGkiOiJvQjhGc25vZ0ludmp5S3luRmlsSjdnIiwiaWF0IjoxNzU0MzAzMTk3LCJleHAiOjE3NTQzODk1OTcsIm5iZiI6MTc1NDMwMzEzNywic3ViIjoieXVueWEiLCJpc3MiOiJyYW5rIiwiYXVkIjoic2VsbGVyU3BhY2UiLCJpZCI6MTQ2NjIxNSwicGkiOm51bGwsIm5uIjoi5biF5ZOlIiwic3lzIjoiU1NfQ04iLCJlZCI6Ik4iLCJwaG4iOiIxODMwNzk2NzM0NyIsImVtIjoiMzE1OTk4MDg5MkBxcS5jb20iLCJtbCI6IkcifQ.OOEsxsBWHf6J1ta8ueS0i-8fVxuxstNOtoJ2gWSxcJwr6UbRMiHiXqo3fNwkwzYrBjp75oz7xbdaui3LPu90-VZCUyh5lXoiFBjZD-iVJcQNTqkfYbV3siHtjRS27LBBh4UJLRRdSAfxP5iZscz640WHj9PupOXYUDPbljOsWOC4jBYSY3Ek3ikxH70BFluOvrD8kpwfQvbhmue_0fZAqu-rACr3ed5cpDUc3YQiFH7sDRkV0FJv4SLLm1qxLvSo4RmNftfYUBggsLl7qM0tQyBQh2BooUIt8ZBldTmtUdJiz9shLu1kYyv_zzoXtgfMmpdNADM85W0INKp1u5DGAg; ao_lo_to_n="7970634571pnkdYV6Hfb0IWfvuV/gc85dImjms+dJ7IrpjIs0CNJBquIGSx1xPUHU/OAMezoHKbqvLvZZuXrKHmPj6PK6OtV+0hL1+N+4daHAf8FeCzWg="; JSESSIONID=421BD32330EB1F2A12E2571E4D00CE8F; _ga_38NCVF2XST=GS2.1.s1754303191$o1$g1$t1754303203$j48$l0$h314662329; Hm_lpvt_e0dfc78949a2d7c553713cb5c573a486=1754303203; _ga_CN0F80S6GL=GS2.1.s1754303192$o1$g1$t1754303204$j48$l0$h0',
"User-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
}
url = "https://www.sellersprite.com/v2/tools/sales-estimator/bsr.json"
data = {
"station": "UK",
"station": "US",
"cid": category_name['c_id'], # 分类id
"bsr": f"{i}" # 排名
}
......@@ -399,7 +399,7 @@ def junglescout_spider(db_base):
print(inset_sql)
cursor_mysql_db.executemany(inset_sql, name_rnak_list)
db.commit()
up_sql = f"UPDATE all_site_category set state=3 WHERE site='{db_base}' and state=1 and c_id='{category_name['c_id']}'"
up_sql = f"UPDATE all_site_category set state=3 WHERE site='{db_base}' and state=2 and c_id='{category_name['c_id']}'"
print('更新状态:', up_sql)
cursor_us_mysql_db.execute(up_sql)
db_us.commit()
......@@ -435,7 +435,7 @@ def save_site_category(site_bsr_dict=None):
def run():
# get_cid()
junglescout_spider('uk')
junglescout_spider('us')
if __name__ == '__main__':
......
......@@ -18,8 +18,6 @@ class count_all_syn_st_id(BaseUtils):
if engine_db_num == 14:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
else:
self.engine_pg = self.pg_connect_6()
self.engine_db_num = engine_db_num
if site_name == "us":
self.site_url = 'https://www.amazon.com/'
......@@ -48,8 +46,8 @@ class count_all_syn_st_id(BaseUtils):
print(self.site_name, ' 查询最小和最大 id')
query = f"SELECT MIN(id) AS min_id, MAX(id) AS max_id FROM {self.site_name}_all_syn_st_month_{self.table_data_info} where state in (1,2)"
print(query)
result = pd.read_sql(query, self.engine_pg)
# result = pd.read_sql(query, self.engine_pg)
result = self.engine_pg.read_sql(query)
if result.shape[0] > 0:
min_id = result['min_id'].values[0]
max_id = result['max_id'].values[0]
......@@ -72,58 +70,7 @@ class count_all_syn_st_id(BaseUtils):
delete_sql = f'DELETE from {self.site_name}_syn_asin_all_minid_maxid where state <4'
print('delete_sql::',delete_sql)
conn.execute(delete_sql)
df_asin_img_video.to_sql(f'{self.site_name}_syn_asin_all_minid_maxid', con=self.engine,
if_exists='append',
index=False)
def search_term_syn(self):
# 初始化一个空的 DataFrame
result_list = []
if self.engine_db_num == 6:
query = f"SELECT search_term FROM {self.site_name}_search_term_month_merchantwords WHERE state=1 and id <5000001"
elif self.engine_db_num == 14:
query = f"SELECT search_term FROM {self.site_name}_search_term_month WHERE month={self.month} and state in (1,2)"
print(query)
result_df = self.get_data_from_database(self.engine_pg, query)
result_df.drop_duplicates(['search_term'], inplace=True)
print('_search_term_month::',result_df.shape)
# 对每个搜索关键词生成 URL 并添加到结果列表
for search_term in result_df['search_term']:
urls = self.build_urls(search_term)
result_list.extend(urls)
# 创建初始 DataFrame
df_search_term = pd.DataFrame(data=result_list, columns=['search_term', 'url'])
print(df_search_term.shape)
# 找出超过 450 字符长度的 URL 行的索引
long_url_rows = df_search_term['url'].str.len() <= 450
# 筛选保留不超过 450 字符长度的 URL 行
data_df = df_search_term[long_url_rows]
data_df['month'] = f'{self.month}'
data_df['date_info'] = self.data_info
print(data_df)
print(data_df.shape)
with self.engine_pg.begin() as conn:
if self.engine_db_num == 14:
data_df.to_sql(f'{self.site_name}_search_term_month_syn',con=self.engine_pg, if_exists="append",
index=False)
update_sql = f"update {self.site_name}_search_term_month set state =3 where date_info='2025-{self.month}' and state=1"
print(update_sql)
conn.execute(update_sql)
deletesql = f"DELETE from {self.site_name}_search_term_month_syn where date_info < '{self.data_info}'"
print(deletesql)
conn.execute(deletesql)
elif self.engine_db_num == 6:
print('pg6 写入数据 merchantwords')
data_df.to_sql(f'{self.site_name}_search_term_month_syn_merchantwords', con=self.engine_pg, if_exists="append",
index=False)
update_sql = f"update us_search_term_month_merchantwords set state =3 where state=1"
print(update_sql)
conn.execute(update_sql)
deletesql = f"DELETE from {self.site_name}_search_term_month_syn_merchantwords where state =3"
print(deletesql)
conn.execute(deletesql)
self.engine.to_sql(df_asin_img_video,f'{self.site_name}_syn_asin_all_minid_maxid',if_exists='append')
# 从数据库获取数据的函数
def get_data_from_database(self, connection, query):
......@@ -161,5 +108,4 @@ if __name__ == '__main__':
for site in ['us']:
time.sleep(0)
count_all_syn_st_id(site_name=site,month=month).get_minid_maxid()
# count_all_syn_st_id(site_name=site,month=month,engine_db_num=engine_db_num).search_term_syn()
......@@ -11,6 +11,7 @@ import traceback
import time
import random
from amazon_spider.VPS_IP import is_internet_available
from redis.exceptions import LockError
print('存储 asin 到pg数据库')
......@@ -27,7 +28,7 @@ class Save_asin_detail(BaseUtils):
self.reuests_para_val = Requests_param_val(site_name=self.site_name, proxy_name=proxy_name)
self.init_db_names()
self.cols = self.reuests_para_val.db_column(site_name)
self.redis_client = self.redis_db()
def init_db_names(self):
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect() # 更改变体 时 存储 变体表 使用 self.engine
......@@ -78,6 +79,9 @@ class Save_asin_detail(BaseUtils):
else:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
lock_key = f"{self.db_syn}_{self.month}_lock"
lock = self.redis_client.lock(lock_key, timeout=25) # 10秒超时
with self.engine_pg.begin() as conn:
sql_read = f"SELECT asin, id, date_info, asin_is_variation,data_type,volume,weight_str FROM {self.db_syn}_{self.month} WHERE STATE = 1 ORDER BY id FOR UPDATE SKIP LOCKED LIMIT {self.read_size}"
print(sql_read)
......@@ -97,9 +101,15 @@ class Save_asin_detail(BaseUtils):
self.df_read.asin + '|' + self.df_read.date_info + '|' + self.df_read.asin_is_variation.astype(
"U") + '|' + self.df_read.data_type.astype("U") + '|' + self.df_read.volume.astype(
"U") + '|' + self.df_read.weight_str.astype("U"))
if lock.locked():
lock.release()
return asin_list
else:
if lock.locked():
lock.release()
return []
except LockError:
print("获取锁失败1111,其他程序正在查询")
except Exception as e:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
......@@ -116,6 +126,8 @@ class Save_asin_detail(BaseUtils):
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
if self.minid_maxid_list:
lock_key = f"{self.db_syn}_{self.month}_lock"
lock = self.redis_client.lock(lock_key, timeout=25) # 10秒超时
minid, maxid = self.minid_maxid_list[0].split('-')
with self.engine_pg.begin() as conn:
# sql_read = f"-- SELECT asin, id, date_info, asin_is_variation,data_type,volume,weight_str FROM {self.db_syn}_{self.month} WHERE STATE = 1 and id BETWEEN {minid} AND {maxid} limit {self.read_size} for update"
......@@ -137,20 +149,27 @@ class Save_asin_detail(BaseUtils):
self.df_read.asin + '|' + self.df_read.date_info + '|' + self.df_read.asin_is_variation.astype(
"U") + '|' + self.df_read.data_type.astype("U") + '|' + self.df_read.volume.astype(
"U") + '|' + self.df_read.weight_str.astype("U"))
if lock.locked():
lock.release()
return asin_list
else:
if lock.locked():
lock.release()
print('重新获取', self.minid_maxid_list[0], '无数据')
self.minid_maxid_list = self.reuests_para_val.get_minid_maxid(site_name=self.site_name,
state=3,
minid_maxid=
self.minid_maxid_list[0],
month=self.month)
else:
asin_list = self.read_db_data2()
if asin_list:
return asin_list
else:
return []
except LockError:
print("获取锁失败,其他程序正在查询")
except Exception as e:
self.engine = self.mysql_connect()
self.engine_pg = self.pg_connect()
......
import time
import pandas as pd
from redis.exceptions import LockError
from threading_spider.db_connectivity import connect_db
from threading_spider.post_to_dolphin import DolphinschedulerHelper
import random
from utils.db_connect import BaseUtils
from redis.exceptions import LockError
from utils.secure_db_client import get_remote_engine
def db_engine_us(site_name,db_type):
engine_mysql = get_remote_engine(
site_name=site_name, # -> database "selection"
db_type=db_type, # -> 服务端 alias "mysql"
)
return engine_mysql
def db_cursor_connect(cursor, connect, sql):
def db_cursor_connect_update(sql, site):
for i in range(3):
try:
cursor.execute(sql)
connect.commit()
engine_us_mysql = db_engine_us('us','mysql')
print('更新sql:', sql)
with engine_us_mysql.begin() as conn:
conn.execute(sql)
break
except:
time.sleep(10)
db_class = connect_db('us')
cursor, connect = db_class.us_mysql_db() # us站点
print(site, 'db_cursor_connect 报错:', sql)
def db_cursor_connect_msyql_read(select_state1_sql):
for i in range(3):
try:
engine_us_mysql = db_engine_us('us','mysql')
print(select_state1_sql)
df = engine_us_mysql.read_sql(select_state1_sql)
return df
except Exception as e:
import traceback
traceback.print_exc() # ★ 打印完整栈到终端
print(e, 'db_cursor_connect_msyql_read 报错:', select_state1_sql)
def db_cursor_connect_pg_read(site,select_state1_sql):
for i in range(3):
try:
engine_pg = db_engine_us(site,'postgresql_14_outer')
df = engine_pg.read_sql(select_state1_sql)
return df
except:
print('db_cursor_connect_pg_read 报错:', select_state1_sql)
def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spider_int=None):
db_class = connect_db(site)
engine_pg = db_class.pg_db() # pg
cursor_mysql_db, connect_mysql_db = db_class.mysql_db() # mysql
cursor_us, connect_us = db_class.us_mysql_db() # us站点
print('当前抓取月::', month)
if int(week) < 10:
week = f'0{int(week)}'
year_week = f'2025-{week}'
year_month = f'2025-{month}'
time_strftime = time.strftime("%Y-%m-%d %X", time.localtime())
if num is None:
if site in ['fr', 'es', 'it']:
print('查询 周工作进度 工作进度 状态')
select_state1_sql = f"select status_val from workflow_progress where site_name='{site}' and date_info='{year_week}' and date_type='week' and page='反查搜索词'"
print(select_state1_sql)
cursor_us.execute(select_state1_sql)
site_sate1_list = cursor_us.fetchone()
print(site_sate1_list, '查询搜索词状态')
if site_sate1_list:
if site_sate1_list[0] == 3:
df = db_cursor_connect_msyql_read(select_state1_sql)
if not df.empty:
if df.status_val[0] == 3:
print(f"{site} 站点 {week} 周 搜索词 已完成 执行下一步")
select_sate_sql = f"select status_val from workflow_progress where site_name='{site}' and date_info='{year_week}' and date_type='week' and page='ASIN详情'"
print(select_sate_sql)
cursor_us.execute(select_sate_sql)
site_all_sate_list = cursor_us.fetchone()
print(site_all_sate_list)
if site_all_sate_list[0] != 3:
df1 = db_cursor_connect_msyql_read(select_state1_sql)
if df1.status_val[0] != 3:
update_workflow_progress = f"update workflow_progress set status_val=2,status='抓取中' where page='ASIN详情' and date_info='2025-{week}' and site_name='{site}' and date_type='week' and status_val in(1,0)"
db_cursor_connect(cursor_us, connect_us, update_workflow_progress)
db_cursor_connect_update(update_workflow_progress, site)
print(f"{site} 站点 未全部完成 数据 抓取")
return True
else:
......@@ -61,21 +86,20 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
lock_key = f"{year_week}_{site}_lock"
lock = redis_client.lock(lock_key, timeout=5) # 10秒超时
select_sql = f"select status_val from workflow_progress WHERE date_info='{year_week}' and date_type='week' and site_name='{site}' and page='ASIN详情'"
cursor_us.execute(select_sql)
_state_list = cursor_us.fetchone()
if int(_state_list[0]) != 3:
df_state = db_cursor_connect_msyql_read(select_sql)
print('df_state.status_val::', df_state.status_val)
if int(df_state.status_val[0]) != 3:
try:
if lock.acquire(blocking=True):
spider_state_sql = f"select id from workflow_progress where date_info='{year_week}' and date_type='week' and site_name='{site}' and page='ASIN详情' and up_spider_state=1"
print(spider_state_sql)
cursor_us.execute(spider_state_sql)
up_spider_state_list = cursor_us.fetchone()
if up_spider_state_list:
df_id = db_cursor_connect_msyql_read(spider_state_sql)
if not df_id.empty:
update_up_spider_state_list = f"update workflow_progress set up_spider_state=3 where date_info='{year_week}' and date_type='week' and site_name='{site}' and page='ASIN详情' and up_spider_state=1"
db_cursor_connect(cursor_us, connect_us, update_up_spider_state_list)
db_cursor_connect_update(update_up_spider_state_list, site)
update_month_spider_state = f"update workflow_progress set spider_state=2,spider_int={spider_int} WHERE site_name='{site}' and date_type='month' and date_info='{year_month}' and page='ASIN详情'"
print(update_month_spider_state)
db_cursor_connect(cursor_us, connect_us, update_month_spider_state)
db_cursor_connect_update(update_month_spider_state, site)
DolphinschedulerHelper.start_process_instance_common(
project_name="big_data_selection",
process_df_name='ALL-实时-插件+流量选品',
......@@ -95,7 +119,6 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
except LockError:
print("获取锁失败")
except Exception as e:
connect_us.rollback()
print(f"错误: {e}")
finally:
if lock.locked():
......@@ -104,60 +127,32 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
while True:
try:
sql_read = f"select id from workflow_progress where date_info='{year_month}' and date_type='month' and page='ASIN详情' and site_name='{site}' and kafka_flow_state=3 and spider_state=2"
print(sql_read)
cursor_us.execute(sql_read)
report_date_list = cursor_us.fetchone()
print(sql_read, '232323')
df_report_date = db_cursor_connect_msyql_read(sql_read)
if n > 20:
up_state_sql = f"update workflow_progress set up_spider_state=1 where date_info='{year_week}' and date_type='week' and site_name='{site}' and page='ASIN详情'"
db_cursor_connect(cursor_us, connect_us, up_state_sql)
print('up_state_sql::', up_state_sql)
db_cursor_connect_update(up_state_sql, site)
return False
if report_date_list:
if df_report_date.id[0]:
return True
else:
n += 1
time.sleep(120)
except:
time.sleep(10)
db_class = connect_db(site)
cursor_us, connect_us = db_class.us_mysql_db() # us站点
if num == 1:
if site in ['fr', 'es', 'it']:
print(f' {site} _all_syn_st 完成')
sql_select_ = f"select status_val from workflow_progress where date_info='{year_week}' and date_type='week' and page='ASIN详情' and site_name='{site}'"
print(sql_select_)
cursor_us.execute(sql_select_)
# 获取结果
status_dict = cursor_us.fetchone()
print(status_dict)
if int(status_dict[0]) in (1, 2):
update_workflow_progress = f"update workflow_progress set status_val=3,status='ASIN爬取完成' where page='ASIN详情' and date_info='{year_week}' and site_name='{site}' and date_type='week'"
db_cursor_connect(cursor_us, connect_us, update_workflow_progress)
DolphinschedulerHelper.start_process_instance_common(
project_name="big_data_selection",
process_df_name='ALL-asin历史详情消费',
startParams={
"site_name": f"{site}",
"date_type": "week",
"date_info": f'2025-{week}'
}
)
print('发送消息通知')
account = 'pengyanbing,chenyuanjie,hezhe,wangrui4,fangxingjun,zhouyuchen'
title = site + '站点 asin 详情'
content = str(week) + ' 周 asin 详情 已结束,请确认下一步流程!时间:' + time_strftime
db_class.send_mg(account, title, content)
else:
sql_select_ = f"select status_val from workflow_progress where date_info='{year_week}' and date_type='week' and page='ASIN详情' and site_name='{site}'"
print(sql_select_)
cursor_us.execute(sql_select_)
# 获取结果
status_dict = cursor_us.fetchone()
print('查询kafka是否开启', status_dict)
if int(status_dict[0]) in (1, 2):
df_status_dict = db_cursor_connect_msyql_read(sql_select_)
print('查询kafka是否开启', df_status_dict)
if int(df_status_dict.status_val[0]) in (1, 2):
update_workflow_progress = f"update workflow_progress set status_val=3,status='ASIN爬取完成',up_spider_state=3 where page='ASIN详情' and date_info='{year_week}' and site_name='{site}' and date_type='week'"
print(update_workflow_progress)
db_cursor_connect(cursor_us, connect_us, update_workflow_progress)
db_cursor_connect_update(update_workflow_progress, site)
ii = 0
for i in range(10):
time.sleep(180)
......@@ -167,9 +162,9 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
# SELECT * from workflow_progress WHERE site_name='us' and page='asin详情' and date_type='month' and status_val=1 and status='月ASIN导出完成 and date_info='
update_month_asin_state = f"update workflow_progress set status_val=3,status='月ASIN抓取完成' WHERE site_name='{site}' and page='asin详情' and date_type='month' and status_val=1 and status='月ASIN导出完成' and date_info='{year_month}'"
print(update_month_asin_state)
db_cursor_connect(cursor_us, connect_us, update_month_asin_state)
db_cursor_connect_update(update_month_asin_state, site)
update_month_spider_state = f"update workflow_progress set kafka_flow_state=1,spider_state=3,spider_int={spider_int} WHERE site_name='{site}' and date_type='month' and date_info='{year_month}' and page='ASIN详情'"
db_cursor_connect(cursor_us, connect_us, update_month_spider_state)
db_cursor_connect_update(update_month_spider_state, site)
DolphinschedulerHelper.start_process_instance_common(
project_name="big_data_selection",
process_df_name='ALL站点-启动30day/月流程',
......@@ -186,29 +181,22 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
content = str(month) + ' 月 asin 详情 已结束,请确认下一步流程!时间:' + time_strftime
db_class.send_mg(account, title, content)
if num == 2:
if site in ['fr', 'es', 'it']:
sql_read = f"SELECT status_val FROM workflow_progress where page='{page}' and date_info='{year_week}' and site_name='{site}' and date_type='week'"
else:
sql_read = f"SELECT status_val FROM workflow_progress where page='{page}' and date_info='{year_month}' and site_name='{site}' and date_type='month'"
print(sql_read)
cursor_us.execute(sql_read)
status_val_tuple = cursor_us.fetchone()
print(status_val_tuple)
if status_val_tuple[0] == 3:
df_status_val_tuple = db_cursor_connect_msyql_read(sql_read)
if df_status_val_tuple.status_val[0] == 3:
print(site, page, '已完成')
return False
else:
print(f'开始 {page}')
return True
if num == 3:
if site in ['fr', 'es', 'it']:
sql_read = f'SELECT id FROM {site}_all_syn_st_2025_{week} where state in (1,2) LIMIT 1'
else:
for i in range(5):
try:
sql_read = f'SELECT id FROM {site}_all_syn_st_month_2025_{month} where state in (1,2) LIMIT 1'
print("sql_read:", sql_read)
df = pd.read_sql(sql_read, con=engine_pg)
id_ = list(df.id)
if len(id_):
df = db_cursor_connect_pg_read(site,sql_read)
if not df.empty:
id_tuple = [1]
else:
id_tuple = None
......@@ -220,17 +208,15 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
return False
else:
return True
cursor_mysql_db.close()
connect_mysql_db.close()
cursor_us.close()
connect_us.close()
if __name__ == '__main__':
DolphinschedulerHelper.start_process_instance_common(
project_name="big_data_selection",
process_df_name='ALL站点-启动30day/月流程',
startParams={
"site_name": "us",
"date_type": "month",
"date_info": '2025-06'
}
)
\ No newline at end of file
except Exception as e:
print('查询状态 1 2 报错',e)
# if __name__ == '__main__':
# DolphinschedulerHelper.start_process_instance_common(
# project_name="big_data_selection",
# process_df_name='ALL站点-启动30day/月流程',
# startParams={
# "site_name": "us",
# "date_type": "month",
# "date_info": '2025-07'
# }
# )
\ No newline at end of file
......@@ -27,7 +27,9 @@ db_type_alias_map = {
}
DEFAULT_SERVERS = [
# "http://192.168.10.217:7777", # 内网
# "http://192.168.200.210:7777", # 内网
# "http://192.168.10.217:7777", # 内网-h7
# "http://61.145.136.61:7777", # 外网
"http://61.145.136.61:7779", # 外网
]
# ---------------------------
......@@ -121,7 +123,7 @@ class RemoteTransaction:
json={"db": self.db,
"sql_list": self.sql_queue,
"site_name": self.database}, # site_name not needed on server, kept for clarity
timeout=15,
timeout=3000,
).raise_for_status()
return
except Exception as e:
......@@ -146,7 +148,7 @@ class RemoteEngine:
r = self.session.post(f"{url}/{endpoint}",
data=json_bytes,
headers={"Content-Type": "application/json"},
timeout=60)
timeout=3000)
# r = self.session.post(f"{url}/{endpoint}",
# json=payload, timeout=10)
......
......@@ -657,7 +657,7 @@ class ConnectSpider:
account_id = df_status.account_id.iloc[0]
account_secret = df_status.account_secret.iloc[0]
account_list = [account_id, account_secret]
# print(account_list)
print(account_list,'232323====32')
# print(111111111111)
connection.close()
return account_list
......
......@@ -563,9 +563,9 @@ class GetSS_details():
def run(self):
day = time.strftime("%d")
for item_id in range(1, 33):
print(f"item_id: {item_id}")
print(f"开始抓取 item_id: {item_id}")
if item_id == 1 and int(day)<2:
Con.update_all_states_to_1(state=1)
Con.update_all_states_to_1(state=2)
wait_time = random.uniform(6, 10)
account_list = Con.get_cookie_account(item_id)
......@@ -604,7 +604,8 @@ class GetSS_details():
logging.error(f'发生错误: {e}, 停止循环')
break
if count == counts_last - 1:
print(f'{self.account} 全部爬取完成')
print(f'{self.account} 全部爬取完成1122==')
Con.update_all_states_to_1(state=3, item_id=item_id)
if stop_flag:
print('超过重试次数,暂停')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment