import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.DolphinschedulerHelper import DolphinschedulerHelper
from utils.db_util import DBUtil

from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils


def check_By_offsetDay(offset: int):
    day = CommonUtil.get_sys_arg(1, CommonUtil.format_now("%Y-%m-%d", ))
    site_names = ["us", "uk", "de"]
    start_tasks = []
    export_tasks = []
    err = []
    for day_offset in range(0, offset):
        day_check = CommonUtil.get_day_offset(day, -day_offset)
        for type in ['bsr', 'nsr']:
            for site_name in site_names:
                engine = DBUtil.get_db_engine("mysql", "us")
                with engine.connect() as connection:
                    sql = f"""
                                select * from workflow_everyday
                                where site_name ='{site_name}' 
                                and table_name='{site_name}_{type}_asin_rank'
                                and report_date ='{day_check}' limit 1;
            """
                    size = len(list(connection.execute(sql)))
                    if size == 0:
                        print(f"{site_name} {type} {day_check} 日  不存在 数据!!!")
                        # 判断是否只是未导出
                        path_exist = HdfsUtils.path_exist(CommonUtil.build_hdfs_path(f"dwd_{type}_asin_rank", {
                            "site_name": site_name,
                            "date_type": "last30day",
                            "date_info": day_check,
                        }))
                        if path_exist:
                            export_tasks.append(f"""
        /opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark/demo/py_demo/sqoop_export/dwd_bsr_nsr_asin_rank_day.py {site_name} {day_check} {type}
        """)
                            # 导出
                            pass
                        else:
                            err.append(f"{site_name} {type} {day_check} 日  不存在 数据!!!")
                            #  只有当天的才重新计算
                            if day_check == day:
                                # err = err + f"{type}【{site_name},{day_check}】未计算!"
                                start_tasks.append({
                                    "process_df_name": f"asin_{type}榜单统计日流程_api",
                                    "startParams": {"site_name": site_name, "date_info": day_check, "wx_user": "wujicang"}
                                })
                pass

    #  有异常
    if len(err) > 0 and len(start_tasks) > 0:
        print(err)
        CommonUtil.send_wx_msg(["wujicang", "pengyanbing"], "严重警告", ";".join(err))
        pass

    for task in start_tasks:
        DolphinschedulerHelper.start_and_watch_process_instance(
            project_name="big_data_selection",
            process_df_name=task['process_df_name'],
            startParams=task['startParams'],
        )

    for task in export_tasks:
        client = SSHUtil.get_ssh_client()
        SSHUtil.exec_command_async(client, task, ignore_err=False)
        print(task)

    pass


if __name__ == '__main__':
    # 明天晚上检查数据并导出
    if CommonUtil.format_now("%Y-%m-%d", ) == '2024-05-18':
        check_By_offsetDay(60)
    else:
        check_By_offsetDay(1)