import os import sys from pyspark.sql.window import Window sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.storagelevel import StorageLevel from utils.templates import Templates # from ..utils.templates import Templates from pyspark.sql import functions as F class DwdStMeasure(Templates): def __init__(self, site_name='us', date_type="month", date_info='2022-01'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f'dwd_st_rank' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") # self.df_date = self.get_year_week_tuple() # pandas的df对象 self.df_st_month = self.spark.sql(f"select 1+1;") self.df_st_week = self.spark.sql(f"select 1+1;") def read_data(self): sql_month = f"SELECT distinct(trim(asin_brand_name)) as asin_brand_name from dim_asin_detail WHERE site_name ='us' and date_type ='month' and date_info BETWEEN '2023-04' and '2023-09';" self.df_st_month = self.spark.sql(sql_month).cache() self.df_st_month.show(20, truncate=False) print("month:", self.df_st_month.count()) sql_week = f"SELECT distinct(trim(asin_brand_name)) as asin_brand_name from dim_asin_detail WHERE site_name ='us' and date_type ='week' and date_info BETWEEN '2023-14' and '2023-39';" self.df_st_week = self.spark.sql(sql_week).cache() self.df_st_week.show(20, truncate=False) print("week:", self.df_st_week.count()) def handle_data(self): # 合并两个DataFrame df_combined = self.df_st_month.union(self.df_st_week) # 只选择 row_number 为 1 的行 df_unique = df_combined.drop_duplicates(['asin_brand_name']) self.df_combined_unique = df_unique self.df_combined_unique.show(20, truncate=False) print("combined:", self.df_combined_unique.count()) def save_data(self): # 转换为 Pandas DataFrame pdf = self.df_combined_unique.toPandas() # 根据需求将每100万行数据保存为一个CSV文件 num_rows_per_file = 1000000 num_files = (len(pdf) // num_rows_per_file) + (1 if len(pdf) % num_rows_per_file != 0 else 0) for i in range(num_files): start_idx = i * num_rows_per_file end_idx = start_idx + num_rows_per_file output_path = os.path.join("/root", f"asin_brand_{i + 1}.csv") # 将子集保存为CSV pdf.iloc[start_idx:end_idx].to_csv(output_path, index=False) print(f"Data saved into {num_files} CSV files.") if __name__ == '__main__': handle_obj = DwdStMeasure() handle_obj.run()