dwt_new_store_collect_info.py 6.25 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
# author : wangrui
# data : 2023/8/4 16:54

import os
import sys
import datetime as DT
import time
import requests

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
import subprocess
from utils.common_util import CommonUtil
from datetime import datetime


def is_in_time_range(start_time, end_time, check_time):
    if start_time <= end_time:
        return start_time <= check_time <= end_time
    else:
        return start_time <= check_time or check_time <= end_time


def scan_new_store_collections_info(spark, pg_con_info):
    if pg_con_info is not None:
        sql = f"""
              select data_id as seller_id 
                from user_collection_syn 
                where data_type = 2 and to_char(last_crawling_time, 'YYYY-MM-DD') = to_char(current_date, 'YYYY-MM-DD') 
                and calculate_state = 1 and EXTRACT(EPOCH FROM ( now() - last_crawling_time)) / 3600 < 0.5 
                and data_id is not null 
                group by data_id
            """
        df_new_store_collections_info = SparkUtil.read_jdbc_query(session=spark, url=pg_con_info['url'],
                                                                  pwd=pg_con_info['pwd'],
                                                                  username=pg_con_info['username'], query=sql)
        if df_new_store_collections_info.count() == 0:
            return None
        else:
            seller_id_list = df_new_store_collections_info.select("seller_id").collect()
            new_collect_store_list = [row[0] for row in seller_id_list]
            new_collect_store_id = ",".join(new_collect_store_list)
            print(new_collect_store_id)
            return new_collect_store_id


def handle_new_store_collections(new_collect_store_id):
    if new_collect_store_id is not None:
        new_collect_store_id_list = new_collect_store_id.split(",")
        before_sql1 = f"""
            update user_collection_syn set calculate_state = 2 where data_id
        """
        query_store = ', '.join([f"'{value}'" for value in new_collect_store_id_list])
        before_sql2 = f" in ({query_store})"
        before_sql3 = f""" and to_char(last_crawling_time, 'YYYY-MM-DD') = to_char(current_date, 'YYYY-MM-DD') and calculate_state = 1"""
        before_sql = before_sql1 + before_sql2 + before_sql3
        DBUtil.exec_sql('postgresql', 'us', before_sql)
        cmd = f"""/mnt/run_shell/spark_shell/dws/spark_dws_user_collect_store_asin_detail.sh {site_name} {date_type} {date_info} real_time {new_collect_store_id}"""
        print(cmd)
        process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        result, error = process.communicate()
        if not process.returncode:
            print("dws开始执行")
            print("dws执行成功")
            print("执行信息为:============")
            print(result.decode())
            cmd = f"""/mnt/run_shell/spark_shell/dwt/spark_dwt_user_store_collections_info.sh {site_name} {date_type} {date_info} real_time {new_collect_store_id}"""
            print(cmd)
            process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            result, error = process.communicate()
            if not process.returncode:
                print("dwt开始执行")
                print("dwt执行成功")
                print("执行信息为:============")
                print(result.decode())
                after_sql1 = f"""
                 update user_collection_syn set calculate_state = 3 where data_id  
                """
                query_store = ', '.join([f"'{value}'" for value in new_collect_store_id_list])
                after_sql2 = f" in ({query_store})"
                after_sql3 = f""" and to_char(last_crawling_time, 'YYYY-MM-DD') = to_char(current_date, 'YYYY-MM-DD') and calculate_state = 2"""
                after_sql = after_sql1 + after_sql2 + after_sql3
                DBUtil.exec_sql('postgresql', 'us', after_sql)
                host = "https://selection.yswg.com.cn/soundasia_selection"
                url = f"{host}/workflow/common/emit"
                data = {
                    "type": "用户收藏整合",
                    "date_time": datetime.now(),
                    "extraJson": {}
                }
                try:
                    requests.post(url=url, data=data, timeout=15)
                    print("推送完成")
                except:
                    pass
                    print("推送失败")
                    CommonUtil.send_wx_msg(['wangrui4'], f"\u26A0店铺收藏消息推送失败\u26A0", f"任务信息: {cmd} 请注意检查!")

            else:
                print("dwt执行失败")
                print("错误信息为:============")
                print(error.decode())
                CommonUtil.send_wx_msg(['wangrui4'], f"\u26A0店铺收藏更新失败\u26A0", f"任务信息: {cmd} 请注意检查!")
        else:
            print("dws执行失败")
            print("错误信息为:============")
            print(error.decode())
            CommonUtil.send_wx_msg(['wangrui4'], f"\u26A0店铺收藏更新失败\u26A0", f"任务信息: {cmd} 请注意检查!")


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    while True:
        start_time = DT.time(9, 0)  # 下午19:00
        end_time = DT.time(21, 0)  # 次日早上8:00
        current_time = DT.datetime.now().time()
        if is_in_time_range(start_time, end_time, current_time):
            spark = SparkUtil.get_spark_sessionV3(f"new_store_collections_info")
            pg_con_info = DBUtil.get_connection_info('postgresql', f'{site_name}')
            new_collect_store_id = scan_new_store_collections_info(spark, pg_con_info)
            if new_collect_store_id is not None:
                handle_new_store_collections(new_collect_store_id)
            else:
                print("当前没有新增店铺,10min后继续扫描")
                time.sleep(600)  # 休眠15分钟
        else:
            print("新增店铺更新任务不在执行时间内")
            sys.exit(0)