1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
class DwtAbaYearWeek(Templates):
def __init__(self, site_name='us', date_type="week", date_info='2022-01'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f'dwt_aba_year_week'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
# self.df_date = self.get_year_week_tuple() # pandas的df对象
self.df_st = self.spark.sql(f"select 1+1;")
self.df_save = self.spark.sql(f"select * from {self.db_save} where site_name='{self.site_name}' limit 0")
self.partitions_by = ['site_name']
self.reset_partitions(10)
def read_data(self):
sql = f"select search_term, st_search_sum, st_rank, st_bsr_cate_1_id, date_info from dim_st_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and " \
f"date_info between '2022-01' and '2022-52'"
self.df_st = self.spark.sql(sql).cache()
def handle_data(self):
df_save = self.df_st.groupby(['search_term']).pivot('date_info').agg(
F.max('st_rank'),
F.max('st_search_sum'),
F.max('st_bsr_cate_1_id'),
)
print(df_save.columns)
cols_list = df_save.columns
for col in cols_list:
if col != 'search_term':
new_col = "st_" + col.replace('2022-0', '').replace('2022-', '').replace('max(st_', '').replace(')', '')
print("new_col, col:", new_col, col)
df_save = df_save.withColumnRenamed(col, new_col)
df_save.show(10, truncate=False)
print(df_save.columns)
df_save = df_save.withColumn("site_name", F.lit(self.site_name))
self.df_save = self.df_save.unionByName(df_save, allowMissingColumns=True)
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
handle_obj = DwtAbaYearWeek(site_name=site_name)
handle_obj.run()