dim_asin_features_parquet.py 9.02 KB
import os
import sys
import pandas as pd

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F


class DimAsinFeaturesParquet(Templates):

    def __init__(self, site_name='us', block_size=100000):
        super(DimAsinFeaturesParquet, self).__init__()
        self.site_name = site_name
        self.block_size = block_size
        self.db_save = f'dim_asin_features_parquet'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.block_size}")
        self.df_asin_features = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        # self.partitions_by = ['site_name', 'block']
        self.partitions_by = ['site_name']
        self.partitions_num = 600

    def read_data(self):
        sql = f"select id, asin, img_vector as embedding from ods_asin_extract_features;"
        print("sql:", sql)
        self.df_save = self.spark.sql(sql).cache()

    def handle_data(self):
        # self.df_save = self.df_save.withColumn('block', F.floor(self.df_save['id'] / self.block_size))
        self.df_save = self.df_save.withColumn('site_name', F.lit(self.site_name))

    def handle_data_old(self):
        """
        开窗这种方式进行全局索引,会导致所有的数据在1个分区里面,从而只能有1个cpu运行,降低了性能
        """
        # 添加索引列 -- index
        window = Window.orderBy("id")
        self.df_asin_features = self.df_asin_features.withColumn("index", F.row_number().over(window) - 1)  # 从0开始
        self.df_asin_features.show(20)
        # 生成分区列
        self.df_asin_features = self.df_asin_features.withColumn('block', F.floor(self.df_asin_features['index'] / self.block_size))
        self.df_asin_features = self.df_asin_features.withColumn('site_name', F.lit(self.site_name))

        self.df_save = self.df_asin_features

    def handle_data_old2(self):
        print("分块前分区数量:", self.df_asin_features.rdd.getNumPartitions())  # 642

        from pyspark.sql.functions import spark_partition_id
        num_partitions = 500  # 你需要根据你的数据和资源来调整这个参数  #

        # 第一步: 对数据进行预分区和排序
        self.df_asin_features = self.df_asin_features.repartitionByRange(num_partitions, "id").sortWithinPartitions("id")
        print("分块后分区数量:", self.df_asin_features.rdd.getNumPartitions())

        # 第二步: 在每个分区内部添加索引
        # def add_index_in_partition(df):
        #     # 使用窗口函数在每个分区内部添加索引
        #     window = Window.orderBy("id")
        #     df = df.withColumn("index", F.row_number().over(window) - 1)  # 从0开始
        #     return df

        # from pyspark.sql.functions import pandas_udf, PandasUDFType
        # from pyspark.sql import DataFrame
        #
        # @pandas_udf("id long, embedding string, index long, block long, site_name string", PandasUDFType.GROUPED_MAP)
        # def add_index_in_partition(pdf: pd.DataFrame) -> pd.DataFrame:
        #     # 使用pandas的cumcount函数在每个分区内部添加索引
        #     pdf['index'] = pdf.sort_values('id').groupby().cumcount()
        #     return pdf

        from pyspark.sql.functions import pandas_udf, PandasUDFType

        @pandas_udf("id long, embedding string, index long", PandasUDFType.GROUPED_MAP)
        def add_index_in_partition(df):
            df = df.sort_values('id')
            df['index'] = range(len(df))  # 或者 df['index'] = df.reset_index().index
            return df

        self.df_asin_features = self.df_asin_features.groupby(spark_partition_id()).apply(add_index_in_partition)

        # 添加全局索引
        self.df_asin_features = self.df_asin_features.withColumn("index", F.sum("index").over(Window.orderBy("id")))

        # 生成分区列
        self.df_asin_features = self.df_asin_features.withColumn('block', F.floor(self.df_asin_features['index'] / self.block_size))
        self.df_asin_features = self.df_asin_features.withColumn('site_name', F.lit(self.site_name))

        # 存储
        self.df_save = self.df_asin_features
        # self.df_save.show(10)


if __name__ == '__main__':
    handle_obj = DimAsinFeaturesParquet(block_size=200000)
    handle_obj.run()














# import os
# import sys
# import pandas as pd
#
# os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
# sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
# from utils.templates import Templates
# # from ..utils.templates import Templates
# # 分组排序的udf窗口函数
# from pyspark.sql.window import Window
# from pyspark.sql import functions as F
#
#
# class DimAsinFeaturesParquet(Templates):
#
#     def __init__(self, site_name='us', block_size=100000):
#         super(DimAsinFeaturesParquet, self).__init__()
#         self.site_name = site_name
#         self.block_size = block_size
#         self.db_save = f'dim_asin_features_parquet'
#         self.spark = self.create_spark_object(
#             app_name=f"{self.db_save}: {self.site_name}, {self.block_size}")
#         self.df_asin_features = self.spark.sql(f"select 1+1;")
#         self.df_save = self.spark.sql(f"select 1+1;")
#         self.partitions_by = ['site_name', 'block']
#         self.partitions_num = 50
#
#     def read_data(self):
#         sql = f"select id, img_vector as embedding from ods_asin_extract_features;"
#         print("sql:", sql)
#         self.df_asin_features = self.spark.sql(sql).cache()
#
#     def handle_data_old(self):
#         """
#         开窗这种方式进行全局索引,会导致所有的数据在1个分区里面,从而只能有1个cpu运行,降低了性能
#         """
#         # 添加索引列 -- index
#         window = Window.orderBy("id")
#         self.df_asin_features = self.df_asin_features.withColumn("index", F.row_number().over(window) - 1)  # 从0开始
#         self.df_asin_features.show(20)
#         # 生成分区列
#         self.df_asin_features = self.df_asin_features.withColumn('block', F.floor(self.df_asin_features['index'] / self.block_size))
#         self.df_asin_features = self.df_asin_features.withColumn('site_name', F.lit(self.site_name))
#
#         self.df_save = self.df_asin_features
#
#     def handle_data(self):
#         from pyspark.sql.functions import spark_partition_id
#         print("分块前分区数量:", self.df_asin_features.rdd.getNumPartitions())
#         num_partitions = 500  # 你需要根据你的数据和资源来调整这个参数
#
#         # 第一步: 对数据进行预分区和排序
#         self.df_asin_features = self.df_asin_features.repartitionByRange(num_partitions, "id").sortWithinPartitions("id")
#
#         # 第二步: 在每个分区内部添加索引
#         # def add_index_in_partition(df):
#         #     # 使用窗口函数在每个分区内部添加索引
#         #     window = Window.orderBy("id")
#         #     df = df.withColumn("index", F.row_number().over(window) - 1)  # 从0开始
#         #     return df
#
#         # from pyspark.sql.functions import pandas_udf, PandasUDFType
#         # from pyspark.sql import DataFrame
#         #
#         # @pandas_udf("id long, embedding string, index long, block long, site_name string", PandasUDFType.GROUPED_MAP)
#         # def add_index_in_partition(pdf: pd.DataFrame) -> pd.DataFrame:
#         #     # 使用pandas的cumcount函数在每个分区内部添加索引
#         #     pdf['index'] = pdf.sort_values('id').groupby().cumcount()
#         #     return pdf
#
#         from pyspark.sql.functions import pandas_udf, PandasUDFType
#
#         @pandas_udf("id long, embedding string, index long", PandasUDFType.GROUPED_MAP)
#         def add_index_in_partition(df):
#             df = df.sort_values('id')
#             df['index'] = range(len(df))  # 或者 df['index'] = df.reset_index().index
#             return df
#
#         # print("分块前分区数量:", self.df_asin_features.rdd.getNumPartitions())
#         # self.df_asin_features = self.df_asin_features.repartition(1000)
#         #
#         # print("分块后分区数量:", self.df_asin_features.rdd.getNumPartitions())
#
#         self.df_asin_features = self.df_asin_features.groupby(spark_partition_id()).apply(add_index_in_partition)
#
#         # 添加全局索引
#         self.df_asin_features = self.df_asin_features.withColumn("index", F.sum("index").over(Window.orderBy("id")))
#
#         # 生成分区列
#         self.df_asin_features = self.df_asin_features.withColumn('block', F.floor(self.df_asin_features['index'] / self.block_size))
#         self.df_asin_features = self.df_asin_features.withColumn('site_name', F.lit(self.site_name))
#
#         # 存储
#         self.df_save = self.df_asin_features
#         # self.df_save.show(10)
#
#
# if __name__ == '__main__':
#     handle_obj = DimAsinFeaturesParquet(block_size=500000)
#     handle_obj.run()