Commit e521edcf by fangxingjun

no message

parent cc57a012
...@@ -10,6 +10,7 @@ sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 ...@@ -10,6 +10,7 @@ sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
# from ..utils.templates import Templates # from ..utils.templates import Templates
from utils.db_util import DbTypes, DBUtil from utils.db_util import DbTypes, DBUtil
from urllib.parse import quote from urllib.parse import quote
from listen_program.wf_month_control import wf_month_control
class ImportStToPg14(object): class ImportStToPg14(object):
...@@ -46,6 +47,7 @@ class ImportStToPg14(object): ...@@ -46,6 +47,7 @@ class ImportStToPg14(object):
sql = f"select `year_month` from selection.date_20_to_30 WHERE year_week='{self.date_info}' and week_day=1;" sql = f"select `year_month` from selection.date_20_to_30 WHERE year_week='{self.date_info}' and week_day=1;"
df = pd.read_sql(sql, con=self.engine_mysql) df = pd.read_sql(sql, con=self.engine_mysql)
self.date_info = list(df.year_month)[0] self.date_info = list(df.year_month)[0]
self.date_type = "month"
def delete_dirty_data(self): def delete_dirty_data(self):
print(f"删除脏数据, 防止失败执行时报错") print(f"删除脏数据, 防止失败执行时报错")
...@@ -118,8 +120,6 @@ class ImportStToPg14(object): ...@@ -118,8 +120,6 @@ class ImportStToPg14(object):
return df_search_term return df_search_term
def save_data(self): def save_data(self):
print(f"存储{self.site_name}_search_term_month: {self.df_save.shape}") print(f"存储{self.site_name}_search_term_month: {self.df_save.shape}")
self.df_save.to_sql(f"{self.site_name}_search_term_month", con=self.engine_pg14, index=False, self.df_save.to_sql(f"{self.site_name}_search_term_month", con=self.engine_pg14, index=False,
if_exists="append") if_exists="append")
...@@ -134,7 +134,9 @@ class ImportStToPg14(object): ...@@ -134,7 +134,9 @@ class ImportStToPg14(object):
print(f"当前没有新增的搜索词同步, 不需要更改进度表, 退出程序") print(f"当前没有新增的搜索词同步, 不需要更改进度表, 退出程序")
quit() quit()
# 更改workflow_manager进度表 # 更改workflow_manager进度表
self.update_workflow_manager() # self.update_workflow_manager() # 更改到单独的命令执行
wf_month_control(site_name=self.site_name, date_type=self.date_type, date_info=self.date_info,
spider_name=f'{self.site_name}_spider_st', wf_type="spider")
with self.engine_pg14.begin() as conn: with self.engine_pg14.begin() as conn:
# sql_delete = f"delete from {self.site_name}_search_term_month_syn where (date_info='{self.date_info}' and state=1) or (date_info<'{self.date_info}');" # sql_delete = f"delete from {self.site_name}_search_term_month_syn where (date_info='{self.date_info}' and state=1) or (date_info<'{self.date_info}');"
...@@ -170,49 +172,49 @@ class ImportStToPg14(object): ...@@ -170,49 +172,49 @@ class ImportStToPg14(object):
] ]
return [[search_term, url] for url in urls] return [[search_term, url] for url in urls]
def update_workflow_manager(self): # def update_workflow_manager(self):
with self.engine_mysql.begin() as conn: # with self.engine_mysql.begin() as conn:
priority = self.site_name_pri_dict[self.site_name] # priority = self.site_name_pri_dict[self.site_name]
spider_script = f'ansible dabing_all -f 10 -m shell -a "nohup /usr/local/bin/python3 /mnt/py_spider/threading_spider/Poll_site_search_term_month.py {self.site_name} {self.date_info} >/dev/null 2>&1 &";' # spider_script = f'ansible dabing_all -f 10 -m shell -a "nohup /usr/local/bin/python3 /mnt/py_spider/threading_spider/Poll_site_search_term_month.py {self.site_name} {self.date_info} >/dev/null 2>&1 &";'
update_sql_workflow = f""" # update_sql_workflow = f"""
INSERT INTO selection.workflow_manager # INSERT INTO selection.workflow_manager
( # (
workflow_name, # workflow_name,
site_name, # site_name,
date_type, # date_type,
date_info, # date_info,
priority, # priority,
spider_name, # spider_name,
spider_script, # spider_script,
spider_is_ready, # spider_is_ready,
spider_state, # spider_state,
bg_name, # bg_name,
bg_dol_name, # bg_dol_name,
bg_dol_state # bg_dol_state
) # )
VALUES # VALUES
( # (
'月全流程', # '月全流程',
'{self.site_name}', # '{self.site_name}',
'month', # 'month',
'{self.date_info}', # '{self.date_info}',
{priority}, # {priority},
'{self.site_name}_spider_st', # '{self.site_name}_spider_st',
'{spider_script}', # '{spider_script}',
'yes', # 'yes',
1, # 1,
'{self.site_name}_asin_export', # '{self.site_name}_asin_export',
'ALL站点-asin同步-pg-api', # 'ALL站点-asin同步-pg-api',
1 # 1
) # )
ON DUPLICATE KEY UPDATE # ON DUPLICATE KEY UPDATE
spider_is_ready = VALUES(spider_is_ready), # spider_is_ready = VALUES(spider_is_ready),
spider_script = VALUES(spider_script), # spider_script = VALUES(spider_script),
bg_dol_state = VALUES(bg_dol_state), # bg_dol_state = VALUES(bg_dol_state),
spider_state = VALUES(spider_state); # spider_state = VALUES(spider_state);
""" # """
print(f"workflow_manager进度表---重置爬虫的搜索词抓取进度: {update_sql_workflow}") # print(f"workflow_manager进度表---重置爬虫的搜索词抓取进度: {update_sql_workflow}")
conn.execute(update_sql_workflow) # conn.execute(update_sql_workflow)
def run(self, num=0): def run(self, num=0):
while num <= 5: while num <= 5:
......
...@@ -10,74 +10,74 @@ sys.path.append(os.path.dirname(sys.path[0])) ...@@ -10,74 +10,74 @@ sys.path.append(os.path.dirname(sys.path[0]))
from utils.secure_db_client import get_remote_engine from utils.secure_db_client import get_remote_engine
def update_workflow_manager(site_name, date_type, date_info): # def update_workflow_manager(site_name, date_type, date_info):
print(f"当前执行的参数: {site_name, date_type, date_info}") # print(f"当前执行的参数: {site_name, date_type, date_info}")
if date_type == "month": # if date_type == "month":
while True: # while True:
try: # try:
site_name_pri_dict = { # site_name_pri_dict = {
"us": 2, # "us": 2,
"uk": 4, # "uk": 4,
"de": 6, # "de": 6,
} # }
engine_mysql = get_remote_engine( # engine_mysql = get_remote_engine(
site_name='us', # site_name='us',
db_type='mysql' # db_type='mysql'
) # )
spider_script = f'ansible dabing_all -f 10 -m shell -a "nohup /usr/local/bin/python3 /mnt/py_spider/threading_spider/Poll_site_spider_month.py {site_name} {date_type} {date_info} >/dev/null 2>&1 &";' # spider_script = f'ansible dabing_all -f 10 -m shell -a "nohup /usr/local/bin/python3 /mnt/py_spider/threading_spider/Poll_site_spider_month.py {site_name} {date_type} {date_info} >/dev/null 2>&1 &";'
priority = site_name_pri_dict[site_name] # priority = site_name_pri_dict[site_name]
update_sql_workflow_spider = f""" # update_sql_workflow_spider = f"""
INSERT INTO selection.workflow_manager # INSERT INTO selection.workflow_manager
( # (
workflow_name, # workflow_name,
site_name, # site_name,
date_type, # date_type,
date_info, # date_info,
priority, # priority,
spider_name, # spider_name,
spider_script, # spider_script,
spider_is_ready, # spider_is_ready,
spider_state, # spider_state,
bg_name, # bg_name,
bg_dol_name, # bg_dol_name,
bg_dol_state, # bg_dol_state,
finished_count # finished_count
) # )
VALUES # VALUES
( # (
'月全流程', # '月全流程',
'{site_name}', # '{site_name}',
'month', # 'month',
'{date_info}', # '{date_info}',
{priority}, # {priority},
'{site_name}_spider_asin', # '{site_name}_spider_asin',
'{spider_script}', # '{spider_script}',
'yes', # 'yes',
1, # 1,
'{site_name}_all_cal', # '{site_name}_all_cal',
'ALL-月流程-ABA+反查+流量选品', # 'ALL-月流程-ABA+反查+流量选品',
1, # 1,
0 # 0
) # )
ON DUPLICATE KEY UPDATE # ON DUPLICATE KEY UPDATE
spider_is_ready = VALUES(spider_is_ready), # spider_is_ready = VALUES(spider_is_ready),
spider_script = VALUES(spider_script), # spider_script = VALUES(spider_script),
spider_state = VALUES(spider_state), # spider_state = VALUES(spider_state),
bg_dol_name = VALUES(bg_dol_name); # bg_dol_name = VALUES(bg_dol_name);
""" # """
print(f"workflow_manager进度表---重置爬虫的asin抓取进度: {update_sql_workflow_spider}") # print(f"workflow_manager进度表---重置爬虫的asin抓取进度: {update_sql_workflow_spider}")
engine_mysql.execute(update_sql_workflow_spider) # engine_mysql.execute(update_sql_workflow_spider)
#
update_sql_workflow_bg = f"""update selection.workflow_manager # update_sql_workflow_bg = f"""update selection.workflow_manager
set bg_dol_state=3, finished_count=COALESCE(finished_count, 0) + 1 # set bg_dol_state=3, finished_count=COALESCE(finished_count, 0) + 1
WHERE workflow_name='月全流程' and site_name='{site_name}' and date_type='{date_type}' and date_info='{date_info}' and priority={priority}""" # WHERE workflow_name='月全流程' and site_name='{site_name}' and date_type='{date_type}' and date_info='{date_info}' and priority={priority}"""
print(f"workflow_manager进度表---更新asin导出进度和完成次数: {update_sql_workflow_bg}") # print(f"workflow_manager进度表---更新asin导出进度和完成次数: {update_sql_workflow_bg}")
engine_mysql.execute(update_sql_workflow_bg) # engine_mysql.execute(update_sql_workflow_bg)
break # break
except Exception as e: # except Exception as e:
print(f"失败:workflow_manager进度表---重置爬虫的asin抓取进度: {update_sql_workflow_spider}, {e}, 报错信息: {traceback.format_exc()}") # print(f"失败:workflow_manager进度表---重置爬虫的asin抓取进度: {update_sql_workflow_spider}, {e}, 报错信息: {traceback.format_exc()}")
time.sleep(300) # time.sleep(300)
continue # continue
def export_data(site_name, date_type, date_info): def export_data(site_name, date_type, date_info):
...@@ -163,4 +163,4 @@ if __name__ == '__main__': ...@@ -163,4 +163,4 @@ if __name__ == '__main__':
export_data(site_name, date_type, date_info) export_data(site_name, date_type, date_info)
if site_name in ["us", "uk", "de"] and date_type == "month": if site_name in ["us", "uk", "de"] and date_type == "month":
get_minid_maxid(site_name, date_type, date_info) get_minid_maxid(site_name, date_type, date_info)
update_workflow_manager(site_name, date_type, date_info) # update_workflow_manager(site_name, date_type, date_info)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment