Apache Arrow是是各種大數據工具(包括BigQuery)使用的一種流行格式,它是平面和分層數據的存儲格式。它是一種加快應用程序內存密集型。
數據處理和數據科學(xué)領(lǐng)域中的常用庫: 。諸如Apache Parquet,Apache Spark,pandas之類(lèi)的開(kāi)放源代碼項目以及許多商業(yè)或封閉源代碼服務(wù)都使用Arrow。它提供以下功能:
讓我們看一看在A(yíng)rrow出現之前事物是如何工作的:
我們可以看到,為了使Spark從Parquet文件中讀取數據,我們需要以Parquet格式讀取和反序列化數據。這要求我們通過(guò)將數據加載到內存中來(lái)制作數據的完整副本。首先,我們將數據讀入內存緩沖區,然后使用Parquet的轉換方法將數據(例如字符串或數字)轉換為我們的編程語(yǔ)言的表示形式。這是必需的,因為Parquet表示的數字與Python編程語(yǔ)言表示的數字不同。
由于許多原因,這對于性能來(lái)說(shuō)是一個(gè)很大的問(wèn)題:
現在,讓我們看一下Apache Arrow如何改進(jìn)這一點(diǎn):
Arrow無(wú)需復制和轉換數據,而是了解如何直接讀取和操作數據。為此,Arrow社區定義了一種新的文件格式以及直接對序列化數據起作用的操作??梢灾苯訌拇疟P(pán)讀取此數據格式,而無(wú)需將其加載到內存中并轉換/反序列化數據。當然,部分數據仍將被加載到RAM中,但您的數據不必放入內存中。Arrow使用其文件的內存映射功能,僅在必要和可能的情況下將盡可能多的數據加載到內存中。
Apache Arrow支持以下語(yǔ)言:
Arrow首先是提供用于內存計算的列式數據結構的庫,可以將任何數據解壓縮并解碼為Arrow柱狀數據結構,以便隨后可以對解碼后的數據進(jìn)行內存內分析。Arrow列格式具有一些不錯的屬性:隨機訪(fǎng)問(wèn)為O(1),每個(gè)值單元格在內存中的前一個(gè)和后一個(gè)相鄰,因此進(jìn)行迭代非常有效。
Apache Arrow定義了一種二進(jìn)制“序列化”協(xié)議,用于安排Arrow列數組的集合(稱(chēng)為“記錄批處理”),該數組可用于消息傳遞和進(jìn)程間通信。您可以將協(xié)議放在任何地方,包括磁盤(pán)上,以后可以對其進(jìn)行內存映射或讀入內存并發(fā)送到其他地方。
Arrow協(xié)議的設計目的是使您可以“映射”一個(gè)Arrow數據塊而不進(jìn)行任何反序列化,因此對磁盤(pán)上的Arrow協(xié)議數據執行分析可以使用內存映射并有效地支付零成本。該協(xié)議用于很多事情,例如Spark SQL和Python之間的流數據,用于針對Spark SQL數據塊運行pandas函數,這些被稱(chēng)為“ pandas udfs”。
Arrow是為內存而設計的(但是您可以將其放在磁盤(pán)上,然后再進(jìn)行內存映射)。它們旨在相互兼容,并在應用程序中一起使用,而其競爭對手Apache Parquet文件是為磁盤(pán)存儲而設計的。
優(yōu)點(diǎn):Apache Arrow為平面和分層數據定義了一種獨立于語(yǔ)言的列式存儲格式,該格式組織為在CPU和GPU等現代硬件上進(jìn)行高效的分析操作而組織。Arrow存儲器格式還支持零拷貝讀取,以實(shí)現閃電般的數據訪(fǎng)問(wèn),而無(wú)需序列化開(kāi)銷(xiāo)。
導入庫:
<dependency> <groupId>org.apache.arrow</groupId> <artifactId>arrow-memory-netty</artifactId> <version>${arrow.version}</version> </dependency> <dependency> <groupId>org.apache.arrow</groupId> <artifactId>arrow-vector</artifactId> <version>${arrow.version}</version> </dependency>
在開(kāi)始之前,必須了解對于A(yíng)rrow的讀/寫(xiě)操作,使用了字節緩沖區。諸如讀取和寫(xiě)入之類(lèi)的操作是字節的連續交換。為了提高效率,Arrow附帶了一個(gè)緩沖區分配器,該緩沖區分配器可以具有一定的大小,也可以具有自動(dòng)擴展功能。支持分配管理的庫是arrow-memory-netty和arrow-memory-unsafe。我們這里使用netty。
用Arrow存儲數據需要一個(gè)模式,模式可以通過(guò)編程定義:
package com.gkatzioura.arrow; import java.io.IOException; import java.util.List; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.arrow.vector.types.pojo.Schema; public class SchemaFactory { public static Schema DEFAULT_SCHEMA = createDefault(); public static Schema createDefault() { var strField = new Field("col1", FieldType.nullable(new ArrowType.Utf8()), null); var intField = new Field("col2", FieldType.nullable(new ArrowType.Int(32, true)), null); return new Schema(List.of(strField, intField)); } public static Schema schemaWithChildren() { var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null); var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null); var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency)); return new Schema(List.of(itemField)); } public static Schema fromJson(String jsonString) { try { return Schema.fromJSON(jsonString); } catch (IOException e) { throw new ArrowExampleException(e); } } }
他們也有一個(gè)可解析的json表示形式:
{ "fields" : [ { "name" : "col1", "nullable" : true, "type" : { "name" : "utf8" }, "children" : [ ] }, { "name" : "col2", "nullable" : true, "type" : { "name" : "int", "bitWidth" : 32, "isSigned" : true }, "children" : [ ] } ] }
另外,就像Avro一樣,您可以在字段上設計復雜的架構和嵌入式值:
public static Schema schemaWithChildren() { var amount = new Field("amount", FieldType.nullable(new ArrowType.Decimal(19,4,128)), null); var currency = new Field("currency",FieldType.nullable(new ArrowType.Utf8()), null); var itemField = new Field("item", FieldType.nullable(new ArrowType.Utf8()), List.of(amount,currency)); return new Schema(List.of(itemField)); }
基于上面的的Schema,我們將為我們的類(lèi)創(chuàng )建一個(gè)DTO:
package com.gkatzioura.arrow; import lombok.Builder; import lombok.Data; @Data @Builder public class DefaultArrowEntry { private String col1; private Integer col2; }
我們的目標是將這些Java對象轉換為Arrow字節流。
1. 使用分配器創(chuàng )建
這些緩沖區是 的 。您確實(shí)需要釋放所使用的內存,但是對于庫用戶(hù)而言,這是通過(guò)在分配器上執行 操作來(lái)完成的。在我們的例子中,我們的類(lèi)將實(shí)現 ,該接口將執行分配器關(guān)閉操作。
通過(guò)使用流api,數據將被流傳輸到使用Arrow格式提交的OutPutStream:
package com.gkatzioura.arrow; import java.io.Closeable; import java.io.IOException; import java.nio.channels.WritableByteChannel; import java.util.List; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.arrow.vector.util.Text; import static com.gkatzioura.arrow.SchemaFactory.DEFAULT_SCHEMA; public class DefaultEntriesWriter implements Closeable { private final RootAllocator rootAllocator; private final VectorSchemaRoot vectorSchemaRoot;//向量分配器創(chuàng )建: public DefaultEntriesWriter() { rootAllocator = new RootAllocator(); vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator); } public void write(List<DefaultArrowEntry> defaultArrowEntries, int batchSize, WritableByteChannel out) { if (batchSize <= 0) { batchSize = defaultArrowEntries.size(); } DictionaryProvider.MapDictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider(); try(ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, out)) { writer.start(); VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0); IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1); childVector1.reset(); childVector2.reset(); boolean exactBatches = defaultArrowEntries.size()%batchSize == 0; int batchCounter = 0; for(int i=0; i < defaultArrowEntries.size(); i++) { childVector1.setSafe(batchCounter, new Text(defaultArrowEntries.get(i).getCol1())); childVector2.setSafe(batchCounter, defaultArrowEntries.get(i).getCol2()); batchCounter++; if(batchCounter == batchSize) { vectorSchemaRoot.setRowCount(batchSize); writer.writeBatch(); batchCounter = 0; } } if(!exactBatches) { vectorSchemaRoot.setRowCount(batchCounter); writer.writeBatch(); } writer.end(); } catch (IOException e) { throw new ArrowExampleException(e); } } @Override public void close() throws IOException { vectorSchemaRoot.close(); rootAllocator.close(); } }
為了在A(yíng)rrow上顯示批處理的支持,已在函數中實(shí)現了簡(jiǎn)單的批處理算法。對于我們的示例,只需考慮將數據分批寫(xiě)入。
讓我們深入了解上面代碼功能:
向量分配器創(chuàng )建:
public DefaultEntriesToBytesConverter() { rootAllocator = new RootAllocator(); vectorSchemaRoot = VectorSchemaRoot.create(DEFAULT_SCHEMA, rootAllocator); }
然后在寫(xiě)入流時(shí),實(shí)現并啟動(dòng)了Arrow流編寫(xiě)器
ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, dictProvider, Channels.newChannel(out)); writer.start();
我們將數據填充向量,然后還重置它們,但讓預分配的緩沖區 存在 :
VarCharVector childVector1 = (VarCharVector) vectorSchemaRoot.getVector(0); IntVector childVector2 = (IntVector) vectorSchemaRoot.getVector(1); childVector1.reset(); childVector2.reset();
寫(xiě)入數據時(shí),我們使用 setSafe 操作。如果需要分配更多的緩沖區,應采用這種方式。對于此示例,此操作在每次寫(xiě)入時(shí)都完成,但是在考慮了所需的操作和緩沖區大小后可以避免:
childVector1.setSafe(i, new Text(defaultArrowEntries.get(i).getCol1())); childVector2.setSafe(i, defaultArrowEntries.get(i).getCol2());
然后,將批處理寫(xiě)入流中:
vectorSchemaRoot.setRowCount(batchSize); writer.writeBatch();
最后但并非最不重要的一點(diǎn)是,我們關(guān)閉了writer:
@Override public void close() throws IOException { vectorSchemaRoot.close(); rootAllocator.close(); }
以上就是JVM上高性能數據格式庫包Apache Arrow入門(mén)和架構詳解(Gkatziouras)的詳細內容,更多關(guān)于A(yíng)pache Arrow入門(mén)的資料請關(guān)注腳本之家其它相關(guān)文章!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。
Copyright ? 2009-2022 56dr.com. All Rights Reserved. 特網(wǎng)科技 特網(wǎng)云 版權所有 特網(wǎng)科技 粵ICP備16109289號
域名注冊服務(wù)機構:阿里云計算有限公司(萬(wàn)網(wǎng)) 域名服務(wù)機構:煙臺帝思普網(wǎng)絡(luò )科技有限公司(DNSPod) CDN服務(wù):阿里云計算有限公司 百度云 中國互聯(lián)網(wǎng)舉報中心 增值電信業(yè)務(wù)經(jīng)營(yíng)許可證B2
建議您使用Chrome、Firefox、Edge、IE10及以上版本和360等主流瀏覽器瀏覽本網(wǎng)站