"""
author: 方星钧(ffman)
description: pyspark程序继承模板
table_read_name: 无
table_save_name: 无
table_save_level: 无
version: 1.0
created_date: 2022-05-07
updated_date: 2022-05-07
"""
import calendar
import json
import os
import sys
import time
import traceback
import uuid

import pandas as pd
import redis

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录

from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from kafka import KafkaConsumer, TopicPartition
from pyspark.sql.types import *
from utils.db_util import DbTypes, DBUtil
from utils.hdfs_utils import HdfsUtils
from pyspark.sql import Window
from utils.common_util import CommonUtil
import subprocess
import requests


class Templates(object):
    def __init__(self):
        # 站点
        self.site_name = str()
        # 日期相关
        self.year_quarter = str()
        self.year_month = str()
        self.year_week = str()
        self.year = int()
        self.quarter = int()
        self.month = int()
        self.week = int()
        self.day = int()
        self.year_month_days_dict = dict()
        self.year_quarter_tuple = tuple()
        self.year_month_tuple = tuple()
        self.year_week_tuple = tuple()
        self.date_info_tuple = tuple()
        self.last_30_date_tuple = tuple()
        self.date_type = str()  # week/4_week/month/quarter
        self.date_info = str()  # 2022-1/2022-1/2022-1/2022-1
        self.app_name = str()
        # spark相关
        self.db_save = str()  # 需要存储的hive表名
        self.db_name = "big_data_selection"  # 需要存储的hive表名
        # self.db_name = "selection_off_line"  # 需要存储的hive表名
        self.df_save = object()  # 需要存储的df数据对象
        self.df_week = object()  # 需要存储的df数据对象
        self.df_date = object()  # 需要存储的df数据对象
        self.partitions_num = int()  # df数据对象的分区数重置
        self.partitions_by = list()  # hive分区表对应的分区
        # self.reset_partitions()
        self.reset_partitions_by()
        self.spark = None  # spark程序的执行入口对象
        self.topic_name = str()
        # my_kafka
        self.consumer = object()
        # 在此处定义 Kafka 认证和安全参数
        # self.kafka_servers = "192.168.10.221:9092,192.168.10.220:9092,192.168.10.210:9092"
        # self.kafka_servers_producer = "61.145.136.61:19092,61.145.136.61:29092,61.145.136.61:39092"
        self.kafka_servers = "192.168.10.218:9092,192.168.10.219:9092,192.168.10.220:9092"
        self.kafka_servers_producer = "'61.145.136.61:19092,61.145.136.61:29092,61.145.136.61:49092,61.145.136.61:59092'"
        self.kafka_security_protocol = "SASL_PLAINTEXT"
        self.kafka_sasl_mechanism = "PLAIN"
        self.kafka_username = "consumer"
        self.kafka_password = "J2#aLmPq7zX"
        self.consumer_type = 'lastest'
        self.processing_time = 300
        self.check_path = str()
        # 实时计算的query对象
        self.query = None
        # 数据库连接
        self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name="us")
        # 爬虫类型
        self.spider_type = "asin详情"
        # 指定历史消费起始偏移量
        self.beginning_offsets = 0
        # 测试标识
        self.test_flag = 'normal'
        self.beginning_offsets_dict = {}  # history消费时, 初始的偏移量
        # redis连接对象--用来锁定--解决并发
        self.client = redis.Redis(host='192.168.10.224', port=6379, db=9, password='yswg2023')

    def create_spark_object(self, app_name=None):
        if self.topic_name != '':
            print("创建实时相关SparkSession对象")
            spark = SparkUtil.get_stream_spark(app_name, self.db_name)
        else:
            print("创建非实时相关SparkSession对象")
            spark = SparkUtil.get_spark_session(app_name, self.db_name)
        return spark

    # 针对消费kafka得到的dataframe去重
    def deduplication_kafka_data(self, kafka_df, deduplicaiton_key_field, deduplication_time_field):
        print(f"数据去重清洗,清洗依据字段: {deduplicaiton_key_field}, 排序依据字段: {deduplication_time_field}")
        window = Window.partitionBy(deduplicaiton_key_field).orderBy(
            F.col(deduplication_time_field).desc_nulls_last()
        )
        kafka_df = kafka_df.withColumn("k_rank", F.row_number().over(window=window))
        kafka_df = kafka_df.filter("k_rank=1").drop("k_rank")
        return kafka_df

    def create_kafka_df_object(self, consumer_type=str(), topic_name=str(), starting_offsets_json=str(), ending_offsets_json=str(), schema=StructType()):
        if consumer_type == "latest":
            # 流处理
            kafka_df = self.spark.readStream \
                    .format("kafka") \
                    .option("kafka.bootstrap.servers", self.kafka_servers) \
                    .option("subscribe", topic_name) \
                    .option("kafka.security.protocol", self.kafka_security_protocol) \
                    .option("kafka.sasl.mechanism", self.kafka_sasl_mechanism) \
                    .option("kafka.sasl.jaas.config",
                            f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{self.kafka_username}" password="{self.kafka_password}";') \
                    .option("startingOffsets", consumer_type) \
                    .option("failOnDataLoss", "false") \
                    .load() \
                    .select(F.from_json(F.col("value").cast("string"), schema=schema).alias("data")) \
                    .select("data.*")
            return kafka_df
        elif consumer_type == "history":
            # 批处理
            kafka_df = self.spark.read \
                    .format("kafka") \
                    .option("kafka.bootstrap.servers", self.kafka_servers) \
                    .option("subscribe", topic_name) \
                    .option("kafka.security.protocol", self.kafka_security_protocol) \
                    .option("kafka.sasl.mechanism", self.kafka_sasl_mechanism) \
                    .option("kafka.sasl.jaas.config",
                    f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{self.kafka_username}" password="{self.kafka_password}";') \
                    .option("failOnDataLoss", "false") \
                    .option("startingOffsets", starting_offsets_json) \
                    .option("endingOffsets", ending_offsets_json) \
                    .load() \
                    .select(F.from_json(F.col("value").cast("string"), schema=schema).alias("data")) \
                    .select("data.*")
            if self.spider_type == 'asin详情' and kafka_df.count() > 0:
                kafka_df = self.deduplication_kafka_data(kafka_df, "asin", "asinUpdateTime")
            return kafka_df

    def get_kafka_object_by_python(self, topic_name="us_asin_detail"):
        consumer = KafkaConsumer(
            topic_name,
            bootstrap_servers=self.kafka_servers,
            value_deserializer=lambda x: json.loads(x.decode('utf-8')),
            security_protocol=self.kafka_security_protocol,  # 或者 'SASL_SSL' 如果你使用 SSL
            sasl_mechanism=self.kafka_sasl_mechanism,
            sasl_plain_username=self.kafka_username,
            sasl_plain_password=self.kafka_password
        )
        return consumer

    # @staticmethod
    def get_kafka_partitions_data(self, consumer=None, topic_name="us_asin_detail"):
        partitions = consumer.partitions_for_topic(topic_name)
        partition_data_count = {}

        for pid in partitions:
            # 创建一个TopicPartition对象
            tp = TopicPartition(topic_name, pid)

            # 获取该分区的最早和最新的offsets
            beginning_offsets = consumer.beginning_offsets([tp])[tp] if self.beginning_offsets == 0 else self.beginning_offsets
            end_offsets = consumer.end_offsets([tp])[tp]

            # 数据量即为这两个offsets之差
            data_count = end_offsets - beginning_offsets
            offset_dict = {
                "beginning_offsets": beginning_offsets,
                "end_offsets": end_offsets,
                "data_count": data_count,
            }
            # partition_data_count[pid] = data_count
            partition_data_count[pid] = offset_dict
        print("partition_data_count:", partition_data_count)
        return partition_data_count

    def get_kafka_df_by_spark(self, schema=None, consumption_type="lastest", topics=f"us_asin_detail"):
        # .option("startingOffsets", consumption_type) \
        # .option("maxOffsetsPerTrigger", 1000) \  # 每个触发器周期读取的最大消息数量
        kafka_df = self.spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", self.kafka_servers) \
            .option("subscribe", topics) \
            .option("kafka.security.protocol", self.kafka_security_protocol) \
            .option("kafka.sasl.mechanism", self.kafka_sasl_mechanism) \
            .option("kafka.sasl.jaas.config",
                    f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{self.kafka_username}" password="{self.kafka_password}";') \
            .option("startingOffsets", consumption_type) \
            .load() \
            .select(F.from_json(F.col("value").cast("string"), schema=schema).alias("data")) \
            .select("data.*")
        return kafka_df

    def get_year_month_days_dict(self, year=2022):
        self.year_month_days_dict = {month: calendar.monthrange(year, month)[-1] for month in range(1, 13)}

    def get_date_info_tuple(self):
        self.df_date = self.spark.sql(f"select * from dim_date_20_to_30;")
        df = self.df_date.toPandas()
        if self.date_type == 'day':
            df_loc = df.loc[df.date == f'{self.date_info}']
            self.date_info_tuple = f"('{tuple(df_loc.date)[0]}')"
            self.year, self.month, self.day = self.date_info.split("-")
            self.week = list(df_loc.year_week)[0].split("-")[-1]
        if self.date_type in ['week', 'week_old', 'month', 'month_old']:
            # df_loc = df.loc[df[f'year_{self.date_type}'] == f"{self.date_info}"]
            # self.date_info_tuple = tuple(df_loc.date)
            if self.date_type in ['week', 'week_old']:
                df_loc = df.loc[df[f'year_week'] == f"{self.date_info}"]
                self.date_info_tuple = tuple(df_loc.date)
                self.year, self.week = self.date_info.split("-")
                self.month = list(df_loc.year_month)[0].split("-")[-1]
            if self.date_type in ['month', 'month_old']:
                df_loc = df.loc[df[f'year_month'] == f"{self.date_info}"]
                self.date_info_tuple = tuple(df_loc.date)
                self.year, self.month = self.date_info.split("-")
        if self.date_type == '4_week':
            df_loc = df.loc[(df[f'year_week'] == f"{self.date_info}") & (df.week_day == 1)]
            current_id = tuple(df_loc.id)[0]
            id_tuple = (current_id, current_id - 7 * 1, current_id - 7 * 2, current_id - 7 * 3)
            df_4_week = df.loc[df.id.isin(id_tuple)]  # 4
            df_4_week = df.loc[df.year_week.isin(df_4_week.year_week)]  # 4*7
            self.date_info_tuple = tuple(sorted(list(df_4_week.date)))
            self.year, self.week = self.date_info.split("-")
            self.year = tuple(df_loc.year)[0]
            self.month = list(df_loc.year_month)[0].split("-")[-1]
        if self.date_type == 'last30day':
            df_loc = df.loc[df.date == f'{self.date_info}']
            day_end_id = list(df_loc.id)[0]
            # 减去29天,获取到30天前的id
            day_begin_id = (int(day_end_id) - 29)
            df_loc = df.loc[(df.id >= day_begin_id) & (df.id <= day_end_id)]
            self.date_info_tuple = tuple(df_loc.date)
            self.year, self.month, self.day = self.date_info.split("-")
        print("self.date_info_tuple:", self.date_info_tuple)

    def get_year_week_tuple(self):
        self.df_week = self.spark.sql(f"select * from dim_date_20_to_30 where week_day=1;")
        # self.df_week = self.spark.sql(f"select * from dim_week_20_to_30;")
        df = self.df_week.toPandas()
        df.year_month = df.year_month.apply(lambda x: x.replace("_", "-"))
        df.year_quarter = df.year_quarter.apply(lambda x: x.replace("_quarter_", "-"))
        if self.date_type in ['week']:
            self.year_week = self.date_info
            # self.year, self.week = int(self.year_week.split("-")[0]), int(self.year_week.split("-")[1])
            self.year, self.week = self.year_week.split("-")[0], self.year_week.split("-")[1]
            self.year_week_tuple = f"('{self.year_week}')"
        if self.date_type in ['4_week']:
            self.year_week = self.date_info
            self.year, self.week = self.year_week.split("-")[0], self.year_week.split("-")[1]
            df_week = df.loc[df.year_week == self.year_week]
            current_id = list(df_week.id)[0] if list(df_week.id) else None
            id_tuple = (current_id, current_id - 7*1, current_id - 7*2, current_id - 7*3)
            df_4_week = df.loc[df.id.isin(id_tuple)]
            self.year_week_tuple = tuple(df_4_week.year_week) if tuple(df_4_week.year_week) else ()
            df_week = df.loc[(df.year_week == self.date_info) & (df.week_day == 1)]
            self.year = tuple(df_week.year)[0]
            self.month = tuple(df_week.month)[0]
            print(f"self.year:{self.year}, self.month:{self.month}")
        if self.date_type in ['month', 'month_old', 'month_week']:
            self.year_month = self.date_info
            self.year, self.month = self.year_month.split("-")[0], self.year_month.split("-")[1]
            df_month = df.loc[df.year_month == self.year_month]
            self.year_week_tuple = tuple(df_month.year_week) if tuple(df_month.year_week) else ()
        if self.date_type in ['quarter']:
            self.year_quarter = self.date_info
            self.year, self.quarter = self.year_quarter.split("-")[0], self.year_quarter.split("-")[1]
            df_quarter = df.loc[df.year_quarter == self.year_quarter]
            self.year_week_tuple = tuple(df_quarter.year_week) if tuple(df_quarter.year_week) else ()
        print("self.year_week_tuple:", self.year_week_tuple)
        return df

    def reset_partitions(self, partitions_num=10):
        print("重置分区数")
        if self.site_name in ['us']:
            self.partitions_num = partitions_num
        elif self.site_name in ['uk', 'de']:
            self.partitions_num = partitions_num // 2 if partitions_num // 2 > 0 else 1
        elif self.site_name in ['es', 'fr', 'it']:
            self.partitions_num = partitions_num // 4 if partitions_num // 4 > 0 else 1

    def reset_partitions_by(self):
        if self.date_type in ['week', '4_week']:
            self.partitions_by = ['site_name', 'dt']
        if self.date_type in ['month']:
            self.partitions_by = ['site_name', 'dm']
        if self.date_type in ['quarter']:
            self.partitions_by = ['site_name', 'dq']

    def read_data(self):
        pass

    def handle_data(self):
        pass

    @staticmethod
    def save_data_common(df_save=None, db_save=None, partitions_num=None, partitions_by=None):
        print("当前存储的表名为:", db_save)
        df_save = df_save.repartition(partitions_num)
        df_save.write.saveAsTable(name=db_save, format='hive', mode='append', partitionBy=partitions_by)

    def save_data(self):
        self.save_data_common(df_save=self.df_save, db_save=self.db_save, partitions_num=self.partitions_num,
                              partitions_by=self.partitions_by)

    # 采用insert overwrite模式覆写数据,覆写模式一定要保证dataFrame的字段顺序与表字段顺序一致
    @staticmethod
    def insert_data_overwrite(df_save=None, db_save=None, partitions_num=None):
        print("当前覆写得表名为:", db_save)
        df_save = df_save.repartition(partitions_num)
        df_save.write.insertInto(tableName=db_save, overwrite=True)

    def insert_data(self):
        self.insert_data_overwrite(df_save=self.df_save, db_save=self.db_save, partitions_num=self.partitions_num)

    def run(self):
        # while True:
        # try:
        self.read_data()
        self.handle_data()
        self.save_data()
            # break
        # except Exception as e:
        #     print("error_info:", e, traceback.format_exc())
            # continue

    def start_process_instance(self):
        pass

    def kafka_stream_stop(self):
        try:
            self.start_process_instance()  # 开启海豚调度
            if self.query is not None:
                self.query.awaitTermination()
                self.query.stop()  # 退出实时消费
            if self.spark is not None:
                self.spark.stop()
            exit(0) # 退出程序
        except Exception as e:
            print(e, traceback.format_exc())

    def kafka_consumption_is_finished(self):
        while True:
            try:
                # if self.site_name == 'us':
                #     # sql = f"SELECT * from workflow_progress WHERE site_name='{self.site_name}' and page='{self.spider_type}' ORDER BY created_at desc LIMIT 1;"
                #     sql = f"""
                #         SELECT * from workflow_progress WHERE site_name='{self.site_name}' and page='{self.spider_type}'
                #         and date_info in
                #         -- (SELECT MAX(year_week) as date_info from date_20_to_30 WHERE `year_month` = '2024-02' and week_day =1
                #         (SELECT year_week as date_info from date_20_to_30 WHERE `year_month` = '{self.date_info}' and week_day =1
                #         )
                #         ORDER BY created_at desc LIMIT 1;
                #
                #     """
                # else:
                #     sql = f"SELECT * from selection.workflow_progress WHERE site_name='{self.site_name}' and date_info='{self.date_info}' and page='{self.spider_type}' ORDER BY created_at desc LIMIT 1;"
                sql = f"SELECT * from selection.workflow_progress WHERE site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and page='{self.spider_type}' and spider_state=3;"
                print(f"判断爬虫'{self.spider_type}'是否结束, sql: {sql}")
                df = pd.read_sql(sql, con=self.engine_mysql)
                if df.shape[0]:
                    status_val = list(df.status_val)[0]
                    if int(status_val) == 3:
                        print(f"spider_type:{self.spider_type}已经爬取完毕, 退出kafka消费和停止程序")
                        if self.consumer_type == "latest":
                            if HdfsUtils.delete_hdfs_file_with_checkpoint(self.check_path):
                                print("实时消费正常完成,删除对应的检查点文件")
                            self.kafka_stream_stop()
                    else:
                        print(f"spider_type:{self.spider_type}还在爬取中, 继续下一个批次数据消费")
                break
            except Exception as e:
                print(f"判断判断爬虫'{self.spider_type}'是否结束---出现异常, 等待20s", e, traceback.format_exc())
                time.sleep(20)
                self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name="us")
                continue

    def kafka_stream(self, processing_time):
        kafka_df = self.create_kafka_df_object(consumer_type="latest", topic_name=self.topic_name, schema=self.schema)
        if self.test_flag == 'test':
            self.query = kafka_df.writeStream \
                .outputMode("append") \
                .format("console") \
                .foreachBatch(self.handle_kafka_stream_templates) \
                .trigger(processingTime=f'{processing_time} seconds') \
                .start()
            self.query.awaitTermination()
        else:
            self.check_path = f"/tmp/kafka/{self.topic_name}" if self.check_path == "" else self.check_path
            print("检查点目录为:", self.check_path)
            HdfsUtils.is_checkpoint_exist(self.check_path)
            self.query = kafka_df.writeStream \
                .outputMode("append") \
                .format("console") \
                .foreachBatch(self.handle_kafka_stream_templates) \
                .trigger(processingTime=f'{processing_time} seconds') \
                .option("checkpointLocation", self.check_path) \
                .start()
            self.query.awaitTermination()

    def handle_kafka_stream_templates(self, kafka_df, epoch_id):
        if self.spider_type == 'asin详情' and kafka_df.count() > 0:
            kafka_df = self.deduplication_kafka_data(kafka_df, "asin", "asinUpdateTime")
        self.handle_kafka_stream(kafka_df, epoch_id)
        if self.test_flag == 'normal':
            self.kafka_consumption_is_finished()

    def handle_kafka_stream(self, kafka_df, epoch_id):
        pass

    def get_offsets_by_history(self):
        if self.db_save in ['spider_asin_detail', 'spider_asin_search']:
            sql = f"select * from selection.kafka_offset_history_detail " \
                  f"where site_name='{self.site_name}' and date_type='{self.date_type}' " \
                  f"and date_info='{self.date_info}' and topic='{self.topic_name}';"
            print(f"sql: {sql}")
            df = pd.read_sql(sql, con=self.engine_mysql)
            if df.shape[0] == 1:
                end_offsets_json = list(df.end_offsets_json)[0]
                print(f"end_offsets_json: {end_offsets_json}")
                # self.beginning_offsets_dict = json.loads(end_offsets_json)  # history消费时, 初始的偏移量
                self.beginning_offsets_dict = eval(end_offsets_json)  # history消费时, 初始的偏移量

    def record_offsets_by_history(self, end_offsets_dict):
        if self.db_save in ['spider_asin_detail', 'spider_asin_search']:
            # 将字典转换为 JSON 字符串
            end_offsets_json = json.dumps(end_offsets_dict)

            sql = f"""
                INSERT INTO selection.kafka_offset_history_detail 
                (site_name, date_type, date_info, topic, end_offsets_json) 
                VALUES 
                ('{self.site_name}', '{self.date_type}', '{self.date_info}', '{self.topic_name}', '{end_offsets_json}')
                ON DUPLICATE KEY UPDATE 
                site_name = VALUES(site_name), 
                date_type = VALUES(date_type), 
                date_info = VALUES(date_info), 
                topic = VALUES(topic);
--                 end_offsets_json = VALUES(end_offsets_json);
                """
            print(f"记录爬虫历史消费的偏移量: {self.db_save}--sql: {sql}")

            with self.engine_mysql.begin() as conn:
                conn.execute(sql)
        else:
            print(f"只有爬虫才需要记录历史消费的偏移量: {self.db_save}")
            pass


    def kafka_history(self, topic_name, batch_size_history, schema):
        consumer = self.get_kafka_object_by_python(topic_name=topic_name)
        partition_offsets_dict = self.get_kafka_partitions_data(consumer=consumer, topic_name=topic_name)
        partition_num = len(partition_offsets_dict)
        beginning_offsets_dict = {}
        end_offsets_dict = {}
        # self.beginning_offsets_dict = {"0": 69355, "1": 69761, "2": 70827, "3": 69609, "4": 71099, "5": 69922, "6": 70054, "7": 70798}
        self.get_offsets_by_history()  # 获取历史消费的偏移量
        if self.beginning_offsets_dict != {}:
            for key, value in self.beginning_offsets_dict.items():
                beginning_offsets = int(partition_offsets_dict[int(key)]['beginning_offsets'])
                beginning_offsets = max(beginning_offsets, int(value))
                partition_offsets_dict[int(key)]['beginning_offsets'] = beginning_offsets

        while True:
            try:
                # 更新偏移量(当kafka主题有数据正在生产/数据自动删除时, 就需要及时更新起始偏移量)
                partition_offsets_dict_check = self.get_kafka_partitions_data(consumer=consumer, topic_name=topic_name)
                print("partition_offsets_dict:", partition_offsets_dict)
                print("partition_offsets_dict_check:", partition_offsets_dict_check)
                # 生产时 -- 更新end_offsets
                for key, value in partition_offsets_dict_check.items():
                    partition_offsets_dict[key]['end_offsets'] = value['end_offsets']
                    # 删除时 -- 更新beginning_offsets
                    if value['beginning_offsets'] > partition_offsets_dict[key]['beginning_offsets']:
                        partition_offsets_dict[key]['beginning_offsets'] = value['beginning_offsets']

                num = 0
                for key, value in partition_offsets_dict.items():
                    # 起始偏移量
                    beginning_offsets = value['beginning_offsets']
                    beginning_offsets_dict[str(key)] = beginning_offsets
                    # 结束偏移量
                    end_offsets = value['beginning_offsets'] + batch_size_history
                    end_offsets_partition = value['end_offsets']
                    end_offsets_dict[str(key)] = min(end_offsets, end_offsets_partition)
                    if end_offsets >= end_offsets_partition:
                        num += 1
                    else:
                        partition_offsets_dict[key]['beginning_offsets'] = end_offsets

                    # 当kafka主题有数据正在生产/数据自动删除时, 就需要及时更新起始偏移量
                    # 删除时
                    # if beginning_offsets_dict[str(key)] < partition_offsets_dict_check[key]['beginning_offsets']:
                    #     beginning_offsets_dict[str(key)] = partition_offsets_dict_check[key]['beginning_offsets']
                    #     partition_offsets_dict[key]['beginning_offsets'] = partition_offsets_dict_check[key]['beginning_offsets']
                    # # 生产时
                    # if partition_offsets_dict[key]['end_offsets'] < partition_offsets_dict_check[key]['end_offsets']:
                    #     partition_offsets_dict[key]['end_offsets'] = partition_offsets_dict_check[key]['end_offsets']

                starting_offsets_json = json.dumps({topic_name: beginning_offsets_dict})
                ending_offsets_json = json.dumps({topic_name: end_offsets_dict})
                print(f"starting_offsets_json: {starting_offsets_json}, ending_offsets_json:{ending_offsets_json}")
                while True:
                    try:
                        kafka_df = self.create_kafka_df_object(
                            consumer_type="history",
                            topic_name=topic_name,
                            schema=schema,
                            starting_offsets_json=starting_offsets_json,
                            ending_offsets_json=ending_offsets_json,
                        )
                        break
                    except Exception as e:
                        print(f"当前批次-历史消费出现报错--继续消费, 报错信息: {e}")
                        continue
                print(f"kafka_df.count():{kafka_df.count()}")

                if num >= partition_num:
                    self.handle_kafka_history_templates(kafka_df=kafka_df)  # 最后一批消费
                    self.record_offsets_by_history(end_offsets_dict=end_offsets_dict)
                    self.start_process_instance()  # 退出之前启动调度
                    break
                else:
                    self.handle_kafka_history_templates(kafka_df=kafka_df)
                    self.record_offsets_by_history(end_offsets_dict=end_offsets_dict)
                    time.sleep(10)
                    continue

                # break
            except Exception as e:
                print(e, traceback.format_exc())
                time.sleep(10)
                continue
            # kafka_df = self.spark.read \
            #     .format("kafka") \
            #     .option("kafka.bootstrap.servers", self.kafka_servers) \
            #     .option("subscribe", topic_name) \
            #     .option("kafka.security.protocol", self.kafka_security_protocol) \
            #     .option("kafka.sasl.mechanism", self.kafka_sasl_mechanism) \
            #     .option("kafka.sasl.jaas.config",
            #             f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{self.kafka_username}" password="{self.kafka_password}";') \
            #     .option("failOnDataLoss", "true") \
            #     .option("startingOffsets", starting_offsets_json) \
            #     .option("endingOffsets", ending_offsets_json) \
            #     .load() \
            #     .select(F.from_json(F.col("value").cast("string"), schema=schema).alias("data")) \
            #     .select("data.*")
            # print(f"kafka_df.count():{kafka_df.count()}")
            # print(f"starting_offsets_json: {starting_offsets_json}, ending_offsets_json:{ending_offsets_json}")
            #
            # if num >= partition_num:
            #     self.start_process_instance()  # 退出之前启动调度
            #     break
            # else:
            #     self.handle_kafka_history_templates(kafka_df=kafka_df)
            #     continue

    def handle_kafka_history_templates(self, kafka_df):
        self.handle_kafka_history(kafka_df)
        self.kafka_consumption_is_finished()

    def handle_kafka_history(self, kafka_df):
        pass

    # 组成yarn上提交任务的任务名称
    def get_app_name(self):
        script_name = sys.argv[0].split("/")[-1].split(".")[0]
        if self.test_flag != 'normal':
            return f"{script_name}: {self.site_name}, {self.date_type}, {self.date_info}, {self.consumer_type}, {self.test_flag}"
        else:
            return f"{script_name}: {self.site_name}, {self.date_type}, {self.date_info}, {self.consumer_type}"

    # 获取yarn上任务示例的applicationID
    def get_application_ids(self):
        try:
            application_ids = []
            response = requests.get(
                f"http://hadoop15:8088/ws/v1/cluster/apps?state=RUNNING&name={self.app_name}")
            if len(response.json()['apps']) > 0:
                for app in response.json()['apps']['app']:
                    application_ids.append(app['id'])
            return application_ids
        except subprocess.CalledProcessError as e:
            print("Error running command:", e)
            return application_ids  # 发生错误

    # 用于标记记录表中实时消费准备阶段已完成
    def modify_kafka_state(self):
        # 正式的实时消费才修改状态
        script_name = sys.argv[0].split("/")[-1].split(".")[0]
        if self.consumer_type == 'latest' and self.test_flag == 'normal' and script_name in ['kafka_flow_asin_detail', 'kafka_asin_detail']:
            if script_name == 'kafka_flow_asin_detail':
                kafka_field = 'kafka_flow_state'
                wx_users = ['chenyuanjie', 'pengyanbing']
                wx_msg = f"站点: {self.site_name} 日期类型: {self.date_type} {self.date_info} asin详情实时消费数据到es准备工作已完成,可以开启详情爬取!"
            elif script_name == 'kafka_asin_detail':
                kafka_field = 'kafka_state'
                wx_users = ['fangxingjun', 'pengyanbing']
                wx_msg = f"站点: {self.site_name}, {self.date_type}, {self.date_info} asin详情实时消费数据到redis准备工作已完成,可以开启详情爬取!"
            else:
                pass
            try:
                sql = f"UPDATE selection.workflow_progress SET {kafka_field}=3, updated_at=CURRENT_TIMESTAMP where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and page='asin详情'"
                DBUtil.exec_sql('mysql', 'us', sql)
                CommonUtil.send_wx_msg(wx_users, f"asin详情kafka消费", wx_msg)
            except Exception as e:
                print(e, traceback.format_exc())
                CommonUtil.send_wx_msg(wx_users, f"\u26A0asin详情kafka实时消费\u26A0",
                                       f"站点: {self.site_name} asin详情实时消费准备失败,请等待处理!")
        else:
            pass

    def run_kafka(self):
        application_ids = self.get_application_ids()
        print("当前任务id列表为: ", application_ids)
        if len(application_ids) == 1:
            print("实时消费正常开启!")
            if self.test_flag == 'normal' and self.consumer_type == 'latest':
                self.kafka_consumption_is_finished()
            self.read_data()
            self.modify_kafka_state()
            self.handle_data()
            if self.consumer_type == 'latest':
                self.kafka_stream(processing_time=self.processing_time)
            else:
                self.kafka_history(topic_name=self.topic_name, batch_size_history=self.batch_size_history, schema=self.schema)
        elif len(application_ids) > 1:
            print("任务进程已启动,请不要重复开启!")
            earliest_applicaiton_id = min(application_ids)
            for application_id in application_ids:
                if application_id > earliest_applicaiton_id:
                    cmd = f"yarn application -kill {application_id}"
                    subprocess.run(cmd, shell=True, check=False)
            exit(0)
        else:
            print("任务未成功开启!")
            exit(0)

    def run_insert(self):
        self.read_data()
        self.handle_data()
        self.insert_data()

    def acquire_lock(self, lock_name, timeout=60):
        """
        尝试获取分布式锁, 能正常设置锁的话返回True, 不能设置锁的话返回None
        lock_name: 锁的key, 建议和任务名称保持一致
        """
        lock_value = str(uuid.uuid4())
        lock_acquired = self.client.set(lock_name, lock_value, nx=True, ex=timeout)  # 可以不设置超时时间
        # lock_acquired = self.client.set(lock_name, lock_value, nx=True)
        return lock_acquired, lock_value

    def release_lock(self, lock_name, lock_value):
        """释放分布式锁"""
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        result = self.client.eval(script, 1, lock_name, lock_value)
        return result