apache / arrow-cookbook

Apache Arrow Cookbook
https://arrow.apache.org/
Apache License 2.0
95 stars 46 forks source link

Write Arrow Objects to Parquet (Java) #315

Open pronzato opened 1 year ago

pronzato commented 1 year ago

Apologies if this is not the correct forum to ask but what's the best way to write Arrow Objects to Parquet via Java?

pronzato commented 1 year ago

Specifically I'm looking to have clients send data to Arrow Flight Server (in Java) and have it persist the data to parquet files.

lidavidm commented 1 year ago

You're best off using the JNI bindings: https://github.com/apache/arrow/blob/main/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java

CC @davisusanibar there's no cookbook example for this

pronzato commented 1 year ago

Thank you David - your help is greatly appreciated - getting closer but not fully there yet.

The specific use case I'm trying to implement is to use the Arrow JDBC adapter to convert a ResultSet to Arrow and then also persist that data to a parquet file.

So I have:

ArrowVectorIterator i = JdbcArrow.sqlToArrowVectorIterator(resultSet allocator);

But then I don't know how to create the Arrow reader needed by the DatasetFileWriter:

DatasetFileWriter.write(allocator, ?, FileFormat.PARQUET, "/tmp/foo.parquet");

On Mon, Jul 10, 2023, 7:31 AM David Li @.***> wrote:

You're best off using the JNI bindings: https://github.com/apache/arrow/blob/main/java/dataset/src/main/java/org/apache/arrow/dataset/file/DatasetFileWriter.java

CC @davisusanibar https://github.com/davisusanibar there's no cookbook example for this

— Reply to this email directly, view it on GitHub https://github.com/apache/arrow-cookbook/issues/315#issuecomment-1628780914, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACO7PHBVPZKXPATKCD5UGNTXPPRZFANCNFSM6AAAAAA2DO2TY4 . You are receiving this because you authored the thread.Message ID: @.***>

lidavidm commented 1 year ago

You'd have to subclass ArrowReader and implement a facade over the iterator yourself, in this case

pronzato commented 1 year ago

Thank you David, I managed to get it working with your guidance - looks great.

Now I'm onto the next task which is to read Parquet files remotely from HDFS but I'm getting the following exception:

java.lang.RuntimeException: Got HDFS URI but Arrow compiled without HDFS support

   at

org.apache.arrow.dataset.file.JniWrapper.makeFileSystemDatasetFactory(Native Method)

   at

org.apache.arrow.dataset.file.FileSystemDatasetFactory.createNative( FileSystemDatasetFactory.java:35)

   at org.apache.arrow.dataset.file.FileSystemDatasetFactory.<init>(

FileSystemDatasetFactory.java:31)

but according to the latest 12.0.1 online docs it says "HDFS support is included in the official Apache Arrow Java package releases and can be used directly without re-building the source code."

https://arrow.apache.org/docs/java/dataset.html#read-data-from-hdfs

Am I missing a step or is the doc incorrect and I need to rebuild the libs or is there another place to download the libs with HDFS support?

On Mon, Jul 10, 2023 at 2:41 PM David Li @.***> wrote:

You'd have to subclass ArrowReader and implement a facade over the iterator yourself, in this case

— Reply to this email directly, view it on GitHub https://github.com/apache/arrow-cookbook/issues/315#issuecomment-1629502140, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACO7PHDD6ASE43SLKC2U6KLXPREGRANCNFSM6AAAAAA2DO2TY4 . You are receiving this because you authored the thread.Message ID: @.***>

lidavidm commented 1 year ago

CC @davisusanibar

pronzato commented 1 year ago

Would you be able to confirm if we're missing a step or if the documentation is incorrect and we need to rebuild the libraries with HDFS support?

On Tue, Jul 11, 2023 at 12:35 PM David Li @.***> wrote:

CC @davisusanibar https://github.com/davisusanibar

— Reply to this email directly, view it on GitHub https://github.com/apache/arrow-cookbook/issues/315#issuecomment-1631141196, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACO7PHFQ5XZPSC4HZR36QYDXPV6DJANCNFSM6AAAAAA2DO2TY4 . You are receiving this because you authored the thread.Message ID: @.***>

davisusanibar commented 1 year ago

Hi @pronzato, let me investigate this request to ensure everything is working as expected

davisusanibar commented 1 year ago

Hi @pronzato. I appreciate you pointing out the HDFS issue. With this PR, HDFS will be enabled by default on Java Dataset module https://github.com/apache/arrow/pull/36704

As a result of the new changes, I am now able to read HDFS parquet files, but the program will not shut down for some reason.

import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;

public class ReadHdfsParquet {
    public static void main(String[] args) {
        //declare JVM environment variable: HADOOP_HOME = /Users/dsusanibar/hadoop-3.3.2
        //where to search for: lib/native/libhdfs.dylib
        String uri = "hdfs://localhost:9000/Users/dsusanibar/data4_2rg_gzip.parquet";
        ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
        try (
            BufferAllocator allocator = new RootAllocator();
            DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
            Dataset dataset = datasetFactory.finish();
            Scanner scanner = dataset.newScan(options);
            ArrowReader reader = scanner.scanBatches()
        ) {
            Schema schema = scanner.schema();
            System.out.println(schema);
            while (reader.loadNextBatch()) {
                System.out.println(reader.getVectorSchemaRoot().contentToTSVString());
                System.out.println("RowCount: " + reader.getVectorSchemaRoot().getRowCount());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
image
pronzato commented 1 year ago

Thank you David - will try this over the weekend.

Is there any possibility this would make it into 13.0.0?

On Sat, Jul 15, 2023 at 3:17 PM david dali susanibar arce < @.***> wrote:

Hi @pronzato https://github.com/pronzato. I appreciate you pointing out the HDFS issue. With this PR, HDFS will be enabled by default on Java Dataset module apache/arrow#36704 https://github.com/apache/arrow/pull/36704

As a result of the new changes, I am now able to read HDFS parquet files, but the program will not shut down for some reason.

import org.apache.arrow.dataset.file.FileFormat;import org.apache.arrow.dataset.file.FileSystemDatasetFactory;import org.apache.arrow.dataset.jni.NativeMemoryPool;import org.apache.arrow.dataset.scanner.ScanOptions;import org.apache.arrow.dataset.scanner.Scanner;import org.apache.arrow.dataset.source.Dataset;import org.apache.arrow.dataset.source.DatasetFactory;import org.apache.arrow.memory.BufferAllocator;import org.apache.arrow.memory.RootAllocator;import org.apache.arrow.vector.ipc.ArrowReader;import org.apache.arrow.vector.types.pojo.Schema; public class ReadHdfsParquet { public static void main(String[] args) { //declare JVM environment variable: HADOOP_HOME = /Users/dsusanibar/hadoop-3.3.2 //where to search for: lib/native/libhdfs.dylib String uri = "hdfs://localhost:9000/Users/dsusanibar/data4_2rg_gzip.parquet"; ScanOptions options = new ScanOptions(/batchSize/ 32768); try ( BufferAllocator allocator = new RootAllocator(); DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri); Dataset dataset = datasetFactory.finish(); Scanner scanner = dataset.newScan(options); ArrowReader reader = scanner.scanBatches() ) { Schema schema = scanner.schema(); System.out.println(schema); while (reader.loadNextBatch()) { System.out.println(reader.getVectorSchemaRoot().contentToTSVString()); System.out.println("RowCount: " + reader.getVectorSchemaRoot().getRowCount()); } } catch (Exception e) { e.printStackTrace(); } } }

[image: image] https://user-images.githubusercontent.com/4554485/253766626-8e3d4f72-de1c-471f-93a7-fdbd8d9060ed.png

— Reply to this email directly, view it on GitHub https://github.com/apache/arrow-cookbook/issues/315#issuecomment-1636861035, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACO7PHBDGVTO7JKVLJTZOADXQLUFVANCNFSM6AAAAAA2DO2TY4 . You are receiving this because you were mentioned.Message ID: @.***>

davisusanibar commented 1 year ago

Hi @pronzato 13.0.0 version is on [VOTE] Release Apache Arrow 13.0.0 - RC0. Let push it into the next version 14.0.0.

I managed to get it working with your guidance - looks great.

@pronzato Could be possible to share your code to load them into a recipe also?


I am working to create a recipe to support also using JDBC Java module to try to read MySQL data, and then load it into ArrowReader for persistence into a Parquet file.

Parquet files are being created without issues, but I am encountering errors such as:

import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.dataset.file.DatasetFileWriter;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.ibatis.jdbc.ScriptRunner;
​
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;
​
public class WriteArrowObjectsToParquet {
    public static void main(String[] args) throws Exception {
        String uri = "file:///Users/dsusanibar/ddsastorage/localarrowrepository/jarfromgithub/tmp/tocookbooks/src/main/resources/write/test3";
        try (
            final BufferAllocator allocator = new RootAllocator();
            final BufferAllocator allocatorJDBC = allocator.newChildAllocator("allocatorJDBC", 0, Long.MAX_VALUE);
            final BufferAllocator allocatorReader = allocator.newChildAllocator("allocatorReader", 0, Long.MAX_VALUE);
            final BufferAllocator allocatorParquetWrite = allocator.newChildAllocator("allocatorParquetWrite", 0, Long.MAX_VALUE);
            final Connection connection = DriverManager.getConnection(
                    "jdbc:h2:mem:h2-jdbc-adapter")
        ) {
            ScriptRunner runnerDDLDML = new ScriptRunner(connection);
            runnerDDLDML.setLogWriter(null);
            runnerDDLDML.runScript(new BufferedReader(
                    new FileReader("./src/main/resources/h2-ddl.sql")));
            runnerDDLDML.runScript(new BufferedReader(
                    new FileReader("./src/main/resources/h2-dml.sql")));
            JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocatorJDBC,
                    JdbcToArrowUtils.getUtcCalendar())
                    .setTargetBatchSize(2)
                    .setArraySubTypeByColumnNameMap(
                            new HashMap() {{
                                put("LIST_FIELD19",
                                        new JdbcFieldInfo(Types.INTEGER));
                            }}
                    )
                    .build();
            String query = "SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1";
            try (
                final ResultSet resultSetConvertToParquet = connection.createStatement().executeQuery(query);
                final ResultSet resultSetForSchema = connection.createStatement().executeQuery(query);
                final ArrowVectorIterator arrowVectorIterator = JdbcToArrow.sqlToArrowVectorIterator(
                        resultSetConvertToParquet, config);
            ) {
                Schema schema = JdbcToArrow.sqlToArrowVectorIterator(resultSetForSchema, config).next().getSchema();
                try (
                    final JDBCReader arrowReader = new JDBCReader(allocatorReader, arrowVectorIterator, schema)
                ) {
                    DatasetFileWriter.write(allocatorParquetWrite, arrowReader, FileFormat.PARQUET, uri);
                }
            }
            runnerDDLDML.closeConnection();
        } catch (SQLException | IOException e) {
            e.printStackTrace();
        }
    }
}
​
class JDBCReader extends ArrowReader {
    private final ArrowVectorIterator iter;
    private final Schema schema;
    public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, Schema schema) {
        super(allocator);
        this.iter = iter;
        this.schema = schema;
    }
​
    @Override
    public boolean loadNextBatch() throws IOException {
        while (iter.hasNext()) {
            try (VectorSchemaRoot rootTmp = iter.next()) {
                if ( rootTmp.getRowCount() > 0 ) {
                    VectorUnloader unloader = new VectorUnloader(rootTmp);
                    VectorLoader loader = new VectorLoader(super.getVectorSchemaRoot());
                    try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) {
                        loader.load(recordBatch);
                    }
                    return true;
                } else {
                    return false;
                }
            }
        }
        return false;
    }
​
    @Override
    public long bytesRead() {
        return 0;
    }
​
    @Override
    protected void closeReadSource() throws IOException {
    }
​
    @Override
    protected Schema readSchema() {
        return schema;
    }
}

Error messages:

Exception in thread "Thread-8" java.lang.IllegalStateException: RefCnt has gone negative
...
Suppressed: java.lang.IllegalStateException: Allocator[allocatorParquetWrite] closed with outstanding buffers allocated (12).
Allocator(allocatorParquetWrite) 0/17746/51748/9223372036854775807 (res/actual/peak/limit)
  child allocators: 0
...
Suppressed: java.lang.IllegalStateException: Allocator[allocatorJDBC] closed with outstanding buffers allocated (8).
Allocator(allocatorJDBC) 0/49760/99536/9223372036854775807 (res/actual/peak/limit)
  child allocators: 0
...
pronzato commented 1 year ago

Hi David,

I'm not super familiar with the memory-management side yet but this is the very simple POC I cobbled together (probably missed some memory-management steps but I was able to convert a ResultSet into a number of Parquet files with the ResultSet row count matching the number of records in the resulting Parquet files).

If I remember rightly, the main issue was the Schema was taken from the 1st VectorSchemaRoot which would move the iterator so I had to play around with that a little to get it working correctly.

Again, this was just a simple POC so I'd be really interested in seeing your final version.

public static void write(BufferAllocator allocator, ResultSet rs, FileFormat fileFormat, String path, String[] partitions, int maxPartitions, String baseNameTemplate) throws Exception { ArrowVectorIterator i = null; try { i = JdbcToArrow.sqlToArrowVectorIterator(rs, allocator); ArrowSqlWriterFacade reader = new ArrowSqlWriterFacade(i, allocator); DatasetFileWriter.write(allocator, reader, fileFormat, "file://" + path, partitions, maxPartitions, baseNameTemplate); } finally { if (i != null) { i.close(); } } }

public class ArrowSqlWriterFacade extends ArrowReader {

private ArrowVectorIterator  i;
private VectorSchemaRoot root;

public ArrowSqlWriterFacade (ArrowVectorIterator  i, BufferAllocator

allocator) { super(allocator); this.i = i; }

public ArrowSqlWriterFacade (ArrowVectorIterator  i,  BufferAllocator

allocator, CompressionCodec.Factory factory) { super(allocator, factory); this.i = i; }

private boolean first = true;

@Override
public boolean loadNextBatch() throws IOException {
    if (first) {
        first = false;
        return true;
    } else {
        boolean loadNext = i.hasNext();
        if (loadNext) {
            root = i.next();
        }
        return loadNext;
    }
}

@Override
private VectorSchemaRoot getVectorSchemaRoot() {
    if (root == null) {
        root = i.next();
    }
    return root;
}

@Override
public long bytesRead() {
      return 0;
}

@Override
public void closeReadSource() throws IOException {

}

@Override
protected Schema readSchema() throws IOException {
      VectorSchemaRoot root = getVectorSchemaRoot();
       return root.getSchema();
}

}

On Mon, Jul 24, 2023 at 10:10 AM david dali susanibar arce < @.***> wrote:

Hi @pronzato https://github.com/pronzato 13.0.0 version is on [VOTE] Release Apache Arrow 13.0.0 - RC0. Let push it into the next version 14.0.0.

I managed to get it working with your guidance - looks great.

@pronzato https://github.com/pronzato Could be possible to share your code to load them into a recipe also?

I am working to create a recipe to support also using JDBC Java module to try to read MySQL data, and then load it into ArrowReader for persistence into a Parquet file.

Parquet files are being created without issues, but I am encountering errors such as:

  • Closed with outstanding buffers allocated ...
  • RefCnt has gone negative ...

import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;import org.apache.arrow.adapter.jdbc.JdbcToArrow;import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;import org.apache.arrow.dataset.file.DatasetFileWriter;import org.apache.arrow.dataset.file.FileFormat;import org.apache.arrow.memory.BufferAllocator;import org.apache.arrow.memory.RootAllocator;import org.apache.arrow.vector.VectorLoader;import org.apache.arrow.vector.VectorSchemaRoot;import org.apache.arrow.vector.VectorUnloader;import org.apache.arrow.vector.ipc.ArrowReader;import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;import org.apache.arrow.vector.types.pojo.Schema;import org.apache.ibatis.jdbc.ScriptRunner; ​import java.io.BufferedReader;import java.io.FileReader;import java.io.IOException;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Types;import java.util.HashMap; ​public class WriteArrowObjectsToParquet { public static void main(String[] args) throws Exception { String uri = "file:///Users/dsusanibar/ddsastorage/localarrowrepository/jarfromgithub/tmp/tocookbooks/src/main/resources/write/test3"; try ( final BufferAllocator allocator = new RootAllocator(); final BufferAllocator allocatorJDBC = allocator.newChildAllocator("allocatorJDBC", 0, Long.MAX_VALUE); final BufferAllocator allocatorReader = allocator.newChildAllocator("allocatorReader", 0, Long.MAX_VALUE); final BufferAllocator allocatorParquetWrite = allocator.newChildAllocator("allocatorParquetWrite", 0, Long.MAX_VALUE); final Connection connection = DriverManager.getConnection( "jdbc:h2:mem:h2-jdbc-adapter") ) { ScriptRunner runnerDDLDML = new ScriptRunner(connection); runnerDDLDML.setLogWriter(null); runnerDDLDML.runScript(new BufferedReader( new FileReader("./src/main/resources/h2-ddl.sql"))); runnerDDLDML.runScript(new BufferedReader( new FileReader("./src/main/resources/h2-dml.sql"))); JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocatorJDBC, JdbcToArrowUtils.getUtcCalendar()) .setTargetBatchSize(2) .setArraySubTypeByColumnNameMap( new HashMap() {{ put("LIST_FIELD19", new JdbcFieldInfo(Types.INTEGER)); }} ) .build(); String query = "SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1"; try ( final ResultSet resultSetConvertToParquet = connection.createStatement().executeQuery(query); final ResultSet resultSetForSchema = connection.createStatement().executeQuery(query); final ArrowVectorIterator arrowVectorIterator = JdbcToArrow.sqlToArrowVectorIterator( resultSetConvertToParquet, config); ) { Schema schema = JdbcToArrow.sqlToArrowVectorIterator(resultSetForSchema, config).next().getSchema(); try ( final JDBCReader arrowReader = new JDBCReader(allocatorReader, arrowVectorIterator, schema) ) { DatasetFileWriter.write(allocatorParquetWrite, arrowReader, FileFormat.PARQUET, uri); } } runnerDDLDML.closeConnection(); } catch (SQLException | IOException e) { e.printStackTrace(); } } } ​class JDBCReader extends ArrowReader { private final ArrowVectorIterator iter; private final Schema schema; public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, Schema schema) { super(allocator); this.iter = iter; this.schema = schema; } ​ @Override public boolean loadNextBatch() throws IOException { while (iter.hasNext()) { try (VectorSchemaRoot rootTmp = iter.next()) { if ( rootTmp.getRowCount() > 0 ) { VectorUnloader unloader = new VectorUnloader(rootTmp); VectorLoader loader = new VectorLoader(super.getVectorSchemaRoot()); try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) { loader.load(recordBatch); } return true; } else { return false; } } } return false; } ​ @Override public long bytesRead() { return 0; } ​ @Override protected void closeReadSource() throws IOException { } ​ @Override protected Schema readSchema() { return schema; } }

Error messages:

Exception in thread "Thread-8" java.lang.IllegalStateException: RefCnt has gone negative ... Suppressed: java.lang.IllegalStateException: Allocator[allocatorParquetWrite] closed with outstanding buffers allocated (12). Allocator(allocatorParquetWrite) 0/17746/51748/9223372036854775807 (res/actual/peak/limit) child allocators: 0 ... Suppressed: java.lang.IllegalStateException: Allocator[allocatorJDBC] closed with outstanding buffers allocated (8). Allocator(allocatorJDBC) 0/49760/99536/9223372036854775807 (res/actual/peak/limit) child allocators: 0 ...

— Reply to this email directly, view it on GitHub https://github.com/apache/arrow-cookbook/issues/315#issuecomment-1647995983, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACO7PHGJIPC46P7XEEHQUBLXRZ66VANCNFSM6AAAAAA2DO2TY4 . You are receiving this because you were mentioned.Message ID: @.***>