dim_asin_features_parquet_old.py 8.76 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
import os
import sys
import pandas as pd

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F


class DimAsinFeaturesParquet(Templates):

    def __init__(self, site_name='us', block_size=100000):
        super(DimAsinFeaturesParquet, self).__init__()
        self.site_name = site_name
        self.block_size = block_size
        self.db_save = f'dim_asin_features_parquet'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.block_size}")
        self.df_asin_features = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.partitions_by = ['site_name', 'block']
        self.partitions_num = 50

    def read_data(self):
        sql = f"select id, img_vector as embedding from ods_asin_extract_features;"
        print("sql:", sql)
        self.df_asin_features = self.spark.sql(sql).cache()

    def handle_data_old(self):
        """
        开窗这种方式进行全局索引,会导致所有的数据在1个分区里面,从而只能有1个cpu运行,降低了性能
        """
        # 添加索引列 -- index
        window = Window.orderBy("id")
        self.df_asin_features = self.df_asin_features.withColumn("index", F.row_number().over(window) - 1)  # 从0开始
        self.df_asin_features.show(20)
        # 生成分区列
        self.df_asin_features = self.df_asin_features.withColumn('block', F.floor(self.df_asin_features['index'] / self.block_size))
        self.df_asin_features = self.df_asin_features.withColumn('site_name', F.lit(self.site_name))

        self.df_save = self.df_asin_features

    def handle_data(self):
        print("分块前分区数量:", self.df_asin_features.rdd.getNumPartitions())  # 642

        from pyspark.sql.functions import spark_partition_id
        num_partitions = 500  # 你需要根据你的数据和资源来调整这个参数  #

        # 第一步: 对数据进行预分区和排序
        self.df_asin_features = self.df_asin_features.repartitionByRange(num_partitions, "id").sortWithinPartitions("id")
        print("分块后分区数量:", self.df_asin_features.rdd.getNumPartitions())

        # 第二步: 在每个分区内部添加索引
        # def add_index_in_partition(df):
        #     # 使用窗口函数在每个分区内部添加索引
        #     window = Window.orderBy("id")
        #     df = df.withColumn("index", F.row_number().over(window) - 1)  # 从0开始
        #     return df

        # from pyspark.sql.functions import pandas_udf, PandasUDFType
        # from pyspark.sql import DataFrame
        #
        # @pandas_udf("id long, embedding string, index long, block long, site_name string", PandasUDFType.GROUPED_MAP)
        # def add_index_in_partition(pdf: pd.DataFrame) -> pd.DataFrame:
        #     # 使用pandas的cumcount函数在每个分区内部添加索引
        #     pdf['index'] = pdf.sort_values('id').groupby().cumcount()
        #     return pdf

        from pyspark.sql.functions import pandas_udf, PandasUDFType

        @pandas_udf("id long, embedding string, index long", PandasUDFType.GROUPED_MAP)
        def add_index_in_partition(df):
            df = df.sort_values('id')
            df['index'] = range(len(df))  # 或者 df['index'] = df.reset_index().index
            return df

        self.df_asin_features = self.df_asin_features.groupby(spark_partition_id()).apply(add_index_in_partition)

        # 添加全局索引
        self.df_asin_features = self.df_asin_features.withColumn("index", F.sum("index").over(Window.orderBy("id")))

        # 生成分区列
        self.df_asin_features = self.df_asin_features.withColumn('block', F.floor(self.df_asin_features['index'] / self.block_size))
        self.df_asin_features = self.df_asin_features.withColumn('site_name', F.lit(self.site_name))

        # 存储
        self.df_save = self.df_asin_features
        # self.df_save.show(10)


if __name__ == '__main__':
    handle_obj = DimAsinFeaturesParquet(block_size=200000)
    handle_obj.run()














