import os
import sys
import pandas as pd

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates_mysql import TemplatesMysql
from ..utils.templates_mysql import TemplatesMysql
from utils.templates import Templates
# from ..utils.templates import Templates
from py4j.java_gateway import java_import


class PicturesHdfsIndex(Templates):

    def __init__(self, site_name='us'):
        super(PicturesHdfsIndex, self).__init__()
        self.site_name = site_name
        self.engine_pg = TemplatesMysql().engine_pg
        self.db_save = f'{self.site_name}_pictures_hdfs_index'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}")
        self.df_features = self.spark.sql(f"select 1+1;")
        self.df_save = pd.DataFrame()
        self.hdfs_file_path = f'hdfs://nameservice1:8020/home/big_data_selection/dim/pictures_dim_features_slice/site_name={self.site_name}/'
        self.hdfs_file_list = self.get_hdfs_file_list()
        self.index_count = 0

    def get_hdfs_file_list(self):
        # 导入hadoop的包
        java_import(self.spark._jvm, 'org.apache.hadoop.fs.Path')
        # fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(self.spark._jsc.hadoopConfiguration(self.hdfs_file_path))
        # status = fs.listStatus(self.spark._jvm.org.apache.hadoop.fs.Path())

        fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(self.spark._jsc.hadoopConfiguration())
        path = self.spark._jvm.org.apache.hadoop.fs.Path(self.hdfs_file_path)
        status = fs.listStatus(path)

        hdfs_file_list = [file_status.getPath().getName() for file_status in status]
        return hdfs_file_list

    def read_data(self, hdfs_path):
        df = self.spark.read.text(hdfs_path)
        index_count = df.count()
        return df, index_count

    def handle_data(self):
        pass

    def save_data(self):
        # 清空
        with self.engine_pg.begin() as conn:
            sql = f"truncate {self.db_save};"
            conn.execute(sql)
        # 保存
        self.df_save.to_sql(self.db_save, con=self.engine_pg, if_exists="append", index=False)

    def run(self):
        data_list = []
        for hdfs_file in self.hdfs_file_list:
            index = self.hdfs_file_list.index(hdfs_file)
            hdfs_path = self.hdfs_file_path + hdfs_file
            df, index_count = self.read_data(hdfs_path)
            data_list.append([index, hdfs_path, index_count, self.index_count])
            print([index, hdfs_path, index_count, self.index_count])
            self.index_count += index_count
        self.df_save = pd.DataFrame(data=data_list, columns=['index', 'hdfs_path', 'current_counts', 'all_counts'])
        # self.df_save.to_csv("/root/hdfs_parquet_block_info.csl")
        self.save_data()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    handle_obj = PicturesHdfsIndex()
    handle_obj.run()