七、Sqoop实战
1 Mysql数据导入HDFS上.
1. 全量导入:
将mysql表中全部数据都导入HDFS,如果HDFS中存在这个目录的话就会报错,默认存储的HDFS目录是 /user/root/XXX.
bin/sqoop import (在sqoop的安装目录内,import表名是导入)
--connect jdbc:mysql://192.168.52.130:3306/userdb (连接:协议:数据库类型://ip地址:端口号/数据库)
--username root (用户名 root)
--password 123456 (密码 123456)
--table emp (表 emp)
--m 1 (--num-mappers:使用几个mapper,写1就可以)
若要导入到HDFS指定目录下,并指定字段之间的分隔符:
使用参数 --target-dir 来指定导出目的地,
使用参数 --delete-target-dir 来判断导出目录是否存在,如果存在就删掉.
使用参数 --fields-terminated-by '\t' 使用''\t''来切割字段,sqoop默认是使用','逗号进行分割的.
bin/sqoop import (在sqoop的安装目录内,import表名是导入)
--connect jdbc:mysql://192.168.52.130:3306/userdb (连接:协议:数据库类型://ip地址:端口号/数据库)
--username root (用户名 root)
--password 123456 (密码 123456)
--table emp (表 emp)
--delete-target-dir (如果指定目录存在就删除它)
--target-dir /sqoop/emp (导入到指定目录)
--fields-terminated-by '\t' (指定字段分割符为\t)
--m 1 (--num-mappers:使用几个mapper,写1就可以)
2.增量导入:
将数据库中某一字段,增加的导入,在HDFS上单独形成一个文件.
注意:增量导入的时候,一定不能加参数--delete-target-dir否则会报错
bin/sqoop import --connect jdbc:mysql://192.168.52.130:3306/myhive --username root --password 123456 --table emp --incremental append (表明增量导入) --check-column id (检查哪个字段,这里检查的是mysql数据库表中的id字段) --last-value 4 (id字段最后一个id是4,那增量导入的话就是从id=5开始往后导入) --m 1
3.减量导入:
设置where条件,通过条件可以判断减少的数据或增加的数据,控制更加灵活一些,例如可以通过表创建时间来判断数据是哪一天生成的等,每个表均设置3个字段,create_time(表创建时间),update_time(表更新时间),is_delete(是否删除)
注意:where条件的地方需要使用“”双引号引起来,否则where条件失效
bin/sqoop import \ --connect jdbc:mysql://192.168.52.130:3306/userdb \ --username root \ --password admin \ --table emp \ --incremental append \ --where "create_time > '2019-02-14 00:00:00' and is_delete='1' and create_time < '2019-02-14 23:59:59'" \ --target-dir /sqoop/incement \ --check-column id \ --m 1
4.SQL语句查找导入HDFS
我们还可以通过 –query参数来指定我们的sql语句,通过sql语句来过滤我们的数据进行导入
bin/sqoop import --connect jdbc:mysql://192.168.52.130:3306/userdb --username root --password 123456 --delete-target-dir --query 'select phno from emp_conn where 1=1 and $CONDITIONS' --target-dir /sqoop/emp_conn --m 1
注意事项:
使用sql语句来进行查找是不能加参数--table
并且必须要添加where条件,
并且where条件后面必须带一个$CONDITIONS 这个字符串,
并且这个sql语句必须用单引号,不能用双引号.
2. Mysql数据导入Hive上.
1.将我们mysql表当中的数据直接导入到hive表中的话,需要将hive的一个叫做hive-exec-1.1.0-cdh5.14.0.jar的jar包拷贝到sqoop的lib目录下
cp /export/servers/hive-1.1.0-cdh5.14.0/lib/hive-exec-1.1.0-cdh5.14.0.jar /export/servers/sqoop-1.4.6-cdh5.14.0/lib/
2.将我们mysql当中的数据导入到hive表当中来,需要准备hive数据库与表
hive (default)> create database sqooptohive; hive (default)> use sqooptohive; hive (sqooptohive)> create external table emp_hive(id int,name string,deg string,salary int ,dept string) row format delimited fields terminated by '\t';
3.导入语句
bin/sqoop import --connect jdbc:mysql://192.168.52.130:3306/userdb --username root --password 123456 --table emp --fields-terminated-by '\t' (字段之间的分隔符) --hive-import (将数据从mysql数据库中导入到hive表中) --hive-table qooptohive.emp_hive (后面接要创建的hive表,数据库中的某个表,使用"."连接) --hive-overwrite (覆盖掉在hive表中已经存在的数据) --delete-target-dir --m 1
注意:我们还可以导入关系表到hive并自动创建hive表,导入
bin/sqoop import --connect jdbc:mysql://192.168.52.130:3306/userdb --username root --password 123456 --table emp_conn --hive-import --hive-database sqooptohive (--hive-database 后面直接接数据库名) --m 1
3.Sqoop的数据导出
将数据从HDFS把文件导出到mysql数据库,导出前,目标表必须存在于目标数据库中。
数据是在HDFS当中的如下目录/sqoop/emp,数据内容如下
1201,gopal,manager,50000,TP,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1 1202,manisha,Proof reader,50000,TP,2018-06-15 18:54:32.0,2018-06-17 20:26:08.0,1 1203,khalil,php dev,30000,AC,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1 1204,prasanth,php dev,30000,AC,2018-06-17 18:54:32.0,2018-06-17 21:05:52.0,0 1205,kranthi,admin,20000,TP,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1
1.创建mysql表
CREATE TABLE `emp_out` ( `id` INT(11) DEFAULT NULL, `name` VARCHAR(100) DEFAULT NULL, `deg` VARCHAR(100) DEFAULT NULL, `salary` INT(11) DEFAULT NULL, `dept` VARCHAR(10) DEFAULT NULL, `create_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, `update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `is_delete` BIGINT(20) DEFAULT '1' ) ENGINE=INNODB DEFAULT CHARSET=utf8;
2.执行导出命令:通过export来实现数据的导出,将hdfs的数据导出到mysql当中去
bin/sqoop export --connect jdbc:mysql://192.168.52.130:3306/userdb --username root --password 123456 --table emp_out --export-dir /sqoop/emp --input-fields-terminated-by ","
3.验证mysql表数据
八、Hbase--分布式列存储NOSQL数据库
1、Hbase数据存储在hdfs,少量存内存
2、hbase适合海量稀疏数据存储
hbase属于nosql数据库,列存储
3、与传统关系型数据库对比:
行存储:传统关系型数据mysql、oracle
优点:保证数据完整性,写入检查
缺点:读的过程会产生冗余信息
列存储:Nosql数据库
优点:读的过程不会产生冗余
缺点:写入效率差,不保证完整性
4、Hbase优点:
(1)存储海量数据
(2)快速随机访问
(3)进行大量的改写操作
Hbase的优点及应用场景:
半结构化或非结构化数据:
对于数据结构字段不够确定或杂乱无章非常难按一个概念去进行抽取的数据适合用HBase,因为HBase支持动态添加列。
记录很稀疏:
RDBMS的行有多少列是固定的。为null的列浪费了存储空间。HBase为null的Column不会被存储,这样既节省了空间又提高了读性能。
多版本号数据:
依据Row key和Column key定位到的Value能够有随意数量的版本号值,因此对于须要存储变动历史记录的数据,用HBase是很方便的。比方某个用户的Address变更,用户的Address变更记录也许也是具有研究意义的。
仅要求最终一致性:
对于数据存储事务的要求不像金融行业和财务系统这么高,只要保证最终一致性就行。(比如HBase+elasticsearch时,可能出现数据不一致)
高可用和海量数据以及很大的瞬间写入量:
WAL解决高可用,支持PB级数据,put性能高
适用于插入比查询操作更频繁的情况。比如,对于历史记录表和日志文件。(HBase的写操作更加高效)
业务场景简单:
不需要太多的关系型数据库特性,列入交叉列,交叉表,事务,连接等。
Hbase的缺点:
单一RowKey固有的局限性决定了它不可能有效地支持多条件查询[2]
不适合于大范围扫描查询
不直接支持 SQL 的语句查询
5、Hbase结构:rowkey -> Column Family -> Column Qualifer列族具体列
rowkey行键
table的主键,table中的记录按照rowkey 的字典序进行排序
Column Family列族
hbase表中的每个列,都归属与某个列族。列族是表的schema的一部分(而列不是),必须在使用表之前定义。
Timestamp时间戳
每次数据操作对应的时间戳,可以看作是数据的version number版本号
Column列
列族下面的具体列
属于某一个ColumnFamily,类似于我们mysql当中创建的具体的列
cell单元格
由{row key, column( = + ), version} 唯一确定的单元
cell中的数据是没有类型的,全部是以字节数组进行存储
6、Hbase逻辑模型:三维有序
Rowkey -> Column Family -> Column Qualifier -> Timestamp
rowkey行(正序, 从小到大)、column列(正序从小到大)、timestamp时间(倒叙从大到小)
面试点:为什么说hbase表的列族不宜超过3个?
a、列族数量决定store, 一个store至少有一个memstore,而memstore占内存
b、如果列族越多的话,造成更多的flush会产生更多IO
flush的最小单位是region, 一个region中的某个列族做flush , 其余的列族也会做flush
频繁的flush产生更多的storeFile,storeFile增多就会产生更多compaction操作
compaction操作和flush都是重IO操作
c、列族过多,split操作会出现数据不均匀的情况
散列原则:
前提:服务器的配置不是很好并且对查询速度要求不是很高
rowkey设计为:random+时间
目的:防止某一个或某几个regionserver成为热点
有序原则:
前提:服务器本身的配置要高一些, 会出现一个或是多个region热点效应
rowkey设计为:时间+random
Hbase shell 基础
list_namespace 查看所有数据,类似于show database;
scan 'hbase:meta' 查看元数据信息
--创建表 'cf1','cf2' 表示列族
create 'badou_20_a','cf1','cf2'
-- 查看表的结构
describe 'badou_20_a'
-- 删除cf1列族
alter 'badou_20_a',{NAME=>'cf1',METHOD=>'delete'}
-- 查看存在哪些表
list
exists 'badou_20_a'
-- 保留两个版本的数据, IN_MEMORY数据保存到内存中
alter 'badou_20_a',{NAME=>'cf2',VERSIONS=>2,IN_MEMORY=>true}
-- 删除表
disable 'badou_20_a' : 将表转换为去激活的状态
drop 'badou_20_a' : 删除表
-- 激活表
enable 'badou_20_a'
-- 插入记录
put 'badou_20','1003','cf2:name','root'
put 'badou_20','1004','cf2:name','scott'
-- 获取记录
scan 'badou_20' 注意 hbase表的数据量特别大的时候, scan 慎用
-- 根据rowkey 查询
get 'badou_20','1001'
-- 根据列族获取
get 'badou_20','1001',{COLUMN=>'cf2:name'}
-- 根据列族和指定的时间戳进行获取
get 'badou_20','1001',{COLUMN=>'cf2:name',TIMESTAMP=>1615465406738}
-- 查询表的记录
count 'badou_20'-- 强制刷出内存的数据到HDFS
flush 'badou_20'
-- 清除表的数据,保留表的结构
truncate 'order'
9、hbase shell 进阶
-- 修改 badou_20 版本为2
put 'badou_20','1001','cf2:name','max' put 'badou_20','1001','cf2:name','avg' alter 'badou_20',{NAME=>'cf2',VERSIONS=>2}
如何显示两个版本?
scan 'badou_20',{VERSIONS=>2} get 'badou_20','1001',{COLUMN=>'cf2:name',VERSIONS=>2} get 'badou_20','1001',{COLUMN=>'cf2',VERSIONS=>2}
-- 修改表的版本
alter 'badou_20',{NAME=>'cf2',VERSIONS=>3} alter 'badou_20',{NAME=>'cf2',VERSIONS=>4}
-- TTL 按照规定的时间对数据进行超时间设置
TTL => 'FOREVER' 表示数据永不过期 TTL => '60 SECONDS 表示一分钟之前的数据会过期 create 'tt_table',{NAME=>'cf1',TTL=>60} 1616311193758 put 'tt_table','rowkey001','cf1:age','30',1616311993900
九、Flume实战
1、采集目录到HDFS
采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去
根据需求,首先定义以下3大要素
采集源,即source——监控文件目录 : spooldir
下沉目标,即sink——HDFS文件系统 : hdfs sink
source和sink之间的传递通道——channel,可用file channel 也可以用内存channel
配置文件编写:
#定义三大组件的名称 agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # 配置source组件 agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /home/hadoop/logs/ agent1.sources.source1.fileHeader = false #配置拦截器 agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname # 配置sink组件 agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 #agent1.sinks.sink1.hdfs.round = true #agent1.sinks.sink1.hdfs.roundValue = 10 #agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1
Channel参数解释:
capacity:默认该通道中最大的可以存储的event数量
trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量
keep-alive:event添加到通道中或者移出的允许时间
2、采集文件到HDFS
采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs
根据需求,首先定义以下3大要素
采集源,即source——监控文件内容更新 : exec ‘tail -F file’
下沉目标,即sink——HDFS文件系统 : hdfs sink
Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel
配置文件编写:
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1 # Describe/configure tail -F source1 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log agent1.sources.source1.channels = channel1 #configure host for source agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname # Describe sink1 agent1.sinks.sink1.type = hdfs #a1.sinks.k1.channel = c1 agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 agent1.sinks.sink1.hdfs.round = true agent1.sinks.sink1.hdfs.roundValue = 10 agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true # Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600 # Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 更多source和sink组件:
Flume支持众多的source和sink类型,详细手册可参考官方文档
Flume 1.9.0 User Guide — Apache Flume
十、Kafka介绍
1.Apache Kafka简介
Kakfa最初由Linkedin公司开发,使用 Scala 编写,拥有高吞吐、可持久化、可水平扩展的基于发布/订阅模式的分布式消息队列,支持分区策略、多副本策略,基于zookeeper协调的分布式消息系统,主要应用于大数据的实时或离线数据处理、日志收集以及实时指标监控等领域。
2.Kafka常用术语
消息:message,消息是kafka的基本数据单元,代表着一条一条的数据,为了提高网络和存储的利用率,生产者会批量发送消息到Kafka,并在发送之前对消息进行压缩。
主题:topic,主题是kafka对消息的分类,是一个逻辑概念,可以看作消息集合,用于接收不同业务的消息。
分区:partition,类似数据库的分区表,通常topic下会多个分区,每个分区内的数据是有序的,同一个topic的多个分区kafka不保证消息的顺序性,一个分区在逻辑上对应一个Log,对应磁盘上的一个文件夹。
偏移量:offset,偏移量是表示消息在分区中的位置,kafka存储的文件是按照offset.log的格式来命名的,便于快速查找。
副本:replicas,副本是针对分区而言的,kafka对消息做了冗余备份,目的就是为了容灾,每个分区可以指定多个副本来冗余数据,分为leader partition和follower partition,只有leader partition对外提供服务,follower partition单纯是从leader partition同步数据,因此会存在多份相同的数据。
生产者:producer,生产者是kafka集群的上游,顾名思义就是往kafka输入数据的客户端。
消费者:comsumer,消费者是kafka集群的下游,与生产者相辅相成,kafka类似一个仓库,生产者负责生产消息往仓库放,自然得有消费者从仓库里拿消息,不然仓库容易爆满。
消费者组:Comsumer Group,简称CG,这个比较容易理解,就是将多个消费者捆绑起来,组团消费消息,一个Consumer只能属于一个Consumer Group,Kafka还通过Consumer Group实现了消费者的水平扩展和故障转移。
节点:broker,一个broker就是一个kafka server实例,多个broker组成kafka集群,主要用于接收生产者发送过来的消息,写入磁盘,同时可以接收消费者和其他broker的请求。
重新负载均衡:rebalance,当消费者组的消费者实例出现变化时,例如新增消费者或者减少消费者,都会触发kafka的Rebalance机制,这个过程比较耗性能,要尽量避免这个过程被触发。
Kafka架构
我们把架构分主从架构和对等架构,主从架构就是分为管理节点和工作节点,职责不同,如HDFS 、spark、flink;对等架构则不区分节点属性,所有的实例职责都是一样的,kafka的架构有点类似于对等架构,但又不完全是。Kafka的设计理念之一就是同时提供离线处理和实时处理。
节点:broker,一个broker就是一个kafka server实例,多个broker组成kafka集群,主要用于接收生产者发送过来的消息,写入磁盘,同时可以接收消费者和其他broker的请求。
重新负载均衡:rebalance,当消费者组的消费者实例出现变化时,例如新增消费者或者减少消费者,都会触发kafka的Rebalance机制,这个过程比较耗性能,要尽量避免这个过程被触发。
Kafka架构
我们把架构分主从架构和对等架构,主从架构就是分为管理节点和工作节点,职责不同,如HDFS 、spark、flink;对等架构则不区分节点属性,所有的实例职责都是一样的,kafka的架构有点类似于对等架构,但又不完全是。Kafka的设计理念之一就是同时提供离线处理和实时处理。
其中leader partition和 follower partition的工作原理如下,正常情况下,只有leader partition对外提供服务,follower partition负责从leader partition拉取数据,当leader发送故障时,follower拥有被选举为新leader的权利。
3.Kafka的优势
支持数据离线和实时处理
能保证消息的可靠性传递
支持消息的持久化存储,并通过多副本分布式的存储方案来保证消息的容错
高吞吐率,每秒处理百万级的消息量
高并发,支持数千个客户端同时读写
支持在线水平扩展
kafka为什么能实现读写都这么快呢?
答:离不开kafka顺序读写机制和零拷贝数据传输,减少了寻址的时间消耗,降低了读写延迟,同时有利于快到定位消息偏移量,零拷贝机制可以提高数据传输的效率,减少IO资源的占用。
欢迎分享,(联系QQ/微信:337407980)