调度工具(ETL任务流)
1.区别ETL作业调度工具和任务流调度工具
kettle是一个ETL工具,ETL(Extract-Transform-Load的缩写,即数据抽取、转换、装载的过程)。
kettle中文名称叫水壶,该项目的主程序员MATT 希望把各种数据放到一个壶里,然后以一种指定的格式流出。
所以他的重心是用于数据
oozie是一个工作流,Oozie工作流是放置在控制依赖DAG(有向无环图 Direct Acyclic Graph)中的一组动作(例如,Hadoop的Map/Reduce作业、Pig作业等),其中指定了动作执行的顺序。
oozie工作流中是有数据流动的,但是重心是在于工作流的定义。
二者虽然都有相关功能及数据的流动,但是其实用途是不一样的。 2.ETL作业调度工具2.1Sqoop调度工具2.1.1列举出所有数据库
查看帮助 bin/sqoop help
列举出所有linux上的数据库 bin/sqoop list-databases --connect jdbc:mysql://localhost:3306 --username root --password root
列举出所有Window上的数据库 bin/sqoop list-databases --connect jdbc:mysql://192.168.22.36:3306 --username root --password root
查看数据库下的所有表bin/sqoop list-tables --connect jdbc:mysql://localhost:3306/mysql --username root --password root2.12导入数据库表数据到HDFS
(1)确定mysql服务启动正常
查询控制端口和查询进程来确定,一下两种办法可以确认mysql是否在启动状态
办法1:查询端口 $ netstat -tulpn
MySQL监控的TCP的3306端口,如果显示3306,证明MySQL服务在运行中
办法二:查询进程
可以看见mysql的进程 ps -ef | grep mysqld
没有指定数据导入到哪个目录,默认是/user/root/表名 bin/sqoop import --connect jdbc:mysql://192.168.77.137/zhjy --password 123456 --username root --table zf_jygz_thjc --m 1 --fields-terminated-by " " 或是 bin/sqoop import --connect jdbc:mysql://192.168.77.137/zhjy --password 123456 --username root --table zf_jygz_thjc --m 5 --split-by ZF_BH(一般在设置-m>1时使用) --fields-terminated-by " "
原因:
如果表中有主键,m的值可以设置大于1的值;如果没有主键只能将m值设置成为1;或者要将m值大于1,需要使用--split-by指定一个字段
设置了-m 1 说明只有一个maptask执行数据导入,默认是4个maptask执行导入操作,但是必须指定一个列来作为划分依据
导入数据到指定目录
在导入表数据到HDFS使用Sqoop导入工具,我们可以指定目标目录。使用参数 --target-dir来指定导出目的地,使用参数—delete-target-dir来判断导出目录是否存在,如果存在就删掉 bin/sqoop import --connect jdbc:mysql://192.168.77.137/zhjy --username root --password 123456 --delete-target-dir --如果目录存在,将目录删除 --table zf_jygz_thjc --target-dir /user/zhjy --指定保存目录 --m 1 --fields-terminated-by " "
查询导入 bin/sqoop import --connect jdbc:mysql://192.168.72.133:3306/company --username root --password root --target-dir /user/company --delete-target-dir --num-mappers 1 --fields-terminated-by " " --query "select name,sex from staff where id <=1 and $CONDITIONS;"
提示:must contain "$CONDITIONS" in WHERE clause。
where id <=1 匹配条件
$CONDITIONS:传递作用。
如果 query 后使用的是双引号,则 $CONDITIONS前必须加转义符,防止 shell 识别为自己的变量。
--query时不能使用--table一起使用
需要指定--target-dir路径
导入到hdfs指定目录并指定要求 bin/sqoop import --connect jdbc:mysql://192.168.72.133:3306/company --username root --password root #提高数据库到hadoop的传输速度 --direct --table staff --delete-target-dir #导入指定列,涉及到多列,用逗号分隔 --column id,sex --target-dir /user/company --num-mappers 1 #指定分隔符 --fields-terminated-by " " #指定导出存储格式 --as-textfile #指定数据压缩(压缩,解压缩方式) --compress --compression-codec org.apache.hadoop.io.compress.SnappyCodec
数据导出储存方式(数据存储文件格式---( textfil parquet)--as-textfileImports data as plain text (default)--as-parquetfile Imports data to Parquet Files)
导入表数据子集到HDFS bin/sqoop import --connect jdbc:mysql://172.16.43.67:3306/userdb --username root --password root --table emp_add --target-dir /sqoop/emp_add -m 1 --delete-target-dir --where "city = "sec-bad""
sqoop导入blob数据到hive
对于CLOB,如xml文本,sqoop可以迁移到Hive表,对应字段存储为字符类型。
对于BLOB,如jpg图片,sqoop无法直接迁移到Hive表,只能先迁移到HDFS路径,然后再使用Hive命令加载到Hive表。迁移到HDFS后BLOB字段存储为16进制形式。 bin/sqoop-import --connect jdbc:mysql://192.168.77.137:3306/zhjy --username root --password 123456 --table ceshi --columns "id,name,photo" --split-by id -m 4 --inline-lob-limit=16777126 设置内联的LOB对象的大小 --target-dir /user/hive/warehouse/ods.db/ceshi
2.1.3导入关系表到Hive
第一步:导入需要的jar包
将我们mysql表当中的数据直接导入到hive表中的话,我们需要将hive的一个叫做hive-exec-1.1.0-cdh5.14.0.jar的jar包拷贝到sqoop的lib目录下 cp /export/servers/hive-1.1.0-cdh5.14.0/lib/hive-exec-1.1.0-cdh5.14.0.jar /export/servers/sqoop-1.4.6-cdh5.14.0/lib/
第二步:开始导入 day=`date -d "yesterday" +%Y%m%d` sqoop import --导入数据 --connect jdbc:mysql://10.2.111.87:3306/ehl_apmp --连接url --username root --用户名 --password root --密码 --table zf_jygz_thjc --要导入的表 -m 1 --maptask --hive-drop-import-delims --导入时删除数据库中特殊字符 --hive-overwrite --覆盖导入 --hive-import --导入到hive表中 --hive-database ods --导入到hive中哪个数据库 --hive-table ods_zf_jygz_thjc --导入到hive中哪个表 --fields-terminated-by " " --字段分隔符 --lines-terminated-by " " --指定行分隔符 --null-string "N" --字符串类型为null是代替字符 --null-non-string "N" --字非符串类型为null是的代替字符 --hive-partition-key day --hive表的分区字段 --hive-partition-value "$day" --指定导入表的分区值
导入关系表到hive并自动创建hive表
们也可以通过命令来将我们的mysql的表直接导入到hive表当中去 sqoop import --connect jdbc:mysql://10.2.111.87:3306/ehl_apmp --username root --password root --table $1 --hive-import --hive-database ods --create-hive-table --fields-terminated-by " " --null-string "N" --null-non-string "N" --split-by code -m 4
通过这个命令,我们可以直接将我们mysql表当中的数据以及表结构一起倒入到hive当中去 2.1.4增量导入
--incremental 增量模式。
append id 是获取一个某一列的某个值。
lastmodified "2016-12-15 15:47:35" 获取某个时间后修改的所有数据
-append 附加模式
-merge-key id 合并模式
--check-column 用来指定一些列,可以去指定多个列;通常的是指定主键id
--last -value 从哪个值开始增量
==注意:增量导入的时候,一定不能加参数--delete-target-dir 否则会报错==
第一种增量导入方式(不常用)
1.Append方式
使用场景:有个订单表,里面每个订单有一个唯一标识的自增列id,在关系型数据库中以主键的形式存在。之前已经将id在0-1000之间的编号的订单导入到HDFS 中;如果在产生新的订单,此时我们只需指定incremental参数为append,--last-value参数为1000即可,表示只从id大于1000后开始导入。
(1)创建一个MySQL表 CREATE TABLE orders( o_id INT PRIMARY KEY AUTO_INCREMENT, o_name VARCHAR(255), o_price INT ); INSERT INTO orders(o_id,o_name,o_price) VALUES(1,"联想",5000); INSERT INTO orders(o_id,o_name,o_price) VALUES(2,"海尔",3000); INSERT INTO orders(o_id,o_name,o_price) VALUES(3,"雷神",5000); INSERT INTO orders(o_id,o_name,o_price) VALUES(4,"JACK JONES",800); INSERT INTO orders(o_id,o_name,o_price) VALUES(5,"真维斯",200);
(2)创建一个hive表(表结构与mysql一致) bin/sqoop import --connect jdbc:mysql://192.168.22.30:3306/userdb --username root --password root --table emp --target-dir /sqoop/increment --num-mappers 1 --incremental append --check-column id --last-value 1202
注意:
append 模式不支持写入到hive表中
2.lastModify方式
此方式要求原有表有time字段,它能指定一个时间戳,让sqoop把该时间戳之后的数据导入到HDFS;因为后续订单可能状体会变化,变化后time字段时间戳也会变化,此时sqoop依然会将相同状态更改后的订单导入HDFS,当然我们可以只当merge-key参数为order-id,表示将后续新的记录和原有记录合并。
# 将时间列大于等于阈值的数据增量导入HDFS sqoop import --connect jdbc:mysql://192.168.xxx.xxx:3316/testdb --username root --password transwarp --query "select order_id, name from order_table where $CONDITIONS" --target-dir /user/root/order_all --split-by id -m 4 --incremental lastmodified --merge-key order_id --check-column time # remember this date !!! --last-value "2014-11-09 21:00:00"
使用 lastmodified 方式导入数据,要指定增量数据是要 --append(追加)还是要 --merge-key(合并)last-value 指定的值是会包含于增量导入的数据中。
第二种增量导入方式(推荐)
==通过where条件选取数据更加精准== yesterday=`date -d "yesterday" +%Y_%m_%d` where="update_time >= "${yesterday}"" day=`date -d "yesterday" +%Y-%m-%d` sqoop import --导入数据 --connect jdbc:mysql://10.2.111.87:3306/ehl_apmp --连接url --username root --用户名 --password root --密码 --table zf_jygz_thjc --要导入的表 -m 1 --maptask--hive-drop-import-delims --导入时删除数据库中特殊字符 --hive-overwrite --覆盖导入 --hive-import --导入到hive表中 --hive-database ods --导入到hive中哪个数据库 --hive-table ods_zf_jygz_thjc --导入到hive中哪个表 --fields-terminated-by " " --字段分隔符 --lines-terminated-by " " --指定行分隔符 --columns "zf_bh,zf_xm" --导入的字段(可选) --where "${where}" --条件导入 --null-string "N" --字符串类型为null是代替字符 --null-non-string "N" --字非符串类型为null是的代替字符 --hive-partition-key day --hive表的分区字段 --hive-partition-value "$day" --指定导入表的分区值
2.1.5从RDBMS到HBase bin/sqoop import --connect jdbc:mysql://192.168.22.30:3306/userdb --username root --password root --table emp --columns "id,name,sex" --column-family "info" --hbase-create-table --hbase-row-key "id" --hbase-table "hbase_test" --split-by id --num-mappers 1
会报错
原因:sqoop1.4.6 只支持 HBase1.0.1 之前的版本的自动创建 HBase 表的功能。
解决方案:手动创建 HBase 表 hbase> create "hbase_staff","info"2.1.6从HDFS到RDBMS
导出前,目标表必须存在与目标数据库中
默认操作是将文件中的数据使用insert语句插入到表中
数据是在HDFS当中的如下目录/sqoop/emp,数据内容如下 1201,gopal,manager,50000,TP,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1 1202,manisha,Proof reader,50000,TP,2018-06-15 18:54:32.0,2018-06-17 20:26:08.0,1 1203,khalil,php dev,30000,AC,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1 1204,prasanth,php dev,30000,AC,2018-06-17 18:54:32.0,2018-06-17 21:05:52.0,0 1205,kranthi,admin,20000,TP,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1
第一步:创建MySQL表 CREATE TABLE `emp_out` ( `id` INT(11) DEFAULT NULL, `name` VARCHAR(100) DEFAULT NULL, `deg` VARCHAR(100) DEFAULT NULL, `salary` INT(11) DEFAULT NULL, `dept` VARCHAR(10) DEFAULT NULL, `create_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `is_delete` BIGINT(20) DEFAULT "1" ) ENGINE=INNODB DEFAULT CHARSET=utf8;
第二步:执行导出命令
通过export来实现数据的导出,将hdfs的数据导出到mysql当中去
全量导出 bin/sqoop export --connect jdbc:mysql://172.16.43.67:3306/userdb --username root --password admin --table emp_out --export-dir /sqoop/emp --columns id,name (当文件数据与表结构一致时,可以不指定) --input-fields-terminated-by ","
增量导出 bin/sqoop export --connect jdbc:mysql://192.168.77.137:3306/zhjy --username root --password 123456 --table emp_out --update-key id --update-mode allowinsert (新增的数据被导出) --export-dir "/user/hive/warehouse/ods_ceshi/part-m-00000" --input-null-string "N" --input-null-non-string "N" --input-fields-terminated-by "," -m 1
更新导出 bin/sqoop export --connect jdbc:mysql://192.168.77.137:3306/zhjy --username root --password 123456 --table emp_out --update-key id --update-mode updateonly (只能导出修改后的数据,不能导出新增的数据) --export-dir "/user/hive/warehouse/ods_ceshi/part-m-00000" --input-null-string "N" --input-null-non-string "N" --input-fields-terminated-by "," -m 1
总结:
参数介绍
--update-key 后面也可以接多个关键字列名,可以使用逗号隔开,Sqoop将会匹配多个关键字后再执行更新操作。
--export-dir 参数配合--table或者--call参数使用,指定了HDFS上需要将数据导入到MySQL中的文件集目录。
--update-mode updateonly和allowinsert。 默认模式为updateonly,如果指定--update-mode模式为allowinsert,可以将目标数据库中原来不存在的数据也导入到数据库表中。即将存在的数据更新,不存在数据插入。
组合测试及说明
1、当指定update-key,且关系型数据库表存在主键时:
A、allowinsert模式时,为更新目标数据库表存的内容,并且原来不存在的数据也导入到数据库表;
B、updateonly模式时,为更新目标数据库表存的内容,并且原来不存在的数据也不导入到数据库表;
2、当指定update-key,且关系型数据库表不存在主键时:
A、allowinsert模式时,为全部数据追加导入到数据库表;
B、updateonly模式时,为更新目标数据库表存的内容,并且原来不存在的数据也不导入到数据库表;
3、当不指定update-key,且关系型数据库表存在主键时:
A、allowinsert模式时,报主键冲突,数据无变化;
B、updateonly模式时,报主键冲突,数据无变化;
4、当不指定update-key,且关系型数据库表不存在主键时:
A、allowinsert模式时,为全部数据追加导入到数据库表;
B、updateonly模式时,为全部数据追加导入到数据库表;
实际案例:
(1)mysql批量导入hive #!/bin/bash source /etc/profile num=0 list="table1 table2 table3" for i in $list; do echo "$sum" echo "$i" echo "sqoop开始批量导入......" sqoop import --connect jdbc:mysql://localhost:3306/test --username root --password 123456 --table person --hive-table db.$i --delete-target-dir --hive-overwrite --hive-import & num=$(expr $num + 1) if [$sum -gt 4 ]; then { echo "等待批量任务完成" wait echo "开始下一批导入" num = 0 } fi done echo "等待最后一批任务完成" wait echo "全部导入完成"
使用shell脚本: #!/bin/sh export SQOOP_HOME=/usr/share/sqoop-1.4.4 hostname="192.168.1.199" user="root" password="root" database="test" table="tags" curr_max=0 function db_to_hive(){ ${SQOOP_HOME}/bin/sqoop import --connect jdbc:mysql://${hostname}/${database} --username ${user} --password ${password} --table ${table} --split-by docid --hive-import --hive-table lan.ding --fields-terminated-by " " --incremental append --check-column docid --last-value ${curr_max} result=`mysql -h${hostname} -u${user} -p${password} ${database}<