plausible / ch

HTTP ClickHouse client
MIT License
43 stars 7 forks source link

mix ecto.ch.schema fails for columns with type `AggregateFunction` #90

Open Zarathustra2 opened 1 year ago

Zarathustra2 commented 1 year ago

Getting this error:

** (ArgumentError) failed to decode "AggregateFunction(argMin, Decimal(18, 4), DateTime)" as ClickHouse type (no function clause matching in Ch.Types.decode/3)
    lib/ch/types.ex:338: Ch.Types.decode([:type], "AggregateFunction(argMin, Decimal(18, 4), DateTime)", [])
    lib/ch/types.ex:328: Ch.Types.decode/1
    lib/mix/tasks/schema.ex:77: Mix.Tasks.Ecto.Ch.Schema.build_field/2
    lib/mix/tasks/schema.ex:46: anonymous fn/1 in Mix.Tasks.Ecto.Ch.Schema.run/1
    (elixir 1.14.4) lib/enum.ex:1658: Enum."-map/2-lists^map/1-0-"/2
    (elixir 1.14.4) lib/enum.ex:1658: Enum."-map/2-lists^map/1-0-"/2
    lib/mix/tasks/schema.ex:46: Mix.Tasks.Ecto.Ch.Schema.run/1
    (mix 1.14.4) lib/mix/task.ex:421: anonymous fn/3 in Mix.Task.run_task/4

when running mix ecto.ch.schema default.<TABLE>

The table has the columns:

`foo` AggregateFunction(argMin, Decimal(18, 4), DateTime),
`bar` AggregateFunction(argMax, Decimal(18, 4), DateTime),
Zarathustra2 commented 1 year ago

How would I define a schema for those fields as well?

** (ArgumentError) failed to decode "AggregateFunction(argMin, Decimal(18, 4), DateTime)" as ClickHouse type (no function clause matching in Ch.Types.decode/3)
    (ch 0.1.11) lib/ch/types.ex:338: Ch.Types.decode([:type], "AggregateFunction(avgWeighted, Decimal(18, 4), Int64)", [])
    (ch 0.1.11) lib/ch/types.ex:328: Ch.Types.decode/1
    (ecto 3.10.1) lib/ecto/parameterized_type.ex:190: Ecto.ParameterizedType.init/2
    (ecto 3.10.1) lib/ecto/schema.ex:1916: Ecto.Schema.__field__/4
    lib/core/clickhouse/schemas/chain_aggregates.ex:34: (module)

and just using Decimal(18,4) won't work because then on insert we will get:

 Type of 'foo' must be AggregateFunction(argMin, Decimal(18, 4), DateTime), not Decimal(18, 4)
ruslandoga commented 1 year ago

