Flink内存模型网络缓冲器内存调优故障排除
1JVM
在大数据领域中,有很多开源框架(Hadoop、Spark、Storm)等都是基于JVM运行,可见JVM在大数据领域扮演的重要角色,所以在了解Flink内存时,我们需要先了解一下JVM。
JVM是可运行Java代码的假想计算机,包括程序计数器、Java虚拟机栈、本地方法栈、Java堆和方法区。JVM是运行在操作系统之上的,它与硬件没有直接的交互。
1。1JVM数据运行区
Java虚拟机在执行Java程序的过程中会把它在主存中管理的内存部分划分成多个区域,每个区域存放不同类型的数据。如下图所示:
1、程序计数器:是一个数据结构,用于保存当前正常执行的程序的内存地址。
Java虚拟机的多线程就是通过线程轮流切换并分配处理器时间来实现的,为了线程切换后能恢复到正确的位置,每条线程都需要一个独立的程序计数器,互不影响,该区域线程私有。
2、Java虚拟机栈:与线程生命周期相同,用于存储局部变量表,操作栈,方法返回值。局部变量表放着基本数据类型,还有对象的引用,该区域线程私有。
3、本地方法栈:跟虚拟机栈很像,不过它是为虚拟机使用到的Native方法服务,该区域线程私有。
4、方法区:储存虚拟机加载的类信息,常量,静态变量,编译后的代码,该区域线程共享。
5、Java堆:存放所有对象的实例。这一块区域在Java虚拟机启动的时候被创建,该区域被所有线程所共享,同时也是垃圾收集器的主要工作区域,因此这一部分区域除了被叫堆内内存以外,也被叫做GC堆(GarbageCollectedHeap)。
1。2堆内内存(onheapmemory)
堆内内存是Java垃圾收集器的主要工作区域,为了提高垃圾回收的效率,在堆内内存的内部又划分出了新生代、老年代、永久代。在新生代内存中又按照8:1:1的比例划分出了Eden、Survivor1、Survivor2三个区域。新生代:新生代有一个Eden区和两个Survivor区,首先将对象放入Eden区,如果空间不足就向Survivor1区上放,触发一次minorGC,如果仍然放不下就将存活的对象放入Survivor2区中,然后清空Eden和Survivor1区的内存。在某次GC过程中,如果发现仍然又放不下的对象,就将这些对象放入老年代内存里去。老年代:大对象以及长期存活的对象直接进入老年代。永久代:永久存储区是一个常驻内存区域,用于存放JDK自身所携带的Class、Interface的元数据,也就是说它存储的是运行环境必须的类信息,被装载进此区域的数据是不会被垃圾回收器回收掉的,关闭JVM才会释放此区域所占用的内存。
如果出现java。lang。OutOfMemoryError:PermGenspace,说明是Java虚拟机对永久代Perm内存设置不够。
1。3GC算法
由于堆内内存处理是容易出现问题的地方,忘记或者错误的内存回收会导致程序或系统的不稳定甚至崩溃,Java就提供GC功能自动监测对象是否超过作用域从而达到自动回收内存的目的。
关于堆内存和永久区的垃圾回收,Java提供的GC算法包含:引用计数法,标记清除算法,复制算法,标记压缩算法,分代收集算法引用计数法:引用计数器的实现很简单,对于一个对象A,只要有任何一个对象引用了A,则A的引用计数器就加1,当引用失效时,引用计数器就减1。只要对象A的引用计数器的值为0,则对象A就不可能再被使用。缺点:1、无法处理循环引用情况,会造成内存泄漏。2、对系统性能产生影响。标记清除算法:将垃圾回收分为两个阶段:标记阶段和清除阶段,首先标记出所有需要回收的对象,在标记完成后统一回收所有被标记的对象。缺点:1、效率问题,2、空间问题。标记清除之后会产生大量不连续的内存碎片,空间碎片太多会导致以后程序在运行过程中需要分配较大对象时,无法找到足够的连续内存而提前触发另一次垃圾收集动作。复制算法:将可用内存按容量划分为大小相等的两块,每次只试用其中的一块,当这一块内存用完时,将存活的对象复制到另外一块内存上面,然后清除使用内存中的所有对象。适用于初生代。标记压缩算法:首先标记出所有需要回收的对象,然后让所有存活的对象都向一端移动,然后清理掉端边界以外的内存。适用于老年代分代收集算法:初生代使用复制算法,老年代使用标记压缩算法。
1。4堆外内存(offheapmemory)
虽然Java提供了多种算法进行垃圾回收,但仍然无法彻底解决堆内内存过大带来的长时间的GC停顿的问题,以及操作系统对堆内内存不可知的问题。
基于上述问题,Java虚拟机开辟出了堆外内存(offheapmemory)。堆外内存意味着把一些对象的实例分配在Java虚拟机堆内内存以外的内存区域,这些内存直接受操作系统(而不是虚拟机)管理。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。同时因为这部分区域直接受操作系统的管理,别的进程和设备(例如GPU)可以直接通过操作系统对其进行访问,减少了从虚拟机中复制内存数据的过程。importjava。nio。ByteBuffer;importsun。nio。ch。DirectBuffer;publicclassTestDirectByBuffer{publicstaticvoidmain(String〔〕args)throwsException{while(true){ByteBufferbufferByteBuffer。allocateDirect(12810241024)}}}1234567891011
优点:可以很方便的自主开辟很大的内存空间,对大内存的伸缩性很好;减少垃圾回收带来的系统停顿时间;直接受操作系统控制,可以直接被其他进程和设备访问,减少了原本从虚拟机复制的过程;特别适合那些分配次数少,读写操作很频繁的场景。
缺点:容易出现内存泄漏,并且很难排查;堆外内存的数据结构不直观,当存储结构复杂的对象时,会浪费大量的时间对其进行串行化。
1。5堆外内存与堆内内存联系
虽然堆外内存本身不受垃圾回收算法的管辖,但是因为其是由ByteBuffer所创造出来的,因此这个buffer自身作为一个实例化的对象,其自身的信息(例如堆外内存在主存中的起始地址等信息)必须存储在堆内内存中,具体情况如下图所示。
1。6JVM内存管理缺陷
由于在JVM内存中存储大量的数据(包括缓存和高效处理)时,JVM内存会面临很多问题,包括如下:Java对象存储密度低。Java的对象在内存中存储包含3个主要部分:对象头、实例数据、对齐填充部分。例如,一个只包含boolean属性的对象占16byte:对象头占8byte,boolean属性占1byte,为了对齐达到8的倍数额外占7byte。而实际上只需要一个bit(18字节)就够了。FullGC会极大地影响性能。尤其是为了处理更大数据而开了很大内存空间的JVM来说,GC会达到秒级甚至分钟级。OOM问题影响稳定性。OutOfMemoryError是分布式计算框架经常会遇到的问题,当JVM中所有对象大小超过分配给JVM的内存大小时,就会发生OutOfMemoryError错误,导致JVM崩溃,分布式框架的健壮性和性能都会受到影响。缓存未命中问题。CPU进行计算的时候,是从CPU缓存中获取数据。现代体系的CPU会有多级缓存,而加载的时候是以CacheLine为单位加载。如果能够将对象连续存储,这样就会大大降低CacheMiss。使得CPU集中处理业务,而不是空转。2Flink内存管理
基于JVM内存存在一些问题,并且在大数据场景下,无法在内存中存储海量数据,计算效率无法提高。Flink社区采用自主内存管理设计。
Flink并不是将大量对象存在堆内存上,而是将对象都序列化到一个预分配的内存块上,这个内存块叫做MemorySegment,它代表了一段固定长度的内存(默认大小为32KB),也是Flink中最小的内存分配单元,并且提供了非常高效的读写方法,很多运算可以直接操作二进制数据,不需要反序列化即可执行。每条记录都会以序列化的形式存储在一个或多个MemorySegment中。如果需要处理的数据多于可以保存在内存中的数据,Flink的运算符会将部分数据溢出到磁盘。
2。1Flink内存模型
Flink总体内存类图如下:
主要包含JobManager内存模型和TaskManager内存模型。
2。2JobManager内存模型
FlinkJobManager内存类图如虚线部分:
在1。11中,Flink对JM端的内存配置进行了修改,使它的选项和配置方式与TM端的配置方式保持一致。
配置JobManager的总进程内存jobmanager。heap。size:1024mjobmanager。memory。process。size:4096mjobmanager。memory。heap。size:2048mjobmanager。memory。offheap。size:1536m12345678910111213141516
2。3TaskManager内存模型
TaskManager内存模型如下图所示:
TaskManager内存模型一共包含3大部分,分别为总体内存、JVMHeap堆上内存、OffHeap堆外内存等。
2。3。1总体内存
1、TotalProcessMemory:FlinkJava应用程序(包括用户代码)和JVM运行整个进程所消耗的总内存。
总进程内存(TotalProcessMemory)Flink总内存JVM元空间JVM执行开销配置项:默认size:1728mbtaskmanager。memory。process。size:1728m12
2、TotalFlinkMemory:仅FlinkJava应用程序消耗的内存,包括用户代码,但不包括JVM为其运行而分配的内存。
Flink总内存Framework堆内外task堆内外networkmanagedMemory配置项:默认size:1280mbtaskmanager。memory。flink。size:1280mb12
2。3。2JVMHeap(JVM堆上内存)
1、FrameworkHeap:框架堆内存
Flink框架本身使用的内存,即TaskManager本身所占用的堆上内存,不计入Slot的资源中。配置参数:默认128MBtaskmanager。memory。framework。heap。size128MB12
2、TaskHeap:任务堆内存
如果内存大小没有指定,它将被推导出为总Flink内存减去框架堆内存、框架堆外内存、任务堆外内存、托管内存和网络内存。
Task执行用户代码时所使用的堆上内存。配置参数:taskmanager。memory。task。heap。size12
2。3。3OffHeapMempry(JVM堆外内存)
1、Managedmemory:托管内存
由Flink管理的原生托管内存,保留用于排序、哈希表、中间结果缓存和RocksDB状态后端。
托管内存由Flink管理并分配为原生内存(堆外)。以下工作负载使用托管内存:
流式作业可以将其用于RocksDB状态后端。流和批处理作业都可以使用它进行排序、哈希表、中间结果的缓存。流作业和批处理作业都可以使用它在Python进程中执行用户定义的函数。
托管内存配置时如果两者都设置,则大小将覆盖分数。如果大小和分数均未明确配置,则将使用默认分数。配置参数:默认size:128mb,fraction:0。4taskmanager。memory。managed。size128mbtaskmanager。memory。managed。fraction0。4123
2、DirectMemory:JVM直接内存FrameworkOffHeapMemory:Flink框架堆外内存。即TaskManager本身所占用的对外内存,不计入Slot资源。配置参数:默认128MBtaskmanager。memory。framework。offheap。size128MB12TaskOffHeap:Task堆外内存。专用于Flink框架的堆外直接(或本机)内存。配置参数:默认0taskmanager。memory。task。offheap。size012NetworkMemory:网络内存。网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区。配置参数:默认min:64mb,max:1gb,fraction:0。1taskmanager。memory。network。min:64mbtaskmanager。memory。network。max:1gbtaskmanager。memory。network。fraction:0。11234
3、JVMmetaspace:JVM元空间。
FlinkJVM进程的元空间大小,默认为256MB。配置参数:默认size:256mbtaskmanage。memory。jvmmetaspace。size:256mb12
4、JVMOverhead:JVM执行开销。
JVM执行时自身所需要的内容,包括线程堆栈、IO、编译缓存等所使用的内存,这是一个上限分级成分的的总进程内存。配置参数:taskmanager。memory。jvmoverhead。min192mbtaskmanager。memory。jvmoverhead。max1gbtaskmanager。memory。jvmoverhead。fraction0。112343Flink内存数据结构
Flink的内存管理和操作系统管理内存一样,将内存划分为内存段、内存页等结构。
3。1Flink内存段
内存段在Flink内部叫MemorySegment,是Flink中最小的内存分配单元,默认大小32KB。它既可以是堆上内存(Java的byte数组),也可以是堆外内存(基于Netty的DirectByteBuffer),同时提供了对二进制数据进行读取和写入的方法。
HeapMemorySegment:用来分配堆上内存;
HybridMemorySegment:用来分配堆外内存和堆上内存;2017年以后的版本实际上只使用了HybridMemorySegment。
通过一个案例介绍Flink在序列化和反序列化过程中如何使用MemorySegment:
如上图所示,当创建一个Tuple3对象时,包含三个层面,一是int类型,一是double类型,还有一个是Person。Person对象包含两个字段,一是int型的ID,另一个是String类型的name,
(1)在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。从图中可以看到Tuple3会把int类型通过IntSerializer进行序列化操作,此时int只需要占用四个字节。
(2)Person类会被当成一个Pojo对象来进行处理,PojoSerializer序列化器会把一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相应序列化,在序列化完的结果中,可以看到所有的数据都是由MemorySegment去支持。
3。2Flink内存页
内存页是MemorySegment之上的数据访问视图,数据读取抽象为DataInputView,数据写入抽象为DataOutputView。使用时就无需关心MemorySegment的细节,该层会自动处理跨MemorySegment的读取和写入。
3。2。1DataInputView
DataInputView是从MemorySegment数据读取抽象视图,继承自java。io。DataInput。InputView中持有多个MemorySegment的引用(MemorySegment〔〕),这一组MemorySegment被视为一个内存页(Page),可以顺序读取MemorySegment中的数据。如下图为继承关系图:
3。2。2DataInputView
DataOutputView是从MemorySegment数据读取抽象视图,继承自java。io。DataOutput。OutputView中持有多个MemorySegment的引用(MemorySegment〔〕),这一组MemorySegment被视为一个内存页(Page),可以顺序地向MemorySegment中写入数据。如下图为继承关系图:
3。2。3Buffer
Buffer是具有引用计数的MemorySegment实例的包装器。用来将上游Task算子处理完毕的结果交给下游时定义的一个抽象或者内存对象。
Buffer的接口是网络层面上传输数据和事件的统一抽象,其实现类是NetworkBuffer。Flink在各个TaskManger之间传递数据时,使用的是这一层的抽象。1个NetworkBuffer中包装了一个MemorySegment。Buffer接口类图如下:
Buffer的底层是MemorySegment,Buffer申请和释放由Flink自行管理,Flink引入了引用数的概念。当有新的Buffer消费者时,引用数加1,当消费者消费完Buffer时,引用数减1,最终当引用数变为0时,就可以将Buffer释放重用了。
3。2。4Buffer资源池
Buffer资源池在Flink中叫作BufferPool。BufferPool用来管理Buffer,包含Buffer的申请、释放、销毁、可用Buffer的通知等,其实现类是LocalBufferPool,每个Task拥有自己的LocalBufferPool。
BufferPool的类体系如下:
4网络缓冲器(NetworkBuffer)
网络缓冲器(NetworkBuffer)是网络交换数据的包装,其对应于MemorySegment内存段。
NetworkBuffer,顾名思义,就是在网络传输中使用到的Buffer(实际非网络传输也会用到)。Flink经过网络传输的上下游Task的设计会比较类似生产者消费者模型。
如果没有这个缓冲区,那么生产者或消费者会消耗大量时间在等待下游拿数据和上游发数据的环节上。加上这个缓冲区,生产者和消费者解耦开,任何一方短时间内的抖动理论上对另一方的数据处理都不会产生太大影响。如下图所示:
这是对于单进程内生产者消费者模型的一个图示,事实上,如果两个Task在同一个TaskManager内,那么使用的就是上述模型,
对于不同TM内、或者需要跨网络传输的TM之间,利用生产者消费者模型来进行数据传输的原理图如下:
可以看到,在NettyServer端,buffer只存在LocalBufferPool中,subpartition自己并没有缓存buffer或者独享一部分buffer,而在接收端,channel有自己独享的一部分buffer(ExclusiveBuffers),也有一部分共享的buffer(FloatingBuffers),所以,NetworkBuffer的使用同时存在于发送端和接收端。
由此可见,TaskManager内需要的buffers数量等于这个TaskManager内的所有Task中的发送端和接收端使用到的networkbuffer总和。明确了NetworkBuffer使用的位置,我们可以结合一些参数计算出作业实际所需的NetworkBuffer数量。5Flink内存调优
了解了FlinkJobManagerMemory和TaskManagerMemory的内存模型和数据结构之后,应该针对不同的部署情况,配置不同的内存,下面我们针对不同的部署方式介绍内存如何调优。
5。1为Standalone配置内存
建议为Standalone配置Flink总内存,设置JobManager和TaskManager的flink。size大小,声明为Flink本身提供了多少内存。配置方式如下:参数配置:taskmanager。memory。flink。size:jobmanager。memory。flink。size:123
5。2为Containers(容器)配置内存
建议为容器化部署(Kubernetes或Yarn)配置总进程内存,设置process。size大小,它声明了总共应该分配多少内存给FlinkJVM进程,并对应于请求容器的大小。配置方式如下:参数配置:taskmanager。memory。process。size:jobmanager。memory。process。size:123
注意:如果你配置了总Flink内存,Flink会隐式添加JVM内存组件来推导总进程内存,并请求一个具有该推导大小的内存的容器。
警告:如果Flink或用户代码分配超出容器大小的非托管堆外(本机)内存,作业可能会失败,因为部署环境可能会杀死有问题的容器。
5。3为statebackends(状态后端)配置内存
为statebackends(状态后端)配置内存时,这仅与TaskManager相关。
在部署Flink流应用程序时,所使用的状态后端类型将决定集群的最佳内存配置。
5。3。1HashMap状态后端
运行无状态作业或使用HashMapStateBackend时,将托管内存设置为零。这将确保为JVM上的用户代码分配最大数量的堆内存。配置如下:配置参数:设置size:0taskmanager。memory。managed。size:012
5。3。2RocksDB状态后端
该EmbeddedRocksDBStateBackend使用本机内存。默认情况下,RocksDB设置为将本机内存分配限制为托管内存的大小。因此,为你的状态保留足够的托管内存非常重要。如果禁用默认的RocksDB内存控制,RocksDB分配的内存超过请求的容器大小(总进程内存)的限制,则可以在容器化部署中终止TaskManager。
5。4为batchJob(批处理作业)配置内存
为batchJob(批处理作业)配置内存时,这仅与TaskManager相关。
Flink的批处理操作符利用托管内存来更高效地运行。这样做时,可以直接对原始数据执行某些操作,而无需反序列化为Java对象。这意味着托管内存配置对应用程序的性能有实际影响。Flink将尝试分配和使用为批处理作业配置的尽可能多的托管内存,但不会超出其限制。这可以防止OutOfMemoryError’s,因为Flink准确地知道它必须利用多少内存。如果托管内存不足,Flink会优雅地溢出到磁盘。6故障排除
6。1非法配置异常
如果看到从TaskExecutorProcessUtils或JobManagerProcessUtils抛出的IllegalConfigurationException,通常表明存在无效的配置值(例如负内存大小、大于1的分数等)或配置冲突。请重新配置内存参数。
6。2Java堆空间异常
如果报OutOfMemoryError:Javaheapspace异常,通常表示JVMHeap太小。可以尝试通过增加总内存来增加JVM堆大小。也可以直接为TaskManager增加任务堆内存或为JobManager增加JVM堆内存。
还可以为TaskManagers增加框架堆内存,但只有在确定Flink框架本身需要更多内存时才应该更改此选项。
6。3直接缓冲存储器异常
如果报OutOfMemoryError:Directbuffermemory异常,通常表示JVM直接内存限制太小或存在直接内存泄漏。检查用户代码或其他外部依赖项是否使用了JVM直接内存,以及它是否被正确考虑。可以尝试通过调整直接堆外内存来增加其限制。可以参考如何为TaskManagers、JobManagers和Flink设置的JVM参数配置堆外内存。
6。4元空间异常
如果报OutOfMemoryError:Metaspace异常,通常表示JVM元空间限制配置得太小。可以尝试加大JVM元空间TaskManagers或JobManagers选项。
6。5网络缓冲区数量不足
如果报IOException:Insufficientnumberofnetworkbuffers异常,这仅与TaskManager相关。通常表示配置的网络内存大小不够大。可以尝试通过调整以下选项来增加网络内存:参数配置:taskmanager。memory。network。mintaskmanager。memory。network。maxtaskmanager。memory。network。fraction1234
6。6超出容器内存异常
这个对应5。2节为容器配置内存。如果Flink容器尝试分配超出其请求大小(Yarn或Kubernetes)的内存,这通常表明Flink没有预留足够的本机内存。当容器被部署环境杀死时,可以通过使用外部监控系统或从错误消息中观察到这一点。
如果在JobManager进程中遇到这个问题,还可以通过设置排除可能的JVMDirectMemory泄漏的选项来开启JVMDirectMemory的限制使用以下命令进行配置参数配置:jobmanager。memory。enablejvmdirectmemorylimit:12
如果使用RocksDBStateBackend,并且内存控制被禁用:可以尝试增加TaskManager的托管内存。在保存点或完整检查点期间启用内存控制和非堆内存增加,这可能是由于glibc内存分配器而发生的。
可以尝试为TaskManagers添加环境变量MALLOCARENAMAX1,或者增加JVM开销。