elixir-ecto / myxql

MySQL 5.5+ driver for Elixir
Apache License 2.0
274 stars 67 forks source link

MyXQL FunctionClauseError error when running query through Ecto #199

Open beerlington opened 10 hours ago

beerlington commented 10 hours ago

I'm attempting to get Oban's new MySQL engine running on PlanetScale and am hitting an issue that does not seem to be specific to Oban. Our MySQL version on PlanetScale is 8.0.34-Vitess.

One of the underlying queries that Oban is running to find jobs is this:

SELECT
  *
FROM
  oban_jobs
WHERE
  state = 'available'
  AND queue = 'event_bus'
  AND attempt < max_attempts
ORDER BY
  priority ASC,
  scheduled_at ASC,
  id ASC
LIMIT
  100 FOR
UPDATE
  SKIP LOCKED;

When I run this query directly on PlanetScale, I get the expected results:

Screenshot 2024-11-22 at 12 20 21 PM

However, when I try to run the exact same query on PlanetScale via Ecto, I get this error:

iex(2)> MyApp.Repo.query!("""
...(2)> SELECT
...(2)>   *
...(2)> FROM
...(2)>   oban_jobs
...(2)> WHERE
...(2)>   state = 'available'
...(2)>   AND queue = 'event_bus'
...(2)>   AND attempt < max_attempts
...(2)> ORDER BY
...(2)>   priority ASC,
...(2)>   scheduled_at ASC,
...(2)>   id ASC
...(2)> LIMIT
...(2)>   100 FOR
...(2)> UPDATE
...(2)>   SKIP LOCKED;
...(2)> """)
[error] ** State machine #PID<0.1874.0> terminating
** Reason for termination ==
** (DBConnection.ConnectionError) client #PID<0.2281.0> stopped: ** (FunctionClauseError) no function clause matching in MyXQL.Protocol.decode_com_stmt_prepare_response/3
    (myxql 0.7.1) lib/myxql/protocol.ex:342: MyXQL.Protocol.decode_com_stmt_prepare_response(<<255, 81, 4, 35, 72, 89, 48, 48, 48, 117, 110, 107, 110, 111, 119, 110, 32, 101, 114, 114, 111, 114, 58, 32, 115, 121, 110, 116, 97, 120, 32, 101, 114, 114, 111, 114, 32, 97, 116, 32, 112, 111, 115, 105, 116, 105, 111, 110, 32, 49, ...>>, <<12, 0, 0, 2, 0, 1, 0, 0, 0, 18, 0, 0, 0, 0, 0, 0, 56, 0, 0, 3, 3, 100, 101, 102, 12, 115, 101, 97, 116, 101, 100, 45, 115, 116, 111, 110, 101, 9, 111, 98, 97, 110, 95, 106, 111, 98, 115, 9, 111, 98, ...>>, :initial)
    (myxql 0.7.1) lib/myxql/client.ex:257: MyXQL.Client.recv_packets/7
    (myxql 0.7.1) lib/myxql/connection.ex:459: MyXQL.Connection.prepare/2
    (myxql 0.7.1) lib/myxql/connection.ex:72: MyXQL.Connection.handle_prepare/3
    (db_connection 2.7.0) lib/db_connection/holder.ex:354: DBConnection.Holder.holder_apply/4
    (db_connection 2.7.0) lib/db_connection.ex:1543: DBConnection.prepare/4
    (db_connection 2.7.0) lib/db_connection.ex:1536: DBConnection.run_prepare/4
    (db_connection 2.7.0) lib/db_connection.ex:1548: DBConnection.run_prepare_execute/5
    (db_connection 2.7.0) lib/db_connection.ex:1653: DBConnection.run/6
    (db_connection 2.7.0) lib/db_connection.ex:772: DBConnection.parsed_prepare_execute/5
    (db_connection 2.7.0) lib/db_connection.ex:764: DBConnection.prepare_execute/4
    (myxql 0.7.1) lib/myxql.ex:354: MyXQL.do_query/4
    (ecto_sql 3.12.1) lib/ecto/adapters/sql.ex:547: Ecto.Adapters.SQL.query!/4
    (elixir 1.15.4) src/elixir.erl:376: anonymous fn/4 in :elixir.eval_external_handler/1
    (stdlib 4.3) erl_eval.erl:748: :erl_eval.do_apply/7
    (elixir 1.15.4) src/elixir.erl:361: :elixir.eval_forms/4
    (elixir 1.15.4) lib/module/parallel_checker.ex:112: Module.ParallelChecker.verify/1
    (iex 1.15.4) lib/iex/evaluator.ex:331: IEx.Evaluator.eval_and_inspect/3
    (iex 1.15.4) lib/iex/evaluator.ex:305: IEx.Evaluator.eval_and_inspect_parsed/3
    (iex 1.15.4) lib/iex/evaluator.ex:294: IEx.Evaluator.parse_eval_inspect/4

[debug] QUERY ERROR db=10.4ms queue=37.4ms idle=315.9ms
SELECT
  *
FROM
  oban_jobs
WHERE
  state = 'available'
  AND queue = 'event_bus'
  AND attempt < max_attempts
ORDER BY
  priority ASC,
  scheduled_at ASC,
  id ASC
LIMIT
  100 FOR
UPDATE
  SKIP LOCKED;
 []
** (FunctionClauseError) no function clause matching in MyXQL.Protocol.decode_com_stmt_prepare_response/3    

    The following arguments were given to MyXQL.Protocol.decode_com_stmt_prepare_response/3:

        # 1
        <<255, 81, 4, 35, 72, 89, 48, 48, 48, 117, 110, 107, 110, 111, 119, 110, 32,
          101, 114, 114, 111, 114, 58, 32, 115, 121, 110, 116, 97, 120, 32, 101, 114,
          114, 111, 114, 32, 97, 116, 32, 112, 111, 115, 105, 116, 105, 111, 110, 32,
          49, ...>>

        # 2
        <<12, 0, 0, 2, 0, 1, 0, 0, 0, 18, 0, 0, 0, 0, 0, 0, 56, 0, 0, 3, 3, 100, 101,
          102, 12, 115, 101, 97, 116, 101, 100, 45, 115, 116, 111, 110, 101, 9, 111, 98,
          97, 110, 95, 106, 111, 98, 115, 9, 111, 98, ...>>

        # 3
        :initial

    Attempted function clauses (showing 4 out of 4):

        def decode_com_stmt_prepare_response(<<0, statement_id::integer-little-size(32), num_columns::integer-little-size(16),
      num_params::integer-little-size(16), 0, num_warnings::integer-little-size(16)>>, next_data, :initial)
        def decode_com_stmt_prepare_response(<<rest::binary>>, "", :initial)
        def decode_com_stmt_prepare_response(payload, _next_data, {com_stmt_prepare_ok, :params, num_params, num_columns})
        def decode_com_stmt_prepare_response(payload, next_data, {com_stmt_prepare_ok, :columns, num_columns})

    (myxql 0.7.1) lib/myxql/protocol.ex:342: MyXQL.Protocol.decode_com_stmt_prepare_response/3
    (myxql 0.7.1) lib/myxql/client.ex:257: MyXQL.Client.recv_packets/7
    (myxql 0.7.1) lib/myxql/connection.ex:459: MyXQL.Connection.prepare/2
    (myxql 0.7.1) lib/myxql/connection.ex:72: MyXQL.Connection.handle_prepare/3
    (db_connection 2.7.0) lib/db_connection/holder.ex:354: DBConnection.Holder.holder_apply/4
    (db_connection 2.7.0) lib/db_connection.ex:1543: DBConnection.prepare/4
    (db_connection 2.7.0) lib/db_connection.ex:1536: DBConnection.run_prepare/4
    (db_connection 2.7.0) lib/db_connection.ex:1548: DBConnection.run_prepare_execute/5
    (db_connection 2.7.0) lib/db_connection.ex:1653: DBConnection.run/6
    (db_connection 2.7.0) lib/db_connection.ex:772: DBConnection.parsed_prepare_execute/5
    (db_connection 2.7.0) lib/db_connection.ex:764: DBConnection.prepare_execute/4
    (myxql 0.7.1) lib/myxql.ex:354: MyXQL.do_query/4
    (ecto_sql 3.12.1) lib/ecto/adapters/sql.ex:547: Ecto.Adapters.SQL.query!/4
    iex:2: (file)

If I add this function to lib/myxql/protocol:

def decode_com_stmt_prepare_response(<<rest::binary>>, _next, :initial) do
  {:halt, decode_generic_response(rest)}
end

The error becomes:

iex(3)> MyApp.Repo.query!("""
...(3)> SELECT
...(3)>   *
...(3)> FROM
...(3)>   oban_jobs
...(3)> WHERE
...(3)>   state = 'available'
...(3)>   AND queue = 'event_bus'
...(3)>   AND attempt < max_attempts
...(3)> ORDER BY
...(3)>   priority ASC,
...(3)>   scheduled_at ASC,
...(3)>   id ASC
...(3)> LIMIT
...(3)>   100 FOR
...(3)> UPDATE
...(3)>   SKIP LOCKED;
...(3)> """)
[debug] QUERY ERROR db=0.0ms queue=37.4ms idle=12.9ms
SELECT
  *
FROM
  oban_jobs
WHERE
  state = 'available'
  AND queue = 'event_bus'
  AND attempt < max_attempts
ORDER BY
  priority ASC,
  scheduled_at ASC,
  id ASC
LIMIT
  100 FOR
UPDATE
  SKIP LOCKED;
 []
** (MyXQL.Error) (1105) unknown error: syntax error at position 195 near 'SKIP'

    query: SELECT
  *
FROM
  oban_jobs
WHERE
  state = 'available'
  AND queue = 'event_bus'
  AND attempt < max_attempts
ORDER BY
  priority ASC,
  scheduled_at ASC,
  id ASC
LIMIT
  100 FOR
