#! /bin/env bash db_type=mysql maps=10 queue=default db=selection get_db_info(){ # 动态获取数据库账号密码 if [ $db_type == mysql ] then db_url=mysql host=rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com port=3306 username=adv_yswg password=HCL1zcUgQesaaXNLbL37O5KhpSAy0c elif [ $db_type == postgresql ] then db_url=postgresql host=192.168.10.216 port=5432 username=postgres password=fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS elif [ $db_type == pgsqltest ] then db_url=postgresql host=192.168.10.217 port=5433 username=postgres password=fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS elif [ $db_type == pg_cluster ] then db_url=postgresql host=192.168.10.221 port=6432 username=postgres password=fazAqRRVV9vDmwDNRNb593ht5TxYVrfTyHJSJ3BS else echo "获取数据库账号密码错误" fi echo "host":${host}, "port":${port}, "username":${username}, "password":${password} echo jdbc:$db_url://$host:$port/$db } import_data(){ # mysql或者pg导入hive get_db_info # 删除hdfs对应的文件路径 hdfs dfs -rm -r ${hdfs_path} # 导入数据 /opt/module/sqoop-1.4.6/bin/sqoop import -D mapred.job.queue.name=${queue} -D mapred.task.timeout=0 --append \ --connect jdbc:$db_url://$host:$port/$db \ --username $username \ --password $password \ --target-dir ${hdfs_path} \ --query "${query}" \ --fields-terminated-by '\t' \ --hive-drop-import-delims \ --input-null-string '\\N' \ --input-null-non-string '\\N' \ --compress \ --compression-codec lzop \ --m 1 \ --outdir "/tmp/sqoop/" echo lzo文件加索引 /opt/module/hadoop/bin/hadoop jar \ /opt/module/hadoop/share/hadoop/common/hadoop-lzo-0.4.20.jar \ com.hadoop.compression.lzo.DistributedLzoIndexer -Dmapreduce.job.queuename=${queue} \ ${hdfs_path} # 恢复表分区 /opt/module/hive/bin/hive -e "set hive.msck.path.validation=ignore; MSCK REPAIR TABLE big_data_selection.${hive_table};" } export_data(){ # hive导出mysql或者pg get_db_info /opt/module/sqoop-1.4.6/bin/sqoop export -D mapred.job.queue.name=${queue} -D mapred.task.timeout=0 \ --connect jdbc:$db_url://$host:$port/$db \ --username $username \ --password $password \ --table ${import_table} \ --input-fields-terminated-by '\001' \ --hcatalog-database big_data_selection \ --hcatalog-table ${hive_table} \ --hcatalog-partition-keys $p_keys \ --hcatalog-partition-values $p_values \ --input-null-string '\\N' \ --input-null-non-string '\\N' \ --num-mappers ${maps} \ --columns ${cols} \ --outdir "/tmp/sqoop/" } truncate_data(){ # sqoop truncate pg分区表 echo "准备执行truncate语句,清除表:${truncate_table}" get_db_info /opt/module/sqoop-1.4.6/bin/sqoop eval -D mapred.job.queue.name=${queue} -D mapred.task.timeout=0 \ --connect jdbc:$db_url://$host:$port/$db \ --username $username \ --password $password \ --query "truncate table ${truncate_table};" } query_data(){ # sqoop 简易query执行 echo "准备执行的query语句:${query}" get_db_info /opt/module/sqoop-1.4.6/bin/sqoop eval -D mapred.job.queue.name=${queue} -D mapred.task.timeout=0 \ --connect jdbc:$db_url://$host:$port/$db \ --username $username \ --password $password \ --query "${query}" }