1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import os
import sys
import pandas as pd
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates_mysql import TemplatesMysql
from utils.templates import Templates
# from ..utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from py4j.java_gateway import java_import
class DwdGetEachParquetCount(Templates):
def __init__(self, site_name='us'):
super(DwdGetEachParquetCount, self).__init__()
self.site_name = site_name
self.engine = TemplatesMysql().engine
self.db_save = f'dwd_get_each_parquet_count'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}: {self.site_name}")
self.df_features = self.spark.sql(f"select 1+1;")
self.df_save = pd.DataFrame()
self.hdfs_file_path = f'hdfs://hadoop5:8020/home/big_data_selection/dim/dim_asin_features_parquet/site_name={self.site_name}/'
self.hdfs_file_list = self.get_hdfs_file_list()
self.index_count = 0
def get_hdfs_file_list(self):
# 导入hadoop的包
java_import(self.spark._jvm, 'org.apache.hadoop.fs.Path')
# fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(self.spark._jsc.hadoopConfiguration(self.hdfs_file_path))
# status = fs.listStatus(self.spark._jvm.org.apache.hadoop.fs.Path())
fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(self.spark._jsc.hadoopConfiguration())
path = self.spark._jvm.org.apache.hadoop.fs.Path(self.hdfs_file_path)
status = fs.listStatus(path)
hdfs_file_list = [file_status.getPath().getName() for file_status in status]
return hdfs_file_list
def read_data(self, hive_path):
df = self.spark.read.text(hive_path)
index_count = df.count()
return df, index_count
def handle_data(self):
pass
def save_data(self):
# 清空
with self.engine.begin() as conn:
sql = f"truncate us_pictures_index_info;"
conn.execute(sql)
# 保存
self.df_save.to_sql("us_pictures_index_info", con=self.engine, if_exists="append", index=False)
def run(self):
data_list = []
for hdfs_file in self.hdfs_file_list:
index = self.hdfs_file_list.index(hdfs_file)
hive_path = self.hdfs_file_path + hdfs_file
df, index_count = self.read_data(hive_path)
data_list.append([index, hive_path, index_count, self.index_count])
print([index, hive_path, index_count, self.index_count])
self.index_count += index_count
self.df_save = pd.DataFrame(data=data_list, columns=['index', 'hive_path', 'current_counts', 'all_counts'])
# self.df_save.to_csv("/root/hdfs_parquet_block_info.csl")
self.save_data()
if __name__ == '__main__':
handle_obj = DwdGetEachParquetCount()
handle_obj.run()