import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.spark_util import SparkUtil from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from pyspark.sql.functions import col, first class StRankToPG(object): def __init__(self): self.db_save = 'tmp_st_rank_to_pg' self.spark = SparkUtil.get_spark_session(self.db_save) def run(self): sql1 = """ select id from dwt_aba_last365 where site_name = 'us' and date_type = 'month' and date_info = '2024-12'; """ df_st = self.spark.sql(sql1).repartition(40, "id").cache() sql2 = """ select id, rank, date_info from dwt_aba_st_analytics where site_name = 'us' and date_type = 'month' and date_info > '2023-12'; """ df_st_rank = self.spark.sql(sql2).repartition(40, "id").cache() df_st_rank = df_st_rank.groupBy("id").pivot("date_info").agg(first(col("rank"))) column_names = df_st_rank.columns for col_name in column_names: if '-' in col_name and len(col_name.split('-')) == 2: new_col_name = f"rank{int(col_name.split('-')[1])}" df_st_rank = df_st_rank.withColumnRenamed(col_name, new_col_name) df_save = df_st.join(df_st_rank, 'id', 'left') print(f"当前存储的表名为:{self.db_save}") hdfs_path = CommonUtil.build_hdfs_path(self.db_save) print(f"清除hdfs目录中:{hdfs_path}") HdfsUtils.delete_file_in_folder(hdfs_path) df_save.write.saveAsTable(name=self.db_save, format='hive', mode='append') print("success") if __name__ == '__main__': handle_obj = StRankToPG() handle_obj.run()