1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""
author: 方星钧(ffman)
description: 清洗6大站点对应的 单周的zr,sp,sb,ac,bs,er,tr等7大类型数据表(计算zr,sp类型表的page_rank+合并7张表)
table_read_name: ods_st_rank_zr/sp/sb/ac/bs/er/tr
table_save_name: dim_st_asin_info
table_save_level: dim
version: 3.0
created_date: 2022-05-10
updated_date: 2022-11-07
"""
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql.types import IntegerType
class DimStAsinInfo(Templates):
def __init__(self, site_name='us', date_type="day", date_info='2022-10-01'):
super().__init__()
self.site_name = site_name
self.date_type = date_type
self.date_info = date_info
self.db_save = f'dim_st_asin_info'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
self.df_date = self.get_year_week_tuple()
self.df_save = self.spark.sql(f"select search_term, asin, page, page_row, 'zr' as data_type, updated_time,site_name,date_type,date_info from ods_search_term_zr limit 0;")
self.partitions_by = ['site_name', 'date_type', 'date_info']
self.reset_partitions(partitions_num=60)
self.data_type_list = ['tr', 'er', 'bs', 'ac', 'sb1', 'sb2', 'sb3', 'sp', 'zr'] # 小表拼大表
@staticmethod
def udf_page_rank(page, page_1_count, page_2_count, page_row):
"""
处理 zr, sp 的page_rank字段
:param page:
:param page_1_count:
:param page_2_count:
:param page_row:
:return: page_rank
"""
if page == 1:
return page_row
elif page == 2:
return page_1_count + page_row
else:
return page_2_count + page_row
def handle_data_page_rank(self, df, data_type):
print(f"{data_type}--page_rank计算")
u_page_rank = self.spark.udf.register('u_page_rank', self.udf_page_rank, IntegerType())
# 由于zr,sp存在重复值,改成max,而不是使用count
df_page_1 = df.filter(f"page=1").groupBy(['search_term']).agg({f"page_row": "max"})
df_page_2 = df.filter(df[f'page'] == 2).groupBy(['search_term']).agg(
{f"page_row": "max"})
df_page_1 = df_page_1.withColumnRenamed(f'max(page_row)', 'page_1_count')
df_page_2 = df_page_2.withColumnRenamed(f'max(page_row)', 'page_2_count_old')
df = df.join(df_page_1, on='search_term', how='left'). \
join(df_page_2, on='search_term', how='left')
df = df.fillna(0)
df = df.withColumn("page_2_count", df.page_1_count + df.page_2_count_old)
df = df.withColumn(f"page_rank", u_page_rank(
df[f'page'], df.page_1_count, df.page_2_count, df[f'page_row']))
# df.show(n=10, truncate=False)
return df
def read_data(self):
for data_type in self.data_type_list:
print(f"site_name: {self.site_name}, data_type: {data_type}")
if data_type in ['zr', 'sp']:
sql = f"select search_term, asin, page, page_row, '{data_type}' as data_type, updated_time, site_name,date_type,date_info from ods_search_term_{data_type} " \
f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
df = self.spark.sql(sqlQuery=sql)
# 处理page_rank
df = self.handle_data_page_rank(df=df, data_type=data_type)
df = df.drop('page_1_count', 'page_2_count', 'page_2_count_old')
else:
if data_type in ['sb1', 'sb2', 'sb3']:
sql = f"select search_term, asin, page, '{data_type}' as data_type, updated_time, site_name,date_type,date_info from ods_search_term_sb " \
f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and data_type={int(data_type[-1])};"
else:
sql = f"select search_term, asin, page, '{data_type}' as data_type, updated_time, site_name,date_type,date_info from ods_search_term_{data_type} " \
f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
df = self.spark.sql(sqlQuery=sql)
# print(f"site_name: {self.site_name}, data_type: {data_type}, partitions: {df.rdd.getNumPartitions()}")
self.df_save = self.df_save.unionByName(df, allowMissingColumns=True)
# self.df_save.show(n=10, truncate=False)
# print("self.df_save.count():", self.df_save.count())
if __name__ == '__main__':
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
handle_obj = DimStAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info)
handle_obj.run()