1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
class OdsBrandAnalytics(Templates):
def __init__(self, site_name='us', date_type="month", date_info='2022-01'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.date_info2 = date_info
# self.db_save = f'ods_brand_analytics'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
def read_data(self):
sql = f"select search_term, asin from dwd_st_asin_measure where site_name='{self.site_name}' " \
f"and date_type='{self.date_type}' and date_info='{self.date_info}' and search_term in ('christmas decorations', 'my orders', 'ugly christmas sweater men')"
print("sql:", sql)
df_st_asin = self.spark.sql(sqlQuery=sql).cache()
df_st_asin.show(20, truncate=False)
sql = f"select search_term, st_ao_val from dwd_st_measure where site_name='{self.site_name}' " \
f"and date_type='{self.date_type}' and date_info='{self.date_info}' and search_term in ('christmas decorations', 'my orders', 'ugly christmas sweater men')"
print("sql:", sql)
df_st = self.spark.sql(sqlQuery=sql).cache()
df_st.show(20, truncate=False)
sql = f"select asin, asin_ao_val, asin_sp_counts, asin_sb_counts, asin_zr_counts from dwd_asin_measure where site_name='{self.site_name}' " \
f"and date_type='{self.date_type}' and date_info='{self.date_info}'"
print("sql:", sql)
df_asin = self.spark.sql(sqlQuery=sql).cache()
df_asin.show(20, truncate=False)
df_asin.filter("asin='B07G9ZZJSV'").show(20, truncate=False)
df_save = df_st_asin.join(
df_st, on='search_term', how='left'
).join(
df_asin, on='asin', how='left'
)
# df_save.show(100, truncate=False)
print(df_save.count())
df_save = df_save.toPandas()
df_save.to_csv("/opt/module/spark/demo/py_demo/temp/st_1_10_100_new.csv", index=False)
quit()
sql = f"select search_term, st_rank, asin, st_ao_val from dwt_st_asin_info where site_name='{self.site_name}' " \
f"and date_type='{self.date_type}' and date_info='{self.date_info}' and st_rank in(1, 10, 100)"
print("sql:", sql)
df_st_asin = self.spark.sql(sqlQuery=sql).cache()
df_st_asin.show(20, truncate=False)
df_st_asin.filter("asin='B07G9ZZJSV'").show(20, truncate=False)
sql = f"select asin, asin_ao_val, asin_sp_counts, asin_sb_counts, asin_zr_counts from dwd_asin_counts where site_name='{self.site_name}' " \
f"and date_type='{self.date_type}' and date_info='{self.date_info}'"
print("sql:", sql)
df_asin = self.spark.sql(sqlQuery=sql).cache()
df_asin.show(20, truncate=False)
df_asin.filter("asin='B07G9ZZJSV'").show(20, truncate=False)
df_save = df_st_asin.join(
df_asin, on='asin', how='left'
)
# df_save.show(100, truncate=False)
print(df_save.count())
df_save = df_save.toPandas()
df_save.to_csv("/opt/module/spark/demo/py_demo/temp/st_1_10_100.csv", index=False)
quit()
def handle_data(self):
pass
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:day/week/4_week/month/quarter
date_info = sys.argv[3] # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1
handle_obj = OdsBrandAnalytics(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()