- 資訊首頁(yè) > 開(kāi)發(fā)技術(shù) >
- 詳解大數據處理引擎Flink內存管理
Flink可以使用堆內和堆外內存,內存模型如圖所示:
flink使用內存劃分為堆內內存和堆外內存。按照用途可以劃分為task所用內存,network memory、managed memory、以及framework所用內存,其中task network managed所用內存計入slot內存。framework為taskmanager公用。
堆內內存包含用戶(hù)代碼所用內存、heapstatebackend、框架執行所用內存。
堆外內存是未經(jīng)jvm虛擬化的內存,直接映射到操作系統的內存地址,堆外內存包含框架執行所用內存,jvm堆外內存、Direct、native等。
Direct memory內存可用于網(wǎng)絡(luò )傳輸緩沖。network memory屬于direct memory的范疇,flink可以借助于此進(jìn)行zero copy,從而減少內核態(tài)到用戶(hù)態(tài)copy次數,從而進(jìn)行更高效的io操作。
jvm metaspace存放jvm加載的類(lèi)的元數據,加載的類(lèi)越多,需要的空間越大,overhead用于jvm的其他開(kāi)銷(xiāo),如native memory、code cache、thread stack等。
Managed Memory主要用于RocksDBStateBackend和批處理算子,也屬于native memory的范疇,其中rocksdbstatebackend對應rocksdb,rocksdb基于lsm數據結構實(shí)現,每個(gè)state對應一個(gè)列族,占有獨立的writebuffer,rocksdb占用native內存大小為 blockCahe + writebufferNum * writeBuffer + index ,同時(shí)堆外內存是進(jìn)程之間共享的,jvm虛擬化大量heap內存耗時(shí)較久,使用堆外內存的話(huà)可以有效的避免該環(huán)節。但堆外內存也有一定的弊端,即監控調試使用相對復雜,對于生命周期較短的segment使用堆內內存開(kāi)銷(xiāo)更低,flink在一些情況下,直接操作二進(jìn)制數據,避免一些反序列化帶來(lái)的開(kāi)銷(xiāo)。如果需要處理的數據超出了內存限制,則會(huì )將部分數據存儲到硬盤(pán)上。
類(lèi)似于OS中的page機制,flink模擬了操作系統的機制,通過(guò)page來(lái)管理內存,flink對應page的數據結構為dataview和MemorySegment,memorysegment是flink內存分配的最小單位,默認32kb,其可以在堆上也可以在堆外,flink通過(guò)MemorySegment的數據結構來(lái)訪(fǎng)問(wèn)堆內堆外內存,借助于flink序列化機制(序列化機制會(huì )在下一小節講解),memorysegment提供了對二進(jìn)制數據的讀取和寫(xiě)入的方法,flink使用datainputview和dataoutputview進(jìn)行memorysegment的二進(jìn)制的讀取和寫(xiě)入,flink可以通過(guò)HeapMemorySegment 管理堆內內存,通過(guò)HybridMemorySegment來(lái)管理堆內和堆外內存,MemorySegment管理jvm堆內存時(shí),其定義一個(gè)字節數組的引用指向內存端,基于該內部字節數組的引用進(jìn)行操作的HeapMemorySegment。
public abstract class MemorySegment { /** * The heap byte array object relative to which we access the memory. * 如果為堆內存,則指向訪(fǎng)問(wèn)的內存的引用,否則若內存為非堆內存,則為null * <p>Is non-<tt>null</tt> if the memory is on the heap, and is <tt>null</tt>, if the memory is * off the heap. If we have this buffer, we must never void this reference, or the memory * segment will point to undefined addresses outside the heap and may in out-of-order execution * cases cause segmentation faults. */ protected final byte[] heapMemory; /** * The address to the data, relative to the heap memory byte array. If the heap memory byte * array is <tt>null</tt>, this becomes an absolute memory address outside the heap. * 字節數組對應的相對地址 */ protected long address; }
HeapMemorySegment用來(lái)分配堆上內存。
public final class HeapMemorySegment extends MemorySegment { /** * An extra reference to the heap memory, so we can let byte array checks fail by the built-in * checks automatically without extra checks. * 字節數組的引用指向該內存段 */ private byte[] memory; public void free() { super.free(); this.memory = null; } public final void get(DataOutput out, int offset, int length) throws IOException { out.write(this.memory, offset, length); } }
HybridMemorySegment即支持onheap和offheap內存,flink通過(guò)jvm的unsafe操作,如果對象o不為null,為onheap的場(chǎng)景,并且后面的地址或者位置是相對位置,那么會(huì )直接對當前對象(比如數組)的相對位置進(jìn)行操作。如果對象o為null,操作的內存塊不是JVM堆內存,為off-heap的場(chǎng)景,并且后面的地址是某個(gè)內存塊的絕對地址,那么這些方法的調用也相當于對該內存塊進(jìn)行操作。
public final class HybridMemorySegment extends MemorySegment { @Override public ByteBuffer wrap(int offset, int length) { if (address <= addressLimit) { if (heapMemory != null) { return ByteBuffer.wrap(heapMemory, offset, length); } else { try { ByteBuffer wrapper = offHeapBuffer.duplicate(); wrapper.limit(offset + length); wrapper.position(offset); return wrapper; } catch (IllegalArgumentException e) { throw new IndexOutOfBoundsException(); } } } else { throw new IllegalStateException("segment has been freed"); } } }
flink通過(guò)MemorySegmentFactory來(lái)創(chuàng )建memorySegment,memorySegment是flink內存分配的最小單位。對于跨memorysegment的數據方位,flink抽象出一個(gè)訪(fǎng)問(wèn)視圖,數據讀取datainputView,數據寫(xiě)入dataoutputview。
/** * This interface defines a view over some memory that can be used to sequentially read the contents of the memory. * The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}. */ @Public public interface DataInputView extends DataInput { private MemorySegment[] memorySegments; // view持有的MemorySegment的引用, 該組memorysegment可以視為一個(gè)內存頁(yè), flink可以順序讀取memorysegmet中的數據 /** * Reads up to {@code len} bytes of memory and stores it into {@code b} starting at offset {@code off}. * It returns the number of read bytes or -1 if there is no more data left. * @param b byte array to store the data to * @param off offset into byte array * @param len byte length to read * @return the number of actually read bytes of -1 if there is no more data left */ int read(byte[] b, int off, int len) throws IOException; }
dataoutputview是數據寫(xiě)入的視圖,outputview持有多個(gè)memorysegment的引用,flink可以順序的寫(xiě)入segment。
/** * This interface defines a view over some memory that can be used to sequentially write contents to the memory. * The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}. */ @Public public interface DataOutputView extends DataOutput { private final List<MemorySegment> memory; // memorysegment的引用 /** * Copies {@code numBytes} bytes from the source to this view. * @param source The source to copy the bytes from. * @param numBytes The number of bytes to copy. void write(DataInputView source, int numBytes) throws IOException; }
上一小節中講到的managedmemory內存部分,flink使用memorymanager來(lái)管理該內存,managedmemory只使用堆外內存,主要用于批處理中的sorting、hashing、以及caching(社區消息,未來(lái)流處理也會(huì )使用到該部分),在流計算中作為rocksdbstatebackend的部分內存。memeorymanager通過(guò)memorypool來(lái)管理memorysegment。
/** * The memory manager governs the memory that Flink uses for sorting, hashing, caching or off-heap state backends * (e.g. RocksDB). Memory is represented either in {@link MemorySegment}s of equal size or in reserved chunks of certain * size. Operators allocate the memory either by requesting a number of memory segments or by reserving chunks. * Any allocated memory has to be released to be reused later. * <p>The memory segments are represented as off-heap unsafe memory regions (both via {@link HybridMemorySegment}). * Releasing a memory segment will make it re-claimable by the garbage collector, but does not necessarily immediately * releases the underlying memory. */ public class MemoryManager { /** * Allocates a set of memory segments from this memory manager. * <p>The total allocated memory will not exceed its size limit, announced in the constructor. * @param owner The owner to associate with the memory segment, for the fallback release. * @param target The list into which to put the allocated memory pages. * @param numberOfPages The number of pages to allocate. * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount * of memory pages any more. */ public void allocatePages( Object owner, Collection<MemorySegment> target, int numberOfPages) throws MemoryAllocationException { } private static void freeSegment(MemorySegment segment, @Nullable Collection<MemorySegment> segments) { segment.free(); if (segments != null) { segments.remove(segment); } } /** * Frees this memory segment. * <p>After this operation has been called, no further operations are possible on the memory * segment and will fail. The actual memory (heap or off-heap) will only be released after this * memory segment object has become garbage collected. */ public void free() { // this ensures we can place no more data and trigger // the checks for the freed segment address = addressLimit + 1; } }
對于上一小節中提到的NetWorkMemory的內存,flink使用networkbuffer做了一層buffer封裝。buffer的底層也是memorysegment,flink通過(guò)bufferpool來(lái)管理buffer,每個(gè)taskmanager都有一個(gè)netwokbufferpool,該tm上的各個(gè)task共享該networkbufferpool,同時(shí)task對應的localbufferpool所需的內存需要從networkbufferpool申請而來(lái),它們都是flink申請的堆外內存。
上游算子向resultpartition寫(xiě)入數據時(shí),申請buffer資源,使用bufferbuilder將數據寫(xiě)入memorysegment,下游算子從resultsubpartition消費數據時(shí),利用bufferconsumer從memorysegment中讀取數據,bufferbuilder與bufferconsumer一一對應。同時(shí)這一流程也和flink的反壓機制相關(guān)。如圖
/** * A buffer pool used to manage a number of {@link Buffer} instances from the * {@link NetworkBufferPool}. * <p>Buffer requests are mediated to the network buffer pool to ensure dead-lock * free operation of the network stack by limiting the number of buffers per * local buffer pool. It also implements the default mechanism for buffer * recycling, which ensures that every buffer is ultimately returned to the * network buffer pool. * <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to * match its new size. */ class LocalBufferPool implements BufferPool { @Nullable private MemorySegment requestMemorySegment(int targetChannel) throws IOException { MemorySegment segment = null; synchronized (availableMemorySegments) { returnExcessMemorySegments(); if (availableMemorySegments.isEmpty()) { segment = requestMemorySegmentFromGlobal(); } // segment may have been released by buffer pool owner if (segment == null) { segment = availableMemorySegments.poll(); } if (segment == null) { availabilityHelper.resetUnavailable(); } if (segment != null && targetChannel != UNKNOWN_CHANNEL) { if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) { unavailableSubpartitionsCount++; availabilityHelper.resetUnavailable(); } } } return segment; } } /** * A result partition for data produced by a single task. * * <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially, * a result partition is a collection of {@link Buffer} instances. The buffers are organized in one * or more {@link ResultSubpartition} instances, which further partition the data depending on the * number of consuming tasks and the data {@link DistributionPattern}. * <p>Tasks, which consume a result partition have to request one of its subpartitions. The request * happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel}) The life-cycle of each result partition has three (possibly overlapping) phases: Produce Consume Release Buffer management State management */ public abstract class ResultPartition implements ResultPartitionWriter, BufferPoolOwner { @Override public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { checkInProduceState(); return bufferPool.requestBufferBuilderBlocking(targetChannel); } } }
flink對自身支持的基本數據類(lèi)型,實(shí)現了定制的序列化機制,flink數據集對象相對固定,可以只保存一份schema信息,從而節省存儲空間,數據序列化就是java對象和二進(jìn)制數據之間的數據轉換,flink使用TypeInformation的createSerializer接口負責創(chuàng )建每種類(lèi)型的序列化器,進(jìn)行數據的序列化反序列化,類(lèi)型信息在構建streamtransformation時(shí)通過(guò)typeextractor根據方法簽名類(lèi)信息等提取類(lèi)型信息并存儲在streamconfig中。
/** * Creates a serializer for the type. The serializer may use the ExecutionConfig * for parameterization. * 創(chuàng )建出對應類(lèi)型的序列化器 * @param config The config used to parameterize the serializer. * @return A serializer for this type. */ @PublicEvolving public abstract TypeSerializer<T> createSerializer(ExecutionConfig config); /** * A utility for reflection analysis on classes, to determine the return type of implementations of transformation * functions. */ @Public public class TypeExtractor { /** * Creates a {@link TypeInformation} from the given parameters. * If the given {@code instance} implements {@link ResultTypeQueryable}, its information * is used to determine the type information. Otherwise, the type information is derived * based on the given class information. * @param instance instance to determine type information for * @param baseClass base class of {@code instance} * @param clazz class of {@code instance} * @param returnParamPos index of the return type in the type arguments of {@code clazz} * @param <OUT> output type * @return type information */ @SuppressWarnings("unchecked") @PublicEvolving public static <OUT> TypeInformation<OUT> createTypeInfo(Object instance, Class<?> baseClass, Class<?> clazz, int returnParamPos) { if (instance instanceof ResultTypeQueryable) { return ((ResultTypeQueryable<OUT>) instance).getProducedType(); } else { return createTypeInfo(baseClass, clazz, returnParamPos, null, null); } } }
對于嵌套的數據類(lèi)型,flink從最內層的字段開(kāi)始序列化,內層序列化的結果將組成外層序列化結果,反序列時(shí),從內存中順序讀取二進(jìn)制數據,根據偏移量反序列化為java對象。flink自帶序列化機制存儲密度很高,序列化對應的類(lèi)型值即可。
flink中的table模塊在memorysegment的基礎上使用了BinaryRow的數據結構,可以更好地減少反序列化開(kāi)銷(xiāo),需要反序列化是可以只序列化相應的字段,而無(wú)需序列化整個(gè)對象。
同時(shí)你也可以注冊子類(lèi)型和自定義序列化器,對于flink無(wú)法序列化的類(lèi)型,會(huì )交給kryo進(jìn)行處理,如果kryo也無(wú)法處理,將強制使用avro來(lái)序列化,kryo序列化性能相對flink自帶序列化機制較低,開(kāi)發(fā)時(shí)可以使用env.getConfig().disableGenericTypes()來(lái)禁用kryo,盡量使用flink框架自帶的序列化器對應的數據類(lèi)型。
cpu中L1、L2、L3的緩存讀取速度比從內存中讀取數據快很多,高速緩存的訪(fǎng)問(wèn)速度是主存的訪(fǎng)問(wèn)速度的很多倍。另外一個(gè)重要的程序特性是局部性原理,程序常常使用它們最近使用的數據和指令,其中兩種局部性類(lèi)型,時(shí)間局部性指最近訪(fǎng)問(wèn)的內容很可能短期內被再次訪(fǎng)問(wèn),空間局部性是指地址相互臨近的項目很可能短時(shí)間內被再次訪(fǎng)問(wèn)。
結合這兩個(gè)特性設計緩存友好的數據結構可以有效的提升緩存命中率和本地化特性,該特性主要用于排序操作中,常規情況下一個(gè)指針指向一個(gè)<key,v>對象,排序時(shí)需要根據指針pointer獲取到實(shí)際數據,然后再進(jìn)行比較,這個(gè)環(huán)節涉及到內存的隨機訪(fǎng)問(wèn),緩存本地化會(huì )很低,使用序列化的定長(cháng)key + pointer,這樣key就會(huì )連續存儲到內存中,避免的內存的隨機訪(fǎng)問(wèn),還可以提升cpu緩存命中率。對兩條記錄進(jìn)行排序時(shí)首先比較key,如果大小不同直接返回結果,只需交換指針即可,不用交換實(shí)際數據,如果相同,則比較指針實(shí)際指向的數據。
以上就是詳解大數據處理引擎Flink內存管理的詳細內容,更多關(guān)于大數據處理引擎Flink內存管理的資料請關(guān)注腳本之家其它相關(guān)文章!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自互聯(lián)網(wǎng)轉載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權請聯(lián)系QQ:712375056 進(jìn)行舉報,并提供相關(guān)證據,一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容。
Copyright ? 2009-2021 56dr.com. All Rights Reserved. 特網(wǎng)科技 特網(wǎng)云 版權所有 珠海市特網(wǎng)科技有限公司 粵ICP備16109289號
域名注冊服務(wù)機構:阿里云計算有限公司(萬(wàn)網(wǎng)) 域名服務(wù)機構:煙臺帝思普網(wǎng)絡(luò )科技有限公司(DNSPod) CDN服務(wù):阿里云計算有限公司 中國互聯(lián)網(wǎng)舉報中心 增值電信業(yè)務(wù)經(jīng)營(yíng)許可證B2
建議您使用Chrome、Firefox、Edge、IE10及以上版本和360等主流瀏覽器瀏覽本網(wǎng)站