Commit 68e0f97e by Peng

no message

parent 85922441
...@@ -3,7 +3,7 @@ import sys ...@@ -3,7 +3,7 @@ import sys
from datetime import datetime, timedelta from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from multiprocessing import Pool import multiprocessing
from amazon_spider.search_term_pg import search_temp_pg from amazon_spider.search_term_pg import search_temp_pg
from threading_spider.db_connectivity import connect_db from threading_spider.db_connectivity import connect_db
import time import time
...@@ -12,6 +12,7 @@ from amazon_spider.VPS_IP import pppoe_ip ...@@ -12,6 +12,7 @@ from amazon_spider.VPS_IP import pppoe_ip
from threading_spider.post_to_dolphin import DolphinschedulerHelper from threading_spider.post_to_dolphin import DolphinschedulerHelper
from utils.secure_db_client import get_remote_engine from utils.secure_db_client import get_remote_engine
from utils.db_connect import BaseUtils from utils.db_connect import BaseUtils
from amazon_params.params import DB_SEARCH_TERM_PARAMS_SPIDER
def db_engine(site_name, db_type): def db_engine(site_name, db_type):
""" """
...@@ -34,7 +35,7 @@ def db_engine(site_name, db_type): ...@@ -34,7 +35,7 @@ def db_engine(site_name, db_type):
def select_search_term_state(month, site): def select_search_term_state(month, site):
for i in range(5): for i in range(5):
try: try:
sql_read = f"SELECT id FROM {site}_search_term_month_syn where state in (1,2) and date_info='2025-{month}' LIMIT 1" sql_read = f"SELECT id FROM {site}_search_term_month_syn where state in (1,2) and date_info='2026-{month}' LIMIT 1"
engine_pg = db_engine(site, 'postgresql_14_outer') engine_pg = db_engine(site, 'postgresql_14_outer')
df = engine_pg.read_sql(sql_read) df = engine_pg.read_sql(sql_read)
if not df.empty: if not df.empty:
...@@ -63,7 +64,7 @@ def select_sate_mysql(site, num=None, month=None, week=None): ...@@ -63,7 +64,7 @@ def select_sate_mysql(site, num=None, month=None, week=None):
db_class = connect_db(site) db_class = connect_db(site)
print('month::', month) print('month::', month)
if num == 1: if num == 1:
sql_select_ = f"select status_val from workflow_progress where date_info='2025-{week}' and date_type='week' and page='反查搜索词' and site_name='{site}'" sql_select_ = f"select status_val from workflow_progress where date_info='2026-{week}' and date_type='week' and page='反查搜索词' and site_name='{site}'"
print(sql_select_) print(sql_select_)
engine_us_mysql = db_engine('us', 'mysql') engine_us_mysql = db_engine('us', 'mysql')
df = engine_us_mysql.read_sql(sql_select_) df = engine_us_mysql.read_sql(sql_select_)
...@@ -71,7 +72,7 @@ def select_sate_mysql(site, num=None, month=None, week=None): ...@@ -71,7 +72,7 @@ def select_sate_mysql(site, num=None, month=None, week=None):
redis_client = BaseUtils().redis_db() redis_client = BaseUtils().redis_db()
lock_key = "ALL站点-asin同步-pg-api_lock" lock_key = "ALL站点-asin同步-pg-api_lock"
lock = redis_client.lock(lock_key, timeout=15) # 10秒超时 lock = redis_client.lock(lock_key, timeout=15) # 10秒超时
update_workflow_progress = f"update workflow_progress set status_val=3,status='抓取结束' where page='反查搜索词' and date_info='2025-{week}' and site_name='{site}' and date_type='week'" update_workflow_progress = f"update workflow_progress set status_val=3,status='抓取结束' where page='反查搜索词' and date_info='2026-{week}' and site_name='{site}' and date_type='week'"
print('update_workflow_progress: 修改状态3 ', update_workflow_progress) print('update_workflow_progress: 修改状态3 ', update_workflow_progress)
db_cursor_connect_update(update_workflow_progress, site) db_cursor_connect_update(update_workflow_progress, site)
account = 'pengyanbing' account = 'pengyanbing'
...@@ -81,7 +82,7 @@ def select_sate_mysql(site, num=None, month=None, week=None): ...@@ -81,7 +82,7 @@ def select_sate_mysql(site, num=None, month=None, week=None):
ii = 0 ii = 0
for i in range(11): for i in range(11):
id_tuple = select_search_term_state(month, site) id_tuple = select_search_term_state(month, site)
time.sleep(180) time.sleep(200)
if id_tuple is None: if id_tuple is None:
ii += 1 ii += 1
if ii > 8: if ii > 8:
...@@ -94,7 +95,7 @@ def select_sate_mysql(site, num=None, month=None, week=None): ...@@ -94,7 +95,7 @@ def select_sate_mysql(site, num=None, month=None, week=None):
startParams={ startParams={
"site_name": f"{site}", "site_name": f"{site}",
"date_type": "month", "date_type": "month",
"date_info": f'2025-{month}' "date_info": f'2026-{month}'
} }
) )
account = 'pengyanbing,chenyuanjie,hezhe,wangrui4,fangxingjun,chenjianyun,zhouyuchen' account = 'pengyanbing,chenyuanjie,hezhe,wangrui4,fangxingjun,chenjianyun,zhouyuchen'
...@@ -120,15 +121,45 @@ def select_sate_mysql(site, num=None, month=None, week=None): ...@@ -120,15 +121,45 @@ def select_sate_mysql(site, num=None, month=None, week=None):
return True return True
def long_time_task(site, proxy_name, month): def read_data_from_pg(site, month, read_size=1500):
print("当前 抓取 站点 ", site) """主进程:走代理PG read_then_update(含FOR UPDATE)读取数据"""
spider_us = search_temp_pg(site_name=site, read_size=300, proxy_name=proxy_name, month=month) try:
spider_us.run_pol() engine_pg = db_engine(site, 'postgresql_14_outer')
date_info = f'2026-{month}'
db_search_term = site + DB_SEARCH_TERM_PARAMS_SPIDER["db_search_term"][2:] + '_month_syn'
sql_read = f"SELECT id, search_term, url FROM {db_search_term} where state=1 and date_info='{date_info}' LIMIT {read_size} for update"
print(sql_read)
df_read = engine_pg.read_then_update(
select_sql=sql_read,
update_table=db_search_term,
set_values={"state": 2},
where_keys=["id"],
)
if df_read.shape[0] > 0:
search_term_list = list(
df_read.id.astype("U") + '|-|' + df_read.search_term + '|-|' + df_read.url)
print(f'读取 {len(search_term_list)} 条,已更新state=2')
return search_term_list
else:
print('没有 state=1 的数据了')
return []
except Exception as e:
print(f'读取数据失败: {e}')
return []
def long_time_task(site, proxy_name, month, data_chunk):
"""子进程:用分配到的数据抓取"""
print(f"当前进程 {os.getpid()} 分配到 {len(data_chunk)} 条数据")
spider_us = search_temp_pg(site_name=site, read_size=100, proxy_name=proxy_name, month=month)
spider_us.run_pol_with_data(data_chunk)
if __name__ == '__main__': if __name__ == '__main__':
pppoe_ip() pppoe_ip()
site_list = ['us','de','uk'] site_list = ['us', 'de', 'uk']
#site_list = ['us']
num_processes = 5
month = int(sys.argv[1]) month = int(sys.argv[1])
week = int(sys.argv[2]) week = int(sys.argv[2])
proxy_name = None proxy_name = None
...@@ -140,27 +171,36 @@ if __name__ == '__main__': ...@@ -140,27 +171,36 @@ if __name__ == '__main__':
for site in site_list: for site in site_list:
while True: while True:
current_time = datetime.now() current_time = datetime.now()
# 计算后五分钟的时间
five_minutes_later = current_time + timedelta(minutes=1) five_minutes_later = current_time + timedelta(minutes=1)
print('后五分钟时间', five_minutes_later) print('后五分钟时间', five_minutes_later)
p = Pool(3)
for i in range(3): # 主进程统一读取数据
p.apply_async(long_time_task, args=(site, proxy_name, month)) all_data = read_data_from_pg(site, month, read_size=1500)
print('等待所有子进程运行完成') if not all_data:
# 执行该方法后不能继续添加新的Process print('没有数据,检查是否全部完成')
p.close() if select_sate_mysql(site, num=3, month=month, week=week) == False:
# 等待所有子进程执行完毕 break
p.join() else:
print('还有数据但读取失败,等待重试')
time.sleep(30)
continue
# 平均分配给子进程
chunks = [all_data[i::num_processes] for i in range(num_processes)]
processes = []
for idx in range(num_processes):
if chunks[idx]:
process = multiprocessing.Process(target=long_time_task, args=(site, proxy_name, month, chunks[idx]))
processes.append(process)
process.start()
for process in processes:
process.join()
print('所有进程运行完毕!') print('所有进程运行完毕!')
print('所有进程运行完毕!开始切换ip')
print('所有进程运行完毕!开始切换ip')
print('所有进程运行完毕!开始切换ip')
print('所有进程运行完毕!开始切换ip')
current_time = datetime.now() current_time = datetime.now()
if current_time > five_minutes_later: if current_time > five_minutes_later:
pppoe_ip() pppoe_ip()
else: else:
time.sleep(random.uniform(150, 220)) time.sleep(random.uniform(150, 220))
pppoe_ip() pppoe_ip()
if select_sate_mysql(site, num=3, month=month, week=week) == False:
break
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment