Commit 4bfb7e81 by wangrui

消息推送调整

parent 5c0104c0
......@@ -217,7 +217,7 @@ class DwdAsinToPg(Templates):
self.df_save = self.df_save.fillna({"asin_is_variation": 0})
self.df_save.show(10, truncate=False)
print("self.df_save.count:", self.df_save.count())
users = ["fangxingjun", "wangrui4", "pengyanbing"]
users = ["fangxingjun", "chenyuanjie", "pengyanbing"]
title = f"dwd_asin_to_pg: {self.site_name}, {self.date_type}, {self.date_info}"
content = f"整合asin完成--等待导出到pg提供爬虫使用--self.df_save.count: {self.df_save.count()}"
CommonUtil().send_wx_msg(users=users, title=title, content=content)
......
......@@ -95,18 +95,18 @@ def handle_new_store_collections(new_collect_store_id):
except:
pass
print("推送失败")
CommonUtil.send_wx_msg(['wangrui4'], f"\u26A0店铺收藏消息推送失败\u26A0", f"任务信息: {cmd} 请注意检查!")
CommonUtil.send_wx_msg(['chenyuanjie'], f"\u26A0店铺收藏消息推送失败\u26A0", f"任务信息: {cmd} 请注意检查!")
else:
print("dwt执行失败")
print("错误信息为:============")
print(error.decode())
CommonUtil.send_wx_msg(['wangrui4'], f"\u26A0店铺收藏更新失败\u26A0", f"任务信息: {cmd} 请注意检查!")
CommonUtil.send_wx_msg(['chenyuanjie'], f"\u26A0店铺收藏更新失败\u26A0", f"任务信息: {cmd} 请注意检查!")
else:
print("dws执行失败")
print("错误信息为:============")
print(error.decode())
CommonUtil.send_wx_msg(['wangrui4'], f"\u26A0店铺收藏更新失败\u26A0", f"任务信息: {cmd} 请注意检查!")
CommonUtil.send_wx_msg(['chenyuanjie'], f"\u26A0店铺收藏更新失败\u26A0", f"任务信息: {cmd} 请注意检查!")
if __name__ == '__main__':
......
......@@ -81,7 +81,7 @@ def update_es_fileds(spark, df_main, date_info_list, site_name, run_type):
print(f"elasticsearch {index_name} 更新完毕!")
except Exception as e:
print("An error occurred while writing to Elasticsearch:", str(e))
CommonUtil.send_wx_msg(['wujicang', 'wangrui4'], '\u26A0 es用户标记信息更新失败', f'es更新用户标记信息失败:{site_name}, {date_info}')
CommonUtil.send_wx_msg(['wujicang', 'chenyuanjie'], '\u26A0 es用户标记信息更新失败', f'es更新用户标记信息失败:{site_name}, {date_info}')
pass
print("elasticsearch 所有数据全部更新完毕")
......
......@@ -54,7 +54,7 @@ class EsStDetail(TemplatesMysql):
# 正式导出需入导出记录表
if result_type == 'formal':
CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
principal='wangrui4', priority=3, export_tools_type=2,
principal='chenyuanjie', priority=3, export_tools_type=2,
belonging_to_process='流量选品')
def get_date_from_week(self):
......
......@@ -157,6 +157,6 @@ if __name__ == '__main__':
# print(schema_flag)
if not schema_flag:
msg = f"数据表:{hive_table} {msg_params},计算数据存在验证不通过,请检查数据是否异常!!具体信息请查看日志!!"
CommonUtil.send_wx_msg(['chenjianyun', 'wangrui4'], f"\u26A0 {hive_table} {msg_params}流程数据导出前验证异常", msg)
CommonUtil.send_wx_msg(['chenjianyun'], f"\u26A0 {hive_table} {msg_params}流程数据导出前验证异常", msg)
spark_session.stop()
pass
\ No newline at end of file
......@@ -59,7 +59,7 @@ class DolProcessApi(Templates):
},
warning_Type="ALL"
)
users = ["fangxingjun", "wangrui4", "pengyanbing", "chenyuanjie"]
users = ["fangxingjun", "pengyanbing", "chenyuanjie"]
title = f"参数信息--{self.site_name}, {self.date_type}, {self.date_info}"
content = f"触发调度--{process_df_name}"
CommonUtil().send_wx_msg(users=users, title=title, content=content)
......
......@@ -23,8 +23,7 @@ def check_hadoop():
'wujicang',
'huangjian',
'fangxingjun',
'chenjianyun',
'wangrui4'
'chenjianyun'
]
print("发送微信消息中!")
CommonUtil.send_wx_msg(users, "hadoop健康检查", content)
......
......@@ -23,8 +23,7 @@ def check_hadoop():
'wujicang',
'huangjian',
'fangxingjun',
'chenjianyun',
'wangrui4'
'chenjianyun'
]
print("发送微信消息中!")
CommonUtil.send_wx_msg(users, "hadoop健康检查", content)
......
......@@ -41,6 +41,6 @@ if __name__ == '__main__':
}
)
CommonUtil.send_wx_msg(["wangrui4", "chenyuanjie", "zhouyuchen"], "【周重跑-新aba(四分位计算)流程】导出完成", "集群升级——启动!")
CommonUtil.send_wx_msg(["chenyuanjie", "zhouyuchen"], "【周重跑-新aba(四分位计算)流程】导出完成", "集群升级——启动!")
pass
......@@ -22,7 +22,7 @@ if __name__ == '__main__':
else:
CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
principal='wangrui4', priority=2, export_tools_type=1, belonging_to_process='买家搜索词')
principal='chenyuanjie', priority=2, export_tools_type=1, belonging_to_process='买家搜索词')
db_type = 'postgresql_cluster'
print("导出到PG-Cluster库中")
year_str = CommonUtil.safeIndex(date_info.split("-"), 0, None)
......
......@@ -14,7 +14,7 @@ if __name__ == '__main__':
date_info = CommonUtil.get_sys_arg(3, None)
CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
principal='wangrui4', priority=3, export_tools_type=1, belonging_to_process='最小产品线市场数据汇总')
principal='zhouyuchen', priority=3, export_tools_type=1, belonging_to_process='最小产品线市场数据汇总')
db_type = 'mysql'
export_tb = f"keepa_{site_name}_asin_bsr_rank"
......@@ -55,4 +55,4 @@ if __name__ == '__main__':
client.close()
print("success")
CommonUtil.send_wx_msg(["wangrui4", "heqinsi"], f"{export_tb} 导出完成", f"时间:{date_info} 最小产品线市场数据明细信息导出完成")
CommonUtil.send_wx_msg(["zhouyuchen", "heqinsi"], f"{export_tb} 导出完成", f"时间:{date_info} 最小产品线市场数据明细信息导出完成")
......@@ -13,7 +13,7 @@ if __name__ == '__main__':
date_info = CommonUtil.get_sys_arg(3, None)
CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info,
principal='wangrui4', priority=3, export_tools_type=1, belonging_to_process='最小产品线市场数据汇总')
principal='zhouyuchen', priority=3, export_tools_type=1, belonging_to_process='最小产品线市场数据汇总')
db_type = 'mysql'
export_tb = f"keepa_{site_name}_asin_bsr_rank_sum_level_1"
......@@ -55,4 +55,4 @@ if __name__ == '__main__':
client.close()
print("success")
CommonUtil.send_wx_msg(["wangrui4", "heqinsi"], f"{export_tb} 导出完成", f"时间:{date_info} 最小产品线市场数据汇总信息导出完成")
CommonUtil.send_wx_msg(["zhouyuchen", "heqinsi"], f"{export_tb} 导出完成", f"时间:{date_info} 最小产品线市场数据汇总信息导出完成")
......@@ -24,7 +24,7 @@ if __name__ == '__main__':
db_type = 'postgresql_test'
print("导出到测试库中")
else:
CommonUtil.judge_is_work_hours(site_name=site_name, date_info=date_info, principal='wangrui4',
CommonUtil.judge_is_work_hours(site_name=site_name, date_info=date_info, principal='chenyuanjie',
priority=4, export_tools_type=1, belonging_to_process='自然asin汇总')
db_type = "postgresql_cluster"
print("导出到PG集群库中")
......
......@@ -42,7 +42,7 @@ if __name__ == '__main__':
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['wangrui4']
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
......
......@@ -49,7 +49,7 @@ if __name__ == '__main__':
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['wangrui4']
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
......@@ -70,7 +70,7 @@ if __name__ == '__main__':
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['wangrui4']
msg_usr=['chenyuanjie']
)
pass
......@@ -70,7 +70,7 @@ if __name__ == '__main__':
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['wangrui4']
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
......@@ -91,7 +91,7 @@ if __name__ == '__main__':
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['wangrui4']
msg_usr=['chenyuanjie']
)
pass
......@@ -35,7 +35,7 @@ if __name__ == '__main__':
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['wangrui4']
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
......@@ -56,7 +56,7 @@ if __name__ == '__main__':
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['wangrui4']
msg_usr=['chenyuanjie']
)
pass
......@@ -37,7 +37,7 @@ if __name__ == '__main__':
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['wangrui4']
msg_usr=['zhouyuchen']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
......@@ -58,7 +58,7 @@ if __name__ == '__main__':
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['wangrui4']
msg_usr=['zhouyuchen']
)
pass
......@@ -36,7 +36,7 @@ if __name__ == '__main__':
site_name=site_name,
query=query,
hive_tb_name=hive_tb,
msg_usr=['wangrui4']
msg_usr=['chenyuanjie']
)
assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!"
......@@ -57,6 +57,6 @@ if __name__ == '__main__':
partition_dict=partition_dict,
import_query=query,
hive_tb_name=hive_tb,
msg_usr=['wangrui4'])
msg_usr=['chenyuanjie'])
pass
......@@ -52,7 +52,7 @@ class CommonUtil(object):
__hive_home__ = "/opt/datasophon/hive-3.1.0/bin/hive"
__hadoop_home__ = "/opt/module/hadoop/bin/hadoop"
__msg_usr__ = ['wujicang', 'huangjian', 'fangxingjun', 'chenjianyun', 'wangrui4']
__msg_usr__ = ['wujicang', 'huangjian', 'fangxingjun', 'chenjianyun']
_date_time_format = "yyyy-MM-dd HH:mm:ss"
......@@ -1590,7 +1590,7 @@ outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
@staticmethod
def judge_is_work_hours(site_name: str = 'us', date_type: str = None, date_info: str = None,
principal: str = "wangrui4, huangjian", priority: int = 1, export_tools_type: int = 1,
principal: str = "chenyuanjie", priority: int = 1, export_tools_type: int = 1,
belonging_to_process: str = None):
"""
导出任务时间约束:控制数据导出任务在非上班时间段进行
......
......@@ -606,7 +606,7 @@ class Templates(object):
if self.consumer_type == 'latest' and self.test_flag == 'normal' and script_name in ['kafka_flow_asin_detail', 'kafka_asin_detail']:
if script_name == 'kafka_flow_asin_detail':
kafka_field = 'kafka_flow_state'
wx_users = ['wangrui4', 'pengyanbing']
wx_users = ['chenyuanjie', 'pengyanbing']
wx_msg = f"站点: {self.site_name} 日期类型: {self.date_type} {self.date_info} asin详情实时消费数据到es准备工作已完成,可以开启详情爬取!"
elif script_name == 'kafka_asin_detail':
kafka_field = 'kafka_state'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment