import json
import os
import re
import sys
import time
import traceback
import zlib

import pandas as pd
import redis
from datetime import datetime

sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/")

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from utils.templates_mysql import TemplatesMysql
# from ..utils.templates_mysql import TemplatesMysql
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession


class DimStAsinInfo(Templates):

    def __init__(self, site_name='us', date_type="day", date_info='2022-10-01', consumer_type='lastest', topic_name="us_asin_detail", batch_size=100000):
        super(DimStAsinInfo, self).__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.consumer_type = consumer_type  # 消费实时还是消费历史
        self.topic_name = topic_name  # 主题名字
        self.batch_size = batch_size
        self.batch_size_history = int(batch_size / 10)
        self.db_save = f'kafka_test_001'
        # self.spark = self.create_spark_object(
        #     app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")

        # 创建 SparkSession
        # self.spark = SparkSession.builder \
        #     .appName("KafkaConsumerApp") \
        #     .getOrCreate()

        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
        self.schema = self.init_schema()

        # 连接mysql
        self.engine = self.get_connection()

    def get_connection(self):
        return TemplatesMysql(site_name="us").mysql_connect()

    def judge_spider_asin_detail_is_finished(self):
        while True:
            try:
                sql = f'SELECT * from workflow_progress WHERE page="ASIN详情" and site_name="{self.site_name}" and date_type="{self.date_type}" and date_info="{self.date_info}" and status_val=3'
                df = pd.read_sql(sql, con=self.engine)
                if df.shape[0] == 1:
                    print(f"ASIN详情状态为3, 抓取完成并终止程序, site_name:{self.site_name}, date_type:{self.date_type}, date_info:{self.date_info}")
                    self.spark.stop()
                    quit()  # 退出程序
                break
            except Exception as e:
                print(e, traceback.format_exc())
                time.sleep(10)
                self.engine = self.get_connection()

    @staticmethod
    def init_schema():
        schema = StructType([
            StructField("asin", StringType(), True),
            StructField("week", StringType(), True),
            StructField("title", StringType(), True),
            StructField("img_url", StringType(), True),
            StructField("rating", StringType(), True),
            StructField("total_comments", StringType(), True),
            StructField("price", FloatType(), True),
            StructField("rank", StringType(), True),
            StructField("category", StringType(), True),
            StructField("launch_time", StringType(), True),
            StructField("volume", StringType(), True),
            StructField("weight", StringType(), True),
            StructField("page_inventory", IntegerType(), True),
            StructField("buy_box_seller_type", IntegerType(), True),
            StructField("asin_vartion_list", IntegerType(), True),
            StructField("title_len", IntegerType(), True),
            StructField("img_num", IntegerType(), True),
            StructField("img_type", StringType(), True),
            StructField("activity_type", StringType(), True),
            StructField("one_two_val", StringType(), True),
            StructField("three_four_val", StringType(), True),
            StructField("eight_val", StringType(), True),
            StructField("qa_num", IntegerType(), True),
            StructField("five_star", IntegerType(), True),
            StructField("four_star", IntegerType(), True),
            StructField("three_star", IntegerType(), True),
            StructField("two_star", IntegerType(), True),
            StructField("one_star", IntegerType(), True),
            StructField("low_star", IntegerType(), True),
            StructField("together_asin", StringType(), True),
            StructField("brand", StringType(), True),
            StructField("ac_name", StringType(), True),
            StructField("material", StringType(), True),
            StructField("node_id", StringType(), True),
            StructField("data_type", IntegerType(), True),
            StructField("sp_num", StringType(), True),
            StructField("describe", StringType(), True),
            StructField("date_info", StringType(), True),
            StructField("weight_str", StringType(), True),
            StructField("package_quantity", StringType(), True),
            StructField("pattern_name", StringType(), True),
            StructField("seller_id", StringType(), True),
            StructField("variat_num", IntegerType(), True),
            StructField("site_name", StringType(), True),
            StructField("best_sellers_rank", StringType(), True),
            StructField("best_sellers_herf", StringType(), True),
            StructField("account_url", StringType(), True),
            StructField("account_name", StringType(), True),
            StructField("parentAsin", StringType(), True),
            StructField("asinUpdateTime", StringType(), True),
        ])
        return schema

    @staticmethod
    def clean_kafka_df(df):
        df = df.withColumnRenamed("seller_id", "account_id")
        # cols_python = ["asin", "parentAsin", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating",
        #         "brand", "brand", "account_id", "account_name", "account_url", "buy_box_seller_type",
        #         "volume", "weight", "weight_str", "launchTime", "total_comments", "page_inventory"]
        # oneCategoryRank, aoVal, bsrOrders, bsrOrdersSale
        # siteName volumeFormat weightFormat asinUpdateTime
        # java那边插件的字段名称
        cols_java = ['asin', 'parentAsin', 'asinVarNum', 'oneCategoryRank', 'bestSellersRank', 'lastHerf', 'aoVal', 'price', 'rating',
                    'bsrOrders', 'bsrOrdersSale', 'brandName', 'accountId', 'accountName', 'accountUrl', 'siteName', 'buyBoxSellerType',
                    'volume', 'volumeFormat', 'weight', 'weightFormat', 'launchTime', 'totalComments', 'pageInventory', 'asinUpdateTime']
        df = df.select("asin", "parentAsin", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating",
                        "brand", "account_id", "account_name", "account_url", "buy_box_seller_type",
                        "volume", "weight", "weight_str", "launch_time", "total_comments", "page_inventory", "asinUpdateTime", "site_name", "node_id")
        return df

    def get_topic_name(self):
        if self.site_name == "us" and self.date_type == "month":
            self.topic_name = f"{site_name}_asin_detail_{self.date_type}"
        else:
            self.topic_name = f"{site_name}_asin_detail"

    def handle_history(self):
        self.get_topic_name()
        consumer = self.get_kafka_object_by_python(topic_name=self.topic_name)
        partition_data_count = self.get_kafka_partitions_data(consumer=consumer, topic_name=self.topic_name)

        beginning_offsets_list = []
        end_offsets_list = []
        for values in partition_data_count.values():
            beginning_offsets_list.append(values['beginning_offsets'])
            end_offsets_list.append(values['end_offsets'])

        min_offset = min(beginning_offsets_list)
        max_offset = max(end_offsets_list)

        # max_offset = max(partition_data_count.values())
        # for start_offset in range(0, max_offset+1, self.batch_size_history):
        for start_offset in range(min_offset, max_offset+1, self.batch_size_history):
            end_offset = start_offset + self.batch_size_history
            starting_offsets_json = json.dumps({self.topic_name: {str(p): start_offset for p in partition_data_count.keys()}})
            ending_offsets_json = json.dumps({self.topic_name: {str(p): end_offset for p in partition_data_count.keys()}})
            kafka_df = self.spark.read \
                .format("kafka") \
                .option("kafka.bootstrap.servers", self.kafka_servers) \
                .option("subscribe", self.topic_name) \
                .option("startingOffsets", starting_offsets_json) \
                .option("endingOffsets", ending_offsets_json) \
                .option("failOnDataLoss", "false") \
                .load() \
                .select(F.from_json(F.col("value").cast("string"), schema=self.schema).alias("data")) \
                .select("data.*")
            print(f"kafka_df.count():{kafka_df.count()}, start_offset:{start_offset}, end_offset:{end_offset}")
            pdf = kafka_df.toPandas()
            # pdf.to_sql()

        # 关闭SparkSession
        self.spark.stop()

    def run(self):
        self.handle_history()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter/day
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
    consumer_type = sys.argv[4]  # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
    handle_obj = DimStAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info, consumer_type=consumer_type, batch_size=100000)
    handle_obj.run()