import os
import sys


sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from utils.templates import Templates
from pyspark.sql import functions as F


class FlowAsinLast30days(Templates):

    def __init__(self):
        super().__init__()
        self.db_save = "tmp_flow_asin_last30days"
        self.spark = self.create_spark_object(app_name="FlowAsinLast30days")
        self.partitions_num = 20
        self.partition_dict = {}
        self.df_es = self.spark.sql(f"select 1+1;")
        self.df_parent = self.spark.sql(f"select 1+1;")
        self.df_joined = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")

    def read_data(self):
        self.df_es = self.spark.read.format("org.elasticsearch.spark.sql")\
            .option("es.nodes", "192.168.10.217")\
            .option("es.port", "9200")\
            .option("es.net.http.auth.user", "elastic")\
            .option("es.net.http.auth.pass", "selection2021.+")\
            .option("es.resource", "us_st_detail_last_4_week")\
            .option("es.query", '{"query": {"match_all": {}}}')\
            .load()

        columns = ["asin", "first_category_rank", "asin_bought_month", "total_comments", "variation_num", "site_name", "account_name"]
        self.df_es = self.df_es.select(columns).cache()
        self.df_es.show()

        sql = f"""
        select
            asin,
            parent_asin
        from
            ods_asin_variat;
        """
        self.df_parent = self.spark.sql(sqlQuery=sql).cache()

    def handle_data(self):
        # self.df_parent = self.df_parent.groupby(["parent_asin"]).agg(F.count("asin").alias("variation_num"))
        self.df_joined = self.df_es.join(self.df_parent, "asin", "left")
        self.df_joined = self.df_joined\
            .withColumn("parent_asin_is_null", F.when(F.col("parent_asin").isNull(), F.lit(1)).otherwise(F.lit(0)))\
            .withColumn("parent_asin_exist", F.when(F.col("parent_asin").isNotNull(), F.lit(1)).otherwise(F.lit(0)))

    def save_data(self):
        self.df_save = self.df_joined
        hdfs_path_asin_info = CommonUtil.build_hdfs_path(self.db_save, partition_dict=self.partition_dict)
        print(f"清除hdfs目录中:{hdfs_path_asin_info}")
        HdfsUtils.delete_file_in_folder(hdfs_path_asin_info)
        print(f"当前存储的表名为:{self.db_save}")
        self.df_save.write.saveAsTable(name=self.db_save, format='hive', mode='append')
        print("success")


if __name__ == '__main__':
    obj = FlowAsinLast30days()
    obj.run()