# author : wangrui
# data : 2023/8/4 16:54

import os
import sys
import datetime as DT
import time
import requests

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
import subprocess
from utils.common_util import CommonUtil
from datetime import datetime


def is_in_time_range(start_time, end_time, check_time):
    if start_time <= end_time:
        return start_time <= check_time <= end_time
    else:
        return start_time <= check_time or check_time <= end_time


def scan_new_store_collections_info(spark, pg_con_info):
    if pg_con_info is not None:
        sql = f"""
              select data_id as seller_id 
                from user_collection_syn 
                where data_type = 2 and to_char(last_crawling_time, 'YYYY-MM-DD') = to_char(current_date, 'YYYY-MM-DD') 
                and calculate_state = 1 and EXTRACT(EPOCH FROM ( now() - last_crawling_time)) / 3600 < 0.5 
                and data_id is not null 
                group by data_id
            """
        df_new_store_collections_info = SparkUtil.read_jdbc_query(session=spark, url=pg_con_info['url'],
                                                                  pwd=pg_con_info['pwd'],
                                                                  username=pg_con_info['username'], query=sql)
        if df_new_store_collections_info.count() == 0:
            return None
        else:
            seller_id_list = df_new_store_collections_info.select("seller_id").collect()
            new_collect_store_list = [row[0] for row in seller_id_list]
            new_collect_store_id = ",".join(new_collect_store_list)
            print(new_collect_store_id)
            return new_collect_store_id


def handle_new_store_collections(new_collect_store_id):
    if new_collect_store_id is not None:
        new_collect_store_id_list = new_collect_store_id.split(",")
        before_sql1 = f"""
            update user_collection_syn set calculate_state = 2 where data_id
        """
        query_store = ', '.join([f"'{value}'" for value in new_collect_store_id_list])
        before_sql2 = f" in ({query_store})"
        before_sql3 = f""" and to_char(last_crawling_time, 'YYYY-MM-DD') = to_char(current_date, 'YYYY-MM-DD') and calculate_state = 1"""
        before_sql = before_sql1 + before_sql2 + before_sql3
        DBUtil.exec_sql('postgresql', 'us', before_sql)
        cmd = f"""/mnt/run_shell/spark_shell/dws/spark_dws_user_collect_store_asin_detail.sh {site_name} {date_type} {date_info} real_time {new_collect_store_id}"""
        print(cmd)
        process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        result, error = process.communicate()
        if not process.returncode:
            print("dws开始执行")
            print("dws执行成功")
            print("执行信息为:============")
            print(result.decode())
            cmd = f"""/mnt/run_shell/spark_shell/dwt/spark_dwt_user_store_collections_info.sh {site_name} {date_type} {date_info} real_time {new_collect_store_id}"""
            print(cmd)
            process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            result, error = process.communicate()
            if not process.returncode:
                print("dwt开始执行")
                print("dwt执行成功")
                print("执行信息为:============")
                print(result.decode())
                after_sql1 = f"""
                 update user_collection_syn set calculate_state = 3 where data_id  
                """
                query_store = ', '.join([f"'{value}'" for value in new_collect_store_id_list])
                after_sql2 = f" in ({query_store})"
                after_sql3 = f""" and to_char(last_crawling_time, 'YYYY-MM-DD') = to_char(current_date, 'YYYY-MM-DD') and calculate_state = 2"""
                after_sql = after_sql1 + after_sql2 + after_sql3
                DBUtil.exec_sql('postgresql', 'us', after_sql)
                host = "https://selection.yswg.com.cn/soundasia_selection"
                url = f"{host}/workflow/common/emit"
                data = {
                    "type": "用户收藏整合",
                    "date_time": datetime.now(),
                    "extraJson": {}
                }
                try:
                    requests.post(url=url, data=data, timeout=15)
                    print("推送完成")
                except:
                    pass
                    print("推送失败")
                    CommonUtil.send_wx_msg(['wangrui4'], f"\u26A0店铺收藏消息推送失败\u26A0", f"任务信息: {cmd} 请注意检查!")

            else:
                print("dwt执行失败")
                print("错误信息为:============")
                print(error.decode())
                CommonUtil.send_wx_msg(['wangrui4'], f"\u26A0店铺收藏更新失败\u26A0", f"任务信息: {cmd} 请注意检查!")
        else:
            print("dws执行失败")
            print("错误信息为:============")
            print(error.decode())
            CommonUtil.send_wx_msg(['wangrui4'], f"\u26A0店铺收藏更新失败\u26A0", f"任务信息: {cmd} 请注意检查!")


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    while True:
        start_time = DT.time(9, 0)  # 下午19:00
        end_time = DT.time(21, 0)  # 次日早上8:00
        current_time = DT.datetime.now().time()
        if is_in_time_range(start_time, end_time, current_time):
            spark = SparkUtil.get_spark_sessionV3(f"new_store_collections_info")
            pg_con_info = DBUtil.get_connection_info('postgresql', f'{site_name}')
            new_collect_store_id = scan_new_store_collections_info(spark, pg_con_info)
            if new_collect_store_id is not None:
                handle_new_store_collections(new_collect_store_id)
            else:
                print("当前没有新增店铺,10min后继续扫描")
                time.sleep(600)  # 休眠15分钟
        else:
            print("新增店铺更新任务不在执行时间内")
            sys.exit(0)