import json
import os
import re
import sys
import time
import traceback
import zlib

import pandas as pd
import redis
from datetime import datetime

sys.path.append("/opt/module/spark-3.2.0-bin-hadoop3.2/demo/py_demo/")

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from utils.templates_mysql import TemplatesMysql
# from ..utils.templates_mysql import TemplatesMysql
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
from pyspark.sql.types import *
from yswg_utils.common_udf import udf_rank_and_category
# from ..yswg_utils.common_udf import udf_rank_and_category
from yswg_utils.common_df import get_node_first_id_df
from kafka import KafkaConsumer, TopicPartition
from yswg_utils.common_udf import parse_weight_str
# from ..yswg_utils.common_udf import parse_weight_str


class DimStAsinInfo(Templates):

    def __init__(self, site_name='us', date_type="day", date_info='2022-10-01', consumer_type='lastest', topic_name="us_asin_detail", batch_size=100000):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.consumer_type = consumer_type  # 消费实时还是消费历史
        self.topic_name = topic_name  # 主题名字
        self.batch_size = batch_size
        self.batch_size_history = int(batch_size / 10)
        # 连接到Redis服务器
        self.redis_db = {
            "us": 0,
            "uk": 1,
            "de": 2,
            "es": 3,
            "fr": 4,
            "it": 5,
        }
        self.client = redis.Redis(host='192.168.10.224', port=6379, db=self.redis_db[self.site_name], password='yswg2023')
        self.db_save = f'kafka_asin_detail'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
        self.get_date_info_tuple()
        self.df_save = self.spark.sql(f"select 1+1;")
        self.df_st_asin = self.spark.sql(f"select 1+1;")
        self.df_bs_report = self.spark.sql(f"select 1+1;")
        self.df_asin_bs = self.spark.sql(f"select 1+1;")
        self.df_self_asin = self.spark.sql(f"select 1+1;")
        self.df_asin_sku = self.spark.sql(f"select 1+1;")
        self.df_asin_templates = self.spark.sql("select asin_zr_counts, asin_sp_counts, asin_sb1_counts,asin_sb2_counts,asin_sb3_counts,asin_ac_counts,asin_bs_counts,asin_er_counts,asin_tr_counts from dwd_asin_measure limit 0")
        self.df_asin_counts = self.spark.sql("select asin_zr_counts, asin_sp_counts, asin_sb1_counts,asin_sb2_counts,asin_sb3_counts,asin_ac_counts,asin_bs_counts,asin_er_counts,asin_tr_counts from dwd_asin_measure limit 0")
        self.schema = self.init_schema()
        # self.u_rank_and_category = self.spark.udf.register("u_rank_and_category", udf_rank_and_category, schema)
        schema = StructType([
            StructField('asin_bs_cate_1_rank', StringType(), True),
            StructField('rank_and_category', StringType(), True),
        ])
        self.u_rank_and_category = self.spark.udf.register("u_rank_and_category", self.udf_rank_and_category, schema)
        self.u_cal_crc32 = self.spark.udf.register("u_cal_crc32", self.udf_cal_crc32, IntegerType())
        self.u_cal_bkdr = self.spark.udf.register("u_cal_bkdr", self.udf_cal_bkdr, IntegerType())
        self.u_extract_dimensions = self.spark.udf.register("u_cal_bkdr", self.udf_extract_dimensions, StringType())
        self.u_extract_weight = self.spark.udf.register("u_cal_bkdr", self.udf_extract_weight, StringType())
        self.pattern_1_rank_str = {
            "us": "(\d+).*?See Top 100 in ",
            "uk": "(\d+).*?See Top 100 in ",
            "de": "(\d+).*?Siehe Top 100 in ",
            "es": "(\d+).*?Ver el Top 100 en ",
            "fr": "(\d+).*?Voir les 100 premiers en ",
            "it": "(\d+).*?Visualizza i Top 100 nella categoria "
        }  # 匹配一级分类的排名
        self.pattern_str = {
            "us": "(\d+ in [\w&' ]+)",
            "uk": "(\d+ in [\w&' ]+)",
            "de": "Nr. (\d+ in [\w&' ]+)",
            "es": "nº(\d+ en [\w&' ]+)",
            "fr": "(\d+ en [\w&' ]+)",
            "it": "n. (\d+ in [\w&' ]+)",
        }  # 匹配排名和分类
        self.replace_str = {
            "us": "See Top 100 in ",
            "uk": "See Top 100 in ",
            "de": "Siehe Top 100 in ",
            "es": "Ver el Top 100 en ",
            "fr": "Voir les 100 premiers en ",
            "it": "Visualizza i Top 100 nella categoria ",
        }  # 去掉top100匹配

        # 连接mysql
        self.engine = self.get_connection()

    def get_connection(self):
        return TemplatesMysql(site_name="us").mysql_connect()

    def judge_spider_asin_detail_is_finished(self):
        while True:
            try:
                sql = f'SELECT * from workflow_progress WHERE page="ASIN详情" and site_name="{self.site_name}" and date_type="{self.date_type}" and date_info="{self.date_info}" and status_val=3'
                df = pd.read_sql(sql, con=self.engine)
                if df.shape[0] == 1:
                    print(f"ASIN详情状态为3, 抓取完成并终止程序, site_name:{self.site_name}, date_type:{self.date_type}, date_info:{self.date_info}")
                    self.spark.stop()
                    quit()  # 退出程序
                break
            except Exception as e:
                print(e, traceback.format_exc())
                time.sleep(10)
                self.engine = self.get_connection()

    def fetch_self_asin(self):
        while True:
            try:
                sql = f"""SELECT asin, 1 as isSelfAsin from {self.site_name}_self_asin"""
                df_self_asin = pd.read_sql(sql, con=self.engine)
                schema = StructType([
                    StructField("asin", StringType(), True),
                    StructField("isSelfAsin", IntegerType(), True),
                ])
                self.df_self_asin = self.spark.createDataFrame(df_self_asin, schema=schema).cache()
                self.df_self_asin.show(10, truncate=False)
                break
            except Exception as e:
                print(e, traceback.format_exc())
                time.sleep(10)
                self.engine = self.get_connection()

    @staticmethod
    def udf_extract_dimensions(volume_str, asin_volume):
        # 解析类型
        # pattern = r'\b\w+\b'
        pattern = r'[a-z]+'
        matches = re.findall(pattern, asin_volume)

        # 使用集合存储匹配的单词
        type_set = set()
        for word in matches:
            if word in ['inches', 'inch']:
                type_set.add('inches')
            elif word in ['cm', 'centímetros', 'centimetres']:
                type_set.add('cm')
            elif word in ['milímetros', 'millimeter', 'mm']:
                type_set.add('mm')
            elif word in ['metros']:
                type_set.add('m')

        # 根据集合的长度返回结果
        if len(type_set) == 1:
            asin_volume_type = list(type_set)[0]
        elif len(type_set) >= 2:
            asin_volume_type = ','.join(type_set)
        else:
            asin_volume_type = 'none'

        # 解析长宽高
        length, width, height = None, None, None
        if asin_volume_type == 'cm,inches':
            num_inches = volume_str.find('inch')
            num_cm = volume_str.find('cm')
            volume_str = volume_str[:num_inches] if num_cm > num_inches else volume_str[num_cm:num_inches]
        dimensions = re.findall(r"(\d+(\.\d+)?)", volume_str)
        dimensions = [float(dim[0]) for dim in dimensions]

        if len(dimensions) == 1:
            length = dimensions[0]
        elif len(dimensions) == 2:
            if asin_volume_type == 'none':
                if "l" in volume_str and "w" in volume_str:
                    length, width = dimensions
                elif "w" in volume_str and "h" in volume_str:
                    width, height = dimensions
                elif "l" in volume_str and "h" in volume_str:
                    length, height = dimensions
                elif "d" in volume_str and "w" in volume_str:
                    length, width = dimensions
                elif "d" in volume_str and "h" in volume_str:
                    length, height = dimensions
            else:
                length, width = dimensions
        elif len(dimensions) == 3:
            length, width, height = dimensions
        elif len(dimensions) >= 4:
            length, width, height = dimensions[:3]

        return f"{length}*{width}*{height}{asin_volume_type}"

    @staticmethod
    def udf_extract_weight(weight_str: str):
        """
        解析重量字符串获取重量和单位,逗号分隔
        :param weight_str:
        :param site_name:
        :return:
        """
        val = None
        # weight_type = 'pounds' if site_name == 'us' else 'grams'
        weight_type = 'g'
        if weight_str is not None:
            if 'pounds' in weight_str:
                match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}pounds", weight_str)
                val = round(float(match.group(1)) * 1000 * 0.454, 3) if match else None
            elif 'ounces' in weight_str:
                match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}ounces", weight_str)
                val = round(float(match.group(1)) / 16 * 1000 * 0.454, 3) if match else None
            elif any(substring in weight_str for substring in ['kilogram', ' kg']):
                weight_str = weight_str.replace(' kg', ' kilogram')
                match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}kilogram", weight_str)
                val = round(float(match.group(1)) * 1000, 3) if match else None
            elif any(substring in weight_str for substring in ['milligrams']):
                match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}milligrams", weight_str)
                val = round(float(match.group(1)) / 1000, 3) if match else None
            elif ' gram' in weight_str:
                match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} gram", weight_str)
                val = round(float(match.group(1)), 3) if match else None
            elif ' g' in weight_str:
                match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} g", weight_str)
                val = round(float(match.group(1)), 3) if match else None
        if val:
            return f"{val}{weight_type}"
        else:
            return f"{val}"

    def fetch_asin_sku_count(self):
        while True:
            try:
                sql = f"""SELECT asin,count(id) as auctionsNum,count((case when sku!='' then sku else NULL end)) as skusNumCreat  
                     from product_audit_asin_sku  
                    --  where asin in ('B085WYH539')
                     GROUP BY asin 
                    """
                df_asin_sku = pd.read_sql(sql, con=self.engine)
                schema = StructType([
                    StructField("asin", StringType(), True),
                    StructField("auctionsNum", IntegerType(), True),
                    StructField("skusNumCreat", IntegerType(), True),
                ])
                self.df_asin_sku = self.spark.createDataFrame(df_asin_sku, schema=schema).cache()
                self.df_asin_sku.show(10, truncate=False)
                break
            except Exception as e:
                print(e, traceback.format_exc())
                time.sleep(10)
                self.engine = self.get_connection()

    @staticmethod
    def init_schema():
        schema = StructType([
            StructField("asin", StringType(), True),
            StructField("week", StringType(), True),
            StructField("title", StringType(), True),
            StructField("img_url", StringType(), True),
            StructField("rating", StringType(), True),
            StructField("total_comments", StringType(), True),
            StructField("price", FloatType(), True),
            StructField("rank", StringType(), True),
            StructField("category", StringType(), True),
            StructField("launch_time", StringType(), True),
            StructField("volume", StringType(), True),
            StructField("weight", StringType(), True),
            StructField("page_inventory", IntegerType(), True),
            StructField("buy_box_seller_type", IntegerType(), True),
            StructField("asin_vartion_list", IntegerType(), True),
            StructField("title_len", IntegerType(), True),
            StructField("img_num", IntegerType(), True),
            StructField("img_type", StringType(), True),
            StructField("activity_type", StringType(), True),
            StructField("one_two_val", StringType(), True),
            StructField("three_four_val", StringType(), True),
            StructField("eight_val", StringType(), True),
            StructField("qa_num", IntegerType(), True),
            StructField("five_star", IntegerType(), True),
            StructField("four_star", IntegerType(), True),
            StructField("three_star", IntegerType(), True),
            StructField("two_star", IntegerType(), True),
            StructField("one_star", IntegerType(), True),
            StructField("low_star", IntegerType(), True),
            StructField("together_asin", StringType(), True),
            StructField("brand", StringType(), True),
            StructField("ac_name", StringType(), True),
            StructField("material", StringType(), True),
            StructField("node_id", StringType(), True),
            StructField("data_type", IntegerType(), True),
            StructField("sp_num", StringType(), True),
            StructField("describe", StringType(), True),
            StructField("date_info", StringType(), True),
            StructField("weight_str", StringType(), True),
            StructField("package_quantity", StringType(), True),
            StructField("pattern_name", StringType(), True),
            StructField("seller_id", StringType(), True),
            StructField("variat_num", IntegerType(), True),
            StructField("site_name", StringType(), True),
            StructField("best_sellers_rank", StringType(), True),
            StructField("best_sellers_herf", StringType(), True),
            StructField("account_url", StringType(), True),
            StructField("account_name", StringType(), True),
            StructField("parentAsin", StringType(), True),
            StructField("asinUpdateTime", StringType(), True),
            StructField("follow_sellers", StringType(), True),
        ])
        return schema

    @staticmethod
    def udf_cal_crc32(asin, key_size):
        # crc32算法 + 取余

        # 获取asin字符串的字节表示形式
        bytes_str = bytes(asin, 'utf-8')
        # 使用zlib计算CRC-32校验和
        checksum = zlib.crc32(bytes_str)
        # 获取32位的二进制补码
        checksum_signed = (checksum & 0xFFFFFFFF) - (1 << 32) if checksum & (1 << 31) else checksum

        def java_mod(x, y):
            # return x % y if x * y > 0 else x % y - y  # 区分正负值
            return abs(x) % y  # 不区分正负值

        # 取余
        result = java_mod(checksum_signed, key_size)
        return result

    @staticmethod
    def udf_cal_bkdr(asin):
        # BKDR哈希算法
        hash = 0
        for c in asin:
            hash = (hash * 33 + ord(c)) % 65535  # 对哈希值取模65535,以避免溢出
        return hash

    @staticmethod
    def udf_rank_and_category(best_sellers_rank, pattern_1_rank_str, pattern_str, replace_str):
        best_sellers_rank = str(best_sellers_rank).replace(",", "")
        matches = re.findall(pattern_1_rank_str, best_sellers_rank)
        asin_bs_cate_1_rank = matches[0] if matches else None
        best_sellers_rank = best_sellers_rank.replace(replace_str, "")
        matches = re.findall(pattern_str, best_sellers_rank)
        rank_and_category = "&&&&".join([rank_cate.replace(",", "") for rank_cate in matches]) if matches else None
        return asin_bs_cate_1_rank, rank_and_category

    def df_read_data_by_kafka(self):
        # .option("my_kafka.bootstrap.servers", "113.100.143.162:39092") \
        # .option("startingOffsets", "lastest")  # 偏移量, lastest, earliest
        # .select(F.from_json("value", schema=self.schema).alias("data")) \
        kafka_df = self.spark.readStream \
            .format("my_kafka") \
            .option("my_kafka.bootstrap.servers", "192.168.10.221:9092,192.168.10.220:9092,192.168.10.210:9092") \
            .option("subscribe", f"{self.site_name}_asin_detail") \
            .option("startingOffsets", "lastest") \
            .load() \
            .select(F.from_json(F.col("value").cast("string"), schema=self.schema).alias("data")) \
            .select("data.*")

        # assign_option = f"""{{"{self.site_name}_asin_detail": {{"7": 0}}}}"""
        # # .option("subscribe", f""""{self.site_name}_asin_detail": {"7": 0}""")
        # kafka_df = self.spark.readStream \
        #     .format("my_kafka") \
        #     .option("my_kafka.bootstrap.servers", "192.168.10.221:9092,192.168.10.220:9092,192.168.10.210:9092") \
        #     .option("subscribe", f"{self.site_name}_asin_detail")\
        #     .option("assign", assign_option) \
        #     .option("startingOffsets", "lastest") \
        #     .load() \
        #     .selectExpr("CAST(value AS STRING) AS value") \
        #     .select(F.from_json("value", schema=self.schema).alias("data")) \
        #     .select("data.*")

        #"""{"your_topic_name": {"0": 100, "1": 200}}"""
        # .option("my_kafka.fetch.max.bytes", "10485760") \
        # .option("my_kafka.max.partition.fetch.bytes", "10485760") \
        return kafka_df

    def read_data(self):
        print("1.1 读取dim_st_asin_info表, 计算ao值")
        sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
        print("sql:", sql)
        self.df_st_asin = self.spark.sql(sql)
        self.df_st_asin = self.df_st_asin.drop_duplicates(['search_term', 'asin', 'data_type']).cache()
        self.df_st_asin.show(10, truncate=False)
        print("1.2 读取ods_one_category_report表")
        if int(self.year) == 2022 and int(self.month) < 3:
            sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as asin_bsr_orders from ods_one_category_report " \
                  f"where site_name='{self.site_name}' and date_type='month' and date_info='2022-12';"
        else:
            sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, orders as asin_bsr_orders from ods_one_category_report " \
                  f"where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}';"
        print("sql:", sql)
        self.df_bs_report = self.spark.sql(sqlQuery=sql).cache()
        self.df_bs_report.show(10, truncate=False)
        print("1.3 读取bsr一级分类表")
        self.df_asin_bs = get_node_first_id_df(self.site_name, self.spark)
        self.df_asin_bs = self.df_asin_bs.withColumnRenamed("category_first_id", "asin_bs_cate_1_id")
        self.df_asin_bs.show(10, truncate=False)
        print("1.4 读取内部asin表")
        # sql = f"select asin, 1 as isSelfAsin from ods_self_asin where site_name='{self.site_name}';"
        # print("sql:", sql)
        # self.df_self_asin = self.spark.sql(sqlQuery=sql).cache()
        # self.df_self_asin.show(10, truncate=False)
        self.fetch_self_asin()
        # 读取asin和sku计数关系
        print("1.5 读取asin和sku计数关系")
        self.fetch_asin_sku_count()

    def handle_data(self):
        # 计算asin的ao值
        self.df_asin_counts = self.handle_st_asin_counts()
        self.df_asin_counts = self.df_asin_counts.select("asin", "asin_ao").cache()

    def handle_asin_bs_category_rank(self, df):
        df = df.withColumn(
            'bs_str', self.u_rank_and_category(
                'best_sellers_rank',
                F.lit(self.pattern_1_rank_str[self.site_name]),
                F.lit(self.pattern_str[self.site_name]),
                F.lit(self.replace_str[self.site_name])
            )
        )
        df = df.withColumn('asin_bs_cate_1_rank', df.bs_str.getField('asin_bs_cate_1_rank')) \
            .withColumn('rank_and_category', df.bs_str.getField('rank_and_category')) \
            .drop('bs_str', 'best_sellers_rank')
        df.show(10, truncate=False)
        return df

    def handle_st_asin_counts(self):
        self.df_st_asin = self.df_st_asin.withColumn(
            f"asin_data_type",
            F.concat(F.lit(f"asin_"), self.df_st_asin.data_type, F.lit(f"_counts"))
        )
        df_asin_counts = self.df_st_asin.groupby([f'asin']). \
            pivot(f"asin_data_type").count()

        df_asin_counts = self.df_asin_templates.unionByName(df_asin_counts, allowMissingColumns=True)  # 防止爬虫数据没有导致程序运行出错
        df_asin_counts = df_asin_counts.fillna(0)
        # df.show(10, truncate=False)
        df_asin_counts = df_asin_counts.withColumn(
            f"asin_sb_counts",
            df_asin_counts[f"asin_sb1_counts"] + df_asin_counts[f"asin_sb2_counts"] + df_asin_counts[f"asin_sb3_counts"]
        )
        df_asin_counts = df_asin_counts.withColumn(
            f"asin_adv_counts",
            df_asin_counts[f"asin_sb_counts"] + df_asin_counts[f"asin_sp_counts"]
        )
        df_asin_counts = df_asin_counts.withColumn(
            f"asin_ao",
            df_asin_counts[f"asin_adv_counts"] / df_asin_counts[f"asin_zr_counts"]
        )  # 不要把null置为0, null值产生原因是zr类型没有搜到对应的搜索词
        df_asin_counts = df_asin_counts.withColumn("asin_ao", F.round(df_asin_counts["asin_ao"], 4))

        df_asin_counts.show(10, truncate=False)
        return df_asin_counts

    @staticmethod
    def clean_kafka_df(df):
        df = df.withColumnRenamed("seller_id", "account_id")
        # cols_python = ["asin", "parentAsin", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating",
        #         "brand", "brand", "account_id", "account_name", "account_url", "buy_box_seller_type",
        #         "volume", "weight", "weight_str", "launchTime", "total_comments", "page_inventory"]
        # oneCategoryRank, aoVal, bsrOrders, bsrOrdersSale
        # siteName volumeFormat weightFormat asinUpdateTime
        # java那边插件的字段名称
        cols_java = ['asin', 'parentAsin', 'asinVarNum', 'oneCategoryRank', 'bestSellersRank', 'lastHerf', 'aoVal', 'price', 'rating',
                    'bsrOrders', 'bsrOrdersSale', 'brandName', 'accountId', 'accountName', 'accountUrl', 'siteName', 'buyBoxSellerType',
                    'volume', 'volumeFormat', 'weight', 'weightFormat', 'launchTime', 'totalComments', 'pageInventory', 'asinUpdateTime']
        df = df.select("asin", "parentAsin", "variat_num", "best_sellers_rank", "best_sellers_herf", "price", "rating",
                        "brand", "account_id", "account_name", "account_url", "buy_box_seller_type",
                        "volume", "weight", "weight_str", "launch_time", "total_comments", "page_inventory", "asinUpdateTime", "site_name", "node_id")
        return df

    def rename_cols(self, df):
        # 计算redis的key
        df = df.withColumn(
            'key_outer', self.u_cal_crc32('asin', F.lit(self.batch_size))
        )
        df = df.withColumn(
            'key_inner', self.u_cal_bkdr('asin')
        )
        df.show(5, truncate=False)

        df = df.withColumnRenamed("variat_num", "asinVarNum")
        df = df.withColumnRenamed("asin_bs_cate_1_rank", "oneCategoryRank")
        df = df.withColumnRenamed("rank_and_category", "bestSellersRank")  # 解析后的
        df = df.withColumnRenamed("best_sellers_herf", "lastHerf")
        df = df.withColumnRenamed("asin_ao", "aoVal")
        df = df.withColumnRenamed("asin_bsr_orders", "bsrOrders")
        df = df.withColumnRenamed("asin_bsr_orders_sale", "bsrOrdersSale")
        df = df.withColumnRenamed("brand", "brandName")
        df = df.withColumnRenamed("account_id", "accountId")
        df = df.withColumnRenamed("account_name", "accountName")
        df = df.withColumnRenamed("account_url", "accountUrl")
        df = df.withColumnRenamed("buy_box_seller_type", "buyBoxSellerType")
        df = df.withColumnRenamed("launch_time", "launchTime")
        df = df.withColumnRenamed("total_comments", "totalComments")
        df = df.withColumnRenamed("page_inventory", "pageInventory")
        df = df.select('asin', 'parentAsin', 'asinVarNum', 'oneCategoryRank', 'bestSellersRank', 'lastHerf', 'aoVal', 'price', 'rating',
                    'bsrOrders', 'bsrOrdersSale', 'brandName', 'accountId', 'accountName', 'accountUrl', 'buyBoxSellerType',
                    'volume', 'weight', 'launchTime', 'totalComments', 'pageInventory', 'asinUpdateTime',
                       "site_name", "key_outer", "key_inner")
        return df

    def process_batch(self, df, epoch_id):
        try:
            count = df.count()
            print("当前批次传输的数据量为df.count():", count)
            if count == 0:
                self.judge_spider_asin_detail_is_finished()

            # 确保schema非空以避免NoneType错误
            if not self.schema:
                raise ValueError("Schema is not defined")
            # df.show(5, truncate=False)
            print("df.columns:", df.columns)
            # df = df.select("asin", "launch_time", "volume", "weight", "weight_str", "node_id", "variat_num", "best_sellers_rank", "best_sellers_herf", "seller_id", "account_url", "account_name", "site_name")
            df = self.clean_kafka_df(df=df)
            # df.show(5, truncate=False)
            # # 提取排名和分类
            df_bs = self.handle_asin_bs_category_rank(df=df.select("asin", "best_sellers_rank"))
            # join
            df_save = df.join(
                df_bs, on='asin', how='left'
            ).join(
                self.df_asin_counts, on='asin', how='left'
            ).join(
                self.df_asin_bs, on='node_id', how='left'
            ).join(
                self.df_self_asin, on='asin', how='left'
            ).join(
                self.df_asin_sku, on='asin', how='left'
            )
            df_save = df_save.na.fill({"isSelfAsin": 0})
            # 计算bsr效率
            df_save = df_save.join(
                self.df_bs_report, on=['asin_bs_cate_1_rank', 'asin_bs_cate_1_id'], how='left'
            )
            df_save = df_save.withColumn("asin_bsr_orders_sale", df_save.price * df_save.asin_bsr_orders)
            df_save = self.rename_cols(df=df_save)
            self.save_to_redis(df=df_save)
        except Exception as e:
            print(e, traceback.format_exc())

        # # 与从Kafka读取的数据进行连接
        # joined_df = df.join(self.df_asin_title, "asin", how='left')
        # # 执行你的转换和聚合逻辑
        # result_df = joined_df.groupBy("asin").count()
        # result_df.show(10, truncate=False)
        print("epoch_id:", epoch_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    def start_stream(self, processing_time=600):
        # kafka_df = self.df_read_data_by_kafka()
        if self.date_type == "month":
            date_type = "_month"
        else:
            date_type = ""
        topics = f"{self.site_name}_asin_detail{date_type}"
        kafka_df = self.get_kafka_df_by_spark(schema=self.schema, consumption_type="latest", topics=topics)
        query = kafka_df.writeStream \
            .outputMode("append") \
            .format("console") \
            .option("checkpointLocation", "/root/tmp") \
            .foreachBatch(self.process_batch) \
            .trigger(processingTime=f'{processing_time} seconds').start()
        query.awaitTermination()

    def save_to_redis(self, df):
        # 将Spark DataFrame转换为Pandas DataFrame
        pdf = df.toPandas()
        # 遍历Pandas DataFrame并将数据插入到Redis
        for index, row in pdf.iterrows():
            # 创建一个复合键,或者根据你的需要选择适当的键
            # 1. 外层key为10197, 内层可以为10197:15931
            # redis_key = f"{row['key_outer']}:{row['key_inner']}"  #
            # # 插入值到Redis - 在这里我仅仅存储了一个值,你可以存储一个字典来存储多个值
            # self.client.set(redis_key, row['value'])
            # row_json = row.to_json(orient='split')
            # self.client.set(redis_key, row_json)
            # 2. 外层key为10197, 内层可以为15931
            # redis_key = row['key_outer']
            # redis_field = row['key_inner']
            # row_json = row.to_json(orient='split')
            # self.client.hset(redis_key, redis_field, row_json)

            # 3. hashmap + 外层key为10197, 内层可以为15931
            redis_key = row['key_outer']
            redis_field = row['key_inner']
            row_dict = row.to_dict()
            # row_dict = {k: str(v).lower().replace("none", "").replace("nan", "") for k, v in row_dict.items()}  # 确保所有的值都是字符串
            row_dict = {k: str(v).replace("None", "").replace("none", "").replace("NaN", "").replace("nan", "") for k, v in row_dict.items()}  # 确保所有的值都是字符串
            row_dict = {k: format(v, ".2f") if isinstance(v, (int, float)) else str(v).replace("None", "").replace(
                    "nan", "") for k, v in row_dict.items()}

            del row_dict["key_outer"]
            del row_dict["key_inner"]
            row_json = json.dumps(row_dict)
            self.client.hset(redis_key, redis_field, row_json)

    def get_topic_name(self):
        if self.site_name == "us" and self.date_type == "month":
            self.topic_name = f"{site_name}_asin_detail_{self.date_type}"
        else:
            self.topic_name = f"{site_name}_asin_detail"

    def handle_history(self):
        self.get_topic_name()
        consumer = self.get_kafka_object_by_python(topic_name=self.topic_name)
        partition_offsets_dict = self.get_kafka_partitions_data(consumer=consumer, topic_name=self.topic_name)
        partition_num = len(partition_offsets_dict)
        beginning_offsets_dict = {}
        end_offsets_dict = {}
        while True:
            num = 0
            for key, value in partition_offsets_dict.items():
                # 起始偏移量
                beginning_offsets_dict[str(key)] = value['beginning_offsets']
                # 结束偏移量
                end_offsets = value['beginning_offsets'] + self.batch_size_history
                end_offsets_partition = value['end_offsets']
                end_offsets_dict[str(key)] = min(end_offsets, end_offsets_partition)
                if end_offsets >= end_offsets_partition:
                    num += 1
                else:
                    partition_offsets_dict[key]['beginning_offsets'] = end_offsets

            starting_offsets_json = json.dumps({self.topic_name: beginning_offsets_dict})
            ending_offsets_json = json.dumps({self.topic_name: end_offsets_dict})
            print(f"starting_offsets_json: {starting_offsets_json}, ending_offsets_json:{ending_offsets_json}")
            kafka_df = self.spark.read \
                .format("kafka") \
                .option("kafka.bootstrap.servers", self.kafka_servers) \
                .option("subscribe", self.topic_name) \
                .option("kafka.security.protocol", self.kafka_security_protocol) \
                .option("kafka.sasl.mechanism", self.kafka_sasl_mechanism) \
                .option("kafka.sasl.jaas.config",
                        f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{self.kafka_username}" password="{self.kafka_password}";') \
                .option("failOnDataLoss", "true") \
                .option("startingOffsets", starting_offsets_json) \
                .option("endingOffsets", ending_offsets_json) \
                .load() \
                .select(F.from_json(F.col("value").cast("string"), schema=self.schema).alias("data")) \
                .select("data.*")
            print(f"kafka_df.count():{kafka_df.count()}")

            if num >= partition_num:
                break
            else:
                continue

    def handle_history_old(self):
        self.get_topic_name()
        consumer = self.get_kafka_object_by_python(topic_name=self.topic_name)
        partition_data_count = self.get_kafka_partitions_data(consumer=consumer, topic_name=self.topic_name)

        beginning_offsets_list = []
        end_offsets_list = []
        for values in partition_data_count.values():
            beginning_offsets_list.append(values['beginning_offsets'])
            end_offsets_list.append(values['end_offsets'])

        min_offset = min(beginning_offsets_list)
        # min_offset = max(beginning_offsets_list)
        max_offset = max(end_offsets_list)
        print(f"min_offset:{min_offset}, max_offset:{max_offset}")
        # max_offset = max(partition_data_count.values())
        # for start_offset in range(0, max_offset+1, self.batch_size_history):
        # self.batch_size_history = 100
        for start_offset in range(min_offset, max_offset+1, self.batch_size_history):
            end_offset = max(start_offset + self.batch_size_history, max_offset)
            starting_offsets_json = json.dumps({self.topic_name: {str(p): start_offset for p in partition_data_count.keys()}})
            ending_offsets_json = json.dumps({self.topic_name: {str(p): end_offset for p in partition_data_count.keys()}})
            # .option("failOnDataLoss", "true")  # 设置 failOnDataLoss 为 true, 默认为False


            kafka_df = self.spark.read \
            .format("kafka") \
            .option("kafka.bootstrap.servers", self.kafka_servers) \
            .option("subscribe", self.topic_name) \
            .option("kafka.security.protocol", self.kafka_security_protocol) \
            .option("kafka.sasl.mechanism", self.kafka_sasl_mechanism) \
            .option("kafka.sasl.jaas.config", f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{self.kafka_username}" password="{self.kafka_password}";') \
            .option("failOnDataLoss", "true") \
            .option("startingOffsets", starting_offsets_json) \
            .option("endingOffsets", ending_offsets_json) \
            .load() \
            .select(F.from_json(F.col("value").cast("string"), schema=self.schema).alias("data")) \
            .select("data.*")
            print(f"kafka_df.count():{kafka_df.count()}, start_offset:{start_offset}, end_offset:{end_offset}")
            self.handle_batch_history(df=kafka_df)
        #     self.handle_batch_history(df=kafka_df)
        #         # current_offsets[partition] = end_offset
 #        .option("startingOffsets", starting_offsets_json) \
 #            .option("endingOffsets", ending_offsets_json) \
 # \
            # while not done:
        #     # for partition in partitions:
        #     #     start_offset = current_offsets[partition]
        #     #     end_offset = start_offset + self.batch_size
        #     #     print(f"partition:{partition}, start_offset:{start_offset}, end_offset:{end_offset}")
        #         # 创建包含所有分区信息的JSON字符串
        #     start_offset, end_offset = 0, 0 + self.batch_size
        #     starting_offsets_json = json.dumps({self.topic_name: {str(p): start_offset for p in partitions}})
        #     # ending_offsets_json = json.dumps({self.topic_name: {str(p): (end_offset if p == partition else start_offset) for p in partitions}})
        #     ending_offsets_json = json.dumps({self.topic_name: {str(p): end_offset for p in partitions}})
        #     print(f"starting_offsets_json:{starting_offsets_json}, ending_offsets_json:{ending_offsets_json}")
        #     # 读取数据
        #     kafka_df = self.spark.read \
        #         .format("kafka") \
        #         .option("kafka.bootstrap.servers", self.kafka_servers) \
        #         .option("subscribe", self.topic_name) \
        #         .option("startingOffsets", starting_offsets_json) \
        #         .option("endingOffsets", ending_offsets_json) \
        #         .option("failOnDataLoss", "false") \
        #         .load()\
        #         .select(F.from_json(F.col("value").cast("string"), schema=self.schema).alias("data")) \
        #         .select("data.*")
        #
        #     # TODO: 根据需要处理数据
        #     # kafka_df.show(10, truncate=False)
        #     print("kafka_df.count():", kafka_df.count())
        #     self.handle_batch_history(df=kafka_df)
        #         # current_offsets[partition] = end_offset
        #
        #     done = all(offset >= partition_data_count[p] for p, offset in current_offsets.items())
        #

        # 关闭SparkSession
        self.spark.stop()

    def handle_batch_history(self, df):
        try:
            print("df.columns:", df.columns)
            # df = df.select("asin", "launch_time", "volume", "weight", "weight_str", "node_id", "variat_num", "best_sellers_rank", "best_sellers_herf", "seller_id", "account_url", "account_name", "site_name")
            df.show(10, truncate=False)
            df = self.clean_kafka_df(df=df)
            # df.show(5, truncate=False)
            # # 提取排名和分类
            df_bs = self.handle_asin_bs_category_rank(df=df.select("asin", "best_sellers_rank"))
            # join
            df_save = df.join(
                df_bs, on='asin', how='left'
            ).join(
                self.df_asin_counts, on='asin', how='left'
            ).join(
                self.df_asin_bs, on='node_id', how='left'
            ).join(
                self.df_self_asin, on='asin', how='left'
            ).join(
                self.df_asin_sku, on='asin', how='left'
            )
            # 计算bsr效率
            df_save = df_save.join(
                self.df_bs_report, on=['asin_bs_cate_1_rank', 'asin_bs_cate_1_id'], how='left'
            )
            df_save = df_save.withColumn("asin_bsr_orders_sale", df_save.price * df_save.asin_bsr_orders)
            df_save = self.rename_cols(df=df_save)
            self.save_to_redis(df=df_save)
        except Exception as e:
            print(e, traceback.format_exc())

    def run(self):

        # self.read_data()
        # self.handle_data()
        if self.consumer_type == 'latest':
            self.start_stream(processing_time=300)
        else:
            self.handle_history()

        # # 将消息值转换为字符串,并创建一个临时视图
        # stringifiedDF = self.my_kafka.selectExpr("CAST(value AS STRING)")
        # stringifiedDF.createOrReplaceTempView("KafkaData")
        # # 设置streaming查询,每5分钟触发一次
        # query = stringifiedDF.writeStream.foreachBatch(self.process_batch).trigger(processingTime='600 seconds').start()
        # # 等待查询终止
        # query.awaitTermination()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter/day
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
    consumer_type = sys.argv[4]  # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
    handle_obj = DimStAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info, consumer_type=consumer_type, batch_size=100000)
    # handle_obj.run()
    handle_obj.run_kafka()