merchantwords_sr_to_pg16.py
3.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from pyspark.sql.functions import row_number, lit, length
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, ArrayType
from urllib.parse import quote
if __name__ == '__main__':
date_info = CommonUtil.get_sys_arg(1, None)
n = CommonUtil.get_sys_arg(2, 0)
import_tb = "search_term_result_year"
export_tb = "us_merchantwords_search_term_month_syn_2024"
spark = SparkUtil.get_spark_session("MerchantwordsSRToPG16")
# 一次导出400w条数据
batch_size = (int(n)-1) * 4000000
start_index = 1 + batch_size
end_index = 4000000 + batch_size
# 构建 URL 的函数
def build_urls(search_term):
url_template = f"https://www.amazon.com/s?k={{search_term}}&page={{page_number}}"
search_term_chinese = quote(search_term, 'utf-8')
search_term_chinese = search_term_chinese.replace("'", '%27').replace("/", '%2F')
urls = [
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=1),
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=2),
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=3)
]
return urls
# 将Python函数转换为UDF
spark.udf.register("build_urls", build_urls, ArrayType(StringType()))
# 从SR数据库中读取已有数据
df = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://192.168.10.151:19030/test") \
.option("dbtable", import_tb) \
.option("user", "chenyuanjie") \
.option("password", "chenyuanjie12345") \
.load()
df = df.withColumn(
"row_num",
row_number().over(Window.orderBy("search_term"))
).filter(f"row_num BETWEEN {start_index} AND {end_index}").repartition(20).cache()
# 过滤掉keyword含有中文的数据
df = df.filter(~df["search_term"].rlike("[\u4e00-\u9fff]"))
# 如果没有数据需要导出,退出循环
if df.count() == 0:
print("-------数据已全部导出!-------")
quit()
df = df.selectExpr("search_term", "explode(build_urls(search_term)) AS url")
df = df.filter(length(df['url']) <= 450)
df = df.withColumn("date_info", lit(date_info))
# 导出数据到 PostgreSQL 数据库
df.write.format("jdbc") \
.option("url", "jdbc:postgresql://192.168.10.225:5432/selection") \
.option("dbtable", export_tb) \
.option("user", "yswg_postgres") \
.option("password", "yswg_postgres") \
.mode("append") \
.save()
spark.stop()