1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from pyspark.sql.functions import col, first
class StRankToPG(object):
def __init__(self):
self.db_save = 'tmp_st_rank_to_pg'
self.spark = SparkUtil.get_spark_session(self.db_save)
def run(self):
sql1 = """
select id from dwt_aba_last365 where site_name = 'us' and date_type = 'month' and date_info = '2024-12';
"""
df_st = self.spark.sql(sql1).repartition(40, "id").cache()
sql2 = """
select id, rank, date_info from dwt_aba_st_analytics where site_name = 'us' and date_type = 'month' and date_info > '2023-12';
"""
df_st_rank = self.spark.sql(sql2).repartition(40, "id").cache()
df_st_rank = df_st_rank.groupBy("id").pivot("date_info").agg(first(col("rank")))
column_names = df_st_rank.columns
for col_name in column_names:
if '-' in col_name and len(col_name.split('-')) == 2:
new_col_name = f"rank{int(col_name.split('-')[1])}"
df_st_rank = df_st_rank.withColumnRenamed(col_name, new_col_name)
df_save = df_st.join(df_st_rank, 'id', 'left')
print(f"当前存储的表名为:{self.db_save}")
hdfs_path = CommonUtil.build_hdfs_path(self.db_save)
print(f"清除hdfs目录中:{hdfs_path}")
HdfsUtils.delete_file_in_folder(hdfs_path)
df_save.write.saveAsTable(name=self.db_save, format='hive', mode='append')
print("success")
if __name__ == '__main__':
handle_obj = StRankToPG()
handle_obj.run()