datafusion-contrib / datafusion-java

Java binding to Apache Arrow DataFusion
Apache License 2.0
61 stars 10 forks source link
arrow ballista datafusion java

datafusion-java

Build Release Maven metadata URL

A Java binding to Apache Arrow DataFusion

Status

This project is still a work in progress, and it currently works with Arrow 14.0 and DataFusion version 25.0. It is built and verified in CI against Java 11 and 21. You may check out the docker run instructions where Java 21 jshell is used to run interactively.

How to use in your code

The artifacts are published to maven central, so you can use datafusion-java like any normal Java library:

dependencies {
    implementation(
        group = "io.github.datafusion-contrib",
        name = "datafusion-java",
        version = "0.16.0" // or latest version, checkout https://github.com/datafusion-contrib/datafusion-java/releases
    )
}

To test it out, you can use this piece of demo code:

DataFusionDemo.java ```java package com.me; import org.apache.arrow.datafusion.DataFrame; import org.apache.arrow.datafusion.SessionContext; import org.apache.arrow.datafusion.SessionContexts; public class DataFusionDemo { public static void main(String[] args) throws Exception { try (SessionContext sessionContext = SessionContexts.create()) { sessionContext.sql("select sqrt(65536)").thenCompose(DataFrame::show).join(); } } } ```
build.gradle.kts ```kotlin plugins { java application } repositories { mavenCentral() google() } tasks { application { mainClass.set("com.me.DataFusionDemo") } } dependencies { implementation( group = "io.github.datafusion-contrib", name = "datafusion-java", version = "0.16.0" ) } ```
Run result ``` $ ./gradlew run ... > Task :compileKotlin UP-TO-DATE > Task :compileJava UP-TO-DATE > Task :processResources NO-SOURCE > Task :classes UP-TO-DATE > Task :run successfully created tokio runtime +--------------------+ | sqrt(Int64(65536)) | +--------------------+ | 256 | +--------------------+ successfully shutdown tokio runtime BUILD SUCCESSFUL in 2s 3 actionable tasks: 1 executed, 2 up-to-date 16:43:34: Execution finished 'run'. ```

How to run the interactive demo

1. Run using Docker (with jshell)

First build the docker image:

docker build -t datafusion-example .

Then you can run the example program using Docker:

docker run --rm -it datafusion-example

Or start an interactive jshell session:

