st_rank_to_pg.py 1.69 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
import os
import sys


sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录

from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from pyspark.sql.functions import col, first


class StRankToPG(object):

    def __init__(self):
        self.db_save = 'tmp_st_rank_to_pg'
        self.spark = SparkUtil.get_spark_session(self.db_save)

    def run(self):
        sql1 = """
        select id from dwt_aba_last365 where site_name = 'us' and date_type = 'month' and date_info = '2024-12';
        """
        df_st = self.spark.sql(sql1).repartition(40, "id").cache()

        sql2 = """
        select id, rank, date_info from dwt_aba_st_analytics where site_name = 'us' and date_type = 'month' and date_info > '2023-12';
        """
        df_st_rank = self.spark.sql(sql2).repartition(40, "id").cache()
        df_st_rank = df_st_rank.groupBy("id").pivot("date_info").agg(first(col("rank")))

        column_names = df_st_rank.columns
        for col_name in column_names:
            if '-' in col_name and len(col_name.split('-')) == 2:
                new_col_name = f"rank{int(col_name.split('-')[1])}"
                df_st_rank = df_st_rank.withColumnRenamed(col_name, new_col_name)

        df_save = df_st.join(df_st_rank, 'id', 'left')
        print(f"当前存储的表名为:{self.db_save}")
        hdfs_path = CommonUtil.build_hdfs_path(self.db_save)
        print(f"清除hdfs目录中:{hdfs_path}")
        HdfsUtils.delete_file_in_folder(hdfs_path)
        df_save.write.saveAsTable(name=self.db_save, format='hive', mode='append')
        print("success")


if __name__ == '__main__':
    handle_obj = StRankToPG()
    handle_obj.run()