1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql.functions import lit, col
from pyspark.sql.types import StringType, ArrayType
from urllib.parse import quote
if __name__ == '__main__':
export_tb = "de_merchantwords_search_term_month_syn_2024"
spark = SparkUtil.get_spark_session("MerchantwordsSupplement")
# 构建 URL 的函数
def build_urls(search_term):
url_template = f"https://www.amazon.de/s?k={{search_term}}&page={{page_number}}"
search_term_chinese = quote(search_term, 'utf-8')
search_term_chinese = search_term_chinese.replace("'", '%27').replace("/", '%2F')
urls = [
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=1),
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=2),
url_template.format(
search_term=search_term_chinese.replace(' ', '+').replace('&', '%26').replace('#', '%23').replace('(',
'%28').replace(
')', '%29'), page_number=3)
]
return urls
# 将Python函数转换为UDF
spark.udf.register("build_urls", build_urls, ArrayType(StringType()))
sql1 = """
select
keyword,
volume,
st_monthly_sales,
greatest(results_count, asin_total_num) as asin_total_num,
st_sp_counts,
st_zr_counts
from dwt_merchantwords_merge
where site_name = 'de'
and batch = '2024-07-01'
"""
df_dwt_merchantwords_merge = spark.sql(sql1)
# sql2 = """
# select
# keyword
# from dwt_merchantwords_st_detail
# where site_name = 'de'
# and batch = '2024-1'
# """
# df_dwt_merchantwords_st_detail = spark.sql(sql2)
# 产品总数大于80且没有月销
df1 = df_dwt_merchantwords_merge.filter('asin_total_num > 80 and st_monthly_sales <= 0').select('keyword')
print("产品总数大于80且没有月销:" + str(df1.count()))
# 搜索量较大且没有sp广告词
df2 = df_dwt_merchantwords_merge.filter('volume >= 1 and st_sp_counts <= 0').select('keyword')
print("搜索量较大且没有sp广告词:" + str(df2.count()))
# 自然词总数 <= 0的部分
df3 = df_dwt_merchantwords_merge.filter('st_zr_counts <= 0').select('keyword')
print("自然词总数 <= 0的部分:" + str(df3.count()))
# # 过滤掉keyword含有中文的数据
# df_hive = df_hive.filter(~df_hive["keyword"].rlike("[\u4e00-\u9fff]"))
df_save = df1.union(df2).union(df3).drop_duplicates(['keyword'])
df_save = df_save.selectExpr("keyword AS search_term")
df_save = df_save.selectExpr("search_term", "explode(build_urls(search_term)) AS url")
df_save = df_save.withColumn("date_info", lit('2024-06-26'))
# 导出数据到 PostgreSQL 数据库
df_save.write.format("jdbc") \
.option("url", "jdbc:postgresql://192.168.10.225:5433/selection_de") \
.option("dbtable", export_tb) \
.option("user", "yswg_postgres") \
.option("password", "yswg_postgres") \
.mode("append") \
.save()
spark.stop()