# import os
# import sys
# import pandas as pd
#
# os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
# sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
# from utils.templates import Templates
# # from ..utils.templates import Templates
# # 分组排序的udf窗口函数
# from pyspark.sql.window import Window
# from pyspark.sql import functions as F
#
#
# class DimAsinFeaturesParquet(Templates):
#
#     def __init__(self, site_name='us', block_size=100000):
#         super(DimAsinFeaturesParquet, self).__init__()
#         self.site_name = site_name
#         self.block_size = block_size
#         self.db_save = f'dim_asin_features_parquet'
#         self.spark = self.create_spark_object(
#             app_name=f"{self.db_save}: {self.site_name}, {self.block_size}")
#         self.df_asin_features = self.spark.sql(f"select 1+1;")
#         self.df_save = self.spark.sql(f"select 1+1;")
#         self.partitions_by = ['site_name', 'block']
#         self.partitions_num = 50
#
#     def read_data(self):
#         sql = f"select id, img_vector as embedding from ods_asin_extract_features;"
#         print("sql:", sql)
#         self.df_asin_features = self.spark.sql(sql).cache()
#
#     def handle_data_old(self):
#         """
#         开窗这种方式进行全局索引,会导致所有的数据在1个分区里面,从而只能有1个cpu运行,降低了性能
#         """
#         # 添加索引列 -- index
#         window = Window.orderBy("id")
#         self.df_asin_features = self.df_asin_features.withColumn("index", F.row_number().over(window) - 1)  # 从0开始
#         self.df_asin_features.show(20)
#         # 生成分区列
#         self.df_asin_features = self.df_asin_features.withColumn('block', F.floor(self.df_asin_features['index'] / self.block_size))
#         self.df_asin_features = self.df_asin_features.withColumn('site_name', F.lit(self.site_name))
#
#         self.df_save = self.df_asin_features
#
#     def handle_data(self):
#         from pyspark.sql.functions import spark_partition_id
#         print("分块前分区数量:", self.df_asin_features.rdd.getNumPartitions())
#         num_partitions = 500  # 你需要根据你的数据和资源来调整这个参数
#
#         # 第一步: 对数据进行预分区和排序
#         self.df_asin_features = self.df_asin_features.repartitionByRange(num_partitions, "id").sortWithinPartitions("id")
#
#         # 第二步: 在每个分区内部添加索引
#         # def add_index_in_partition(df):
#         #     # 使用窗口函数在每个分区内部添加索引
#         #     window = Window.orderBy("id")
#         #     df = df.withColumn("index", F.row_number().over(window) - 1)  # 从0开始
#         #     return df
#
#         # from pyspark.sql.functions import pandas_udf, PandasUDFType
#         # from pyspark.sql import DataFrame
#         #
#         # @pandas_udf("id long, embedding string, index long, block long, site_name string", PandasUDFType.GROUPED_MAP)
#         # def add_index_in_partition(pdf: pd.DataFrame) -> pd.DataFrame:
#         #     # 使用pandas的cumcount函数在每个分区内部添加索引
#         #     pdf['index'] = pdf.sort_values('id').groupby().cumcount()
#         #     return pdf
#
#         from pyspark.sql.functions import pandas_udf, PandasUDFType
#
#         @pandas_udf("id long, embedding string, index long", PandasUDFType.GROUPED_MAP)
#         def add_index_in_partition(df):
#             df = df.sort_values('id')
#             df['index'] = range(len(df))  # 或者 df['index'] = df.reset_index().index
#             return df
#
#         # print("分块前分区数量:", self.df_asin_features.rdd.getNumPartitions())
#         # self.df_asin_features = self.df_asin_features.repartition(1000)
#         #
#         # print("分块后分区数量:", self.df_asin_features.rdd.getNumPartitions())
#
#         self.df_asin_features = self.df_asin_features.groupby(spark_partition_id()).apply(add_index_in_partition)
#
#         # 添加全局索引
#         self.df_asin_features = self.df_asin_features.withColumn("index", F.sum("index").over(Window.orderBy("id")))
#
#         # 生成分区列
#         self.df_asin_features = self.df_asin_features.withColumn('block', F.floor(self.df_asin_features['index'] / self.block_size))
#         self.df_asin_features = self.df_asin_features.withColumn('site_name', F.lit(self.site_name))
#
#         # 存储
#         self.df_save = self.df_asin_features
#         # self.df_save.show(10)
#
#
# if __name__ == '__main__':
#     handle_obj = DimAsinFeaturesParquet(block_size=500000)
#     handle_obj.run()