import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F

"""
asin下架实时表
"""


class DimAsinErrState(object):

    def __init__(self, site_name):
        self.site_name = site_name
        app_name = f"{self.__class__.__name__}:{site_name}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.hive_tb = "dim_asin_err_state"

    def run(self):
        now_date = CommonUtil.format_now("%Y-%m-%d")
        day_30_before = CommonUtil.get_day_offset(now_date, -30)
        # 30天内下架了的asin
        sql = f"""
            select asin,
                   date_info as asin_unlaunch_time
            from dwd_day_asin
            where site_name = '{site_name}'
              and date_info >= '{day_30_before}'
              and craw_state = 2
"""
        df_day = self.spark.sql(sql).cache()

        sql_all = f"""
        select asin, date_format(updated_at, 'yyyy-MM-dd') as asin_unlaunch_time
        from ods_asin_err_state
        where site_name = '{site_name}'
        """
        df_save = self.spark.sql(sql_all)

        df_save = df_save.unionByName(df_day).groupby("asin").agg(
            F.max(F.col("asin_unlaunch_time")).alias("asin_unlaunch_time"),
            F.lit(self.site_name).alias("site_name")
        )

        partition_dict = {
            "site_name": self.site_name,
        }
        # 设置分区块为2
        df_save = df_save.repartition(2)

        CommonUtil.save_or_update_table(
            spark_session=self.spark,
            hive_tb_name=self.hive_tb,
            partition_dict=partition_dict,
            df_save=df_save
        )
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    obj = DimAsinErrState(site_name)
    obj.run()