supplement_merchantwords_de.py 3.73 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.spark_util import SparkUtil
from pyspark.sql.functions import lit, col
from pyspark.sql.types import StringType, ArrayType
from urllib.parse import quote


if __name__ == '__main__':
    export_tb = "de_merchantwords_search_term_month_syn_2024"
    spark = SparkUtil.get_spark_session("MerchantwordsSupplement")

    # 构建 URL 的函数
    def build_urls(search_term):
        url_template = f"https://www.amazon.de/s?k={{search_term}}&page={{page_number}}"
        search_term_chinese = quote(search_term, 'utf-8')
        search_term_chinese = search_term_chinese.replace("'", '%27').replace("/", '%2F')
        urls = [
            url_template.format(
                search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
                                                                                                                  '%28').replace(
                    ')', '%29'), page_number=1),
            url_template.format(
                search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
                                                                                                                  '%28').replace(
                    ')', '%29'), page_number=2),
            url_template.format(
                search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
                                                                                                                  '%28').replace(
                    ')', '%29'), page_number=3)
        ]
        return urls
    # 将Python函数转换为UDF
    spark.udf.register("build_urls", build_urls, ArrayType(StringType()))

    sql1 = """
        select 
            keyword,
            volume,
            st_monthly_sales,
            greatest(results_count, asin_total_num) as asin_total_num,
            st_sp_counts,
            st_zr_counts
        from dwt_merchantwords_merge 
        where site_name = 'de'
        and batch = '2024-07-01'
    """
    df_dwt_merchantwords_merge = spark.sql(sql1)

    # sql2 = """
    #     select
    #         keyword
    #     from dwt_merchantwords_st_detail
    #     where site_name = 'de'
    #     and batch = '2024-1'
    # """
    # df_dwt_merchantwords_st_detail = spark.sql(sql2)

    # 产品总数大于80且没有月销
    df1 = df_dwt_merchantwords_merge.filter('asin_total_num > 80 and st_monthly_sales <= 0').select('keyword')
    print("产品总数大于80且没有月销:" + str(df1.count()))

    # 搜索量较大且没有sp广告词
    df2 = df_dwt_merchantwords_merge.filter('volume >= 1 and st_sp_counts <= 0').select('keyword')
    print("搜索量较大且没有sp广告词:" + str(df2.count()))

    # 自然词总数 <= 0的部分
    df3 = df_dwt_merchantwords_merge.filter('st_zr_counts <= 0').select('keyword')
    print("自然词总数 <= 0的部分:" + str(df3.count()))

    # # 过滤掉keyword含有中文的数据
    # df_hive = df_hive.filter(~df_hive["keyword"].rlike("[\u4e00-\u9fff]"))

    df_save = df1.union(df2).union(df3).drop_duplicates(['keyword'])
    df_save = df_save.selectExpr("keyword AS search_term")
    df_save = df_save.selectExpr("search_term", "explode(build_urls(search_term)) AS url")
    df_save = df_save.withColumn("date_info", lit('2024-06-26'))

    # 导出数据到 PostgreSQL 数据库
    df_save.write.format("jdbc") \
        .option("url", "jdbc:postgresql://192.168.10.225:5433/selection_de") \
        .option("dbtable", export_tb) \
        .option("user", "yswg_postgres") \
        .option("password", "yswg_postgres") \
        .mode("append") \
        .save()

    spark.stop()