import pandas as pd from urllib.parse import quote from utils.common_util import CommonUtil from utils.db_util import DBUtil class Integrate_search_term(): def __init__(self, site_name=None, week=None, month=None, date_info=None): if int(month) < 10: month = '0' + str(month) if int(week) < 10: week = '0' + str(week) self.site_name = site_name # 站点 self.week = week self.month = month self.date_info = date_info if site_name == "us": self.site_url = 'https://www.amazon.com/' elif site_name == 'uk': self.site_url = 'https://www.amazon.co.uk/' elif site_name == 'de': self.site_url = 'https://www.amazon.de/' elif site_name == 'fr': self.site_url = 'https://www.amazon.fr/' elif site_name == 'es': self.site_url = 'https://www.amazon.es/' elif site_name == 'it': self.site_url = 'https://www.amazon.it/' self.engine_pg = DBUtil.get_db_engine("postgresql_14", site_name) self.date_info_pre = self.get_pre_week(date_info) def search_term_syn(self): # 初始化一个空的 DataFrame result_list = [] # 根据不同的 site_name 获取数据并合并 if self.site_name == 'us': pass else: query = f"SELECT search_term FROM {self.site_name}_search_term WHERE week={self.week} and state in (1,2)" print(query) result_df = self.get_data_from_database(self.engine_pg, query) result_df.drop_duplicates(['search_term'], inplace=True) print(result_df.shape) # 对每个搜索关键词生成 URL 并添加到结果列表 for search_term in result_df['search_term']: urls = self.build_urls(search_term) result_list.extend(urls) # 创建初始 DataFrame df_search_term = pd.DataFrame(data=result_list, columns=['search_term', 'url']) print(df_search_term.shape) # 找出超过 450 字符长度的 URL 行的索引 long_url_rows = df_search_term['url'].str.len() <= 450 # 筛选保留不超过 450 字符长度的 URL 行 data_df = df_search_term[long_url_rows] if self.site_name == 'us': data_df['month'] = f'{self.month}' data_df['date_info'] = self.date_info else: data_df['week'] = f'{self.week}' data_df['date_info'] = self.date_info print(data_df) print(data_df.shape) if self.site_name == 'us': pass else: with self.engine_pg.begin() as conn: delete_sql = f"DELETE from {self.site_name}_search_term_syn where date_info<'{self.date_info_pre}' and state not in (1,2)" print(delete_sql) conn.execute(delete_sql) data_df.to_sql(f'{self.site_name}_search_term_syn', con=self.engine_pg, if_exists="append", index=False) # 从数据库获取数据的函数 def get_data_from_database(self, connection, query): return pd.read_sql(query, connection) # 构建 URL 的函数 def build_urls(self, search_term): url_template = f"{self.site_url}s?k={{search_term}}&page={{page_number}}" search_term_chinese = quote(search_term, 'utf-8') search_term_chinese = search_term_chinese.replace("'", '%27').replace("/", '%2F') urls = [ url_template.format( search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(', '%28').replace( ')', '%29'), page_number=1), url_template.format( search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(', '%28').replace( ')', '%29'), page_number=2), url_template.format( search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(', '%28').replace( ')', '%29'), page_number=3) ] return [[search_term, url] for url in urls] def get_pre_week(self, date_info): engine = DBUtil.get_db_engine("mysql", "us") with engine.connect() as connection: sql = f""" select year_week from date_20_to_30 where year_week < '{date_info}' order by year_week desc limit 1 """ result = connection.execute(sql) pre_week = result.cursor.fetchone()[0] return pre_week if __name__ == '__main__': date_info = CommonUtil.get_sys_arg(1, None) site_name = CommonUtil.get_sys_arg(2, None) assert date_info is not None, "date_info 不能为空!" assert site_name is not None, "site_name 不能为空!" year, week = CommonUtil.split_month_week_date("week", date_info) obj = Integrate_search_term(site_name=site_name, week=week, month=99, date_info=date_info) obj.search_term_syn()