I haven't used AggregateFunction types so I'm not sure how to properly decode them (for some reason many adapters don't support them). However, we can add something like :as option:

# uses `AggregateFunction(argMin, Decimal(18, 4), DateTime)` in the header
# uses `Decimal(18, 4)` for encoding and decoding
field :foo, Ch, type: "AggregateFunction(argMin, Decimal(18, 4), DateTime)", as: "Decimal(18, 4)"

@Zarathustra2 what do you think?

It would also be able to replace a custom type in Plausible

field :is_bounce, Ch, type: "UInt8", as: :boolean

PR: https://github.com/plausible/ch/pull/91

ruslandoga commented 1 year ago

or maybe it can be a :name, with the logic reversed

field :foo, Ch, type: "Decimal(18, 4)", name: "AggregateFunction(argMin, Decimal(18, 4), DateTime)"
field :is_bounce, Ch, type: :boolean, name: "UInt8"
ukutaht commented 1 year ago

It's funny, for me the intuitive naming convention would be the other way around:

field :is_bounce, Ch, type: :boolean, as: "UInt8"

I suppose it depends on which side you approach it from. Looking at it from the Clickhouse side, it's a UInt8 which is treated as a boolean in the app. But looking from the app side, it is a boolean which is serialized as an UInt8 to Clickhouse.

Since Ecto lives in the context or our application, it's more intuitive for me that the type would represent how we treat it in-app.

Zarathustra2 commented 1 year ago

@ruslandoga can you also add a test for inserting the data? I am wondering if we will run into the same issue as https://github.com/ClickHouse/clickhouse-java/issues/1232

I personally prefer type & name maybe even go with ch_name instead of name

ruslandoga commented 1 year ago

@Zarathustra2 what kind of test do you have in mind?

Something like this?

  test "insert AggregateFunction", %{conn: conn} do
    Ch.query!(conn, """
    CREATE TABLE test_insert_aggregate_function (
      uid Int16,
      updated SimpleAggregateFunction(max, DateTime),
      name AggregateFunction(argMax, String, DateTime)
    ) ENGINE AggregatingMergeTree ORDER BY uid
    """)

    rows = [
      [1, ~N[2020-01-02 00:00:00], "b"],
      [1, ~N[2020-01-01 00:00:00], "a"]
    ]

    assert %{num_rows: 2} =
             Ch.query!(
               conn,
               """
               INSERT INTO test_insert_aggregate_function
                 SELECT uid, updated, arrayReduce('argMaxState', [name], [updated])
                 FROM input('uid Int16, updated DateTime, name String')
                 FORMAT RowBinary\
               """,
               rows,
               types: ["Int16", "DateTime", "String"]
             )

    assert Ch.query!(conn, """
           SELECT uid, max(updated) AS updated, argMaxMerge(name)
           FROM test_insert_aggregate_function
           GROUP BY uid
           """).rows == [[1, ~N[2020-01-02 00:00:00], "b"]]
  end

But note that insert into ... select from input(...) format ... won't be supported automatically in an Ecto adapter and it doesn't test inserting AggregateFunction types.

Zarathustra2 commented 1 year ago

Sorry bad phrasing, more like how would I even insert into this column with ecto:

defmodule SomeSchema do

  use Ecto.Schema

  @primary_key false
  schema "table" do
     field :foo, Ch, type: "Decimal(18, 4)", name: "AggregateFunction(argMin, Decimal(18, 4), DateTime)"
  end

end

How would I insert into this column since I need to insert not Decimal(18,4) but an argMinState(Decimal(18, 4), DateTime)).

ruslandoga commented 1 year ago

How would you insert into that column with clickhouse-client?

Zarathustra2 commented 1 year ago

for my example above?

ruslandoga commented 1 year ago

Yes, since I haven't used AggregateFunction myself I don't know how inserting would work, but I can help you translate a clickhouse-client query into ecto_ch :)

Zarathustra2 commented 1 year ago
 create table table_test_agg (foo AggregateFunction(argMin, UInt8, DateTime)) Engine = Memory;
insert into table_test_agg (foo) Values (arrayReduce('argMinState', [1], [now()]));

on a different note: AggregateFunction & SimpleAggregateFunction are super cool, I use them a lot! :)

ruslandoga commented 1 year ago

This query won't work in "streaming" formats like native or rowbinary, since they don't do any evaluation of the rows.

You can use the same query with Repo.query or Ch.query though.

Zarathustra2 commented 1 year ago

I am wondering whether the data can be encoded in a different way because the java issues states:

On the other hand, in order to support more AggregateFunction types, since there's no document about the data structures for read and write, we have to dig into ClickHouse code to figure it out one by one, which is going to take a while. Would be great if someone from the server team can document all the details, so that not only Java but all other clients will benefit from that.

So it seems serializing a state of an aggregate function is possible but it is not documented? Maybe we have to ping someone on the slack?

ruslandoga commented 1 year ago

We probably won't be supporting undocumented APIs in this driver. However, it would be possible to support them in a separate library since Ch has a very liberal API:

encoded = YourRowBinaryEncoder.encode_rows(...)
Ch.query(conn, "insert into ... format RowBinary", encoded, encode: false)
ruslandoga commented 1 year ago

It seems to me right now that we won't be needing :as option in Ch since it doesn't accomplish much.

field :is_bounce, Ch, type: :boolean, as: "UInt8"

can be already covered with a custom Ecto type which is more typing but more conventional and possibly easier to understand.

And

field :foo, Ch, type: "AggregateFunction(argMin, Decimal(18, 4), DateTime)", as: "Decimal(18, 4)"

doesn't seem to be possible, since we can't just take a decimal and insert it into an aggregate function type.

Zarathustra2 commented 1 year ago

Yeah I don't think as is needed right now


Do you think creating insert queries such as

"""
 INSERT INTO test_insert_aggregate_function
                 SELECT uid, updated, arrayReduce('argMaxState', [name], [updated])
                 FROM input('uid Int16, updated DateTime, name String')
                 FORMAT RowBinary
"""

would be possible in ecto_ch with a macro or something based on a given schema?

EDIT: Actually this is kinda hard as we will never catch all edge cases (some may insert states with values from no fields of the table)

