import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil, DbTypes
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils


def export_detail(engine, type, site_name, day):
    suffix = CommonUtil.reformat_date(day, "%Y-%m-%d", "%Y_%m")
    export_flag = len(list(DBUtil.engine_exec_sql(engine,
                                                  f"select * from time_partitions where partition::varchar like '{site_name}_{type}_asin_detail_{suffix}';"))) > 0
    # 没有导出的导出
    if not export_flag:
        print(f"{type}:{site_name}:{day} asin_detail 导出中")
        os.system(
            f"/opt/module/anaconda3/envs/pyspark/bin/python3.8 /tmp/wjc_py/sqoop_export/dwt_bsr_nsr_asin_detail_new.py {site_name} {day} {type}")
        pass

    pass


if __name__ == '__main__':
    engine = DBUtil.get_db_engine(DbTypes.postgresql_cluster.name, "us")

    for type in ['bsr', 'nsr']:
        for site_name in ['us']:
            parts = HdfsUtils.read_list(f"/home/big_data_selection/dwd/dwd_{type}_asin_rank/site_name={site_name}/date_type=last30day")
            existDays = list(map(lambda item: item[item.find("=") + 1:], parts))
            existDays.reverse()
            for day in existDays:
                if day <= '2023-02-01':
                    continue
                # 判断是否已导出
                suffix = CommonUtil.reformat_date(day, "%Y-%m-%d", "%Y_%m_%d", )
                export_flag = len(list(DBUtil.engine_exec_sql(engine,
                                                              f"select * from time_partitions where partition::varchar like '{site_name}_{type}_asin_rank_{suffix}';"))) > 0
                # 没有导出的导出
                if not export_flag:
                    print(f"{type}:{site_name}:{day} asin_rank 导出中")
                    os.system(
                        f"/opt/module/anaconda3/envs/pyspark/bin/python3.8 /tmp/wjc_py/sqoop_export/dwd_bsr_nsr_asin_rank_day.py {site_name} {day} {type}")
                    pass
                # 导出detail
                if day.endswith("-01"):
                    export_detail(engine, type, site_name, day)
                    pass

                update_sql = f"""
                 insert ignore into workflow_everyday (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type,freeze_flag)
                               values ('{site_name}', '{day}', '导出PG数据库', 14, '{site_name}_{type}_asin_rank', 'day', '{str(type).upper()}榜单', '是', '{str(type).upper()}榜单对应的TOP100ASIN','postgresql_cluster','enable')
                """
                print(update_sql)
                DBUtil.exec_sql(DbTypes.mysql.name, "us", update_sql, dispose_flag=True)

                update_sql = f"""
                  update workflow_everyday   set export_db_type ='postgresql_cluster', freeze_flag='enable'
                        where table_name = '{site_name}_{type}_asin_rank' and report_date in ('{day}');
            """
                DBUtil.exec_sql(DbTypes.mysql.name, "us", update_sql, dispose_flag=True)
                print(update_sql)
                print(f"{type}:{site_name}:{day} success")
            pass

        pass