dwt_aba_year_week.py 2.41 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F


class DwtAbaYearWeek(Templates):

    def __init__(self, site_name='us', date_type="week", date_info='2022-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'dwt_aba_year_week'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        # self.df_date = self.get_year_week_tuple()  # pandas的df对象
        self.df_st = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select * from {self.db_save} where site_name='{self.site_name}' limit 0")
        self.partitions_by = ['site_name']
        self.reset_partitions(10)

    def read_data(self):
        sql = f"select search_term, st_search_sum, st_rank, st_bsr_cate_1_id, date_info from dim_st_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and " \
              f"date_info between '2022-01' and '2022-52'"
        self.df_st = self.spark.sql(sql).cache()

    def handle_data(self):
        df_save = self.df_st.groupby(['search_term']).pivot('date_info').agg(
            F.max('st_rank'),
            F.max('st_search_sum'),
            F.max('st_bsr_cate_1_id'),
        )
        print(df_save.columns)
        cols_list = df_save.columns
        for col in cols_list:
            if col != 'search_term':
                new_col = "st_" + col.replace('2022-0', '').replace('2022-', '').replace('max(st_', '').replace(')', '')
                print("new_col, col:", new_col, col)
                df_save = df_save.withColumnRenamed(col, new_col)
        df_save.show(10, truncate=False)
        print(df_save.columns)
        df_save = df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.unionByName(df_save, allowMissingColumns=True)


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    handle_obj = DwtAbaYearWeek(site_name=site_name)
    handle_obj.run()