Commit b5fad217 by fangxingjun

no message

parent 66d45910
......@@ -69,7 +69,7 @@ class ImgHdfsIndex(Templates):
with self.engine_doris.begin() as conn:
sql_truncate = f"truncate {self.db_save}"
print(f"sql_truncate: {sql_truncate}")
conn.execut(sql_truncate)
conn.execute(sql_truncate)
self.df_save.to_sql(self.db_save, con=self.engine_doris, if_exists="append", index=False)
def run(self):
......
......@@ -3,10 +3,12 @@ import os
import time
import pandas as pd
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
# from utils.templates import Templates
# from ..utils.templates import Templates
from utils.db_util import DbTypes, DBUtil
from urllib.parse import quote
class ImportStToPg14(object):
......@@ -23,6 +25,19 @@ class ImportStToPg14(object):
self.df_save = pd.DataFrame()
# self.fetch_year_month_by_week() # 如果传的date_type='week', 将date_info转换成月的值
self.year, self.month = self.date_info.split("-")[0], int(self.date_info.split("-")[1])
self.site_name_url_dict = {
"us": 'https://www.amazon.com/',
"uk": 'https://www.amazon.co.uk/',
"de": 'https://www.amazon.de/',
"es": 'https://www.amazon.es/',
"fr": 'https://www.amazon.fr/',
"it": 'https://www.amazon.it/',
}
self.site_name_pri_dict = {
"us": 1,
"uk": 3,
"de": 5,
}
def fetch_year_month_by_week(self):
if self.date_type == 'week':
......@@ -64,17 +79,128 @@ class ImportStToPg14(object):
df_count_after = self.df_save.shape[0]
print(f"df_count_before:{df_count_before}, df_count_after:{df_count_after}")
def handle_data_sync(self):
result_list = []
for search_term in self.df_save.search_term:
urls = self.build_urls(search_term)
result_list.extend(urls)
# 创建初始 DataFrame
df_search_term = pd.DataFrame(data=result_list, columns=['search_term', 'url'])
print(df_search_term.shape)
before_count = len(df_search_term)
df_search_term = df_search_term.loc[
df_search_term["search_term"].fillna("").astype(str).str.len() <= 450
].copy()
after_count = len(df_search_term)
print(f"search_term长度过滤:过滤前 {before_count},过滤后 {after_count},过滤掉 {before_count - after_count}")
df_search_term['month'] = self.month
df_search_term['date_info'] = self.date_info
return df_search_term
def save_data(self):
from datetime import datetime
today_str = datetime.now().strftime("%Y-%m-%d")
with self.engine_pg14.begin() as conn:
sql_delete = f"delete from {self.site_name}_search_term_month where date_info='{self.date_info}' and state=1;"
print(f"sql_delete:", sql_delete)
sql_delete = f"delete from {self.site_name}_search_term_month where (date_info='{self.date_info}' and state=1 and updated_time>='{today_str} 00:00:00') or (date_info<'{self.date_info}');"
print(f"sql_delete--{self.site_name}_search_term_month:", sql_delete)
conn.execute(sql_delete)
self.df_save.to_sql(f"{self.site_name}_search_term_month", con=self.engine_pg14, index=False, if_exists="append")
sql_delete = f"delete from {self.site_name}_search_term_month_syn where (date_info='{self.date_info}' and state=1 and updated_time>='{today_str} 00:00:00') or (date_info<'{self.date_info}');"
print(f"sql_delete--{self.site_name}_search_term_month_syn:", sql_delete)
conn.execute(sql_delete)
print(f"存储{self.site_name}_search_term_month: {self.df_save.shape}")
self.df_save.to_sql(f"{self.site_name}_search_term_month", con=self.engine_pg14, index=False,
if_exists="append")
# pass
df_search_term_sync = self.handle_data_sync()
print(f"存储{self.site_name}_search_term_month_syn: {df_search_term_sync.shape}")
df_search_term_sync.to_sql(f"{self.site_name}_search_term_month_syn", con=self.engine_pg14, index=False,
if_exists="append")
# 更改workflow_manager进度表
self.update_workflow_manager()
with self.engine_pg14.begin() as conn:
# sql_delete = f"delete from {self.site_name}_search_term_month_syn where (date_info='{self.date_info}' and state=1) or (date_info<'{self.date_info}');"
# print(f"sql_delete--{self.site_name}_search_term_month_syn:", sql_delete)
# conn.execute(sql_delete)
sql_update = f"update {self.site_name}_search_term_month set state=3 where date_info='{self.date_info}' and state=1"
print(f"sql_update--{self.site_name}_search_term_month:", sql_update)
conn.execute(sql_update)
# 构建 URL 的函数
def build_urls(self, search_term):
site_name_url = self.site_name_url_dict[self.site_name]
url_template = f"{site_name_url}s?k={{search_term}}&page={{page_number}}"
search_term_chinese = quote(search_term, 'utf-8')
search_term_chinese = search_term_chinese.replace("'", '%27').replace("/", '%2F')
urls = [
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace(
'(',
'%28').replace(
')', '%29'), page_number=1),
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace(
'(',
'%28').replace(
')', '%29'), page_number=2),
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace(
'(',
'%28').replace(
')', '%29'), page_number=3)
]
return [[search_term, url] for url in urls]
def update_workflow_manager(self):
with self.engine_mysql.begin() as conn:
priority = self.site_name_pri_dict[self.site_name]
update_sql_workflow = f"""
INSERT INTO workflow_manager
(
workflow_name,
site_name,
date_type,
date_info,
priority,
spider_name,
spider_is_ready,
spider_state,
bg_name,
bg_dol_state
)
VALUES
(
'月全流程',
'{self.site_name}',
'month',
'{self.date_info}',
{priority},
'us_spider_st',
'yes',
1,
'us_asin_export',
1
)
ON DUPLICATE KEY UPDATE
spider_is_ready = VALUES(spider_is_ready),
spider_state = VALUES(spider_state);
"""
print(f"workflow_manager进度表---重置爬虫的搜索词抓取进度: {update_sql_workflow}")
conn.execute(update_sql_workflow)
def run(self, num=0):
while num<=3:
while num <= 5:
try:
self.read_data()
self.handle_data()
......@@ -94,4 +220,7 @@ if __name__ == '__main__':
date_type = sys.argv[2] # 参数2:类型:month
date_info = sys.argv[3] # 参数3:月对应的值: 2024-01
handle_obj = ImportStToPg14(site_name=site_name, date_type=date_type, date_info=date_info)
if site_name in ['us', 'uk', 'de']:
handle_obj.run()
else:
print(f"小站点不需要同步爬虫表, 当前站点: {site_name}")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment