import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.spark_util import SparkUtil from pyspark.sql.functions import lit, col from pyspark.sql.types import StringType, ArrayType from urllib.parse import quote if __name__ == '__main__': export_tb = "us_merchantwords_search_term_month_syn_2024" spark = SparkUtil.get_spark_session("MerchantwordsSupplement") # 构建 URL 的函数 def build_urls(search_term): url_template = f"https://www.amazon.com/s?k={{search_term}}&page={{page_number}}" search_term_chinese = quote(search_term, 'utf-8') search_term_chinese = search_term_chinese.replace("'", '%27').replace("/", '%2F') urls = [ url_template.format( search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(', '%28').replace( ')', '%29'), page_number=1), url_template.format( search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(', '%28').replace( ')', '%29'), page_number=2), url_template.format( search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(', '%28').replace( ')', '%29'), page_number=3) ] return urls # 将Python函数转换为UDF spark.udf.register("build_urls", build_urls, ArrayType(StringType())) sql1 = """ select keyword, volume, st_zr_counts, st_sp_counts from dwt_merchantwords_merge where site_name = 'us' and batch = '2024-07-01' """ df_dwt_merchantwords_merge = spark.sql(sql1) # 搜索量较大且没有sp广告词 df1 = df_dwt_merchantwords_merge.filter('volume >= 1 and st_sp_counts <= 0').select('keyword') print("搜索量较大且没有sp广告词:" + str(df1.count())) # 自然词总数 <= 0的部分 df2 = df_dwt_merchantwords_merge.filter('st_zr_counts <= 0').select('keyword') print("自然词总数 <= 0的部分:" + str(df2.count())) # # 过滤掉keyword含有中文的数据 # df_hive = df_hive.filter(~df_hive["keyword"].rlike("[\u4e00-\u9fff]")) df_save = df1.union(df2).drop_duplicates(['keyword']) df_save = df_save.selectExpr("keyword AS search_term") df_save = df_save.selectExpr("search_term", "explode(build_urls(search_term)) AS url") df_save = df_save.withColumn("date_info", lit('2024-06-26')) # 导出数据到 PostgreSQL 数据库 df_save.write.format("jdbc") \ .option("url", "jdbc:postgresql://192.168.10.225:5433/selection") \ .option("dbtable", export_tb) \ .option("user", "yswg_postgres") \ .option("password", "yswg_postgres") \ .mode("append") \ .save() spark.stop()