ruslandoga commented 1 year ago

I'll add tests that use the approaches shown in https://kb.altinity.com/altinity-kb-schema-design/ingestion-aggregate-function/. It seems like all of them can already be supported with the current functionality.

Zarathustra2 commented 1 year ago

yeah all of them can be used via arrayReduce + input

ruslandoga commented 1 year ago

would be possible in ecto_ch with a macro or something based on a given schema?

That SQL statement can be constructed without macros. One sec.

ruslandoga commented 1 year ago

Something like this:

defmodule Schema do
  use Ecto.Schema

  @primary_key false
  schema "test_insert_aggregate_function" do
    field :uid, Ch, type: "Int16"
    field :updated, :naive_datetime
    field :name, :string
  end
end

table = Schema.__schema__(:source)
fields = Schema.__schema__(:fields)
types = Enum.map(fields, fn field -> Schema.__schema__(:type, field) |> Ecto.Type.type() end)
structure = Enum.zip(fields, types) |> Enum.map(fn {f, t} -> "#{f} #{t}" end) |> Enum.join(", ")

select = from i in fragment("input(?)", literal(^structure)),
  select:  %{uid: i.uid, updated: i.updated, name: fragment("arrayReduce('argMaxState', [?], [?])", i.name, i.updated)}

{select, _no_params = []} = Repo.to_sql(:all, select)

Repo.query!(["insert into ", table, ?\s, select, " format RowBinary"], rows, types: types)
Zarathustra2 commented 1 year ago

Oh that is sick, thanks @ruslandoga <3


Should this may be added to the readme of ecto_ch, I bet someone else will run into this as well :D

ruslandoga commented 1 year ago

I'll try to find a way to "automate" fragment("input(?)", ...) in the tests for https://kb.altinity.com/altinity-kb-schema-design/ingestion-aggregate-function/.


I'm thinking about doing something like

import Ecto.Adapters.ClickHouse.API, only: [input: 1]

input =
  from i in input(uid: "Int16", updated: "DateTime", name: "String"), # or input(Schema)
    select: %{
      uid: i.uid,
      updated: i.updated,
      name: fragment("arrayReduce('argMaxState', [?], [?])", i.name, i.updated)
    }

rows = [
  [uid: 1231, updated: ~N[2020-01-02 00:00:00], name: "Jane"],
  [uid: 1231, updated: ~N[2020-01-01 00:00:00], name: "John"]
]

TestRepo.insert_all("users", rows, input: input)

WIP: https://github.com/plausible/ecto_ch/pull/79

ruslandoga commented 1 year ago

@Zarathustra2 I've merged https://github.com/plausible/ecto_ch/pull/79 and I wonder if it solves your use-case.

def deps do
  [
-   {:ecto_ch, "~> 0.1.0"},
+   {:ecto_ch, github: "plausible/ecto_ch"}
  ]
end

I'm not releasing it yet since it might need some more work depending on your test-drive.

Zarathustra2 commented 1 year ago

Oh that is sick!!! Let me test that today/tomorrow, I will ping you but just from reading over the code it looks pretty incredible easy to work with! <3

Zarathustra2 commented 1 year ago

likely not getting to it today but should be getting to it tomorrow, sorry about that

Zarathustra2 commented 1 year ago

Currently getting ** (Ch.Error) Code: 477. DB::Exception: FORMAT must be specified for function input(). (INVALID_USAGE_OF_INPUT) (version 23.2.4.12 (official build)) let me see if I can debug this one :D (can't really share the schema due to being confidential :( )

ruslandoga commented 1 year ago

You might be able to use the tests as a guide: https://github.com/plausible/ecto_ch/blob/fb995abad0207b1751898c237a9d89c93a2154e7/test/ecto/integration/aggregate_function_type_test.exs#L78-L152

Zarathustra2 commented 1 year ago

Works now I had TestRepo.insert_all("users", input, input: rows) instead of TestRepo.insert_all("users", rows, input: input) lol

Super awesome! @ruslandoga <3

ruslandoga commented 1 year ago

Ah, yes. I still don't know what the API should be. I considered both of your approaches (and picked the current one since we are still inserting rows, but "via" an input). And I also thought about Repo.insert_input(table, input_query, rows, opts)

Zarathustra2 commented 1 year ago

I mean, I did a silly mistake so that is that :D

insert_input doesn't sound too bad IMO then could add dedicated docs for that function which may make it easier for other users to spot on hexdocs when reading through the different functions?