大数据开发知识点总结


一、HDFS


1.基本操作:

查看所有命令 hadoop fs


1、查看所有目录及其文件 hadoop fs -ls /


2、hdfs文件系统创建目录 hadoop fs -mkdir /input(用于测试代码)


2.1、hdfs文件系统创建目录(批量)


hadoop fs -mkdir -p /inout/tmp 在input文件夹内创建tmp文件夹


3、hdfs文件系统创建文件 hadoop fs -touchz /a.txt


4、hdfs文件系统删除文件


hadoop fs -rmr /a.txt


hadoop fs -rmr -skipTrash /a.txt(跳过回收站彻底删除)


5.hdfs上传本地文件   (注意:必须先建好hdfs上目录再put)


hadoop fs -put  t.txt /test 将本地文件t.txt上传至hdfs上test文件夹内;


hadoop fs -put /a.txt


6.查看hdfs文件内容


hadoop fs -cat /a.txt


hadoop fs -tail /a.txt (从尾部开始看)


hadoop fs -text /a.txt (查看二进制数据)


7、hdfs下载文件


hadoop fs -get /a.txt .


注意最后有一点,这个.代表下载到本地命令行所在目录;


8、递归删除目录


hadoop fs -rmr /input/tmp


9、查看hdfs文件的大小


hadoop fs -du -h /b.txt


-du -s或者-du -h


10、查看hdfs文件行数


hadoop fs -cat /b.txt | wc -l


最后是字母l


cat或者text 都可以


实战:


查看集群ip情况 cat /etc/hosts


查看hadoop版本


echo $HADOOP_HOME/


which $HADOOP_HOME/


运行集群脚本


sh -x run.sh


运行run脚本,最好是 -X调试模式


2.HDFS的优缺点有哪些?

(1)HDFS的优点

高容错性


①:数据自动保存多个副本。它通过增加副本的形式,提高容错性


②:某一个副本丢失以后,它可以自动恢复


适合批处理即就近原则


①:移动计算而非非数据,数据位置暴露给计算机框架


②:本地化,数据不移动,代码(任务)移动。


适合处理大数据


①:数据规模:能够处理数据规模达到GB、TB、甚至PB级别的数据


②:文件规模:能够处理百万规模以上的文件数量,数量相当之大


可构建在廉价机器上,通过多副本机制,提高可靠性


(2)HDFS的缺点

不适合低延时数据访问,比如毫秒级的存储数据,是做不到的。寻址时间长,适合读取大文件,低延迟与高吞吐率。


不适合小文件存储


占用NameNode大量内存,寻找时间超过读取时间


不支持并发写入,文件随机修改


①:一个文件只能有一个写,不允许多个线程同时写


②:仅支持数据append(追加),不支持文件的修改


3.HDFS整体架构介绍


大数据开发知识点总结

1)Client:就是客户端。


(1)文件切分。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行存储;


(2)与NameNode交互,获取文件的位置信息;


(3)与DataNode交互,读取或者写入数据;


(4)Client提供一些命令来管理HDFS,比如启动或者关闭HDFS;


(5)Client可以通过一些命令来访问HDFS;


2)NameNode:就是Master,它是一个主管、管理者。


(1)管理HDFS的名称空间;namespace


(2)管理数据块(Block)映射信息;


(3)配置副本策略(默认);3


(4)处理客户端读写请求。


3) DataNode:就是Slave。NameNode下达命令,DataNode执行实际的操作。


(1)存储实际的数据块;


(2)执行数据块的读/写操作。


4) SecondaryNameNode:并非NameNode的热备。当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务。


(1)辅助NameNode,分担其工作量;


(2)定期合并Fsimage和Edits,并推送给NameNode;


(3)在紧急情况下,可辅助恢复NameNode。


DataNode的工作机制?

一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。


DataNode启动后向NameNode注册,通过后,周期性(1小时)的向NameNode上报所有的块信息。


心跳是每3秒一次,心跳返回结果带有NameNode给该DataNode的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。


什么是机架感知?什么时候会使用机架感知?

通俗的来说就是NN(NameNode)通过读取我们的配置来配置各个节点所在的机架信息,数据的流水线复制和HDFS复制副本时候。


4.HDFS数据写入(上传)流程是怎样的?

大数据开发知识点总结


一 HDFS读流程概括:


client跟namenode通信查询元数据,namenode通过查询元数据,找到文件块所在的datanode服务器


挑选一台datanode(就近原则,然后随机)服务器,请求建立socket流


datanode开始发送数据(从磁盘里面读取数据放入流,以packet为单位来做校验,大小为64k)


客户端以packet为单位接收,现在本地缓存,然后写入目标文件


详细流程


Client 发起文件上传请求,通过 RPC 与 NameNode 建立通讯,NameNode检查目标文件是否已存在,父目录是否存在,返回是否可以上传;


Client 请求第一个 block 该传输到哪些 DataNode 服务器上;


NameNode 根据配置文件中指定的备份数量及副本放置策略进行文件分配,返回可用的 DataNode 的地址,如A,B,C


Client 请求 3 台 DataNode 中的一台 A 上传数据(本质上是一个 RPC 调用,建立 pipeline),A 收到请求会继续调用 B,然后 B 调用 C,将整个pipeline 建立完成,后逐级返回 client;


Client 开始往 A 上传第一个 block(先从磁盘读取数据放到一个本地内存缓存),以 packet 为单位(默认 64K),A 收到一个 packet 就会传给 B,B 传给 C;A 每传一个 packet 会放入一个应答队列等待应答


数据被分割成一个个 packet 数据包在 pipeline 上依次传输,在pipeline 反方向上,逐个发送 ack(命令正确应答),最终由 pipeline中第一个 DataNode 节点 A 将 pipeline ack 发送给 client;


当一个 block 传输完成之后,client 再次请求 NameNode 上传第二个block 到服务器


5.HDFS数据读取(下载)流程是怎样的?

大数据开发知识点总结


HDFS写流程概括


客户端跟namenode通信请求上传文件,namenode检查目标文件是否已存在,父目录是否存在,用户是否有权限等


namenode返回是否可以上传


client请求第一个 block该传输到哪些datanode服务器上


namenode返回3个datanode服务器ABC


client请求3台dn中的一台A上传数据(本质上是一个RPC调用,建立pipeline),A收到请求会继续调用B,然后B调用C,将整个pipeline建立完成,逐级返回客户端


client开始往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,A收到一个packet就会传给B,B传给C;A每传一个packet会放入一个应答队列等待应答


当一个block传输完成之后,client再次请求namenode上传第二个block的服务器。


写流程详细:


Client 发起文件读取请求通过RPC与NameNode建立通讯,nameNode检查文件位置,来确定请求文件 block 所在的位置


NameNode会视情况返回文件的部分或者全部block列表,对于每个block,NameNode 都会返回含有该 block 副本的 DataNode 地址;


这些返回的 DN 地址,会按照集群拓扑结构得出 DataNode 与客户端的距离,然后进行排序,排序两个规则:网络拓扑结构中距离 Client 近的排靠前;心跳机制中超时汇报的 DN 状态为 STALE,这样的排靠后;


Client 选取排序靠前的 DataNode 来读取 block,如果客户端本身就是DataNode,那么将从本地直接获取数据


底层上本质是建立 Socket Stream(FSDataInputStream),重复的调用父类 DataInputStream 的 read 方法,直到这个块上的数据读取完毕;


当读完列表的 block 后,若文件读取还没有结束,客户端会继续向NameNode 获取下一批的 block 列表;


读取完一个 block 都会进行 checksum 验证,如果读取 DataNode 时出现错误,客户端会通知 NameNode,然后再从下一个拥有该 block 副本的DataNode 继续读。


Read 方法是并行的读取 block 信息,不是一块一块的读取;NameNode 只是返回Client请求包含块的DataNode地址,并不是返回请求块的数据;最终读取来所有的 block 会合并成一个完整的最终文件。


hdfs-site.xml的3个主要属性是?

dfs.name.dir→决定的是元数据存储的路径和DFS的存储方式(磁盘或远端)


dfs.data.dir→决定的是数据存储的路径


fs.checkpoint.dir→用于 SecondaryNameNode


NN和2NN工作机制

大数据开发知识点总结


1. 第一阶段:NameNode启动


第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。


客户端对元数据进行增删改的请求。


NameNode记录操作日志,更新滚动日志。


NameNode在内存中对数据进行增删改。


2. 第二阶段:Secondary NameNode工作


Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。


Secondary NameNode请求执行CheckPoint。


NameNode滚动正在写的Edits日志。


将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。


Secondary NameNode加载编辑日志和镜像文件到内存,并合并。


生成新的镜像文件fsimage.chkpoint。


拷贝fsimage.chkpoint到NameNode。


NameNode将fsimage.chkpoint重新命名成fsimage。


NN和2NN工作机制详解:


Fsimage:NameNode内存中元数据序列化后形成的文件。


Edits:记录客户端更新元数据信息的每一步操作(可通过Edits运算出元数据)。


NameNode启动时,先滚动Edits并生成一个空的edits.inprogress,然后加载Edits和Fsimage到内存中,此时NameNode内存就持有最新的元数据信息。Client开始对NameNode发送元数据的增删改的请求,这些请求的操作首先会被记录到edits.inprogress中(查询元数据的操作不会被记录在Edits中,因为查询操作不会更改元数据信息),如果此时NameNode挂掉,重启后会从Edits中读取元数据的信息。然后,NameNode会在内存中执行元数据的增删改的操作。


由于Edits中记录的操作会越来越多,Edits文件会越来越大,导致NameNode在启动加载Edits时会很慢,所以需要对Edits和Fsimage进行合并(所谓合并,就是将Edits和Fsimage加载到内存中,照着Edits中的操作一步步执行,最终形成新的Fsimage)。SecondaryNameNode的作用就是帮助NameNode进行Edits和Fsimage的合并工作。

SecondaryNameNode首先会询问NameNode是否需要CheckPoint(触发CheckPoint需要满足两个条件中的任意一个,定时时间到和Edits中数据写满了)。直接带回NameNode是否检查结果。SecondaryNameNode执行CheckPoint操作,首先会让NameNode滚动Edits并生成一个空的edits.inprogress,滚动Edits的目的是给Edits打个标记,以后所有新的操作都写入edits.inprogress,其他未合并的Edits和Fsimage会拷贝到SecondaryNameNode的本地,然后将拷贝的Edits和Fsimage加载到内存中进行合并,生成fsimage.chkpoint,然后将fsimage.chkpoint拷贝给NameNode,重命名为Fsimage后替换掉原来的Fsimage。NameNode在启动时就只需要加载之前未合并的Edits和Fsimage即可,因为合并过的Edits中的元数据信息已经被记录在Fsimage中。


二、Mapreduce


1.MapReduce工作流程

大数据开发知识点总结

大数据开发知识点总结

大数据开发知识点总结

Application运行流程:

mr程序最先启动MRAppMaster(AM),AM启动后根据本次application的信息,计算出需要的maptask实例数量,然后向RM申请机器启动相应数量的maptask进程,RM通过心跳感知目前集群container(容器)的工作繁忙情况,分配相应的container资源,相应containers的nodemanagr在各自节点上启动container。


2.Mapreduce具体流程总结:

1、inputformat:MR框架基础类之一,包含数据分割(Data Splits)和记录读取器(Record Reader)两部分。每个split包含后一个Block中开头部分的数据可以解决记录跨Block问题,每读取一条记录,调用一次map函数。


2、Map:每一个切片对应一个map,map输出的数据,放入环形溢写缓冲区,缓冲区默认100M,达到80M进行溢写,写入到本地文件。


3、Shuffle:shuffle是MapReduce计算框架的核心,包括了Partion, Sort, Spill, Meger, Combiner, Copy, Memery, Disk等分组动作;


3.1、partition对map的内容根据kv对进行分区


3.2、sort(快速排序),溢写到磁盘


3.3、数据合并combiner(①减少数据写入磁盘的数据量 ② 减少网络传输的数据量 , 数据压缩)


AM监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)。Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储。


3.4、fetch (通过RM,reduce找到指定的map主动fetch数据)


3.5、溢写,排序(归并排序)


3.6、merger(数据合并①减少数据量 ② 提高执行效率)


4、reduce(汇总,聚合的过程)


5、output(hdfs)


3.Shuffle过程中涉及两次排序:

1.快速排序:

sort阶段,环形缓冲区达到80%时,对数据进行快速排序,排序按照key的索引进行字典顺序排序,然后开始进行溢写,从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件 。


(1)算法步骤


1.从数列中挑出一个元素,称为 “基准”(pivot);


2.重新排序数列,所有元素比基准值小的摆放在基准前面,所有元素比基准值大的摆在基准的后面(相同的数可以到任一边)。


在这个分区退出之后,该基准就处于数列的中间位置。这个称为分区(partition)操作;


3. 递归地(recursive)把小于基准值元素的子数列和大于基准值元素的子数列排序;


2.归并排序

在小的文件merge成大文件时采用,归并排序在map端和reduce端都可能出现。


算法步骤:


1.申请空间,使其大小为两个已经排序序列之和,该空间用来存放合并后的序列;


2.设定两个指针,最初位置分别为两个已经排序序列的起始位置;


3.比较两个指针所指向的元素,选择相对小的元素放入到合并空间,并移动指针到下一位置;


4.重复步骤 3 直到某一指针达到序列尾;


5.将另一序列剩下的所有元素直接复制到合并序列尾。


MR流程中涉及的快排、归并排序,可参考以下文章

图文详解—十大经典排序算法_珞沫的博客-CSDN博客_排序算法


三、海量数据处理实例分析



1、海量日志数据,提取出某日访问百度次数最多的那个IP。

解决方案:首先是将这一天,并且是访问百度的日志中的IP取出来,逐个写入到一个大文件中。注意到IP是32位的,最多有个2^32个IP。同样可以采用映射的方法,比如模1000,把整个大文件映射为1000个小文件,再找出每个小文中出现频率最大的IP(可以采用hash_map进行频率统计,然后再找出频率最大的几个)及相应的频率。然后再在这1000个最大的IP中,找出那个频率最大的IP,即为所求。


2、有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16字节,内存限制大小是1M。返回频数最高的100个词。

解决方案:顺序读文件中,对于每个词x,取hash(x)%5000,然后按照该值存到5000个小文件(记为x0,x1,...x4999)中。这样每个文件大概是200k左右。如果其中的有的文件超过了1M大小,还可以按照类似的方法继续往下分,直到分解得到的小文件的大小都不超过1M。 对每个小文件,统计每个文件中出现的词以及相应的频率(可以采用trie树/hash_map等),并取出出现频率最大的100个词(可以用含100个结点的最小堆),并把100个词及相应的频率存入文件,这样又得到了5000个文件。下一步就是把这5000个文件进行归并(类似与归并排序)的过程了。


3、有10个文件,每个文件1G,每个文件的每一行存放的都是用户的query,每个文件的query都可能重复。要求你按照query的频度排序。

方案1: 顺序读取10个文件,按照hash(query)%10的结果将query写入到另外10个文件(记为)中。这样新生成的文件每个的大小大约也1G(假设hash函数是随机的)。 找一台内存在2G左右的机器,依次对用hash_map(query, query_count)来统计每个query出现的次数。利用快速/堆/归并排序按照出现次数进行排序。将排序好的query和对应的query_cout输出到文件中。这样得到了10个排好序的文件(记为)。


对这10个文件进行归并排序(内排序与外排序相结合)。


方案2: 一般query的总量是有限的,只是重复的次数比较多而已,可能对于所有的query,一次性就可以加入到内存了。这样,我们就可以采用trie树/hash_map等直接来统计每个query出现的次数,然后按出现次数做快速/堆/归并排序就可以了。


方案3: 与方案1类似,但在做完hash,分成多个文件后,可以交给多个文件来处理,采用分布式的架构来处理(比如MapReduce),最后再进行合并。


4、 给定a、b两个文件,各存放50亿个url,每个url各占64字节,内存限制是4G,让你找出a、b文件共同的url。

方案1:可以估计每个文件安的大小为5G×64=320G,远远大于内存限制的4G。所以不可能将其完全加载到内存中处理。考虑采取分而治之的方法。通读文件a,对每个url求取hash(url)%1000,然后根据所取得的值将url分别存储到1000个小文件(记为a0,a1,...,a999)中。这样每个小文件的大约为300M。通读文件b,采取和a相同的方式将url分别存储到1000小文件(记为b0,b1,...,b999)。这样处理后,所有可能相同的url都在对应的小文件(a0vsb0,a1vsb1,...,a999vsb999)中,不对应的小文件不可能有相同的url。然后我们只要求出1000对小文件中相同的url即可。求每对小文件中相同的url时,可以把其中一个小文件的url存储到hash_set中。然后遍历另一个小文件的每个url,看其是否在刚才构建的hash_set中,如果是,那么就是共同的url,存到文件里面就可以了。


方案2:如果允许有一定的错误率,可以使用Bloom filter,4G内存大概可以表示340亿bit。将其中一个文件中的url使用Bloom filter映射为这340亿bit,然后挨个读取另外一个文件的url,检查是否与Bloom filter,如果是,那么该url应该是共同的url(注意会有一定的错误率)。


5、腾讯面试题:给40亿个不重复的unsigned int的整数,没排过序的,然后再给一个数,如何快速判断这个数是否在那40亿个数当中?

方案1:申请512M的内存,一个bit位代表一个unsigned int值。读入40亿个数,设置相应的bit位,读入要查询的数,查看相应bit位是否为1,为1表示存在,为0表示不存在。


方案2:因为2^32为40亿多,所以给定一个数可能在,也可能不在其中;这里我们把40亿个数中的每一个用32位的二进制来表示假设这40亿个数开始放在一个文件中。然后将这40亿个数分成两类: 1.最高位为0 2.最高位为1 并将这两类分别写入到两个文件中,其中一个文件中数的个数<=20亿,而另一个>=20亿(这相当于折半了);与要查找的数的最高位比较并接着进入相应的文件再查找再然后把这个文件为又分成两类: 1.次最高位为0 2.次最高位为1,并将这两类分别写入到两个文件中,其中一个文件中数的个数<=10亿,而另一个>=10亿(这相当于折半了); 与要查找的数的次最高位比较并接着进入相应的文件再查找。 ....... 以此类推,就可以找到了,而且时间复杂度为O(logn),方案2完。


附:这里,再简单介绍下,位图方法: 使用位图法判断整形数组是否存在重复 判断集合中存在重复是常见编程任务之一,当集合中数据量比较大时我们通常希望少进行几次扫描,这时双重循环法就不可取了。位图法比较适合于这种情况,它的做法是按照集合中最大元素max创建一个长度为max+1的新数组,然后再次扫描原数组,遇到几就给新数组的第几位置上1,如遇到5就给新数组的第六个元素置1,这样下次再遇到5想置位时发现新数组的第六个元素已经是1了,这说明这次的数据肯定和以前的数据存在着重复。这种给新数组初始化时置零其后置一的做法类似于位图的处理方法故称位图法。它的运算次数最坏的情况为2N。如果已知数组的最大值即能事先给新数组定长的话效率还能提高一倍。


四、YARN


1.YARN核心组件

大数据开发知识点总结


1.RM(ResourceManager)资源管理器


RM是一个全局的资源管理器,负责整个系统的资源管理和分配。


它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM),通俗讲是用于管理NodeManager节点的资源,包括cup、内存等。


2.NodeManager(NM)+ (DataNode 硬盘 CPU 内存)


NM是每个节点上的资源和任务管理器,一方面,它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它接收并处理来自AM的Container启动/停止等各种请求。


3.Applications Manager(应用程序管理器)new ApplicationMaster 监控所有job的任务运行结果的监控 --> 客户端


负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。是AM的AM。


4.ApplicationMaster(AM)job过程监控


ApplicationMaster 管理在YARN内运行的每个应用程序实例。每个应用程序对应一个ApplicationMaster。ApplicationMaster 负责协调来自 ResourceManager 的资源,并通过 NodeManager 监视容器的执行和资源使用(CPU、内存等的资源分配),通俗讲是管理发起的任务,随着任务创建而创建,任务的完成而结束。


5.Container(重要) --> 资源接口 --> map reduce  mapContainer reduceContainer spark --> sparkContainer


Container是YARN中的资源抽象,它封装了NodeManager节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示的。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。(Container的具有优先级别,包括:队列 普通用户 VIP会员, 权重高的一定是先执行)


2.YARN工作流程

大数据开发知识点总结


概括:

步骤1:用户将应用程序提交到 ResourceManager 上;


步骤2:ResourceManager 为应用程序 ApplicationMaster 申请资源,并与某个 NodeManager 通信启动第一个 Container,以启动ApplicationMaster;


步骤3:ApplicationMaster 与 ResourceManager 注册进行通信,为内部要执行的任务申请资源,一旦得到资源后,将于 NodeManager 通信,以启动对应的 Task;


步骤4:所有任务运行完成后,ApplicationMaster 向 ResourceManager 注销,整个应用程序运行结束。


详细流程:

- client向RM提交应用程序,其中包括启动该应用的ApplicationMaster的必须信息,例如ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等


- ResourceManager启动一个NodeManager的一个container用于运行ApplicationMaster


- 启动中的ApplicationMaster向ResourceManager注册自己,启动成功后与RM保持心跳


- ApplicationMaster向ResourceManager发送请求,申请相应数目的container


- 申请成功的container,由ApplicationMaster进行初始化。container的启动信息初始化后,AM与对应的NodeManager通信,要求NM启动container


- NM启动container


- container运行期间,ApplicationMaster对container进行监控。container通过RPC协议向对应的AM汇报自己的进度和状态等信息


- 应用运行结束后,ApplicationMaster向ResourceManager注销自己,并允许属于它的container被收回


3.YARN默认的调度器分别是什么,及他们区别?

YARN调度器主要分为三类:


- 1、FIFO :先进先出,同一个队列中现先提交的先执行,后面等待


- 2、Capacity Scheduler(容量调度器): 允许创建多个任务队列,每个队列使用所有资源的一部分。多个任务队列可 以同时执行。但是一个队列内部还是先进先出。


- 3、Fair Scheduler(公平调度): 第一个程序在启动时可以占用其他队列资源(100%占用),当其他队列有 任务提交时,占用资源的队列需要将资源还给该任务。还资源的时候,效率 比较慢。


五、Zookeeper


大数据开发知识点总结


1.请简单介绍下Zookeeper?(重要)

ZooKeeper是一个分布式的,开放源码的,用于分布式应用程序的协调服务。zookeeper服务端有两种模式:单机的独立模式和集群的仲裁模式,所谓仲裁是指一切事件只要满足多数派同意就执行,不需要等到集群中的每个节点反馈才执行。Zookeeper本身也是服从主从架构的,在仲裁模式下会有一个主要的节点作为Leader(领导者),而其余集群中的节点作为Follower(公民),对某一事件是否执行,leader都会先征询各个follower的反馈信息再做决定,如果多数派同意,leader就将命令下发到所有的follower去执行。

2.Leader选举

zookeeper的leader选举,leader的选举会发生在集群启动时和运行中leader挂了,概括选举过程也是少数服从多数选出新leader。
Zookeeper所提供的服务主要是通过它以下三部分组成:
Zookeeper = Znode(数据节点,也是简约版文件系统)+ 原语(可以理解成Zookeeper的命令) + Watcher(通知机制,类似监听器)
Znode可以分为持久节点和临时节点,在用Zookeeper的命令create创建文件时默认时一个持久节点,而临时节点是会随着会话关闭而删除。另外也可以创建为有序节点,在创建时追加一个自增数字的标识
Watcher通知机制:
Watcher通知机制类似于监听器的过程,即有注册 + 监听事件+ 回调函数,客户端在znode上注册一个Watcher监视器,当znode上数据出现变化,watcher监测到此变化就会通知客户端。

3.Zookeeper读写流程

在Client向Follwer发出一个读写的请求
Follwer把请求发送给Leader,Leader接收到以后开始发起投票并通知每个Follwer进行投票
Follwer把投票结果发送给Leader
Leader将结果汇总后如果需要读取或写入,则开始执行同时把读写操作通知给Follwer,然后commit
Follower执行并把请求结果返回给Client

4.加入Zookeeper中某个Follower出故障了怎么办?(重要)

这会启动Zookeeper的状态同步过程。具体来说如下:
在完成leader选举后,各Follower和leader进行连接通信,并在每一次事务执行时,Follower都会把自己的最大事务ID发送给leader,当某个Follower出故障后,leader就根据原先该Follower发送的zxid确定同步点,向它同步记录最大zxid之后的内容。
当完成同步后,会通知Follower已成为为update状态,Follower受到update消息后,就可以重新接受客户端的请求继续工作。

大数据开发知识点总结



zookeeper应用场景

以下各应用场景实码:https://github.com/suzhida/demo_zookeeper/tree/master/src/main/java/com/demo

1.数据发布与订阅(配置中心)

Zookeeper 实现数据的发布和订阅

数据发布与订阅,即所谓的配置中心,顾名思义就是发布者将数据发布到 ZooKeeper 节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置信息的集中式管理和动态更新。对于:数据量通常比较小。数据内容在运行时动态变化。集群中各机器共享,配置一致。
这样的全局配置信息就可以发布到 ZooKeeper上,让客户端(集群的机器)去订阅该消息。
发布/订阅系统一般有两种设计模式,分别是推(Push)和拉(Pull)模式。
推模式:服务端主动将数据更新发送给所有订阅的客户端
拉模式:客户端主动发起请求来获取最新数据,通常客户端都采用定时轮询拉取的方式
ZooKeeper 采用的是推拉相结合的方式:
客户端想服务端注册自己需要关注的节点,一旦该节点的数据发生变更,那么服务端就会向相应的客户端发送 Watcher 事件通知,客户端接收到这个消息通知后,需要主动到服务端获取最新的数据
发布方Provider的代码:
package com.zhuyun.release.subscribe;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class Provider {
public static void main(String[] args) throws Exception {
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("触发了" + event.getPath() + "的" + event.getType() + "事件!");
}
};
ZooKeeper zk = new ZooKeeper("192.168.10.203:2181", 20, watcher);

Stat stat = zk.exists("/message", watcher);
if (stat == null) { //假如节点不存在,则先创建节点
zk.create("/message", "hello".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

//向该节点发送消息
zk.setData("/message", "hello world".getBytes(), -1);
zk.close();
}
}

订阅方Consumer的代码:
package com.zhuyun.release.subscribe;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;

public class Consumer implements Watcher{
ZooKeeper zk;
String hostPort;
String znode;

public Consumer(String hostPort,String znode) throws Exception{
this.hostPort = hostPort;
this.znode = znode;

zk = new ZooKeeper(hostPort, 3000, this);
//第一次获取节点消息,同时添加watcher
System.out.println("消息内容:" + new String(zk.getData(znode, true, null)));
}

@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDataChanged) {
try {
//当节点消息变化时,触发该操作:获取变化后的消息,同时再添加watcher
System.out.println("你有新的消息:" + new String(zk.getData("/message", true, null)));

} catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

public static void main(String[] args) throws Exception {
new Consumer("192.168.10.203:2181","/message");

System.in.read();
}
}

首先启动订阅方监听,再启动发布方发布消息,每次发布一条消息,订阅方都能收到该消息,结果如下:
消息内容:我是谁?
你有新的消息:hello
你有新的消息:你好
你有新的消息:hello world

2.命名服务

命名服务也是分布式系统中比较常见的一类场景。在分布式系统中,通过使用命名服务,客户端应用能够根据指定名字来获取资源或服务的地址,提供者等信息。被命名的实体通常可以是集群中的机器,提供的服务,远程对象等等——这些我们都可以统称他们为名字。其中较为常见的就是一些分布式服务框架(如 RPC)中的服务地址列表。通过在 ZooKeepr 里创建顺序节点,能够很容易创建一个全局唯一的路径,这个路径就可以作为一个名字。ZooKeeper 的命名服务即生成全局唯一的 ID。

3.分布式协调服务/通知

ZooKeeper 中特有 Watcher 注册与异步通知机制,能够很好的实现分布式环境下不同机器,甚至不同系统之间的通知与协调,从而实现对数据变更的实时处理。使用方法通常是不同的客户端如果机器节点发生了变化,那么所有订阅的客户端都能够接收到相应的 Watcher 通知,并做出相应的处理。ZooKeeper 的分布式协调/通知,是一种通用的分布式系统机器间的通信方式。

4.Master选举

Master 选举可以说是 ZooKeeper 最典型的应用场景了。比如 HDFS 中 Active NameNode 的选举、YARN 中 Active ResourceManager 的选举和 HBase 中 Active HMaster 的选举等。针对 Master 选举的需求,通常情况下,我们可以选择常见的关系型数据库中的主键特性来实现:希望成为 Master 的机器都向数据库中插入一条相同主键 ID 的记录,数据库会帮我们进行主键冲突检查,也就是说,只有一台机器能插入成功,那么,我们就认为向数据库中成功插入数据的客户端机器成为 Master。依靠关系型数据库的主键特性确实能够很好地保证在集群中选举出唯一的一个 Master。但是,如果当前选举出的 Master 挂了,那么该如何处理?谁来告诉我 Master 挂了呢? 显然,关系型数据库无法通知我们这个事件。但是,ZooKeeper 可以做到!利用 ZooKeepr 的强一致性,能够很好地保证在分布式高并发情况下节点的创建一定能够保证全局唯一性,即 ZooKeeper 将会保证客户端无法创建一个已经存在的数据单元节点。也就是说,如果同时有多个客户端请求创建同一个临时节点,那么最终一定只有一个客户端请求能够创建成功。利用这个特性,就能很容易地在分布式环境中进行 Master 选举了。成功创建该节点的客户端所在的机器就成为了 Master。同时,其他没有成功创建该节点的客户端,都会在该节点上注册一个子节点变更的 Watcher,用于监控当前 Master 机器是否存活,一旦发现当前的 Master 挂了,那么其他客户端将会重新进行 Master 选举。这样就实现了 Master 的动态选举。

5.分布式锁

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。分布式锁又分为排他锁和共享锁两种
(1)排它锁:
ZooKeeper 如何实现排它锁?
定义锁
ZooKeeper 上的一个机器节点可以表示一个锁
获得锁
把 ZooKeeper 上的一个节点看作是一个锁,获得锁就通过创建临时节点的方式来实现。ZooKeeper 会保证在所有客户端中,最终只有一个客户端能够创建成功,那么就可以认为该客户端获得了锁。同时,所有没有获取到锁的客户端就需要到 /exclusive_lock 节点上注册一个子节点变更的 Watcher 监听事件,以便实时监听到 lock 节点的变更情况。
释放锁
因为锁是一个临时节点,释放锁有两种方式
当前获得锁的客户端机器发生宕机或重启,那么该临时节点就会被删除,释放锁。
正常执行完业务逻辑后,客户端就会主动将自己创建的临时节点删除,释放锁。
无论在什么情况下移除了 lock 节点,ZooKeeper 都会通知所有在 /exclusive_lock 节点上注册了节点变更 Watcher 监听的客户端。这些客户端在接收到通知后,再次重新发起分布式锁获取,即重复获取锁过程。
(2)共享锁:
共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren 方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。


六、Hive


Hive:基于Hadoop一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的 sql 查询功能,可以将 sql 语句转换为 MapReduce 任务进行运行。适合数据仓库的统计分析。


1、Hive架构

大数据开发知识点总结大数据开发知识点总结

2、三大组件

1、用户接口:包括 CLI、JDBC/ODBC、WebGUI。


CLI(command line interface)为 shell 命令行,进行交互式执行SQL:直接与Driver进行交互。CLI启动的时候,会同时启动一个 Hive 副本


JDBC/ODBC 驱动是 Hive 的 JAVA 实现,作为JAVA的API:JDBC是通过Thift Server来接入,然后发送给Driver


WebGUI 是通过浏览器访问 Hive。


HiveServer2基于Thrift, 允许远程客户端使用多种编程语言如Java、Python向Hive提交请求


2、Metastore:存储元数据


Hive 将元数据存储在数据库中,如MySQL、derby


Hive 中的元数据包括表的名字、表的列、分区及其属性、表的属性(是否为外部表等)、表的数据所在目录等。


3、Driver(驱动模块):包括解释器、编译器、优化器、执行器


通过该模块对输入进行解析编译,对需求的计算进行优化,然后按照指定的步骤进行(通常启动多个MR任务来执行)


解释器、编译器、优化器、执行器完成 HQL 查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在 HDFS 中,并在随后由 MapReduce 调用执行


3.Hive 内部表和外部表区别?:

(1)是否直接通过external


(2)删除外部表,元数据得到删除,但是数据不会真正删除,针对内部表,元数据和数据都被删除


(3)在导入数据到外部表,数据并没有移动到自己的数据仓库目录下,也就是说外部表中的数据并不是由它自己来管理的!而内部表则不一样


注意:内部表和外部表场景:


内部表:逻辑处理的中间过程生成的中间表,或者一些临时表,直接删除即可


外部表:可以用户存储一些日志信息,数据不会被删除


4.Hive与关系型数据库区别:

Hive表示纯逻辑,只有表定义,不存数据。读多写少,不支持数据的改写和删除

大数据开发知识点总结

5.order by ,sort by , distribute by , cluster by 的区别?

1.Order by会对所给的全部数据进行全局排序,只启动一个reduce来处理。


Sort by是局部排序,它可以根据数据量的大小启动一到多个reducer来工作,并在每个reduce中单独排序。


3.Distribute by 类似于mr中的partition,采用hash算法,在map端将查询结果中hash值相同的结果分发到对应的reduce中,结合sort by使用。


4.Cluster by 可以看作是distribute by 和sort by的结合,当两者后面所跟的字段列名相同时,效果就等同于使用cluster by,但是cluster by最终的结果只能是降序,无法指定升序和降序。


注:不带count,sum这些聚合函数的,都不会走mapreduce。


6.Hive最全函数介绍:

UDF:普通函数:1对1的关系,select语句,例如:数据格式


UDAF:聚合函数:多对1的关系,结合group by联合使用  


UDTF:生成函数:1对多


用 UDF 函数解析公共字段;用 UDTF 函数解析事件字段。

自定义 UDF:继承 UDF,重写 evaluate 方法

自定义 UDTF:继承自 GenericUDTF,重写 3 个方法:initialize(自定义输出的列名和类型),

process(将结果返回 forward(result),close 。

为什么要自定义 UDF/UDTF,因为自定义函数,可以自己埋点 Log 打印日志,出错或者数据异常,方便调试。


Hive窗口函数:

Hive命令行窗口查看函数定义  :desc function 函数名;


HIve窗口函数实战可参考:Hive开窗函数总结_Abysscarry的博客-CSDN博客_hive的开窗函数


Hive常用日期函数

unix_timestamp:返回当前或指定时间的时间戳    

from_unixtime:将时间戳转为日期格式

current_date:当前日期

current_timestamp:当前的日期加时间

to_date:抽取日期部分

year:获取年

month:获取月

day:获取日

hour:获取时

minute:获取分

second:获取秒

weekofyear:当前时间是一年中的第几周

dayofmonth:当前时间是一个月中的第几天

months_between: 两个日期间的月份

add_months:日期加减月

datediff:两个日期相差的天数

date_add:日期加天数

date_sub:日期减天数

last_day:日期的当月的最后一天


日期处理函数实例

1)date_format函数(根据格式整理日期)


hive (gmall)> select date_format('2019-02-10','yyyy-MM');


2019-02


2)date_add函数(加减日期)


hive (gmall)> select date_add('2019-02-10',1);


2019-02-11


hive (gmall)> select date_add('2019-02-10',-1);


2019-02-09


3)next_day函数                                                                                                                                            


(1)取当前天的下一个周一


hive (gmall)> select next_day('2019-02-12','MO');


2019-02-18


说明:星期一到星期日的英文(Monday,Tuesday、Wednesday、Thursday、Friday、Saturday、Sunday)


(2)取当前周的周一


hive (gmall)> select date_add(next_day('2019-02-12','MO'),-7);


2019-02-11


4)last_day函数(求当月最后一天日期)


hive (gmall)> select last_day('2019-02-10');


2019-02-28


常用取整函数

round: 四舍五入

ceil:  向上取整

floor: 向下取整


常用字符串操作函数

upper: 转大写

lower: 转小写

length: 长度

trim:  前后去空格

lpad: 向左补齐,到指定长度

rpad:  向右补齐,到指定长度

regexp_replace: SELECT regexp_replace('100-200', '(\\d+)', 'num') ;

使用正则表达式匹配目标字符串,匹配成功后替换!


集合操作

size: 集合中元素的个数

map_keys: 返回map中的key

map_values: 返回map中的value

array_contains: 判断array中是否包含某个元素

sort_array: 将array中的元素排序


7.Hive的数据管理:

1)内表外表(2)Partition辅助查询,缩小查询范围,加快数据检索速度(3)Bucket 控制reduce数量


hive分桶:

分桶是将整个数据内容按照某列属性值去hash值进行区分,对取得的hash再做模运算(columnValue.hashCode % 桶数),具有相同结果的数据进入同一个文件中。


8.Hive实战数据分析

有以下几张数据表,请写出Hive SQL语句,实现以下需求。

注:分区字段为dt,代表日期。

1、某次经营活动中,商家发起了"异性拼团购",试着针对某个地区的用户进行推广,找出匹配用户。

参考实现:选出城市在北京,性别为男的10个用户名

select user_name
from user_info
where city=‘beijing’ and sex=‘male’
limit 10;

2、某天,发现食物类的商品卖的很好,你能找出几个资深吃货吗?
参考实现:选出在2019年6月18日,购买的商品类是food的用户名、购买数量、支付金额,并按照购买数量、支付金额倒序排序,取前10个用户。

select user_name, piece, pay_amount
from user_trade
where dt=‘2019-06-18’ and goods_category=‘food’
order by pay_amount desc, piece desc
limit 10;

3、试着对本公司2019年第一季度商品的热度与价值度进行分析。
参考实现:2019年1月到3月,每个品类有多少人购买,累计金额是多少。

select goods_category,
count(distinct user_name) as user_num,
sum(pay_amount) as total_amount
from user_trade
where dt between ‘2019-01-01’ and ‘2019-03-31’
group by goods_category;

4、2019年4月,支付金额超过5万元的用户,给VIP用户赠送优惠劵。

参考实现:2019年4月份,统计每位用户的支付金额,筛选出超过5万元的。

select user_name,
sum(pay_amount) as total_amount
from user_trade
where dt between ‘2019-04-01’ and ‘2019-04-30’
group by user_name
having sum(pay_amount)>50000;

5、去年的劳动节新用户推广活动价值分析,即拉新分析。

参考实现:统计2019年5月1日之后,每日激活用户的数量。

select sum(user_name) as user_num,
datediff(to_date(firstactivetime), ‘2019-05-01’)
from user_info
where to_date(firstactivetime) between ‘2019-05-01’ and ‘2019-05-31’
group by to_date(firstactivetime);

6、对用户的年龄段进行分析,观察分布情况。

参考实现:统计以下四个年龄段:20岁以下、20-30岁、30-40岁、40岁以上的用户数。

select case when age<20 then ‘20岁以下’
when age>=20 and age<30 then ‘20-30岁’
when age>=30 and age<40 then ‘30-40岁’
else ‘40岁以上’ end as age_type
count(distinct user_name) as user_num
from user_info
group by case when age<20 then ‘20岁以下’
when age>=20 and age<30 then ‘20-30岁’
when age>=30 and age<40 then ‘30-40岁’
else ‘40岁以上’ end;

7、去年王思聪的微博抽奖活动引起争议,我们想要观察用户等级随性别的分布情况。

参考实现:统计每个性别用户等级高低分布情况(level大于5为高级)

select sex.
if(level>5, ‘高’, ‘低’) as level_type,
count(distinct user_name) as user_num
from user_info
group by sex,
if(level>5, ‘高’, ‘低’);

8、分析每个月的拉新情况,可以倒推回运营效果。

参考实现:统计每个月激活用户的数量

select substr(firstactivetime, 1, 7) as active_month,
count(distinct user_name) as user_num
from user_info
group by substr(firstactivetime, 1, 7);

9、找出不同手机品牌的用户分布情况。

参考实现:按照手机品牌分组,统计每个品牌的用户数量。

select extra2[‘phonebrand’] as phone_brand,
count(distinct user_name) as user_num
from user_info
group by extra2[‘phonebrand’];

10、找出在2018年具有VIP潜质的用户,发送VIP试用劵。

参考实现:2018年购买的商品品类在五个以上的用户

select user_name,
count(distinct goods_category) as category_num
from user_trade
where year(dt)=‘2018’
group by user_name
having count(distinct goods_category)>5;

11、激活天数距今超过300天的男女分布情况
select sex,
count(distinct user_name) as user_num
from user_info
where datediff(current_date(),to_date(firstactivetime))>300
group by sex;

12、不同性别、教育程度的分布情况
select sex,
extra2[“education”] as education,
count(distinct user_name) as user_num
from user_info
group by sex,
extra2[“education”];

13、2019年1月1日到2019年4月30日,每个时段的不同品类购买金额分布。

select substr(from_unixtime(pay_time, ‘yyyy-MM-dd HH’), 12),
goods_category,
sum(pay_amount) as total_amount
from user_trade
where dt between ‘2019-01-01’ and ‘2019-04-30’
group by
substr(from_unixtime(pay_time, ‘yyyy-MM-dd HH’), 12),
goods_category;
————————————————
版权声明:本文为CSDN博主「GoAI」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_36816848/article/details/106665176

欢迎分享,(联系QQ/微信:337407980)

打赏

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,您说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

评论