dwd_asin_features_parquet.py 5.19 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
import ast
import os
import shutil
import sys
import pandas as pd

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType
from py4j.java_gateway import java_import
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq


class DwdAsinFeaturesParquet(Templates):

    def __init__(self, site_name='us', block_size=100000):
        super(DwdAsinFeaturesParquet, self).__init__()
        self.site_name = site_name
        self.db_save = f'dwd_asin_features_parquet'
        self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.partitions_by = ['site_name', 'block']
        self.partitions_num = 1
        self.hdfs_file_path = f'hdfs://hadoop5:8020/home/big_data_selection/dim/dim_asin_features_parquet/site_name={self.site_name}/'
        self.hdfs_file_list = self.get_hdfs_file_list()
        self.index_count = 0

    def get_hdfs_file_list(self):
        # 导入hadoop的包
        java_import(self.spark._jvm, 'org.apache.hadoop.fs.Path')
        # fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(self.spark._jsc.hadoopConfiguration(self.hdfs_file_path))
        # status = fs.listStatus(self.spark._jvm.org.apache.hadoop.fs.Path())

        fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(self.spark._jsc.hadoopConfiguration())
        path = self.spark._jvm.org.apache.hadoop.fs.Path(self.hdfs_file_path)
        status = fs.listStatus(path)

        hdfs_file_list = [file_status.getPath().getName() for file_status in status]
        return hdfs_file_list

    def read_data(self, hive_path):
        df = self.spark.read.text(hive_path)
        return df

    def handle_data(self, df, index):
        # 创建一个新的 DataFrame,其中每个字段都是一个独立的列
        split_df = df.select(F.split(df['value'], '\t').alias('split_values'))

        # 假设你知道你的数据有三个字段
        # 你可以这样创建每个字段的独立列
        final_df = split_df.select(
            split_df['split_values'].getItem(0).alias('id'),
            split_df['split_values'].getItem(1).alias('asin'),
            split_df['split_values'].getItem(2).alias('embedding')
        )
        print("分块前分区数量:", final_df.rdd.getNumPartitions())  # 642

        # 显示处理后的 DataFrame 的内容
        # final_df.show()
        # 从hdfs读取parquet文件,进行split切分的时候是字符串类型-->转换成数值类型
        final_df = final_df.withColumn('id', final_df['id'].cast('bigint'))  # 然后你可以安全地转换

        # 添加索引列
        final_df = final_df.withColumn("index", F.monotonically_increasing_id() + self.index_count)
        final_df.show()
        # 定义一个将字符串转换为列表的UDF
        str_to_list_udf = F.udf(lambda s: ast.literal_eval(s), ArrayType(FloatType()))
        # 对DataFrame中的列应用这个UDF
        final_df = final_df.withColumn("embedding", str_to_list_udf(final_df["embedding"]))

        # final_df.write.mode('overwrite').parquet("hdfs://hadoop5:8020/home/ffman/parquet")

        final_df = final_df.withColumn("block", F.lit(index))
        final_df = final_df.withColumn("site_name", F.lit(self.site_name))
        index_count = final_df.count()

        return final_df, index_count

    @staticmethod
    def save_data_to_local(df, local_path):
        # df.write.mode('append').parquet(whole_path)
        # df.write.mode('overwrite').parquet(local_path)
        print("当前存储到本地:", local_path)
        # Convert DataFrame to Arrow Table
        df = df.toPandas()
        table = pa.Table.from_pandas(df)

        # Save to Parquet
        pq.write_table(table, local_path)

    def save_data_to_hive(self):
        self.save_data()

    def save_data_all(self, df, local_path):
        self.save_data_to_hive()
        self.save_data_to_local(df.select("embedding"), local_path)

    def run(self):
        embeddings_dir = rf"/mnt/ffman/embeddings/folder"
        # if os.path.exists(embeddings_dir):
        #     shutil.rmtree(embeddings_dir)
        os.mkdir(embeddings_dir)
        for hdfs_file in self.hdfs_file_list:
            index = self.hdfs_file_list.index(hdfs_file)
            hive_path = self.hdfs_file_path + hdfs_file
            print("hive_path:", hive_path)
            df = self.read_data(hive_path=hive_path)
            self.df_save, index_count = self.handle_data(df=df, index=index)
            # local_path = rf"hdfs://hadoop5:8020/home/ffman/embeddings/folder_test/part_{index}"
            local_path = rf"{embeddings_dir}/part_{index}.parquet"
            self.index_count += index_count
            print("index_count, self.index_count", index_count, self.index_count)
            self.save_data_to_hive()
            self.save_data_to_local(df=self.df_save.select("embedding"), local_path=local_path)


if __name__ == '__main__':
    handle_obj = DwdAsinFeaturesParquet()
    handle_obj.run()