dwt_new_store_collect_info.py 6.26 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
# author : wangrui
# data : 2023/8/4 16:54

import os
import sys
import datetime as DT
import time
import requests

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
import subprocess
from utils.common_util import CommonUtil
from datetime import datetime


def is_in_time_range(start_time, end_time, check_time):
    if start_time <= end_time:
        return start_time <= check_time <= end_time
    else:
        return start_time <= check_time or check_time <= end_time


def scan_new_store_collections_info(spark, pg_con_info):
    if pg_con_info is not None:
        sql = f"""
              select data_id as seller_id 
                from user_collection_syn 
                where data_type = 2 and to_char(last_crawling_time, 'YYYY-MM-DD') = to_char(current_date, 'YYYY-MM-DD') 
                and calculate_state = 1 and EXTRACT(EPOCH FROM ( now() - last_crawling_time)) / 3600 < 0.5 
                and data_id is not null 
                group by data_id
            """
        df_new_store_collections_info = SparkUtil.read_jdbc_query(session=spark, url=pg_con_info['url'],
                                                                  pwd=pg_con_info['pwd'],
                                                                  username=pg_con_info['username'], query=sql)
        if df_new_store_collections_info.count() == 0:
            return None
        else:
            seller_id_list = df_new_store_collections_info.select("seller_id").collect()
            new_collect_store_list = [row[0] for row in seller_id_list]
            new_collect_store_id = ",".join(new_collect_store_list)
            print(new_collect_store_id)
            return new_collect_store_id


def handle_new_store_collections(new_collect_store_id):
    if new_collect_store_id is not None:
        new_collect_store_id_list = new_collect_store_id.split(",")
        before_sql1 = f"""
            update user_collection_syn set calculate_state = 2 where data_id
        """
        query_store = ', '.join([f"'{value}'" for value in new_collect_store_id_list])
        before_sql2 = f" in ({query_store})"
        before_sql3 = f""" and to_char(last_crawling_time, 'YYYY-MM-DD') = to_char(current_date, 'YYYY-MM-DD') and calculate_state = 1"""
        before_sql = before_sql1 + before_sql2 + before_sql3
        DBUtil.exec_sql('postgresql', 'us', before_sql)
        cmd = f"""/mnt/run_shell/spark_shell/dws/spark_dws_user_collect_store_asin_detail.sh {site_name} {date_type} {date_info} real_time {new_collect_store_id}"""
        print(cmd)
        process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        result, error = process.communicate()
        if not process.returncode:
            print("dws开始执行")
            print("dws执行成功")
            print("执行信息为:============")
            print(result.decode())
            cmd = f"""/mnt/run_shell/spark_shell/dwt/spark_dwt_user_store_collections_info.sh {site_name} {date_type} {date_info} real_time {new_collect_store_id}"""
            print(cmd)
            process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            result, error = process.communicate()
            if not process.returncode:
                print("dwt开始执行")
                print("dwt执行成功")
                print("执行信息为:============")
                print(result.decode())
                after_sql1 = f"""
                 update user_collection_syn set calculate_state = 3 where data_id  
                """
                query_store = ', '.join([f"'{value}'" for value in new_collect_store_id_list])
                after_sql2 = f" in ({query_store})"
                after_sql3 = f""" and to_char(last_crawling_time, 'YYYY-MM-DD') = to_char(current_date, 'YYYY-MM-DD') and calculate_state = 2"""
                after_sql = after_sql1 + after_sql2 + after_sql3
                DBUtil.exec_sql('postgresql', 'us', after_sql)
                host = "https://selection.yswg.com.cn/soundasia_selection"
                url = f"{host}/workflow/common/emit"
                data = {
                    "type": "用户收藏整合",
                    "date_time": datetime.now(),
                    "extraJson": {}
                }
                try:
                    requests.post(url=url, data=data, timeout=15)
                    print("推送完成")
                except:
                    pass
                    print("推送失败")
wangrui committed
98
                    CommonUtil.send_wx_msg(['chenyuanjie'], f"\u26A0店铺收藏消息推送失败\u26A0", f"任务信息: {cmd} 请注意检查!")
chenyuanjie committed
99 100 101 102 103

            else:
                print("dwt执行失败")
                print("错误信息为:============")
                print(error.decode())
wangrui committed
104
                CommonUtil.send_wx_msg(['chenyuanjie'], f"\u26A0店铺收藏更新失败\u26A0", f"任务信息: {cmd} 请注意检查!")
chenyuanjie committed
105 106 107 108
        else:
            print("dws执行失败")
            print("错误信息为:============")
            print(error.decode())
wangrui committed
109
            CommonUtil.send_wx_msg(['chenyuanjie'], f"\u26A0店铺收藏更新失败\u26A0", f"任务信息: {cmd} 请注意检查!")
chenyuanjie committed
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    while True:
        start_time = DT.time(9, 0)  # 下午19:00
        end_time = DT.time(21, 0)  # 次日早上8:00
        current_time = DT.datetime.now().time()
        if is_in_time_range(start_time, end_time, current_time):
            spark = SparkUtil.get_spark_sessionV3(f"new_store_collections_info")
            pg_con_info = DBUtil.get_connection_info('postgresql', f'{site_name}')
            new_collect_store_id = scan_new_store_collections_info(spark, pg_con_info)
            if new_collect_store_id is not None:
                handle_new_store_collections(new_collect_store_id)
            else:
                print("当前没有新增店铺,10min后继续扫描")
                time.sleep(600)  # 休眠15分钟
        else:
            print("新增店铺更新任务不在执行时间内")
            sys.exit(0)