docker run --rm -it datafusion-example jshell
Example jshell session ```text Jan 11, 2024 1:49:28 AM java.util.prefs.FileSystemPreferences$1 run INFO: Created user preferences directory. | Welcome to JShell -- Version 21 | For an introduction type: /help intro jshell> import org.apache.arrow.datafusion.* jshell> var context = SessionContexts.create() 01:41:05.586 [main] DEBUG org.apache.arrow.datafusion.JNILoader -- successfully loaded datafusion_jni from library path 01:41:05.589 [main] DEBUG org.apache.arrow.datafusion.JNILoader -- datafusion_jni already loaded, returning 01:41:05.590 [main] DEBUG org.apache.arrow.datafusion.AbstractProxy -- Obtaining DefaultSessionContext@7f58383b8db0 01:41:05.591 [main] DEBUG org.apache.arrow.datafusion.AbstractProxy -- Obtaining TokioRuntime@7f58383ce110 context ==> org.apache.arrow.datafusion.DefaultSessionContext@2d209079 jshell> var df = context.sql("select 1.1 + cos(2.0)").join() 01:41:10.961 [main] DEBUG org.apache.arrow.datafusion.AbstractProxy -- Obtaining DefaultDataFrame@7f5838209100 df ==> org.apache.arrow.datafusion.DefaultDataFrame@34ce8af7 jshell> import org.apache.arrow.memory.* jshell> var allocator = new RootAllocator() 01:41:22.521 [main] INFO org.apache.arrow.memory.BaseAllocator -- Debug mode disabled. Enable with the VM option -Darrow.memory.debug.allocator=true. 01:41:22.525 [main] INFO org.apache.arrow.memory.DefaultAllocationManagerOption -- allocation manager type not specified, using netty as the default type 01:41:22.525 [main] INFO org.apache.arrow.memory.CheckAllocator -- Using DefaultAllocationManager at memory-unsafe-14.0.2.jar!/org/apache/arrow/memory/DefaultAllocationManagerFactory.class 01:41:22.531 [main] DEBUG org.apache.arrow.memory.util.MemoryUtil -- Constructor for direct buffer found and made accessible 01:41:22.536 [main] DEBUG org.apache.arrow.memory.util.MemoryUtil -- direct buffer constructor: available 01:41:22.537 [main] DEBUG org.apache.arrow.memory.rounding.DefaultRoundingPolicy -- -Dorg.apache.memory.allocator.pageSize: 8192 01:41:22.537 [main] DEBUG org.apache.arrow.memory.rounding.DefaultRoundingPolicy -- -Dorg.apache.memory.allocator.maxOrder: 11 allocator ==> Allocator(ROOT) 0/0/0/9223372036854775807 (res/actual/peak/limit) jshell> var r = df.collect(allocator).join() 01:41:29.635 [main] INFO org.apache.arrow.datafusion.DefaultDataFrame -- successfully completed with arr length=610 r ==> org.apache.arrow.vector.ipc.ArrowFileReader@7ac7a4e4 jshell> var root = r.getVectorSchemaRoot() 01:41:34.658 [main] DEBUG org.apache.arrow.vector.ipc.ReadChannel -- Reading buffer with size: 10 01:41:34.661 [main] DEBUG org.apache.arrow.vector.ipc.ArrowFileReader -- Footer starts at 416, length: 184 01:41:34.661 [main] DEBUG org.apache.arrow.vector.ipc.ReadChannel -- Reading buffer with size: 184 root ==> org.apache.arrow.vector.VectorSchemaRoot@6cd28fa7 jshell> r.loadNextBatch() 01:41:39.421 [main] DEBUG org.apache.arrow.vector.ipc.ArrowFileReader -- RecordBatch at 200, metadata: 192, body: 16 01:41:39.423 [main] DEBUG org.apache.arrow.vector.ipc.ReadChannel -- Reading buffer with size: 208 01:41:39.424 [main] DEBUG org.apache.arrow.vector.ipc.message.ArrowRecordBatch -- Buffer in RecordBatch at 0, length: 1 01:41:39.425 [main] DEBUG org.apache.arrow.vector.ipc.message.ArrowRecordBatch -- Buffer in RecordBatch at 8, length: 8 $8 ==> true jshell> var v = root.getVector(0) v ==> [0.6838531634528577] ```

2. Build from source

Note you must have a local Rust and Java environment setup.

Run the example in one line:

./gradlew run

Or roll your own test example:

import org.apache.arrow.datafusion.DataFrame;
import org.apache.arrow.datafusion.SessionContext;
import org.apache.arrow.datafusion.SessionContexts;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class ExampleMain {

    private static final Logger logger = LoggerFactory.getLogger(ExampleMain.class);

    public static void main(String[] args) throws Exception {
        try (SessionContext sessionContext = SessionContexts.create(); BufferAllocator allocator = new RootAllocator()) {
            DataFrame dataFrame = sessionContext.sql("select 1.5 + sqrt(2.0)").get();
            dataFrame.collect(allocator).thenAccept(ExampleMain::onReaderResult).get();
        }
    }

    private static void onReaderResult(ArrowReader reader) {
        try {
            VectorSchemaRoot root = reader.getVectorSchemaRoot();
            while (reader.loadNextBatch()) {
                Float8Vector vector = (Float8Vector) root.getVector(0);
                for (int i = 0; i < root.getRowCount(); i += 1) {
                    logger.info("value {}={}", i, vector.getValueAsDouble(i));
                }
            }
            // close to release resource
            reader.close();
        } catch (IOException e) {
            logger.warn("got IO Exception", e);
        }
    }
}

To build the library:

./gradlew build