site_search_term_to_syn.py 5.34 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
import pandas as pd

from urllib.parse import quote
from utils.common_util import CommonUtil
from utils.db_util import DBUtil


class Integrate_search_term():

    def __init__(self, site_name=None, week=None, month=None, date_info=None):
        if int(month) < 10:
            month = '0' + str(month)
        if int(week) < 10:
            week = '0' + str(week)
        self.site_name = site_name  # 站点
        self.week = week
        self.month = month
        self.date_info = date_info
        if site_name == "us":
            self.site_url = 'https://www.amazon.com/'
        elif site_name == 'uk':
            self.site_url = 'https://www.amazon.co.uk/'
        elif site_name == 'de':
            self.site_url = 'https://www.amazon.de/'
        elif site_name == 'fr':
            self.site_url = 'https://www.amazon.fr/'
        elif site_name == 'es':
            self.site_url = 'https://www.amazon.es/'
        elif site_name == 'it':
            self.site_url = 'https://www.amazon.it/'
        self.engine_pg = DBUtil.get_db_engine("postgresql_14", site_name)
        self.date_info_pre = self.get_pre_week(date_info)

    def search_term_syn(self):
        # 初始化一个空的 DataFrame
        result_list = []
        # 根据不同的 site_name 获取数据并合并
        if self.site_name == 'us':
            pass
        else:
            query = f"SELECT search_term FROM {self.site_name}_search_term WHERE week={self.week} and state in (1,2)"
            print(query)
            result_df = self.get_data_from_database(self.engine_pg, query)
            result_df.drop_duplicates(['search_term'], inplace=True)
            print(result_df.shape)
            # 对每个搜索关键词生成 URL 并添加到结果列表
            for search_term in result_df['search_term']:
                urls = self.build_urls(search_term)
                result_list.extend(urls)

        # 创建初始 DataFrame
        df_search_term = pd.DataFrame(data=result_list, columns=['search_term', 'url'])
        print(df_search_term.shape)
        # 找出超过 450 字符长度的 URL 行的索引
        long_url_rows = df_search_term['url'].str.len() <= 450
        # 筛选保留不超过 450 字符长度的 URL 行
        data_df = df_search_term[long_url_rows]
        if self.site_name == 'us':
            data_df['month'] = f'{self.month}'
            data_df['date_info'] = self.date_info
        else:
            data_df['week'] = f'{self.week}'
            data_df['date_info'] = self.date_info

        print(data_df)
        print(data_df.shape)

        if self.site_name == 'us':
            pass
        else:
            with self.engine_pg.begin() as conn:
                delete_sql = f"DELETE from {self.site_name}_search_term_syn where date_info<'{self.date_info_pre}' and state not in (1,2)"
                print(delete_sql)
                conn.execute(delete_sql)
            data_df.to_sql(f'{self.site_name}_search_term_syn', con=self.engine_pg, if_exists="append",
                           index=False)

    # 从数据库获取数据的函数
    def get_data_from_database(self, connection, query):
        return pd.read_sql(query, connection)

    # 构建 URL 的函数
    def build_urls(self, search_term):
        url_template = f"{self.site_url}s?k={{search_term}}&page={{page_number}}"
        search_term_chinese = quote(search_term, 'utf-8')
        search_term_chinese = search_term_chinese.replace("'", '%27').replace("/", '%2F')
        urls = [
            url_template.format(
                search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
                                                                                                                  '%28').replace(
                    ')', '%29'), page_number=1),
            url_template.format(
                search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
                                                                                                                  '%28').replace(
                    ')', '%29'), page_number=2),
            url_template.format(
                search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
                                                                                                                  '%28').replace(
                    ')', '%29'), page_number=3)
        ]
        return [[search_term, url] for url in urls]

    def get_pre_week(self, date_info):
        engine = DBUtil.get_db_engine("mysql", "us")
        with engine.connect() as connection:
            sql = f"""
            select year_week
            from date_20_to_30
            where year_week < '{date_info}'
            order by year_week desc
            limit 1  """
            result = connection.execute(sql)
            pre_week = result.cursor.fetchone()[0]
        return pre_week


if __name__ == '__main__':
    date_info = CommonUtil.get_sys_arg(1, None)
    site_name = CommonUtil.get_sys_arg(2, None)
    assert date_info is not None, "date_info 不能为空!"
    assert site_name is not None, "site_name 不能为空!"
    year, week = CommonUtil.split_month_week_date("week", date_info)
    obj = Integrate_search_term(site_name=site_name, week=week, month=99, date_info=date_info)
    obj.search_term_syn()