UPDATE
  SKIP LOCKED;

    (ecto_sql 3.12.1) lib/ecto/adapters/sql.ex:1096: Ecto.Adapters.SQL.raise_sql_call_error/1
    iex:3: (file)

Why might I be getting an error in Ecto for a query that runs without an error? Any ideas on how I might debug this further?

beerlington commented 10 hours ago

I forgot to add that if I remove the FOR UPDATE SKIP LOCKED portion of the query, it does run:

iex(4)> MyApp.Repo.query!("""
...(4)> SELECT
...(4)>   *
...(4)> FROM
...(4)>   oban_jobs
...(4)> WHERE
...(4)>   state = 'available'
...(4)>   AND queue = 'event_bus'
...(4)>   AND attempt < max_attempts
...(4)> ORDER BY
...(4)>   priority ASC,
...(4)>   scheduled_at ASC,
...(4)>   id ASC
...(4)> LIMIT
...(4)>   100
...(4)> """)
[debug] QUERY OK db=30.8ms queue=37.3ms idle=1077.8ms
SELECT
  *
FROM
  oban_jobs
WHERE
  state = 'available'
  AND queue = 'event_bus'
  AND attempt < max_attempts
ORDER BY
  priority ASC,
  scheduled_at ASC,
  id ASC
LIMIT
  100
 []
%MyXQL.Result{
  columns: ["id", "state", "queue", "worker", "args", "meta", "tags",
   "attempted_by", "errors", "attempt", "max_attempts", "priority",
   "inserted_at", "scheduled_at", "attempted_at", "cancelled_at",
   "completed_at", "discarded_at"],
  connection_id: 616749671,
  last_insert_id: nil,
  num_rows: 1,
  rows: [
    [
      15,
      "available",
      "event_bus",
      "MyApp.EventBus",
      %{
        "data" => %{
          "attempt" => nil,
          "id" => "a344b3f3-8ed3-41e5-a7ec-10f8aea4cfad"
        },
        "event_type" => "Elixir.MyApp.EventBus.Events.ThemePreferenceUpdated"
      },
      %{},
      [],
      [],
      [],
      0,
      20,
      0,
      ~N[2024-11-21 22:13:46.114637],
      ~N[2024-11-21 22:13:46.114637],
      nil,
      nil,
      nil,
      nil
    ]
  ],
  num_warnings: 0
}

If it was simply a syntax error or an unsupported feature though, I would expect the query to fail when connecting directly to PlanetScale, but it does not.

greg-rychlewski commented 8 hours ago

Hi @beerlington ,

Just want to make sure: you are querying the same database server on both Ecto and the CLI you showed that returned results?

If so then maybe one of the flags we are setting when we connect is rejecting that syntax. If you can confirm the above it will help narrow the search. Thank you!

wojtekmach commented 8 hours ago

Thank you for the report. I was able to reproduce this by connecting to a planetscale instance.

@greg-rychlewski yeah it sounds like exactly that but no luck yet.

beerlington commented 8 hours ago

Yes, it's the same database and it sounds like @wojtekmach is able to reproduce it. Let me know if there's anything else I can provide. If you have any questions about PlanetScale or Vitess, I can probably get those answered as well. Thanks!

wojtekmach commented 8 hours ago

OK I know what this is, I was able to fix this issue by using the text protocol instead of the default binary protocol. In other words:

MyXQL.query!(pid, sql, [], query_type: :text)
wojtekmach commented 7 hours ago

If Oban is doing Repo.query!(sql, params, options) then setting options to query_type: :text should fix this issue. If it's using Repo.all, Repo.one, etc, then at the moment we can't pass that option down from Ecto all the way to MyXQL.

wojtekmach commented 7 hours ago

This seems like Planetscale bug maybe?

Running this against Planetscale has the same issue:

MyXQL.query!(pid, "SELECT 1 FOR UPDATE SKIP LOCKED", [], [])

however against a stock mysql 8.0.40 server it works. (docker run --rm -it -p 3306:3306 -e MYSQL_ALLOW_EMPTY_PASSWORD=1 mysql:8.0.)

beerlington commented 7 hours ago

For reference, here is the Ecto query that Oban is using where this error comes up https://github.com/oban-bg/oban/blob/main/lib/oban/engines/dolphin.ex#L105-L114

fetch_query =
  Job
  |> where([j], j.state == "available")
  |> where([j], j.queue == ^meta.queue)
  |> where([j], j.attempt < j.max_attempts)
  |> order_by(asc: :priority, asc: :scheduled_at, asc: :id)
  |> limit(^demand)
  |> lock("FOR UPDATE SKIP LOCKED")

jobs = Repo.all(conf, fetch_query)

In terms of being a PlanetScale bug, I'm not sure. We've been running our production application on it and this is the first query we've hit that has had this issue. Is it possible that my MySQL client is handling binary vs text automatically so I don't notice the difference?

Screenshot 2024-11-22 at 2 35 14 PM

wojtekmach commented 7 hours ago

Yeah most clients including CLI client tend to use text protocol for one off queries