import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.spark_util import SparkUtil from pyspark.sql.functions import row_number, lit, length from pyspark.sql.window import Window from pyspark.sql.types import StringType, ArrayType from urllib.parse import quote if __name__ == '__main__': date_info = CommonUtil.get_sys_arg(1, None) n = CommonUtil.get_sys_arg(2, 0) import_tb = "search_term_result_year" export_tb = "us_merchantwords_search_term_month_syn_2024" spark = SparkUtil.get_spark_session("MerchantwordsSRToPG16") # 一次导出400w条数据 batch_size = (int(n)-1) * 4000000 start_index = 1 + batch_size end_index = 4000000 + batch_size # 构建 URL 的函数 def build_urls(search_term): url_template = f"https://www.amazon.com/s?k={{search_term}}&page={{page_number}}" search_term_chinese = quote(search_term, 'utf-8') search_term_chinese = search_term_chinese.replace("'", '%27').replace("/", '%2F') urls = [ url_template.format( search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(', '%28').replace( ')', '%29'), page_number=1), url_template.format( search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(', '%28').replace( ')', '%29'), page_number=2), url_template.format( search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(', '%28').replace( ')', '%29'), page_number=3) ] return urls # 将Python函数转换为UDF spark.udf.register("build_urls", build_urls, ArrayType(StringType())) # 从SR数据库中读取已有数据 df = spark.read.format("jdbc") \ .option("url", "jdbc:mysql://192.168.10.151:19030/test") \ .option("dbtable", import_tb) \ .option("user", "chenyuanjie") \ .option("password", "chenyuanjie12345") \ .load() df = df.withColumn( "row_num", row_number().over(Window.orderBy("search_term")) ).filter(f"row_num BETWEEN {start_index} AND {end_index}").repartition(20).cache() # 过滤掉keyword含有中文的数据 df = df.filter(~df["search_term"].rlike("[\u4e00-\u9fff]")) # 如果没有数据需要导出,退出循环 if df.count() == 0: print("-------数据已全部导出!-------") quit() df = df.selectExpr("search_term", "explode(build_urls(search_term)) AS url") df = df.filter(length(df['url']) <= 450) df = df.withColumn("date_info", lit(date_info)) # 导出数据到 PostgreSQL 数据库 df.write.format("jdbc") \ .option("url", "jdbc:postgresql://192.168.10.225:5432/selection") \ .option("dbtable", export_tb) \ .option("user", "yswg_postgres") \ .option("password", "yswg_postgres") \ .mode("append") \ .save() spark.stop()