templates.sh 3.25 KB
#! /bin/env bash

db_type=mysql
maps=10
queue=default
db=selection

get_db_info(){
	# 动态获取数据库账号密码
	if [ $db_type == mysql ]
	then
		db_url=mysql
		host=rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com
		port=3306
		username=adv_yswg
		password=HCL1zcUgQesaaXNLbL37O5KhpSAy0c
	elif [ $db_type == postgresql ]
	then
		db_url=postgresql
		host=192.168.10.216
		port=5432
		username=postgres
		password=fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS
	elif [ $db_type == pgsqltest ]
	then	
		db_url=postgresql
		host=192.168.10.217
		port=5433
		username=postgres
		password=fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS
	elif [ $db_type == pg_cluster ]
        then
                db_url=postgresql
                host=192.168.10.221
                port=6432
                username=postgres
                password=fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS
	else
		echo "获取数据库账号密码错误"
	fi
	echo "host":${host}, "port":${port}, "username":${username}, "password":${password}
	echo jdbc:$db_url://$host:$port/$db

}

import_data(){
	# mysql或者pg导入hive
	get_db_info
	# 删除hdfs对应的文件路径
	hdfs dfs -rm -r ${hdfs_path}
	# 导入数据
	/opt/module/sqoop-1.4.6/bin/sqoop import -D mapred.job.queue.name=${queue} -D mapred.task.timeout=0 --append \
	--connect jdbc:$db_url://$host:$port/$db \
	--username $username \
	--password $password \
	--target-dir ${hdfs_path} \
	--query "${query}" \
	--fields-terminated-by '\t' \
	--hive-drop-import-delims \
	--input-null-string '\\N' \
	--input-null-non-string '\\N' \
	--compress \
	--compression-codec lzop \
	--m 1 \
	--outdir "/tmp/sqoop/"
	echo lzo文件加索引
	/opt/module/hadoop/bin/hadoop jar \
		/opt/module/hadoop/share/hadoop/common/hadoop-lzo-0.4.20.jar \
		com.hadoop.compression.lzo.DistributedLzoIndexer -Dmapreduce.job.queuename=${queue} \
		${hdfs_path}
	# 恢复表分区
	/opt/module/hive/bin/hive -e "set hive.msck.path.validation=ignore; MSCK REPAIR TABLE big_data_selection.${hive_table};"
}

export_data(){
	# hive导出mysql或者pg
	get_db_info
	/opt/module/sqoop-1.4.6/bin/sqoop export -D mapred.job.queue.name=${queue} -D mapred.task.timeout=0 \
	--connect jdbc:$db_url://$host:$port/$db \
	--username $username \
	--password $password \
	--table ${import_table} \
	--input-fields-terminated-by '\001' \
	--hcatalog-database big_data_selection \
	--hcatalog-table ${hive_table} \
	--hcatalog-partition-keys $p_keys \
	--hcatalog-partition-values $p_values \
	--input-null-string '\\N' \
	--input-null-non-string '\\N' \
	--num-mappers ${maps} \
	--columns ${cols} \
	--outdir "/tmp/sqoop/"
}

truncate_data(){
	# sqoop truncate pg分区表
	echo "准备执行truncate语句,清除表:${truncate_table}"
	get_db_info
	/opt/module/sqoop-1.4.6/bin/sqoop eval -D mapred.job.queue.name=${queue} -D mapred.task.timeout=0 \
	--connect jdbc:$db_url://$host:$port/$db \
	--username $username \
	--password $password \
	--query "truncate table ${truncate_table};"
}

query_data(){
	# sqoop 简易query执行
	echo "准备执行的query语句:${query}"
	get_db_info
        /opt/module/sqoop-1.4.6/bin/sqoop eval -D mapred.job.queue.name=${queue} -D mapred.task.timeout=0 \
        --connect jdbc:$db_url://$host:$port/$db \
        --username $username \
        --password $password \
        --query "${query}"
}