Commit c7f5d019 by fangxingjun

新增导入数据时,跳过数量校验参数;新增导出数据功能

parent f3ab9b3c
......@@ -6,14 +6,14 @@ import json
import orjson, requests, time
from typing import List, Optional, Union
DEFAULT_USER = "fangxingjun"
DEFAULT_USER_TOKEN = "5f1b2e9c3a4d7f60"
# DEFAULT_USER = "fangxingjun"
# DEFAULT_USER_TOKEN = "fxj_token_123"
# sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
# try:
# from user import DEFAULT_USER, DEFAULT_USER_TOKEN
# except Exception as e:
# from .user import DEFAULT_USER, DEFAULT_USER_TOKEN
try:
from user import DEFAULT_USER, DEFAULT_USER_TOKEN
except Exception as e:
from .user import DEFAULT_USER, DEFAULT_USER_TOKEN
# 新增:默认微信通知人(用于任务失败时告警)
NOTIFY_DEFAULT_USERS = ['pengyanbing', 'hezhe', 'chenyuanjie', 'fangxingjun']
......@@ -205,7 +205,10 @@ class RemoteEngine:
payload = {
"users": self.alert_users, # 也可不传,由服务端默认;这里显式传入
"title": title,
"content": content
"content": content,
"msgtype": "textcard",
"user": self.user,
"user_token": self.user_token,
}
for url in self.urls:
try:
......@@ -270,6 +273,7 @@ class RemoteEngine:
print(f"更新 {rows_updated} 行")
return df
def sqoop_raw_import(self,
# site_name: str,
# db_type: str,
......@@ -285,6 +289,7 @@ class RemoteEngine:
sqoop_home: str = "/opt/module/sqoop-1.4.6/bin/sqoop",
job_name: str = None,
dry_run: bool = False,
check_count: bool = True,
clean_hdfs: bool = True, # 是否删除hdfs路径, 默认删除
timeout_sec: int = 36000):
print("site_name:", self.site_name, "db_type:", self.db_type)
......@@ -304,6 +309,7 @@ class RemoteEngine:
"sqoop_home": sqoop_home,
"job_name": job_name or f"sqoop_task---{hive_table}",
"dry_run": dry_run,
"check_count": check_count, # ✅ 新增
"hdfs_path": hdfs_path,
"clean_hdfs": clean_hdfs,
"timeout_sec": timeout_sec
......@@ -366,6 +372,64 @@ class RemoteEngine:
print("\n✅ 全部步骤通过。")
def sqoop_raw_export(self,
hive_table: str,
import_table: str,
partitions: dict = None,
default_db: str = "big_data_selection",
queue_name: str = "default",
m: int = 1,
cols: str = None,
outdir: str = "/tmp/sqoop/",
sqoop_home: str = "/opt/module/sqoop-1.4.6/bin/sqoop",
dry_run: bool = False,
timeout_sec: int = 36000):
"""
Hive → MySQL / PostgreSQL 导出
"""
print("site_name:", self.site_name, "db_type:", self.db_type)
body = {
"site_name": self.site_name,
"db_type": self.db_type,
"hive_table": hive_table,
"import_table": import_table,
"default_db": default_db,
"partitions": partitions or {},
"queue_name": queue_name,
"m": m,
"cols": cols,
"outdir": outdir,
"sqoop_home": sqoop_home,
"dry_run": dry_run,
"timeout_sec": timeout_sec
}
resp = self._request("sqoop/raw_export", body, timeout=timeout_sec)
print("\n===== SQOOP EXPORT =====")
if "cmd" in resp and resp["cmd"]:
print("CMD:", resp["cmd"])
if resp.get("msg"):
print("MSG:", resp["msg"])
if resp.get("stdout"):
print("\nSTDOUT:\n", resp["stdout"])
if resp.get("stderr"):
print("\nSTDERR:\n", resp["stderr"])
print("OK:", resp.get("ok"), " CODE:", resp.get("code"))
if not resp.get("ok", False):
self._notify_textcard(
title="Sqoop 导出失败",
content=(
f"任务:{hive_table} → {import_table}\n"
f"code:{resp.get('code')}\n"
f"msg:{self._short(resp.get('msg'))}\n"
f"stderr:{self._short(resp.get('stderr'))}"
)
)
raise RuntimeError("Sqoop 导出失败。详见上面日志。")
print("\n✅ Sqoop export 成功。")
def begin(self):
return RemoteTransaction(self.db_type, self.site_name,
self.session, self.urls,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment