import os import sys from pyspark.sql.window import Window sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.storagelevel import StorageLevel from utils.templates import Templates # from ..utils.templates import Templates from pyspark.sql import functions as F class DwdStMeasure(Templates): def __init__(self, site_name='us', date_type="month", date_info='2022-01'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f'dwd_st_rank' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") # self.df_date = self.get_year_week_tuple() # pandas的df对象 self.df_st_month = self.spark.sql(f"select 1+1;") self.df_st_week = self.spark.sql(f"select 1+1;") def read_data(self): sql_month = f"SELECT search_term, st_brand_label from dwt_aba_st_analytics WHERE site_name ='us' and date_type ='month' and date_info BETWEEN '2023-04' and '2023-09';" self.df_st_month = self.spark.sql(sql_month).cache() self.df_st_month.show(20, truncate=False) print("month:", self.df_st_month.count()) sql_week = f"SELECT search_term, st_brand_label from dwt_aba_st_analytics WHERE site_name ='us' and date_type ='week' and date_info BETWEEN '2023-14' and '2023-39';" self.df_st_week = self.spark.sql(sql_week).cache() self.df_st_week.show(20, truncate=False) print("week:", self.df_st_week.count()) def handle_data(self): # 合并两个DataFrame df_combined = self.df_st_month.union(self.df_st_week) # 定义开窗函数 windowSpec = Window.partitionBy("search_term").orderBy(F.desc("st_brand_label")) # 使用 row_number 生成每个分区的行号 df_combined = df_combined.withColumn("row_number", F.row_number().over(windowSpec)) # 只选择 row_number 为 1 的行 df_unique = df_combined.filter(df_combined.row_number == 1).drop("row_number") self.df_combined_unique = df_unique self.df_combined_unique.show(20, truncate=False) print("combined:", self.df_combined_unique.count()) def save_data(self): # 转换为 Pandas DataFrame pdf = self.df_combined_unique.toPandas() # 根据需求将每100万行数据保存为一个CSV文件 num_rows_per_file = 1000000 num_files = (len(pdf) // num_rows_per_file) + (1 if len(pdf) % num_rows_per_file != 0 else 0) for i in range(num_files): start_idx = i * num_rows_per_file end_idx = start_idx + num_rows_per_file output_path = os.path.join("/root", f"output_{i + 1}.csv") # 将子集保存为CSV pdf.iloc[start_idx:end_idx].to_csv(output_path, index=False) print(f"Data saved into {num_files} CSV files.") if __name__ == '__main__': handle_obj = DwdStMeasure() handle_obj.run()