site_search_term_to_syn.py 5.34 KB
import pandas as pd

from urllib.parse import quote
from utils.common_util import CommonUtil
from utils.db_util import DBUtil


class Integrate_search_term():

    def __init__(self, site_name=None, week=None, month=None, date_info=None):
        if int(month) < 10:
            month = '0' + str(month)
        if int(week) < 10:
            week = '0' + str(week)
        self.site_name = site_name  # 站点
        self.week = week
        self.month = month
        self.date_info = date_info
        if site_name == "us":
            self.site_url = 'https://www.amazon.com/'
        elif site_name == 'uk':
            self.site_url = 'https://www.amazon.co.uk/'
        elif site_name == 'de':
            self.site_url = 'https://www.amazon.de/'
        elif site_name == 'fr':
            self.site_url = 'https://www.amazon.fr/'
        elif site_name == 'es':
            self.site_url = 'https://www.amazon.es/'
        elif site_name == 'it':
            self.site_url = 'https://www.amazon.it/'
        self.engine_pg = DBUtil.get_db_engine("postgresql_14", site_name)
        self.date_info_pre = self.get_pre_week(date_info)

    def search_term_syn(self):
        # 初始化一个空的 DataFrame
        result_list = []
        # 根据不同的 site_name 获取数据并合并
        if self.site_name == 'us':
            pass
        else:
            query = f"SELECT search_term FROM {self.site_name}_search_term WHERE week={self.week} and state in (1,2)"
            print(query)
            result_df = self.get_data_from_database(self.engine_pg, query)
            result_df.drop_duplicates(['search_term'], inplace=True)
            print(result_df.shape)
            # 对每个搜索关键词生成 URL 并添加到结果列表
            for search_term in result_df['search_term']:
                urls = self.build_urls(search_term)
                result_list.extend(urls)

        # 创建初始 DataFrame
        df_search_term = pd.DataFrame(data=result_list, columns=['search_term', 'url'])
        print(df_search_term.shape)
        # 找出超过 450 字符长度的 URL 行的索引
        long_url_rows = df_search_term['url'].str.len() <= 450
        # 筛选保留不超过 450 字符长度的 URL 行
        data_df = df_search_term[long_url_rows]
        if self.site_name == 'us':
            data_df['month'] = f'{self.month}'
            data_df['date_info'] = self.date_info
        else:
            data_df['week'] = f'{self.week}'
            data_df['date_info'] = self.date_info

        print(data_df)
        print(data_df.shape)

        if self.site_name == 'us':
            pass
        else:
            with self.engine_pg.begin() as conn:
                delete_sql = f"DELETE from {self.site_name}_search_term_syn where date_info<'{self.date_info_pre}' and state not in (1,2)"
                print(delete_sql)
                conn.execute(delete_sql)
            data_df.to_sql(f'{self.site_name}_search_term_syn', con=self.engine_pg, if_exists="append",
                           index=False)

    # 从数据库获取数据的函数
    def get_data_from_database(self, connection, query):
        return pd.read_sql(query, connection)

    # 构建 URL 的函数
    def build_urls(self, search_term):
        url_template = f"{self.site_url}s?k={{search_term}}&page={{page_number}}"
        search_term_chinese = quote(search_term, 'utf-8')
        search_term_chinese = search_term_chinese.replace("'", '%27').replace("/", '%2F')
        urls = [
            url_template.format(
                search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
                                                                                                                  '%28').replace(
                    ')', '%29'), page_number=1),
            url_template.format(
                search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
                                                                                                                  '%28').replace(
                    ')', '%29'), page_number=2),
            url_template.format(
                search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
                                                                                                                  '%28').replace(
                    ')', '%29'), page_number=3)
        ]
        return [[search_term, url] for url in urls]

    def get_pre_week(self, date_info):
        engine = DBUtil.get_db_engine("mysql", "us")
        with engine.connect() as connection:
            sql = f"""
            select year_week
            from date_20_to_30
            where year_week < '{date_info}'
            order by year_week desc
            limit 1  """
            result = connection.execute(sql)
            pre_week = result.cursor.fetchone()[0]
        return pre_week


if __name__ == '__main__':
    date_info = CommonUtil.get_sys_arg(1, None)
    site_name = CommonUtil.get_sys_arg(2, None)
    assert date_info is not None, "date_info 不能为空!"
    assert site_name is not None, "site_name 不能为空!"
    year, week = CommonUtil.split_month_week_date("week", date_info)
    obj = Integrate_search_term(site_name=site_name, week=week, month=99, date_info=date_info)
    obj.search_term_syn()