Commit 3c29ca39 by chenyuanjie

流量选品-日流程-健壮性操作

parent 2ca6c058
......@@ -17,7 +17,7 @@ from pyspark.sql import functions as F
from utils.db_util import DbTypes, DBUtil
from utils.common_util import CommonUtil
from datetime import datetime, timedelta
from utils.hdfs_utils import HdfsUtils
class DwdNsrBsrKeepaAsin(Templates):
......@@ -162,6 +162,33 @@ class DwdNsrBsrKeepaAsin(Templates):
content = f"整合asin完成--等待导出到pg提供爬虫使用--数量: {self.df_save.count()}"
CommonUtil().send_wx_msg(users=users, title=title, content=content)
# 更新30day分区
hdfs_day = CommonUtil.build_hdfs_path(self.db_save_cate, {"site_name": self.site_name, "date_type": self.date_type, "date_info": self.date_info})
hdfs_30day = CommonUtil.build_hdfs_path(self.db_save_cate, {"site_name": self.site_name, "date_type": "30day", "date_info": "1970-01"})
hdfs_30day_copy = CommonUtil.build_hdfs_path(self.db_save_cate, {"site_name": self.site_name, "date_type": "30day_copy", "date_info": "1970-01"})
if not HdfsUtils.path_exist(hdfs_day):
print(f"源目录不存在: {hdfs_day}")
wx_users = ['fangxingjun', 'chenyuanjie']
wx_msg = f"{hdfs_day} 目录数据不存在,请检查!"
CommonUtil.send_wx_msg(wx_users, "复制数据至30day分区", wx_msg)
sys.exit(1)
if HdfsUtils.path_exist(hdfs_30day_copy):
print(f"中间目录已存在,先清空: {hdfs_30day_copy}")
HdfsUtils.delete_file_in_folder(hdfs_30day_copy)
else:
print(f"中间目录不存在,创建: {hdfs_30day_copy}")
HdfsUtils.create_if_not_exist(hdfs_30day_copy)
os.system(f"hdfs dfs -cp {hdfs_day}/* {hdfs_30day_copy}/")
files = HdfsUtils.read_list(hdfs_30day_copy)
print(f"中间目录文件数: {len(files) if files else 0}")
if not HdfsUtils.path_exist(hdfs_30day):
print(f"目标目录不存在,创建: {hdfs_30day}")
HdfsUtils.create_if_not_exist(hdfs_30day)
HdfsUtils.exchange_path(hdfs_30day_copy, hdfs_30day)
print(f"交换完成! 30day与30day_copy已互换")
# 4. 修复hive元数据
CommonUtil.hive_cmd_exec(f"set hive.msck.path.validation=ignore; msck repair table big_data_selection.{self.db_save_cate};")
# def save_data(self):
# pass
......
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
if __name__ == '__main__':
site_name = sys.argv[1]
date_type = sys.argv[2]
date_info = sys.argv[3]
table_name_list = ['dwt_flow_asin', 'dim_fd_asin_info', 'ods_other_search_term_data',
'dwd_asin_measure', 'ods_one_category_report', 'ods_asin_keep_date']
for table_name in table_name_list:
if table_name in ['dim_fd_asin_info', 'ods_asin_keep_date']:
hdfs_month = CommonUtil.build_hdfs_path(table_name, {"site_name": site_name})
# 复制到copy表
table_name = f"{table_name}_30day"
hdfs_30day = CommonUtil.build_hdfs_path(table_name, {"site_name": site_name, "date_type": "30day"})
hdfs_30day_copy = CommonUtil.build_hdfs_path(table_name, {"site_name": site_name, "date_type": "30day_copy"})
else:
hdfs_month = CommonUtil.build_hdfs_path(table_name, {"site_name": site_name, "date_type": date_type, "date_info": date_info})
hdfs_30day = CommonUtil.build_hdfs_path(table_name, {"site_name": site_name, "date_type": "30day", "date_info": "1970-01"})
hdfs_30day_copy = CommonUtil.build_hdfs_path(table_name, {"site_name": site_name, "date_type": "30day_copy", "date_info": "1970-01"})
print(f"源目录: {hdfs_month}")
print(f"目标目录: {hdfs_30day}")
print(f"中间目录: {hdfs_30day_copy}")
# 1. 检查源目录是否存在
if not HdfsUtils.path_exist(hdfs_month):
print(f"源目录不存在: {hdfs_month}")
wx_users = ['fangxingjun', 'chenyuanjie']
wx_msg = f"{hdfs_month} 目录数据不存在,请检查!"
CommonUtil.send_wx_msg(wx_users, "复制数据至30day分区", wx_msg)
sys.exit(1)
# 2. 将源目录下的文件复制到中间目录
if HdfsUtils.path_exist(hdfs_30day_copy):
print(f"中间目录已存在,先清空: {hdfs_30day_copy}")
HdfsUtils.delete_file_in_folder(hdfs_30day_copy)
else:
print(f"中间目录不存在,创建: {hdfs_30day_copy}")
HdfsUtils.create_if_not_exist(hdfs_30day_copy)
# HdfsUtils.copy_file(hdfs_month, hdfs_30day_copy)
os.system(f"hdfs dfs -cp {hdfs_month}/* {hdfs_30day_copy}/")
files = HdfsUtils.read_list(hdfs_30day_copy)
print(f"中间目录文件数: {len(files) if files else 0}")
# 3. 将中间目录与目标目录交换分区名
if not HdfsUtils.path_exist(hdfs_30day):
print(f"目标目录不存在,创建: {hdfs_30day}")
HdfsUtils.create_if_not_exist(hdfs_30day)
HdfsUtils.exchange_path(hdfs_30day_copy, hdfs_30day)
print(f"交换完成! 30day与30day_copy已互换")
# 4. 修复hive元数据
CommonUtil.hive_cmd_exec(f"set hive.msck.path.validation=ignore; msck repair table big_data_selection.{table_name};")
print("success!")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment