大数据开发知识点总结(三)


十一、Spark


1.Spark介绍

1.1 什么是spark

基于内存的分布式计算框架

只负责算 不负责存

spark 在离线计算 功能上 类似于mapreduce的作用

1.2 为什么用spark

MapReduce的缺点


运行速度慢 (没有充分利用内存)

接口比较简单,仅支持Map Reduce

功能比较单一 只能做离线计算

不适合迭代计算(如机器学习、图计算等等),交互式处理(数据挖掘)

不适合流式处理(点击日志分析)

需要一种灵活的框架可同时进行批处理、流式计算、交互式计算


内存计算引擎,提供cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销

DAG引擎,减少多次计算之间中间结果写到HDFS的开销

使用多线程模型来减少task启动开销,shuffle过程中避免不必要的sort操作以及减少磁盘IO

spark的缺点是:吃内存,不太稳定

Spark优势


速度快(比mapreduce在内存中快100倍,在磁盘中快10倍)spark中的job中间结果可以不落地,可以存放在内存中。 mapreduce中map和reduce任务都是以进程的方式运行着,而spark中的job是以线程方式运行在进程中。

易用性(可以通过java/scala/python/R开发spark应用程序)

通用性(可以使用spark sql/spark streaming/mlib/Graphx)

兼容性(spark程序可以运行在standalone/yarn/mesos)

2. RDD 的概念

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合.


Dataset:一个数据集,简单的理解为集合,用于存放数据的


Distributed:它的数据是分布式存储,并且可以做分布式的计算


Resilient:弹性的


它表示的是数据可以保存在磁盘,也可以保存在内存中

数据分布式也是弹性的

弹性:并不是指他可以动态扩展,而是容错机制。

RDD会在多个节点上存储,就和hdfs的分布式道理是一样的。hdfs文件被切分为多个block存储在各个节点上,而RDD是被切分为多个partition。不同的partition可能在不同的节点上

spark读取hdfs的场景下,spark把hdfs的block读到内存就会抽象为spark的partition。

spark计算结束,一般会把数据做持久化到hive,hbase,hdfs等等。我们就拿hdfs举例,将RDD持久化到hdfs上,RDD的每个partition就会存成一个文件,如果文件小于128M,就可以理解为一个partition对应hdfs的一个block。反之,如果大于128M,就会被且分为多个block,这样,一个partition就会对应多个block。

所有spark中对数据的操作最终都会转换成RDD的操作

spark sql

spark streaming

spark ml 、spark mllib

RDD是不可变的

父RDD 生成一个子 RDD 父RDD的状态不会变化

从容错的角度去做这样的设计


2.1 RDD的创建

创建RDD之前先要有spark context

conf = SparkConf().setAppName(appName).setMaster(master)

sc = SparkContext(conf=conf)

通过内存中的数据创建RDD

= [1, 2, 3, 4, 5]

distData = sc.parallelize(data)

创建RDD时可以指定 partition的数量(RDD会分成几份)一个partition会对应一个task,根据CPU的内核数来指定partition (1核对应2~4个partition)

从文件创建RDD 可以是HDFS支持的任何一种存储介质

可以从 hdfs、 数据库(mysql) 、本地文件系统、 hbase 这些地方加载数据创建RDD

rdd = sc.textFile(‘file:///root/tmp/test.txt’)


2.2 RDD的三类算子

transformation


所有的transformation 都是延迟执行的,只要不调用action 不会执行,只是记录过程


transformation 这一类算子返回值还是 rdd


rdd.transformation 还会得到新的rdd


map(func) 将func函数作用到数据集的每一个元素上,生成一个新的RDD返回


filter(func) 选出所有func返回值为true的元素,生成一个新的RDD返回


flatMap(func) 会先执行map的操作,再将所有对象合并为一个对象


union() 取并集


intersection() 交集


groupByKey() 以元组中的第0个元素作为key,进行分组,返回一个新的RDD 结果中 value是一个Iterable


reducebykey(func) 将key相同的键值对,按照Function进行计算


sortbykey(ascending=True, numPartitions=None, keyfunc=)Sorts this RDD, which is assumed to consist of (key, value) pairs.按照关键词排序,排完后函数操作


action


会触发之前的rdd所有的transformation

获取最终的结果

collect 所有的结果都会加载到内存中

reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。

fitst 第一个

take(num) 前n个

count()

persist


数据存储,可以存到内存,也可以是磁盘

3. spark-core 实战

详情见spark-core 实战 通过spark实现ip地址查询


4. spark集群架构

大数据开发知识点总结(三)


Application

用户自己写的Spark应用程序,批处理作业的集合。Application的main方法为应用程序的入口,用户通过Spark的API,定义了RDD和对RDD的操作。


Client:客户端进程,负责提交作业到Master。


Master(类比与ResourceManager)


Standalone模式中主控节点,负责接收Client提交的作业,管理Worker,并命令Worker启动Driver和Executor。

Worker(类比于NodeManager)


Standalone模式中slave节点上的守护进程,负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,启动Driver和Executor。

Driver(类比于ApplicationMaster)


一个Spark作业运行时包括一个Driver进程,也是作业的主进程,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskScheduler。

DAGScheduler: 实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler中。

TaskScheduler:实现Task分配到Executor上执行。

Stage:一个Spark作业一般包含一到多个Stage。

Task:一个Stage包含一到多个Task,通过多个Task实现并行运行的功能。

Executor(类比于Container):即真正执行作业的地方,一个集群一般包含多个Executor,每个Executor接收Driver的命令Launch Task,一个Executor可以执行一到多个Task。

Spark考点总结



一、你是怎么理解Spark,它的特点是什么?


       Spark是一个基于内存的,用于大规模数据处理(离线计算、实时计算、快速查询(交互式查询))的统一分析引擎。


大数据开发知识点总结(三)



它内部的组成模块,包含SparkCore,SparkSQL,SparkStreaming,SparkMLlib,SparkGraghx等…

       它的特点:

       Spark计算速度是MapReduce计算速度的10-100倍


易用

       MR支持1种计算模型,Spsark支持更多的计算模型(算法多)


通用

       Spark 能够进行离线计算、交互式查询(快速查询)、实时计算、机器学习、图计算


兼容性

       Spark支持大数据中的Yarn调度,支持mesos。可以处理hadoop计算的数据。


二、Spark有几种部署方式,请分别简要论述


       1) Local:运行在一台机器上,通常是练手或者测试环境。


       2)Standalone:构建一个基于Mster+Slaves的资源调度集群,Spark任务提交给Master运行。是Spark自身的一个调度系统。


       3)Yarn: Spark客户端直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。


       4)Mesos:国内大环境比较少用。


三、Spark提交作业的参数


       因为我们Spark任务是采用的Shell脚本进行提交,所以一定会涉及到几个重要的参数,而这个也是在面试的时候容易被考察到的“细节”。


  

executor-cores —— 每个executor使用的内核数,默认为1,官方建议2-5个,我们企业是4个
num-executors —— 启动executors的数量,默认为2
executor-memory —— executor内存大小,默认1G
driver-cores —— driver使用内核数,默认为1
driver-memory —— driver内存大小,默认512M



四、简述Spark的作业提交流程


       Spark的任务提交方式实际上有两种,分别是YarnClient模式和YarnCluster模式。大家在回答这个问题的时候,也需要分类去介绍。千万不要被冗长的步骤吓到,一定要学会总结差异,发现规律,通过图形去增强记忆。


YarnClient 运行模式介绍

大数据开发知识点总结(三)



在YARN Client模式下,Driver在任务提交的本地机器上运行,Driver启动后会和ResourceManager通讯申请启动ApplicationMaster,随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster的功能相当于一个ExecutorLaucher,只负责向ResourceManager申请Executor内存。


       ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。

     


YarnCluster 模式介绍

大数据开发知识点总结(三)



        在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster,随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。


       Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。


五、你是如何理解Spark中血统(RDD)的概念?它的作用是什么?


概念

       RDD是弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算 的集合。


作用

       提供了一个抽象的数据模型,将具体的应用逻辑表达为一系列转换操作(函数)。另外不同RDD之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘IO和序列化开销,并且还提供了更多的API(map/reduec/filter/groupBy…)


       如果还想锦上添花,可以添上这一句:


RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies,用来解决数据容错时的高效性以及划分任务时候起到重要作用


简述Spark的宽窄依赖,以及Spark如何划分stage,每个stage又根据什么决定task个数?


Spark的宽窄依赖问题是SparkCore部分的重点考察内容,多数出现在笔试中,大家需要注意。


       窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖


       宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)


那Stage是如何划分的呢?


   根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。


每个stage又根据什么决定task个数?


   Stage是一个TaskSet,将Stage根据分区数划分成一个个的Task。

大数据开发知识点总结(三)



六、列举Spark常用的transformation和action算子,有哪些算子会导致Shuffle?


       我们在Spark开发过程中,避不开与各种算子打交道,其中Spark 算子分为transformation 和 action 算子,下面列出一些常用的算子,具体的功能还需要小伙伴们自行去了解。

transformation
 map
 mapRartition
 flatMap
 filter
 …
action
 reduce
 collect
 first
 take

 如果面试官问你,那小伙叽,有哪些会引起Shuffle过程的Spark算子呢?


       你只管自信的回答:


reduceByKey

groupByKey

…ByKey


七、reduceByKey与groupByKey的区别,哪一种更具优势?


       既然你上面都提到 reduceByKey 和groupByKey ,那哪一种更具优势,你能简单分析一下吗?


       能问这样的问题,已经暗示面试官的水平不低了,那么我们该如何回答呢:


       reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。


       groupByKey:按照key进行分组,直接进行shuffle


       所以,在实际开发过程中,reduceByKey比groupByKey,更建议使用。但是需要注意是否会影响业务逻辑。


     


八、Repartition和Coalesce 的关系与区别,能简单说说吗?


       这道题就已经开始参和有“源码”的味道了,为什么呢?


       1)关系:


       两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)


       2)区别:


       repartition一定会发生shuffle,coalesce 根据传入的参数来判断是否发生shuffle。


       一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。


     


十、简述下Spark中的缓存(cache和persist)与checkpoint机制,并指出两者的区别和联系


       关于Spark缓存和检查点的区别,大致可以从这3个角度去回答:


位置

       Persist 和 Cache将数据保存在内存,Checkpoint将数据保存在HDFS


生命周期

       Persist 和 Cache 程序结束后会被清除或手动调用unpersist方法,Checkpoint永久存储不会被删除。


RDD依赖关系

       Persist 和 Cache,不会丢掉RDD间的依赖链/依赖关系,CheckPoint会斩断依赖链。


九、简述Spark中共享变量(广播变量和累加器)的基本原理与用途


       关于Spark中的广播变量和累加器的基本原理和用途,答案较为固定,大家无需刻意去记忆。


       累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。


       广播变量是在每个机器上缓存一份,不可变,只读的,相同的变量,该节点每个任务都能访问,起到节省资源和优化的作用。它通常用来高效分发较大的对象。


 

十、当Spark涉及到数据库的操作时,如何减少Spark运行中的数据库连接数?


       嗯,有点“调优”的味道,感觉真正的“风暴”即将到来,这道题还是很好回答的,我们只需要减少连接数据库的次数即可。


       使用foreachPartition代替foreach,在foreachPartition内获取数据库的连接。

   


十一、能介绍下你所知道和使用过的Spark调优吗?


资源参数调优

num-executors:设置Spark作业总共要用多少个Executor进程来执行

executor-memory:设置每个Executor进程的内存

executor-cores:设置每个Executor进程的CPU core数量

driver-memory:设置Driver进程的内存

spark.default.parallelism:设置每个stage的默认task数量

开发调优

避免创建重复的RDD

尽可能复用同一个RDD

对多次使用的RDD进行持久化

尽量避免使用shuffle类算子

使用map-side预聚合的shuffle操作

使用高性能的算子

①使用reduceByKey/aggregateByKey替代groupByKey

②使用mapPartitions替代普通map

③使用foreachPartitions替代foreach

④使用filter之后进行coalesce操作

⑤使用repartitionAndSortWithinPartitions替代repartition与sort类操作


广播大变量

在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC(垃圾回收),都会极大地影响性能。


使用Kryo优化序列化性能

优化数据结构

在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。


十二、如何使用Spark实现TopN的获取(描述思路或使用伪代码)?


能让你使用伪代码来描述这已经非常“苛刻”了,但是不慌,这里提供3种思路供大家参考:


方法1:

       (1)按照key对数据进行聚合(groupByKey)


       (2)将value转换为数组,利用scala的sortBy或者sortWith进行排序(mapValues)


       注意:当数据量太大时,会导致OOM


方法2:

       (1)取出所有的key


       (2)对key进行迭代,每次取出一个key利用spark的排序算子进行排序


方法3:

       (1)自定义分区器,按照key进行分区,使不同的key进到不同的分区


       (2)对每个分区运用spark的排序算子进行排序


Spark面试干货

Spark面试干货总结!(8千字长文、27个知识点、21张图)


Spark实战

参考:Spark实践_GoAl的博客-CSDN博客_外国spark实践


十二、Flink


1、简单介绍一下 Flink


       Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。并且 Flink 提供了数据分布、容错机制以及资源管理等核心功能。Flink提供了诸多高抽象层的API以便用户编写分布式任务:


       DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。


       DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。


       Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。


       此外,Flink 还针对特定的应用领域提供了领域库,例如: Flink ML,Flink 的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法, Gelly,Flink 的图计算库,提供了图计算的相关API及多种图计算算法实现。


2、Flink相比传统的Spark Streaming区别?


       这个问题是一个非常宏观的问题,因为两个框架的不同点非常之多。但是在面试时有非常重要的一点一定要回答出来:Flink 是标准的实时处理引擎,基于事件驱动。而 Spark Streaming 是微批(Micro-Batch)的模型 。


       下面我们就分几个方面介绍两个框架的主要区别:


架构模型Spark Streaming 在运行时的主要角色包括:Master、Worker、Driver、Executor,Flink 在运行时主要包含:Jobmanager、Taskmanager和Slot。

任务调度Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图DAG,Spark Streaming 会依次创建 DStreamGraph、JobGenerator、JobScheduler。Flink 根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后提交给 JobManager进行处理,JobManager 会根据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 调度最核心的数据结构,JobManager 根据 ExecutionGraph 对 Job 进行调度。

时间机制Spark Streaming 支持的时间机制有限,只支持处理时间。 Flink 支持了流处理程序在时间上的三个定义:处理时间、事件时间、注入时间。同时也支持 watermark 机制来处理滞后数据。

容错机制对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰好一次处理语义。Flink 则使用两阶段提交协议来解决这个问题。

3、Flink的组件栈有哪些?


       根据 Flink 官网描述,Flink 是一个分层架构的系统,每一层所包含的组件都提供了特定的抽象,用来服务于上层组件。

大数据开发知识点总结(三)

 自下而上,每一层分别代表:Deploy 层:该层主要涉及了Flink的部署模式,在上图中我们可以看出,Flink 支持包括local、Standalone、Cluster、Cloud等多种部署模式 。


       Runtime 层:Runtime层提供了支持 Flink 计算的核心实现,比如:支持分布式 Stream 处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务 。


       API层: API 层主要实现了面向流(Stream)处理和批(Batch)处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API,后续版本,Flink有计划将DataStream和DataSet API进行统一 。


       Libraries层: 该层称为Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的实现计算框架,也分别对应于面向流处理和面向批处理两类。面向流处理支持:CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作);面向批处理支持:FlinkML(机器学习库)、Gelly(图处理)。


4、Flink 的运行必须依赖 Hadoop组件吗?


       Flink可以完全独立于Hadoop,在不依赖Hadoop组件下运行。但是做为大数据的基础设施,Hadoop体系是任何大数据框架都绕不过去的。Flink可以集成众多Hadooop 组件,例如Yarn、Hbase、HDFS等等。例如,Flink可以和Yarn集成做资源调度,也可以读写HDFS,或者利用HDFS做检查点。


5、你们的Flink集群规模多大?


       大家注意,这个问题看起来是问你实际应用中的Flink集群规模,其实还隐藏着另一个问题:Flink可以支持多少节点的集群规模?在回答这个问题时候,可以将自己生产环节中的集群规模、节点、内存情况说明,同时说明部署模式(一般是Flink on Yarn),除此之外,用户也可以同时在小集群(少于5个节点)和拥有 TB 级别状态的上千个节点上运行 Flink 任务。


6、Flink的基础编程模型了解吗?

大数据开发知识点总结(三)



 上图是来自Flink官网的运行流程图。通过上图我们可以得知,Flink 程序的基本构建是数据输入来自一个 Source,Source 代表数据的输入端,经过 Transformation 进行转换,然后在一个或者多个Sink接收器中结束。数据流(stream)就是一组永远不会停止的数据记录流,而转换(transformation)是将一个或多个流作为输入,并生成一个或多个输出流的操作。执行时,Flink程序映射到 streaming dataflows,由流(streams)和转换操作(transformation operators)组成。


7、Flink集群有哪些角色?各自有什么作用?

大数据开发知识点总结(三)




        Flink 程序在运行时主要有 TaskManager,JobManager,Client 三种角色。


       其中JobManager扮演着集群中的管理者Master的角色,它是整个集群的协调者,负责接收Flink Job,协调检查点,Failover 故障恢复等,同时管理Flink集群中从节点TaskManager。


       TaskManager是实际负责执行计算的Worker,在其上执行Flink Job的一组Task,每个TaskManager负责管理其所在节点上的资源信息,如内存、磁盘、网络,在启动的时候将资源的状态向JobManager汇报。


       Client是Flink程序提交的客户端,当用户提交一个Flink程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群中处理,所以Client需要从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager的连接,将Flink Job提交给JobManager。


8、说说 Flink 资源管理中 Task Slot 的概念

大数据开发知识点总结(三)




        在Flink架构角色中我们提到,TaskManager是实际负责执行计算的Worker,TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个task或多个subtask。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念。简单的说,TaskManager会将自己节点上管理的资源分为不同的Slot:固定大小的资源子集。这样就避免了不同Job的Task互相竞争内存资源,但是需要主要的是,Slot只会做内存的隔离。没有做CPU的隔离。


9、说说 Flink 的常用算子?


       Flink 最常用的常用算子包括:Map:DataStream → DataStream,输入一个参数产生一个参数,map的功能是对输入的参数进行转换操作。Filter:过滤掉指定条件的数据。KeyBy:按照指定的key进行分组。Reduce:用来进行结果汇总合并。Window:窗口函数,根据某些特性将每个key的数据进行分组(例如:在5s内到达的数据)    


10、说说你知道的Flink分区策略?


       要搞懂什么是分区策略,需要清楚分区策略是用来决定数据如何发送至下游。目前 Flink 支持了8种分区策略的实现。

大数据开发知识点总结(三)

        在Flink架构角色中我们提到,TaskManager是实际负责执行计算的Worker,TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个task或多个subtask。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念。简单的说,TaskManager会将自己节点上管理的资源分为不同的Slot:固定大小的资源子集。这样就避免了不同Job的Task互相竞争内存资源,但是需要主要的是,Slot只会做内存的隔离。没有做CPU的隔离。


9、说说 Flink 的常用算子?


       Flink 最常用的常用算子包括:Map:DataStream → DataStream,输入一个参数产生一个参数,map的功能是对输入的参数进行转换操作。Filter:过滤掉指定条件的数据。KeyBy:按照指定的key进行分组。Reduce:用来进行结果汇总合并。Window:窗口函数,根据某些特性将每个key的数据进行分组(例如:在5s内到达的数据)    


10、说说你知道的Flink分区策略?


       要搞懂什么是分区策略,需要清楚分区策略是用来决定数据如何发送至下游。目前 Flink 支持了8种分区策略的实现。

大数据开发知识点总结(三)


        slot是指 taskmanager 的并发执行能力,假设我们将 taskmanager.numberOfTaskSlots 配置为3 那么每一个 taskmanager 中分配3个 TaskSlot, 3个 taskmanager 一共有9个TaskSlot。

大数据开发知识点总结(三)


        parallelism是指taskmanager实际使用的并发能力。假设我们把 parallelism.default 设置为1,那么9个 TaskSlot 只能用1个,有8个空闲。


13、Flink有没有重启策略?说说有哪几种?


       Flink 实现了多种重启策略。


固定延迟重启策略(Fixed Delay Restart Strategy)

故障率重启策略(Failure Rate Restart Strategy)

没有重启策略(No Restart Strategy)

Fallback重启策略(Fallback Restart Strategy)

14、用过Flink中的分布式缓存吗?如何使用?


       Flink实现的分布式缓存和Hadoop有异曲同工之妙。目的是在本地读取文件,并把他放在 taskmanager 节点中,防止task重复拉取。

<span style="color:#000000"><span style="background-color:#f5f7ff"><code class="language-java">val env <span style="color:#ac9739">=</span> ExecutionEnvironment<span style="color:#999999">.</span>getExecutionEnvironment
<span style="color:#6b7394">// register a file from HDFS</span>
env<span style="color:#999999">.</span><span style="color:#3d8fd1">registerCachedFile</span><span style="color:#999999">(</span><span style="color:#ac9739">"hdfs:///path/to/your/file"</span><span style="color:#999999">,</span> <span style="color:#ac9739">"hdfsFile"</span><span style="color:#999999">)</span>
<span style="color:#6b7394">// register a local executable file (script, executable, ...)</span>
env<span style="color:#999999">.</span><span style="color:#3d8fd1">registerCachedFile</span><span style="color:#999999">(</span><span style="color:#ac9739">"file:///path/to/exec/file"</span><span style="color:#999999">,</span> <span style="color:#ac9739">"localExecFile"</span><span style="color:#999999">,</span> <span style="color:#c76b29">true</span><span style="color:#999999">)</span>
<span style="color:#6b7394">// define your program and execute</span>
<span style="color:#999999">.</span><span style="color:#999999">.</span><span style="color:#999999">.</span>
val input<span style="color:#ac9739">:</span> DataSet<span style="color:#999999">[</span>String<span style="color:#999999">]</span> <span style="color:#ac9739">=</span> <span style="color:#999999">.</span><span style="color:#999999">.</span><span style="color:#999999">.</span>
val result<span style="color:#ac9739">:</span> DataSet<span style="color:#999999">[</span>Integer<span style="color:#999999">]</span> <span style="color:#ac9739">=</span> input<span style="color:#999999">.</span><span style="color:#3d8fd1">map</span><span style="color:#999999">(</span><span style="color:#6679cc">new</span> MyMapper<span style="color:#999999">(</span><span style="color:#999999">)</span><span style="color:#999999">)</span>
<span style="color:#999999">.</span><span style="color:#999999">.</span><span style="color:#999999">.</span>
env<span style="color:#999999">.</span><span style="color:#3d8fd1">execute</span><span style="color:#999999">(</span><span style="color:#999999">)</span>
</code></span></span>

15、说说Flink中的广播变量,使用时需要注意什么?


       我们知道Flink是并行的,计算过程可能不在一个 Slot 中进行,那么有一种情况即:当我们需要访问同一份数据。那么Flink中的广播变量就是为了解决这种情况。我们可以把广播变量理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。


16、说说Flink中的窗口?


       说说Flink中的窗口?

大数据开发知识点总结(三)


Flink 支持两种划分窗口的方式,按照time和count。如果根据时间划分窗口,那么它就是一个time-window 。如果根据数据划分窗口,那么它就是一个 count-window。flink支持窗口的两个重要属性(size和interval)如果size=interval,那么就会形成tumbling-window(无重叠数据) ;如果size>interval,那么就会形成sliding-window(有重叠数据) 如果size< interval, 那么这种窗口将会丢失数据。比如每5秒钟,统计过去3秒的通过路口汽车的数据,将会漏掉2秒钟的数据。通过组合可以得出四种基本窗口:
time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))
time-sliding-window 有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3))
count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5)
count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)
17、说说Flink中的状态存储?
        Flink在做计算的过程中经常需要存储中间状态,来避免数据丢失和状态恢复。选择的状态存储策略不同,会影响状态持久化如何和 checkpoint 交互。
        Flink提供了三种状态存储方式:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。
18、Flink中的时间有哪几类
        Flink 中的时间和其他流式计算系统的时间一样分为三类:事件时间,摄入时间,处理时间三种。如果以 EventTime为基准来定义时间窗口将形成EventTimeWindow,要求消息本身就应该携带EventTime。如果以IngesingtTime 为基准来定义时间窗口将形成 IngestingTimeWindow,以 source 的systemTime为准。如果以 ProcessingTime 基准来定义时间窗口将形成 ProcessingTimeWindow,以 operator 的systemTime 为准。
19、Flink中的水印是什么概念,起到什么作用?
        Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制, 本质上是一种时间戳。 一般来讲Watermark经常和Window一起被用来处理乱序事件。
20、Flink Table & SQL 熟悉吗?TableEnvironment这个类有什么作用
        TableEnvironment是Table API和SQL集成的核心概念。这个类主要用来:
在内部 catalog 中注册表
注册外部 catalog
执行SQL查询
注册用户定义(标量,表或聚合)函数
将DataStream或DataSet转换为表
持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
21、Flink SQL的实现原理是什么?是如何实现 SQL 解析的呢?
大数据开发知识点总结(三)


        首先大家要知道 Flink 的SQL解析是基于Apache Calcite这个开源框架。

       基于此,一次完整的SQL解析过程如下:


用户使用对外提供Stream SQL的语法开发业务应用

用calcite对StreamSQL进行语法检验,语法检验通过后,转换成calcite的逻辑树节点;最终形成calcite的逻辑计划

采用Flink自定义的优化规则和calcite火山模型、启发式模型共同对逻辑树进行优化,生成最优的Flink物理计划

对物理计划采用janino codegen生成代码,生成用低阶API DataStream 描述的流应用,提交到Flink平台执行

Flink中级

22、Flink是如何支持批流一体的?

大数据开发知识点总结(三)


        本道面试题考察的其实就是一句话:Flink的开发者认为批处理是流处理的一种特殊情况。批处理是有限的流处理。Flink 使用一个引擎支持了DataSet API 和 DataStream API。


23、Flink是如何做到高效的数据交换的?


       在一个Flink Job中,数据需要在不同的task中进行交换,整个数据交换是由 TaskManager 负责的,TaskManager 的网络组件首先从缓冲buffer中收集records,然后再发送。Records 并不是一个一个被发送的,二是积累一个批次再发送,batch 技术可以更加高效的利用网络资源。


24、Flink是如何做容错的?


       Flink实现容错主要靠强大的CheckPoint机制和State机制。Checkpoint 负责定时制作分布式快照、对程序中的状态进行备份;State 用来存储计算过程中的中间状态。


24、Flink 分布式快照的原理是什么?


       Flink的分布式快照是根据Chandy-Lamport算法量身定做的。简单来说就是持续创建分布式数据流及其状态的一致快照。


大数据开发知识点总结(三)


        核心思想是在 input source 端插入 barrier,控制 barrier 的同步来实现 snapshot 的备份和 exactly-once 语义。


25、Flink是如何保证Exactly-once语义的?


       Flink通过实现两阶段提交和状态保存来实现端到端的一致性语义。 分为以下几个步骤:


开始事务(beginTransaction)创建一个临时文件夹,来写把数据写入到这个文件夹里面

预提交(preCommit)将内存中缓存的数据写入文件并关闭

预提交(preCommit)将内存中缓存的数据写入文件并关闭

预提交(preCommit)将内存中缓存的数据写入文件并关闭

若失败发生在预提交成功后,正式提交前。可以根据状态来提交预提交的数据,也可删除预提交的数据

26、Flink 的 kafka 连接器有什么特别的地方?


       Flink源码中有一个独立的connector模块,所有的其他connector都依赖于此模块,Flink 在1.9版本发布的全新kafka连接器,摒弃了之前连接不同版本的kafka集群需要依赖不同版本的connector这种做法,只需要依赖一个connector即可。

     


27、说说 Flink的内存管理是如何做的?


       Flink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上。此外,Flink大量的使用了堆外内存。如果需要处理的数据超出了内存限制,则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据实现了自己的序列化框架。理论上Flink的内存管理分为三部分:


       Network Buffers:这个是在TaskManager启动的时候分配的,这是一组用于缓存网络数据的内存,每个块是32K,默认分配2048个,可以通过“taskmanager.network.numberOfBuffers”修改。


       Memory Manage pool:大量的Memory Segment块,用于运行时的算法(Sort/Join/Shuffle等),这部分启动的时候就会分配。下面这段代码,根据配置文件中的各种参数来计算内存的分配方法。(heap or off-heap,这个放到下节谈),内存的分配支持预分配和lazy load,默认懒加载的方式。


       User Code,这部分是除了Memory Manager之外的内存用于User code和TaskManager本身的数据结构。


28、说说 Flink的序列化如何做的?


       Java本身自带的序列化和反序列化的功能,但是辅助信息占用空间比较大,在序列化对象时记录了过多的类信息。Apache Flink摒弃了Java原生的序列化方法,以独特的方式处理数据类型和序列化,包含自己的类型描述符,泛型类型提取和类型序列化框架。TypeInformation 是所有类型描述符的基类。它揭示了该类型的一些基本属性,并且可以生成序列化器。TypeInformation 支持以下几种类型:


       BasicTypeInfo: 任意Java 基本类型或 String 类型


       BasicArrayTypeInfo: 任意Java基本类型数组或 String 数组


       WritableTypeInfo: 任意 Hadoop Writable 接口的实现类


       WritableTypeInfo: 任意 Hadoop Writable 接口的实现类


       CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)


       CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)


       GenericTypeInfo: 任意无法匹配之前几种类型的类


       针对前六种类型数据集,Flink皆可以自动生成对应的TypeSerializer,能非常高效地对数据集进行序列化和反序列化。


29、Flink中的Window出现了数据倾斜,你有什么解决办法?


       window产生数据倾斜指的是数据在不同的窗口内堆积的数据量相差过多。本质上产生这种情况的原因是数据源头发送的数据量速度不同导致的。出现这种情况一般通过两种方式来解决:


在数据进入窗口前做预聚合

重新设计窗口聚合的key

30、Flink中在使用聚合函数 GroupBy、Distinct、KeyBy 等函数时出现数据热点该如何解决?


       数据倾斜和数据热点是所有大数据框架绕不过去的问题。处理这类问题主要从3个方面入手:


在业务上规避这类问题

       例如一个假设订单场景,北京和上海两个城市订单量增长几十倍,其余城市的数据量不变。这时候我们在进行聚合的时候,北京和上海就会出现数据堆积,我们可以单独数据北京和上海的数据。


Key的设计上

       把热key进行拆分,比如上个例子中的北京和上海,可以把北京和上海按照地区进行拆分聚合。


参数设置

       Flink 1.9.0 SQL(Blink Planner) 性能优化中一项重要的改进就是升级了微批模型,即 MiniBatch。原理是缓存一定的数据后再触发处理,以减少对State的访问,从而提升吞吐和减少数据的输出量。


31、Flink任务延迟高,想解决这个问题,你会如何入手?


       在Flink的后台任务管理中,我们可以看到Flink的哪个算子和task出现了反压。最主要的手段是资源调优和算子调优。资源调优即是对作业中的Operator的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包括:并行度的设置,State的设置,checkpoint的设置。


   

32、Flink是如何处理反压的?


       Flink 内部是基于 producer-consumer 模型来进行消息传递的,Flink的反压设计也是基于这个模型。Flink 使用了高效有界的分布式阻塞队列,就像 Java 通用的阻塞队列(BlockingQueue)一样。下游消费者消费变慢,上游就会受到阻塞。


33、Flink的反压和Strom有哪些不同?


       Storm 是通过监控 Bolt 中的接收队列负载情况,如果超过高水位值就会将反压信息写到 Zookeeper ,Zookeeper 上的 watch 会通知该拓扑的所有 Worker 都进入反压状态,最后 Spout 停止发送 tuple。Flink中的反压使用了高效有界的分布式阻塞队列,下游消费变慢会导致发送端阻塞。二者最大的区别是Flink是逐级反压,而Storm是直接从源头降速。

     

34、 Operator Chains(算子链)这个概念你了解吗?


        为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。这就是我们所说的算子链。

     

35、说说Flink1.9的新特性?


支持hive读写,支持UDF

Flink SQL TopN和GroupBy等优化

Checkpoint跟savepoint针对实际业务场景做了优化

Flink state查询

36、消费kafka数据的时候,如何处理脏数

可以在处理前加一个fliter算子,将不符合规则的数据过滤出去。


Flink高级

37、Flink Job的提交流程


       用户提交的Flink Job会被转化成一个DAG任务运行,分别是:StreamGraph、JobGraph、ExecutionGraph,Flink中JobManager与TaskManager,JobManager与Client的交互是基于Akka工具包的,是通过消息驱动。整个Flink Job的提交还包含着ActorSystem的创建,JobManager的启动,TaskManager的启动和注册。


38、Flink所谓"三层图"结构是哪几个"图"?


       一个Flink任务的DAG生成计算图大致经历以下三个过程:


       StreamGraph 最接近代码所表达的逻辑层面的计算拓扑结构,按照用户代码的执行顺序向StreamExecutionEnvironment添加StreamTransformation构成流式图。


       JobGraph 从StreamGraph生成,将可以串联合并的节点进行合并,设置节点之间的边,安排资源共享slot槽位和放置相关联的节点,上传任务所需的文件,设置检查点配置等。相当于经过部分初始化和优化处理的任务图。


       ExecutionGraph 由JobGraph转换而来,包含了任务具体执行所需的内容,是最贴近底层实现的执行图。


39、JobManger在集群中扮演了什么角色?


       JobManager 负责整个 Flink 集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中 TaskManager 上 TaskSlot 的使用情况,为提交的应用分配相应的 TaskSlot 资源并命令 TaskManager 启动从客户端中获取的应用。JobManager 相当于整个集群的 Master 节点,且整个集群有且只有一个活跃的 JobManager ,负责整个集群的任务管理和资源管理。JobManager 和 TaskManager 之间通过 Actor System 进行通信,获取任务执行的情况并通过 Actor System 将应用的任务执行情况发送给客户端。同时在任务执行的过程中,Flink JobManager 会触发 Checkpoint 操作,每个 TaskManager 节点 收到 Checkpoint 触发指令后,完成 Checkpoint 操作,所有的 Checkpoint 协调过程都是在 Fink JobManager 中完成。当任务完成后,Flink 会将任务执行的信息反馈给客户端,并且释放掉 TaskManager 中的资源以供下一次提交任务使用。


40、JobManger在集群启动过程中起到什么作用?


       JobManager的职责主要是接收Flink作业,调度Task,收集作业状态和管理TaskManager。它包含一个Actor,并且做如下操作:


       RegisterTaskManager: 它由想要注册到JobManager的TaskManager发送。注册成功会通过AcknowledgeRegistration消息进行Ack。


       SubmitJob: 由提交作业到系统的Client发送。提交的信息是JobGraph形式的作业描述信息。


       CancelJob: 请求取消指定id的作业。成功会返回CancellationSuccess,否则返回CancellationFailure。


       UpdateTaskExecutionState: 由TaskManager发送,用来更新执行节点(ExecutionVertex)的状态。成功则返回true,否则返回false。


       RequestNextInputSplit: TaskManager上的Task请求下一个输入split,成功则返回NextInputSplit,否则返回null。


       JobStatusChanged: 它意味着作业的状态(RUNNING, CANCELING, FINISHED,等)发生变化。这个消息由ExecutionGraph发送。


41、TaskManager在集群中扮演了什么角色?


       TaskManager 相当于整个集群的 Slave 节点,负责具体的任务执行和对应任务在每个节点上的资源申请和管理。客户端通过将编写好的 Flink 应用编译打包,提交到 JobManager,然后 JobManager 会根据已注册在 JobManager 中 TaskManager 的资源情况,将任务分配给有资源的 TaskManager节点,然后启动并运行任务。TaskManager 从 JobManager 接收需要部署的任务,然后使用 Slot 资源启动 Task,建立数据接入的网络连接,接收数据并开始数据处理。同时 TaskManager 之间的数据交互都是通过数据流的方式进行的。可以看出,Flink 的任务运行其实是采用多线程的方式,这和 MapReduce 多 JVM 进行的方式有很大的区别,Flink 能够极大提高 CPU 使用效率,在多个任务和 Task 之间通过 TaskSlot 方式共享系统资源,每个 TaskManager 中通过管理多个 TaskSlot 资源池进行对资源进行有效管理。


42、TaskManager在集群启动过程中起到什么作用?


       TaskManager的启动流程较为简单:


        启动类:org.apache.flink.runtime.taskmanager.TaskManager


       核心启动方法 : selectNetworkInterfaceAndRunTaskManager


       启动后直接向JobManager注册自己,注册完成后,进行部分模块的初始化。


43、Flink 计算资源的调度是如何实现的?


       TaskManager中最细粒度的资源是Task slot,代表了一个固定大小的资源子集,每个TaskManager会将其所占有的资源平分给它的slot。


       通过调整 task slot 的数量,用户可以定义task之间是如何相互隔离的。每个 TaskManager 有一个slot,也就意味着每个task运行在独立的 JVM 中。每个 TaskManager 有多个slot的话,也就是说多个task运行在同一个JVM中。


       通过调整 task slot 的数量,用户可以定义task之间是如何相互隔离的。每个 TaskManager 有一个slot,也就意味着每个task运行在独立的 JVM 中。每个 TaskManager 有多个slot的话,也就是说多个task运行在同一个JVM中。

大数据开发知识点总结(三)

44、简述Flink的数据抽象及数据交换过程?


       Flink 为了避免JVM的固有缺陷例如java对象存储密度低,FGC影响吞吐和响应等,实现了自主管理内存。MemorySegment就是Flink的内存抽象。默认情况下,一个MemorySegment可以被看做是一个32kb大的内存块的抽象。这块内存既可以是JVM里的一个byte[],也可以是堆外内存(DirectByteBuffer)。在MemorySegment这个抽象之上,Flink在数据从operator内的数据对象在向TaskManager上转移,预备被发给下个节点的过程中,使用的抽象或者说内存对象是Buffer。对接从Java对象转为Buffer的中间对象是另一个抽象StreamRecord。

     


45、Flink 中的分布式快照机制是如何实现的?


       Flink的容错机制的核心部分是制作分布式数据流和操作算子状态的一致性快照。 这些快照充当一致性checkpoint,系统可以在发生故障时回滚。 Flink用于制作这些快照的机制在“分布式数据流的轻量级异步快照”中进行了描述。 它受到分布式快照的标准Chandy-Lamport算法的启发,专门针对Flink的执行模型而定制。

大数据开发知识点总结(三)


        barriers在数据流源处被注入并行数据流中。快照n的barriers被插入的位置(我们称之为Sn)是快照所包含的数据在数据源中最大位置。例如,在Apache Kafka中,此位置将是分区中最后一条记录的偏移量。 将该位置Sn报告给checkpoint协调器(Flink的JobManager)。然后barriers向下游流动。当一个中间操作算子从其所有输入流中收到快照n的barriers时,它会为快照n发出barriers进入其所有输出流中。 一旦sink操作算子(流式DAG的末端)从其所有输入流接收到barriers n,它就向checkpoint协调器确认快照n完成。在所有sink确认快照后,意味快照着已完成。一旦完成快照n,job将永远不再向数据源请求Sn之前的记录,因为此时这些记录(及其后续记录)将已经通过整个数据流拓扑,也即是已经被处理结束。

     


46、简单说说FlinkSQL是如何实现的?


       Flink 将 SQL 校验、SQL 解析以及 SQL 优化交给了Apache Calcite。Calcite 在其他很多开源项目里也都应用到了,譬如 Apache Hive, Apache Drill, Apache Kylin, Cascading。Calcite 在新的架构中处于核心的地位,如下图所示。

大数据开发知识点总结(三)

        构建抽象语法树的事情交给了 Calcite 去做。SQL query 会经过 Calcite 解析器转变成 SQL 节点树,通过验证后构建成 Calcite 的抽象语法树(也就是图中的 Logical Plan)。另一边,Table API 上的调用会构建成 Table API 的抽象语法树,并通过 Calcite 提供的 RelBuilder 转变成 Calcite 的抽象语法树。然后依次被转换成逻辑执行计划和物理执行计划。在提交任务后会分发到各个 TaskManager 中运行,在运行时会使用 Janino 编译器编译代码后运行。


本段参考文章:大数据面试杀招 | Flink,大数据时代的“王者”_Alice菌的博客


Flink面试:

史上最全干货!Flink面试大全总结(全文6万字、110个知识点、160张图)

史上最全Flink面试大全总结

https://mp.weixin.qq.com/s/fCXwf9r3wP4rplDhM_fuvA

十二、大数据优秀博客总结:

Alice菌的博客_大数据梦想家_CSDN博客-Hadoop,Scala,大数据实战项目领域博主


yuan_more的博客_五分钟学大数据_CSDN博客-大数据,hive,大数据面试领域博主


王傲旗的大数据之路_大数据面试宝典_CSDN博客-大数据面试,大数据,Scala领域博主


是故事啊~关注我~_大数据学习僧_CSDN博客-Hive,Mysql,Hadoop领域博主


珞沫的博客_CSDN博客-leetcode,数据结构与算法,Hadoop领域博主


不温卜火_CSDN博客-Hadoop,Spark,Hive领域博主


我的祖传代码_大数据私房菜_CSDN博客-数据仓库,Hive,Spark领域博主


欢迎分享,(联系QQ/微信:337407980)

打赏

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,您说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

评论