import os import sys import pandas as pd os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F class DimAsinFeaturesParquet(Templates): def __init__(self, site_name='us', block_size=100000): super(DimAsinFeaturesParquet, self).__init__() self.site_name = site_name self.block_size = block_size self.db_save = f'dim_asin_features_parquet' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.block_size}") self.df_asin_features = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") # self.partitions_by = ['site_name', 'block'] self.partitions_by = ['site_name'] self.partitions_num = 600 def read_data(self): sql = f"select id, asin, img_vector as embedding from ods_asin_extract_features;" print("sql:", sql) self.df_save = self.spark.sql(sql).cache() def handle_data(self): # self.df_save = self.df_save.withColumn('block', F.floor(self.df_save['id'] / self.block_size)) self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name)) def handle_data_old(self): """ 开窗这种方式进行全局索引,会导致所有的数据在1个分区里面,从而只能有1个cpu运行,降低了性能 """ # 添加索引列 -- index window = Window.orderBy("id") self.df_asin_features = self.df_asin_features.withColumn("index", F.row_number().over(window) - 1) # 从0开始 self.df_asin_features.show(20) # 生成分区列 self.df_asin_features = self.df_asin_features.withColumn('block', F.floor(self.df_asin_features['index'] / self.block_size)) self.df_asin_features = self.df_asin_features.withColumn('site_name', F.lit(self.site_name)) self.df_save = self.df_asin_features def handle_data_old2(self): print("分块前分区数量:", self.df_asin_features.rdd.getNumPartitions()) # 642 from pyspark.sql.functions import spark_partition_id num_partitions = 500 # 你需要根据你的数据和资源来调整这个参数 # # 第一步: 对数据进行预分区和排序 self.df_asin_features = self.df_asin_features.repartitionByRange(num_partitions, "id").sortWithinPartitions("id") print("分块后分区数量:", self.df_asin_features.rdd.getNumPartitions()) # 第二步: 在每个分区内部添加索引 # def add_index_in_partition(df): # # 使用窗口函数在每个分区内部添加索引 # window = Window.orderBy("id") # df = df.withColumn("index", F.row_number().over(window) - 1) # 从0开始 # return df # from pyspark.sql.functions import pandas_udf, PandasUDFType # from pyspark.sql import DataFrame # # @pandas_udf("id long, embedding string, index long, block long, site_name string", PandasUDFType.GROUPED_MAP) # def add_index_in_partition(pdf: pd.DataFrame) -> pd.DataFrame: # # 使用pandas的cumcount函数在每个分区内部添加索引 # pdf['index'] = pdf.sort_values('id').groupby().cumcount() # return pdf from pyspark.sql.functions import pandas_udf, PandasUDFType @pandas_udf("id long, embedding string, index long", PandasUDFType.GROUPED_MAP) def add_index_in_partition(df): df = df.sort_values('id') df['index'] = range(len(df)) # 或者 df['index'] = df.reset_index().index return df self.df_asin_features = self.df_asin_features.groupby(spark_partition_id()).apply(add_index_in_partition) # 添加全局索引 self.df_asin_features = self.df_asin_features.withColumn("index", F.sum("index").over(Window.orderBy("id"))) # 生成分区列 self.df_asin_features = self.df_asin_features.withColumn('block', F.floor(self.df_asin_features['index'] / self.block_size)) self.df_asin_features = self.df_asin_features.withColumn('site_name', F.lit(self.site_name)) # 存储 self.df_save = self.df_asin_features # self.df_save.show(10) if __name__ == '__main__': handle_obj = DimAsinFeaturesParquet(block_size=200000) handle_obj.run() # import os # import sys # import pandas as pd # # os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" # sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 # from utils.templates import Templates # # from ..utils.templates import Templates # # 分组排序的udf窗口函数 # from pyspark.sql.window import Window # from pyspark.sql import functions as F # # # class DimAsinFeaturesParquet(Templates): # # def __init__(self, site_name='us', block_size=100000): # super(DimAsinFeaturesParquet, self).__init__() # self.site_name = site_name # self.block_size = block_size # self.db_save = f'dim_asin_features_parquet' # self.spark = self.create_spark_object( # app_name=f"{self.db_save}: {self.site_name}, {self.block_size}") # self.df_asin_features = self.spark.sql(f"select 1+1;") # self.df_save = self.spark.sql(f"select 1+1;") # self.partitions_by = ['site_name', 'block'] # self.partitions_num = 50 # # def read_data(self): # sql = f"select id, img_vector as embedding from ods_asin_extract_features;" # print("sql:", sql) # self.df_asin_features = self.spark.sql(sql).cache() # # def handle_data_old(self): # """ # 开窗这种方式进行全局索引,会导致所有的数据在1个分区里面,从而只能有1个cpu运行,降低了性能 # """ # # 添加索引列 -- index # window = Window.orderBy("id") # self.df_asin_features = self.df_asin_features.withColumn("index", F.row_number().over(window) - 1) # 从0开始 # self.df_asin_features.show(20) # # 生成分区列 # self.df_asin_features = self.df_asin_features.withColumn('block', F.floor(self.df_asin_features['index'] / self.block_size)) # self.df_asin_features = self.df_asin_features.withColumn('site_name', F.lit(self.site_name)) # # self.df_save = self.df_asin_features # # def handle_data(self): # from pyspark.sql.functions import spark_partition_id # print("分块前分区数量:", self.df_asin_features.rdd.getNumPartitions()) # num_partitions = 500 # 你需要根据你的数据和资源来调整这个参数 # # # 第一步: 对数据进行预分区和排序 # self.df_asin_features = self.df_asin_features.repartitionByRange(num_partitions, "id").sortWithinPartitions("id") # # # 第二步: 在每个分区内部添加索引 # # def add_index_in_partition(df): # # # 使用窗口函数在每个分区内部添加索引 # # window = Window.orderBy("id") # # df = df.withColumn("index", F.row_number().over(window) - 1) # 从0开始 # # return df # # # from pyspark.sql.functions import pandas_udf, PandasUDFType # # from pyspark.sql import DataFrame # # # # @pandas_udf("id long, embedding string, index long, block long, site_name string", PandasUDFType.GROUPED_MAP) # # def add_index_in_partition(pdf: pd.DataFrame) -> pd.DataFrame: # # # 使用pandas的cumcount函数在每个分区内部添加索引 # # pdf['index'] = pdf.sort_values('id').groupby().cumcount() # # return pdf # # from pyspark.sql.functions import pandas_udf, PandasUDFType # # @pandas_udf("id long, embedding string, index long", PandasUDFType.GROUPED_MAP) # def add_index_in_partition(df): # df = df.sort_values('id') # df['index'] = range(len(df)) # 或者 df['index'] = df.reset_index().index # return df # # # print("分块前分区数量:", self.df_asin_features.rdd.getNumPartitions()) # # self.df_asin_features = self.df_asin_features.repartition(1000) # # # # print("分块后分区数量:", self.df_asin_features.rdd.getNumPartitions()) # # self.df_asin_features = self.df_asin_features.groupby(spark_partition_id()).apply(add_index_in_partition) # # # 添加全局索引 # self.df_asin_features = self.df_asin_features.withColumn("index", F.sum("index").over(Window.orderBy("id"))) # # # 生成分区列 # self.df_asin_features = self.df_asin_features.withColumn('block', F.floor(self.df_asin_features['index'] / self.block_size)) # self.df_asin_features = self.df_asin_features.withColumn('site_name', F.lit(self.site_name)) # # # 存储 # self.df_save = self.df_asin_features # # self.df_save.show(10) # # # if __name__ == '__main__': # handle_obj = DimAsinFeaturesParquet(block_size=500000) # handle_obj.run()