1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import ast
import os
import shutil
import sys
import pandas as pd
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType
from py4j.java_gateway import java_import
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
class DwdAsinFeaturesParquet(Templates):
def __init__(self, site_name='us', block_size=100000):
super(DwdAsinFeaturesParquet, self).__init__()
self.site_name = site_name
self.db_save = f'dwd_asin_features_parquet'
self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}")
self.df_save = self.spark.sql(f"select 1+1;")
self.partitions_by = ['site_name', 'block']
self.partitions_num = 1
self.hdfs_file_path = f'hdfs://hadoop5:8020/home/big_data_selection/dim/dim_asin_features_parquet/site_name={self.site_name}/'
self.hdfs_file_list = self.get_hdfs_file_list()
self.index_count = 0
def get_hdfs_file_list(self):
# 导入hadoop的包
java_import(self.spark._jvm, 'org.apache.hadoop.fs.Path')
# fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(self.spark._jsc.hadoopConfiguration(self.hdfs_file_path))
# status = fs.listStatus(self.spark._jvm.org.apache.hadoop.fs.Path())
fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(self.spark._jsc.hadoopConfiguration())
path = self.spark._jvm.org.apache.hadoop.fs.Path(self.hdfs_file_path)
status = fs.listStatus(path)
hdfs_file_list = [file_status.getPath().getName() for file_status in status]
return hdfs_file_list
def read_data(self, hive_path):
df = self.spark.read.text(hive_path)
return df
def handle_data(self, df, index):
# 创建一个新的 DataFrame,其中每个字段都是一个独立的列
split_df = df.select(F.split(df['value'], '\t').alias('split_values'))
# 假设你知道你的数据有三个字段
# 你可以这样创建每个字段的独立列
final_df = split_df.select(
split_df['split_values'].getItem(0).alias('id'),
split_df['split_values'].getItem(1).alias('asin'),
split_df['split_values'].getItem(2).alias('embedding')
)
print("分块前分区数量:", final_df.rdd.getNumPartitions()) # 642
# 显示处理后的 DataFrame 的内容
# final_df.show()
# 从hdfs读取parquet文件,进行split切分的时候是字符串类型-->转换成数值类型
final_df = final_df.withColumn('id', final_df['id'].cast('bigint')) # 然后你可以安全地转换
# 添加索引列
final_df = final_df.withColumn("index", F.monotonically_increasing_id() + self.index_count)
final_df.show()
# 定义一个将字符串转换为列表的UDF
str_to_list_udf = F.udf(lambda s: ast.literal_eval(s), ArrayType(FloatType()))
# 对DataFrame中的列应用这个UDF
final_df = final_df.withColumn("embedding", str_to_list_udf(final_df["embedding"]))
# final_df.write.mode('overwrite').parquet("hdfs://hadoop5:8020/home/ffman/parquet")
final_df = final_df.withColumn("block", F.lit(index))
final_df = final_df.withColumn("site_name", F.lit(self.site_name))
index_count = final_df.count()
return final_df, index_count
@staticmethod
def save_data_to_local(df, local_path):
# df.write.mode('append').parquet(whole_path)
# df.write.mode('overwrite').parquet(local_path)
print("当前存储到本地:", local_path)
# Convert DataFrame to Arrow Table
df = df.toPandas()
table = pa.Table.from_pandas(df)
# Save to Parquet
pq.write_table(table, local_path)
def save_data_to_hive(self):
self.save_data()
def save_data_all(self, df, local_path):
self.save_data_to_hive()
self.save_data_to_local(df.select("embedding"), local_path)
def run(self):
embeddings_dir = rf"/mnt/ffman/embeddings/folder"
# if os.path.exists(embeddings_dir):
# shutil.rmtree(embeddings_dir)
os.mkdir(embeddings_dir)
for hdfs_file in self.hdfs_file_list:
index = self.hdfs_file_list.index(hdfs_file)
hive_path = self.hdfs_file_path + hdfs_file
print("hive_path:", hive_path)
df = self.read_data(hive_path=hive_path)
self.df_save, index_count = self.handle_data(df=df, index=index)
# local_path = rf"hdfs://hadoop5:8020/home/ffman/embeddings/folder_test/part_{index}"
local_path = rf"{embeddings_dir}/part_{index}.parquet"
self.index_count += index_count
print("index_count, self.index_count", index_count, self.index_count)
self.save_data_to_hive()
self.save_data_to_local(df=self.df_save.select("embedding"), local_path=local_path)
if __name__ == '__main__':
handle_obj = DwdAsinFeaturesParquet()
handle_obj.run()