# author : wangrui # data : 2023/8/4 16:54 import os import sys import datetime as DT import time import requests sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.spark_util import SparkUtil from utils.db_util import DBUtil import subprocess from utils.common_util import CommonUtil from datetime import datetime def is_in_time_range(start_time, end_time, check_time): if start_time <= end_time: return start_time <= check_time <= end_time else: return start_time <= check_time or check_time <= end_time def scan_new_store_collections_info(spark, pg_con_info): if pg_con_info is not None: sql = f""" select data_id as seller_id from user_collection_syn where data_type = 2 and to_char(last_crawling_time, 'YYYY-MM-DD') = to_char(current_date, 'YYYY-MM-DD') and calculate_state = 1 and EXTRACT(EPOCH FROM ( now() - last_crawling_time)) / 3600 < 0.5 and data_id is not null group by data_id """ df_new_store_collections_info = SparkUtil.read_jdbc_query(session=spark, url=pg_con_info['url'], pwd=pg_con_info['pwd'], username=pg_con_info['username'], query=sql) if df_new_store_collections_info.count() == 0: return None else: seller_id_list = df_new_store_collections_info.select("seller_id").collect() new_collect_store_list = [row[0] for row in seller_id_list] new_collect_store_id = ",".join(new_collect_store_list) print(new_collect_store_id) return new_collect_store_id def handle_new_store_collections(new_collect_store_id): if new_collect_store_id is not None: new_collect_store_id_list = new_collect_store_id.split(",") before_sql1 = f""" update user_collection_syn set calculate_state = 2 where data_id """ query_store = ', '.join([f"'{value}'" for value in new_collect_store_id_list]) before_sql2 = f" in ({query_store})" before_sql3 = f""" and to_char(last_crawling_time, 'YYYY-MM-DD') = to_char(current_date, 'YYYY-MM-DD') and calculate_state = 1""" before_sql = before_sql1 + before_sql2 + before_sql3 DBUtil.exec_sql('postgresql', 'us', before_sql) cmd = f"""/mnt/run_shell/spark_shell/dws/spark_dws_user_collect_store_asin_detail.sh {site_name} {date_type} {date_info} real_time {new_collect_store_id}""" print(cmd) process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) result, error = process.communicate() if not process.returncode: print("dws开始执行") print("dws执行成功") print("执行信息为:============") print(result.decode()) cmd = f"""/mnt/run_shell/spark_shell/dwt/spark_dwt_user_store_collections_info.sh {site_name} {date_type} {date_info} real_time {new_collect_store_id}""" print(cmd) process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) result, error = process.communicate() if not process.returncode: print("dwt开始执行") print("dwt执行成功") print("执行信息为:============") print(result.decode()) after_sql1 = f""" update user_collection_syn set calculate_state = 3 where data_id """ query_store = ', '.join([f"'{value}'" for value in new_collect_store_id_list]) after_sql2 = f" in ({query_store})" after_sql3 = f""" and to_char(last_crawling_time, 'YYYY-MM-DD') = to_char(current_date, 'YYYY-MM-DD') and calculate_state = 2""" after_sql = after_sql1 + after_sql2 + after_sql3 DBUtil.exec_sql('postgresql', 'us', after_sql) host = "https://selection.yswg.com.cn/soundasia_selection" url = f"{host}/workflow/common/emit" data = { "type": "用户收藏整合", "date_time": datetime.now(), "extraJson": {} } try: requests.post(url=url, data=data, timeout=15) print("推送完成") except: pass print("推送失败") CommonUtil.send_wx_msg(['wangrui4'], f"\u26A0店铺收藏消息推送失败\u26A0", f"任务信息: {cmd} 请注意检查!") else: print("dwt执行失败") print("错误信息为:============") print(error.decode()) CommonUtil.send_wx_msg(['wangrui4'], f"\u26A0店铺收藏更新失败\u26A0", f"任务信息: {cmd} 请注意检查!") else: print("dws执行失败") print("错误信息为:============") print(error.decode()) CommonUtil.send_wx_msg(['wangrui4'], f"\u26A0店铺收藏更新失败\u26A0", f"任务信息: {cmd} 请注意检查!") if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 while True: start_time = DT.time(9, 0) # 下午19:00 end_time = DT.time(21, 0) # 次日早上8:00 current_time = DT.datetime.now().time() if is_in_time_range(start_time, end_time, current_time): spark = SparkUtil.get_spark_sessionV3(f"new_store_collections_info") pg_con_info = DBUtil.get_connection_info('postgresql', f'{site_name}') new_collect_store_id = scan_new_store_collections_info(spark, pg_con_info) if new_collect_store_id is not None: handle_new_store_collections(new_collect_store_id) else: print("当前没有新增店铺,10min后继续扫描") time.sleep(600) # 休眠15分钟 else: print("新增店铺更新任务不在执行时间内") sys.exit(0)