apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.3k stars 1.19k forks source link

Unable to "GROUP BY" udf result #5635

Closed bubbajoe closed 1 year ago

bubbajoe commented 1 year ago

Describe the bug When i try to add a udf (to_date), I am able to do the following:

But I am NOT able to do the following

To Reproduce example updatedAt looks like: "2023-03-05T05:01:15.274000+00:00"

to_date udf using chrono crate

pub fn to_date(args: &[ArrayRef]) -> DFResult<ArrayRef> {
    if args.is_empty() || args.len() > 1 {
        return Err(DataFusionError::Internal(format!(
            "to_date was called with {} arguments. It requires only 1.",
            args.len()
        )));
    }
    let arg = &args[0].as_any().downcast_ref::<StringArray>().unwrap();
    let mut builder = Date32Builder::new();
    let date_string: &str = arg.value(0);
    let date_time = match DateTime::parse_from_rfc3339(date_string) {
        Ok(dt) => dt,
        Err(e) => {
            return Result::Err(DataFusionError::Internal(e.to_string()));
        }
    };
    builder.append_value((date_time.timestamp() / 86400) as i32);
    Ok(Arc::new(builder.finish()))
}

Expected behavior Expecting it to group the dates like this: SELECT u FROM (SELECT to_date(\"updatedAt\") as u from 'table') group by u;

bubbajoe commented 1 year ago

I updated from 15.0 to 20.0 and something really weird happened. when i run this: SELECT COUNT(*), T, SUM(COUNT(*)) Over (Order By T) From (SELECT *, to_date(\"updatedAt\") as T from 'table') Group BY T I get the below error, this query was working fine on version 15.0, seems like record_batches_to_json_rows is bugged maybe?

I modified my UDF to process multiple values:

pub fn to_date(args: &[ArrayRef]) -> DFResult<ArrayRef> {
    if args.is_empty() || args.len() > 1 {
        return Err(DataFusionError::Internal(format!(
            "to_date was called with {} arguments. It requires only 1.",
            args.len()
        )));
    }
    let arg = &args[0].as_any().downcast_ref::<StringArray>().unwrap();
    let mut builder = Date32Builder::new();
    for date_string in arg.iter() {
        let date_time = match DateTime::parse_from_rfc3339(date_string.expect("date_string is null")) {
            Ok(dt) => dt,
            Err(e) => {
                return Result::Err(DataFusionError::Internal(e.to_string()));
            }
        };
        builder.append_value((date_time.timestamp() / 86400) as i32);
    }
    Ok(Arc::new(builder.finish()))
}

Error log:

thread 'tokio-runtime-worker' panicked at 'Trying to access an element at index 17 from a PrimitiveArray of length 17', /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-array-34.0.0/src/array/primitive_array.rs:337:9
stack backtrace:
   0: rust_begin_unwind
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/panicking.rs:575:5
   1: core::panicking::panic_fmt
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/panicking.rs:64:14
   2: arrow_array::array::primitive_array::PrimitiveArray<T>::value
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-array-34.0.0/src/array/primitive_array.rs:337:9
   3: <&arrow_array::array::primitive_array::PrimitiveArray<T> as arrow_array::array::ArrayAccessor>::value
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-array-34.0.0/src/array/primitive_array.rs:711:9
   4: <&arrow_array::array::primitive_array::PrimitiveArray<arrow_array::types::Date32Type> as arrow_cast::display::DisplayIndexState>::write
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-cast-34.0.0/src/display.rs:505:29
   5: <arrow_cast::display::ArrayFormat<F> as arrow_cast::display::DisplayIndex>::write
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-cast-34.0.0/src/display.rs:361:9
   6: <arrow_cast::display::ValueFormatter as core::fmt::Display>::fmt
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-cast-34.0.0/src/display.rs:162:15
   7: <T as alloc::string::ToString>::to_string
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/alloc/src/string.rs:2536:9
   8: arrow_json::writer::set_column_for_json_rows::{{closure}}
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-json-34.0.0/src/writer.rs:309:25
   9: core::iter::traits::iterator::Iterator::for_each::call::{{closure}}
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/iter/traits/iterator.rs:828:29
  10: <core::iter::adapters::enumerate::Enumerate<I> as core::iter::traits::iterator::Iterator>::fold::enumerate::{{closure}}
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/iter/adapters/enumerate.rs:106:27
  11: core::iter::traits::iterator::Iterator::fold
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/iter/traits/iterator.rs:2414:21
  12: <core::iter::adapters::enumerate::Enumerate<I> as core::iter::traits::iterator::Iterator>::fold
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/iter/adapters/enumerate.rs:112:9
  13: core::iter::traits::iterator::Iterator::for_each
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/iter/traits/iterator.rs:831:9
  14: arrow_json::writer::set_column_for_json_rows
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-json-34.0.0/src/writer.rs:305:13
  15: arrow_json::writer::record_batches_to_json_rows
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-json-34.0.0/src/writer.rs:422:17
  16: local::routes::query::QueryResponse::new::{{closure}}
             at ./src/routes/query.rs:49:19
  17: local::routes::query::execute_query::{{closure}}
             at ./src/routes/query.rs:22:9
  18: <F as axum::handler::Handler<(M,T1,T2),S,B>>::call::{{closure}}
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/axum-0.6.0-rc.4/src/handler/mod.rs:209:52
  19: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/future/future.rs:124:9
  20: <futures_util::future::future::map::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/future/map.rs:55:37
  21: <futures_util::future::future::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  22: <axum::handler::future::IntoServiceFuture<F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/axum-0.6.0-rc.4/src/macros.rs:42:17
  23: <F as futures_core::future::TryFuture>::try_poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/future.rs:82:9
  24: <futures_util::future::try_future::into_future::IntoFuture<Fut> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/try_future/into_future.rs:34:9
  25: <futures_util::future::future::map::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/future/map.rs:55:37
  26: <futures_util::future::future::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  27: <futures_util::future::try_future::MapOk<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  28: <tower::util::map_response::MapResponseFuture<F,N> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tower-0.4.13/src/macros.rs:38:17
  29: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/future/future.rs:124:9
  30: <tower::util::oneshot::Oneshot<S,Req> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tower-0.4.13/src/util/oneshot.rs:97:38
  31: <axum::routing::route::RouteFuture<B,E> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/axum-0.6.0-rc.4/src/routing/route.rs:144:61
  32: <tower_http::cors::ResponseFuture<F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tower-http-0.3.4/src/cors/mod.rs:662:56
  33: <F as futures_core::future::TryFuture>::try_poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/future.rs:82:9
  34: <futures_util::future::try_future::into_future::IntoFuture<Fut> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/try_future/into_future.rs:34:9
  35: <futures_util::future::future::map::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/future/map.rs:55:37
  36: <futures_util::future::future::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  37: <futures_util::future::try_future::MapOk<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  38: <tower::util::map_response::MapResponseFuture<F,N> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tower-0.4.13/src/macros.rs:38:17
  39: <F as futures_core::future::TryFuture>::try_poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/future.rs:82:9
  40: <futures_util::future::try_future::into_future::IntoFuture<Fut> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/try_future/into_future.rs:34:9
  41: <futures_util::future::future::map::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/future/map.rs:55:37
  42: <futures_util::future::future::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  43: <futures_util::future::try_future::MapErr<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  44: <tower::util::map_err::MapErrFuture<F,N> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tower-0.4.13/src/macros.rs:38:17
  45: <F as futures_core::future::TryFuture>::try_poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/future.rs:82:9
  46: <futures_util::future::try_future::into_future::IntoFuture<Fut> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/try_future/into_future.rs:34:9
  47: <futures_util::future::future::map::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/future/map.rs:55:37
  48: <futures_util::future::future::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  49: <futures_util::future::try_future::MapOk<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  50: <tower::util::map_response::MapResponseFuture<F,N> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tower-0.4.13/src/macros.rs:38:17
  51: <F as futures_core::future::TryFuture>::try_poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/future.rs:82:9
  52: <futures_util::future::try_future::into_future::IntoFuture<Fut> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/try_future/into_future.rs:34:9
  53: <futures_util::future::future::map::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/future/map.rs:55:37
  54: <futures_util::future::future::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  55: <futures_util::future::try_future::MapOk<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  56: <tower::util::map_response::MapResponseFuture<F,N> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tower-0.4.13/src/macros.rs:38:17
  57: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/future/future.rs:124:9
  58: <tower::util::oneshot::Oneshot<S,Req> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tower-0.4.13/src/util/oneshot.rs:97:38
  59: <axum::routing::route::RouteFuture<B,E> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/axum-0.6.0-rc.4/src/routing/route.rs:144:61
  60: <F as futures_core::future::TryFuture>::try_poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-core-0.3.25/src/future.rs:82:9
  61: <futures_util::future::try_future::into_future::IntoFuture<Fut> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/try_future/into_future.rs:34:9
  62: <futures_util::future::future::map::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/future/future/map.rs:55:37
  63: <futures_util::future::future::Map<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  64: <futures_util::future::try_future::MapOk<Fut,F> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-util-0.3.25/src/lib.rs:91:13
  65: <tower::util::map_response::MapResponseFuture<F,N> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tower-0.4.13/src/macros.rs:38:17
  66: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/future/future.rs:124:9
  67: <tower::util::oneshot::Oneshot<S,Req> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tower-0.4.13/src/util/oneshot.rs:97:38
  68: <axum::routing::route::RouteFuture<B,E> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/axum-0.6.0-rc.4/src/routing/route.rs:144:61
  69: <hyper::proto::h1::dispatch::Server<S,hyper::body::body::Body> as hyper::proto::h1::dispatch::Dispatch>::poll_msg
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/proto/h1/dispatch.rs:491:35
  70: hyper::proto::h1::dispatch::Dispatcher<D,Bs,I,T>::poll_write
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/proto/h1/dispatch.rs:297:43
  71: hyper::proto::h1::dispatch::Dispatcher<D,Bs,I,T>::poll_loop
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/proto/h1/dispatch.rs:161:21
  72: hyper::proto::h1::dispatch::Dispatcher<D,Bs,I,T>::poll_inner
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/proto/h1/dispatch.rs:137:16
  73: hyper::proto::h1::dispatch::Dispatcher<D,Bs,I,T>::poll_catch
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/proto/h1/dispatch.rs:120:28
  74: <hyper::proto::h1::dispatch::Dispatcher<D,Bs,I,T> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/proto/h1/dispatch.rs:424:9
  75: <hyper::server::conn::ProtoServer<T,B,S,E> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/server/conn.rs:952:47
  76: <hyper::server::conn::upgrades::UpgradeableConnection<I,S,E> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/server/conn.rs:1012:30
  77: <hyper::server::server::new_svc::NewSvcTask<I,N,S,E,W> as core::future::future::Future>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/server/server.rs:728:36
  78: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.26.0/src/runtime/task/core.rs:223:17
  79: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.26.0/src/loom/std/unsafe_cell.rs:14:9
  80: tokio::runtime::task::core::Core<T,S>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.26.0/src/runtime/task/core.rs:212:13
  81: tokio::runtime::task::harness::poll_future::{{closure}}
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.26.0/src/runtime/task/harness.rs:476:19
  82: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/core/src/panic/unwind_safe.rs:271:9
  83: std::panicking::try::do_call
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/panicking.rs:483:40
  84: ___rust_try
  85: std::panicking::try
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/panicking.rs:447:19
  86: std::panic::catch_unwind
             at /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/panic.rs:137:14
  87: tokio::runtime::task::harness::poll_future
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.26.0/src/runtime/task/harness.rs:464:18
  88: tokio::runtime::task::harness::Harness<T,S>::poll_inner
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.26.0/src/runtime/task/harness.rs:198:27
  89: tokio::runtime::task::harness::Harness<T,S>::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.26.0/src/runtime/task/harness.rs:152:15
  90: tokio::runtime::task::raw::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.26.0/src/runtime/task/raw.rs:255:5
  91: tokio::runtime::task::raw::RawTask::poll
             at /Users/joe/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.26.0/src/runtime/task/raw.rs:200:18
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
bubbajoe commented 1 year ago

Hey @jiangzhx thanks for your response! How hard is it to implement something like this?

I am not too familiar with the rust, let alone the code base. If you could give some pointers, i wouldn't mind creating a PR.

jiangzhx commented 1 year ago

cc @BubbaJoe i wrote a to_date UDF myself, which seems to be correct.


use arrow::array::{Date32Array, Date32Builder, StringArray};
use chrono::DateTime;
use datafusion::from_slice::FromSlice;
use datafusion::prelude::*;
use datafusion::{
    arrow::{datatypes::DataType, record_batch::RecordBatch},
    logical_expr::Volatility,
};
use datafusion_common::{downcast_value, ScalarValue};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;
use std::sync::Arc;

// create local execution context with an in-memory table
fn create_context() -> Result<SessionContext> {
    use datafusion::arrow::datatypes::{Field, Schema};
    // define a schema.
    let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));

    // define data.
    let batch = RecordBatch::try_new(
        schema,
        vec![Arc::new(StringArray::from_slice([
            "2023-03-04T05:01:14.274000+00:00",
            "2023-03-05T05:02:15.274000+00:00",
            "2023-03-06T05:03:16.274000+00:00",
        ]))],
    )?;

    // declare a new context. In spark API, this corresponds to a new spark SQLsession
    let ctx = SessionContext::new();

    // declare a table in memory. In spark API, this corresponds to createDataFrame(...).
    ctx.register_batch("t", batch)?;
    Ok(ctx)
}

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = create_context()?;

    pub fn to_date(args: &[ColumnarValue]) -> Result<ColumnarValue> {
        if args.is_empty() || args.len() > 1 {
            return Err(DataFusionError::Internal(format!(
                "to_date was called with {} arguments. It requires only 1.",
                args.len()
            )));
        }

        match &args[0] {
            ColumnarValue::Array(array) => {
                let args = downcast_value!(array, StringArray);
                let mut builder = Date32Builder::new();

                for arg in args {
                    let date_time = match DateTime::parse_from_rfc3339(arg.unwrap()) {
                        Ok(dt) => dt,
                        Err(e) => {
                            return Result::Err(DataFusionError::Internal(e.to_string()));
                        }
                    };
                    builder.append_value((date_time.timestamp() / 86400) as i32);
                }
                let date32 = Arc::new(Date32Array::from(builder.finish()));

                Ok(ColumnarValue::Array(date32))
            }
            ColumnarValue::Scalar(ScalarValue::Utf8(v)) => {
                let date_time =
                    match DateTime::parse_from_rfc3339((v.clone().unwrap()).as_str()) {
                        Ok(dt) => dt,
                        Err(e) => {
                            return Result::Err(DataFusionError::Internal(e.to_string()));
                        }
                    };
                let mut builder = Date32Builder::new();

                builder.append_value((date_time.timestamp() / 86400) as i32);
                let date32 = Arc::new(Date32Array::from(builder.finish()));

                Ok(ColumnarValue::Array(date32))
            }
            _ => {
                return Err(DataFusionError::Execution(
                    "array of `to_date` must be non-null scalar Utf8".to_string(),
                ));
            }
        }
    }

    // let to_date = make_scalar_function(to_date);
    let to_date = create_udf(
        "to_date",
        vec![DataType::Utf8],
        Arc::new(DataType::Date32),
        Volatility::Immutable,
        Arc::new(to_date),
    );

    ctx.register_udf(to_date.clone()); 

    let expr = to_date.call(vec![col("a")]);
    let df = ctx.table("t").await?;
    let df = df.select(vec![expr])?;
    df.show().await?;

    ctx.sql("SELECT to_date(a) as u from t group by u")
        .await?
        .show()
        .await?;

    ctx.sql("SELECT COUNT(*), T, SUM(COUNT(*)) Over (Order By T) From (SELECT *, to_date(a) as T from t) Group BY T")
        .await?
        .show()
        .await?;
    Ok(())
}
doki23 commented 1 year ago

The to_date only returns 1 value per input array, is it as expected?

jiangzhx commented 1 year ago

@BubbaJoe based on your code, I made some modifications. Now everything looks normal.

    pub fn to_date(args: &[ArrayRef]) -> Result<ArrayRef> {
        if args.is_empty() || args.len() > 1 {
            return Err(DataFusionError::Internal(format!(
                "to_date was called with {} arguments. It requires only 1.",
                args.len()
            )));
        }
        let arg = &args[0].as_any().downcast_ref::<StringArray>().unwrap();
        let mut builder = Date32Builder::new();
        for date_string in arg.iter() {
            let date_time = match DateTime::parse_from_rfc3339(
                date_string.expect("date_string is null"),
            ) {
                Ok(dt) => dt,
                Err(e) => {
                    return Result::Err(DataFusionError::Internal(e.to_string()));
                }
            };
            builder.append_value((date_time.timestamp() / 86400) as i32);
        }
        Ok(Arc::new(builder.finish()))
    }

    let to_date = make_scalar_function(to_date);
    let to_date = create_udf(
        "to_date",
        vec![DataType::Utf8],
        Arc::new(DataType::Date32),
        Volatility::Immutable,
        to_date,
    );

    ctx.register_udf(to_date.clone());
jiangzhx commented 1 year ago

The to_date only returns 1 value per input array, is it as expected? cc @BubbaJoe @doki23

The current error message is confusing, maybe we should add some checks here.

Maybe we can compare the length of group_values and batch_hashes, and if not equal then give some clear exception information. https://github.com/apache/arrow-datafusion/blob/3ccf1aebb6959fbc6bbbf74d2821522ddfd7d484/datafusion/core/src/physical_plan/aggregates/row_hash.rs#L330-L335

doki23 commented 1 year ago

Hmmm...or we should check the result size of udf? I'm not sure wether it's proper that the sizes of input and result could be different. cc @alamb @mingmwang @tustvold

bubbajoe commented 1 year ago

Thanks @jiangzhx, the code worked for me, but I am still getting an error when calling record_batches_to_json_rows on the queries:

Error is the same as the above error log:

thread 'tokio-runtime-worker' panicked at 'Trying to access an element at index 0 from a PrimitiveArray of length 0', /Users/user/.cargo/registry/src/github.com-1ecc6299db9ec823/arrow-array-34.0.0/src/array/primitive_array.rs:337:9
...
alamb commented 1 year ago

Hmmm...or we should check the result size of udf? I'm not sure wether it's proper that the sizes of input and result could be different. cc @alamb @mingmwang @tustvold

I think this is a great idea @jiangzhx for a UDF (rather than a user defined aggregate, for example), the number of output rows should be the same as the number of input rows - if that is not the case making a clearer error would be 👍

alamb commented 1 year ago

Error is the same as the above error log:

@BubbaJoe would it be possible to share a self contained reproducer?

jiangzhx commented 1 year ago

@BubbaJoe i still working on your "getting an error when calling record_batches_to_json_rows on the queries:"

@alamb @mingmwang @tustvold If you have time, I hope to receive some guidance. something so weird.

    // with order by  work fine
    // let sql = "SELECT to_date(a) as u from t group by u order by u";        //fine
    // let sql = "SELECT to_timestamp(a) as u from t group by u order by u";   //fine

    // without order by  got error
    // let sql = "SELECT to_date(a) as u from t group by u";        //got error
    // let sql = "SELECT to_timestamp(a) as u from t group by u";   //got error

    let batches = ctx.sql(sql).await?.collect().await.unwrap();
    let list: Vec<_> = record_batches_to_json_rows(&batches)?;    //got error here

also i write a testcase for record_batches_to_json_rows,look like everything works fine.

#[cfg(test)]
mod tests {
    use arrow::array::Date32Builder;
    use arrow::json::writer::record_batches_to_json_rows;
    use arrow::json::LineDelimitedWriter;
    use arrow::record_batch::RecordBatch;
    use arrow_schema::{DataType, Field, Schema};
    use chrono::DateTime;
    use serde_json::Value;
    use std::sync::Arc;

    #[test]
    fn work_well() {
        let mut builder = Date32Builder::new();

        builder.append_value(
            i32::try_from(
                DateTime::parse_from_rfc3339("2021-02-04T05:01:14.274000+00:00")
                    .unwrap()
                    .timestamp()
                    / (60 * 60 * 24),
            )
            .unwrap(),
        );
        builder.append_value(
            i32::try_from(
                DateTime::parse_from_rfc3339("2023-03-05T05:01:14.274000+00:00")
                    .unwrap()
                    .timestamp()
                    / (60 * 60 * 24),
            )
            .unwrap(),
        );

        let arr_date32 = builder.finish();

        let schema = Schema::new(vec![Field::new("date32", DataType::Date32, true)]);
        let schema = Arc::new(schema);

        let batch = RecordBatch::try_new(schema, vec![Arc::new(arr_date32)]).unwrap();

        let mut buf = Vec::new();
        {
            let mut writer = LineDelimitedWriter::new(&mut buf);
            writer.write_batches(&[batch.clone()]).unwrap();
        }

        let actual: Vec<Option<Value>> = buf
            .split(|b| *b == b'\n')
            .map(|s| (!s.is_empty()).then(|| serde_json::from_slice(s).unwrap()))
            .collect();

        println!("{:?}", actual);

        //everything works fine.
        let rows = record_batches_to_json_rows(&vec![batch]);
    }
}
bubbajoe commented 1 year ago

main.rs

use datafusion::error::Result;
use datafusion::{
    arrow::{
        array::{
            ArrayRef,
            StringArray,
            Date32Builder,
            Array,
        },
        datatypes::DataType,
    },
    physical_plan::functions::make_scalar_function,
    logical_expr::Volatility,
    prelude::{
        create_udf,
        SessionContext,
        NdJsonReadOptions,
    },
    error::DataFusionError,
};
use std::sync::Arc;
use chrono::DateTime;

#[tokio::main]
async fn main() -> Result<()> {
    let ctx = SessionContext::new();

    // let testdata = datafusion::test_util::parquet_test_data();

    // register parquet file with the execution context
    ctx.register_json(
        "users",
        &format!("./data.jsonl"),
        NdJsonReadOptions::default().
            file_extension(".jsonl"),
    ).await?;

    let to_date = make_scalar_function(to_date);
    let to_date = create_udf(
        "to_date",
        vec![DataType::Utf8],
        Arc::new(DataType::Date32),
        Volatility::Immutable,
        to_date,
    );

    ctx.register_udf(to_date.clone());

    let batches = {
        let df = ctx.sql(
            "SELECT to_date(\"date\") as a from \
            'users' group by a;").await?;
        df.collect().await?
    };
    let list: Vec<_> = datafusion::arrow::json::writer::
        record_batches_to_json_rows(&batches[..])?;

    println!("{:?}", list);

    Ok(())
}

fn to_date(args: &[ArrayRef]) -> Result<ArrayRef> {
    if args.is_empty() || args.len() > 1 {
        return Err(DataFusionError::Internal(format!(
            "to_date was called with {} arguments. It requires only 1.",
            args.len()
        )));
    }
    let arg = &args[0].as_any().downcast_ref::<StringArray>().unwrap();
    let mut builder = Date32Builder::new();
    for date_string in arg.iter() {
        let date_time = match DateTime::parse_from_rfc3339(
            date_string.expect("date_string is null"),
        ) {
            Ok(dt) => dt,
            Err(e) => {
                // builder.append_value((date_time.timestamp() / 86400) as i32);
                return Result::Err(DataFusionError::Internal(e.to_string()));
            }
        };
        builder.append_value((date_time.timestamp() / 86400) as i32);
    }
    Ok(Arc::new(builder.finish()))
}

Cargo.toml

[package]
name = "df-test"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.0", features = ["full"] }
datafusion = { version = "20.0"}
chrono = "0.4.24"

data.jsonl

{"date":"2012-04-23T18:25:43.511Z","id":0}
{"date":"2012-04-23T18:25:43.511Z","id":1}
{"date":"2012-04-23T18:25:43.511Z","id":2}
{"date":"2012-04-23T18:25:43.511Z","id":3}
{"date":"2012-04-23T18:25:43.511Z","id":4}
{"date":"2012-04-23T18:25:43.511Z","id":5}
{"date":"2012-04-23T18:25:43.511Z","id":6}
{"date":"2012-04-23T18:25:43.511Z","id":7}
{"date":"2012-04-23T18:25:43.511Z","id":8}
{"date":"2012-04-23T18:25:43.511Z","id":9}
bubbajoe commented 1 year ago

@alamb @jiangzhx I am getting the same error here, please check!

doki23 commented 1 year ago

It's because to_date is not an aggregate function. I notice that the size of batches is 10(9 nulls + '2012-04-23'), that's the reason. You should use udaf for group by or maybe udf with distinct @BubbaJoe .

bubbajoe commented 1 year ago

Hmm, this doesn't seem very intuitive. why would a to_date() function need to be an aggregate function. Seems more like a bug tbh.

alamb commented 1 year ago

Hmm, this doesn't seem very intuitive. why would a to_date() function need to be an aggregate function. Seems more like a bug tbh.

I think @doki23 means that this query doesn't makes sense because it is needs to produce a single output per distinct value of u (not a) so to_date needs to be an aggregate function (that takes in multiple rows and produces a single output row)

SELECT to_date(a) as u from t group by u

What I think is super confusing is that datafusion does run this query (when there is an order by)

SELECT to_date(a) as u from t group by u order by u

IOx users also recently hit this confusing behavior:

https://github.com/influxdata/docs-v2/issues/4812#issuecomment-1478395592

I will write a ticket to fix it (basically proposing to make SELECT to_date(a) as u from t group by u order by u error)

doki23 commented 1 year ago

Sorry for the horrible description. Actually my suggestion is

select distinct to_date(...) from t 

In my opinion, group by usually accompanies by an aggr func. If not, it's equivalent to distinct.

bubbajoe commented 1 year ago

@doki23 Actually i just tried using DISTINCT and that also doesn't work. Were you not able to reproduce on your side?

doki23 commented 1 year ago

@BubbaJoe It's a bug of the json writer when batches have temporal fields, I'm trying to fix it.

bubbajoe commented 1 year ago

Thanks @doki23

But I am curious about the batching format, why do the record batches look like this?

batch 1: []
batch 2: []
batch 3: []
batch 4: []
batch 5: []
batch 6: []
batch 7: [2012-04-27]
batch 8: [2012-04-23, 2012-04-24]
batch 9: [2012-04-25]
batch 10: [2012-04-26]

You would expect them to look something similar to this right?

batch 1: [2012-04-27]
batch 2: [2012-04-23]
batch 3: [2012-04-24]
batch 4: [2012-04-25]
batch 5: [2012-04-26]

I am a very new to datafusion, but it seems like fixing/cleaning the result batch format would also mitigate these issues.

doki23 commented 1 year ago

I have no idea, maybe it's the default partitions? I'll dive into this then.

bubbajoe commented 1 year ago

I would like to test this this! When is the next release?

doki23 commented 1 year ago

But I am curious about the batching format, why do the record batches look like this?

It's because the 'target partitions' whose default value is determined by core num. Setting target partitions to 1 will make the final batches vector only have 1 batch @BubbaJoe .

    let mut session_cfg = SessionConfig::new();
    session_cfg = session_cfg.with_target_partitions(1);
    let ctx = SessionContext::with_config(session_cfg);
alamb commented 1 year ago

I would like to test this this! When is the next release?

It is currently out for a vote and should be released early next week: https://lists.apache.org/thread/fnnmrn83spnb2y2l2vdw2v1hd54pfjl7

jiangzhx commented 1 year ago

I would like to test this this! When is the next release? @BubbaJoe you can try with master branch with dependence like this. arrow-json = { git="https://github.com/apache/arrow-rs.git", branch="master"}

bubbajoe commented 1 year ago

@jiangzhx Can you check this example code here?

I am still getting the error: https://github.com/BubbaJoe/datafusion-arrow-test

jiangzhx commented 1 year ago

@jiangzhx Can you check this example code here?

I am still getting the error: https://github.com/BubbaJoe/datafusion-arrow-test

There are some steps that need to be done before you can test the master version of arrow-rs.

  1. Use the arrow-rs master version in arrow-datafusion.
  2. In your project, reference your arrow-datafusion.

or ,you can try my arrow-datafusion repo. datafusion = { git="https://github.com/jiangzhx/arrow-datafusion/", branch="test_arrow_master", features = ["avro"]}

Do not use my arrow-datafusion branch in production . It is recommended that you wait for the latest version of arrow-datafusion to be released .

bubbajoe commented 1 year ago

Thanks, @jiangzhx it's working!