1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#! /bin/env bash
db_type=mysql
maps=10
queue=default
db=selection
get_db_info(){
# 动态获取数据库账号密码
if [ $db_type == mysql ]
then
db_url=mysql
host=rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com
port=3306
username=adv_yswg
password=HCL1zcUgQesaaXNLbL37O5KhpSAy0c
elif [ $db_type == postgresql ]
then
db_url=postgresql
host=192.168.10.216
port=5432
username=postgres
password=fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS
elif [ $db_type == pgsqltest ]
then
db_url=postgresql
host=192.168.10.217
port=5433
username=postgres
password=fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS
elif [ $db_type == pg_cluster ]
then
db_url=postgresql
host=192.168.10.221
port=6432
username=postgres
password=fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS
else
echo "获取数据库账号密码错误"
fi
echo "host":${host}, "port":${port}, "username":${username}, "password":${password}
echo jdbc:$db_url://$host:$port/$db
}
import_data(){
# mysql或者pg导入hive
get_db_info
# 删除hdfs对应的文件路径
hdfs dfs -rm -r ${hdfs_path}
# 导入数据
/opt/module/sqoop-1.4.6/bin/sqoop import -D mapred.job.queue.name=${queue} -D mapred.task.timeout=0 --append \
--connect jdbc:$db_url://$host:$port/$db \
--username $username \
--password $password \
--target-dir ${hdfs_path} \
--query "${query}" \
--fields-terminated-by '\t' \
--hive-drop-import-delims \
--input-null-string '\\N' \
--input-null-non-string '\\N' \
--compress \
--compression-codec lzop \
--m 1 \
--outdir "/tmp/sqoop/"
echo lzo文件加索引
/opt/module/hadoop/bin/hadoop jar \
/opt/module/hadoop/share/hadoop/common/hadoop-lzo-0.4.20.jar \
com.hadoop.compression.lzo.DistributedLzoIndexer -Dmapreduce.job.queuename=${queue} \
${hdfs_path}
# 恢复表分区
/opt/module/hive/bin/hive -e "set hive.msck.path.validation=ignore; MSCK REPAIR TABLE big_data_selection.${hive_table};"
}
export_data(){
# hive导出mysql或者pg
get_db_info
/opt/module/sqoop-1.4.6/bin/sqoop export -D mapred.job.queue.name=${queue} -D mapred.task.timeout=0 \
--connect jdbc:$db_url://$host:$port/$db \
--username $username \
--password $password \
--table ${import_table} \
--input-fields-terminated-by '\001' \
--hcatalog-database big_data_selection \
--hcatalog-table ${hive_table} \
--hcatalog-partition-keys $p_keys \
--hcatalog-partition-values $p_values \
--input-null-string '\\N' \
--input-null-non-string '\\N' \
--num-mappers ${maps} \
--columns ${cols} \
--outdir "/tmp/sqoop/"
}
truncate_data(){
# sqoop truncate pg分区表
echo "准备执行truncate语句,清除表:${truncate_table}"
get_db_info
/opt/module/sqoop-1.4.6/bin/sqoop eval -D mapred.job.queue.name=${queue} -D mapred.task.timeout=0 \
--connect jdbc:$db_url://$host:$port/$db \
--username $username \
--password $password \
--query "truncate table ${truncate_table};"
}
query_data(){
# sqoop 简易query执行
echo "准备执行的query语句:${query}"
get_db_info
/opt/module/sqoop-1.4.6/bin/sqoop eval -D mapred.job.queue.name=${queue} -D mapred.task.timeout=0 \
--connect jdbc:$db_url://$host:$port/$db \
--username $username \
--password $password \
--query "${query}"
}