citusdata / citus

Distributed PostgreSQL as an extension
https://www.citusdata.com
GNU Affero General Public License v3.0
10.39k stars 661 forks source link

BUG: can't call parameterized distributed procedures outside an explicit transaction in Citus 12.1.3 (but not 12.1.0?) #7597

Open DS-AdamMilazzo opened 4 months ago

DS-AdamMilazzo commented 4 months ago

In Citus 12.1.3, when calling a distributed procedure in a batch of SQL statements, the CALL statement appears to execute in a different transaction from the rest of the statements in the batch. Also, it is generally impossible to call distributed procedures with batch parameters as arguments to the procedure. This behavior is not observed in Citus 12.1.0.

Repro steps

First, perform this setup:

CREATE TABLE t (p INT NOT NULL, i INT NOT NULL, PRIMARY KEY (p, i));
SELECT create_distributed_table('t', 'p');
CREATE PROCEDURE f(_p INT, _i INT) LANGUAGE plpgsql AS
    $$ BEGIN SELECT i / 0 FROM t WHERE p = _p AND i = _i; END; $$;
SELECT create_distributed_function('f(INT,INT)', distribution_arg_name := '_p', colocate_with := 't');

Second, call the procedure in a parameterized batch of SQL statements:

INSERT INTO t (p, i) VALUES (@0, @1);
CALL f(1, @1);

In this batch, I used parameters @0 = 1 and @1 = 2, but the specific values don't matter. The batch was sent with the Npgsql library, but any library that allows sending parameterized batches should be able to reproduce this bug. (Don't try putting the statements into a SQL procedure and calling it. That's not likely to reproduce the problem. Use a client library to send the parameterized batch.) Example Npgsql code:

NpgsqlCommand cmd = connection.CreateCommand();
cmd.CommandText = "INSERT INTO t (p, i) VALUES (@0, @1); CALL f(1, @1);";
cmd.Parameters.Add(new NpgsqlParameter("@0", 1));
cmd.Parameters.Add(new NpgsqlParameter("@1", 2));
cmd.ExecuteNonQuery();

Expected behavior:

  1. A tuple should be inserted into table t.
  2. Procedure f(1, 2) should be called, and fail with a division by zero error.
  3. The transaction should roll back, removing the inserted tuple.

Actual behavior:

  1. A tuple is inserted into table t.
  2. Procedure f is not called. Instead, an error occurs: "42P02: there is no parameter $1"
  3. The INSERT statement is still committed (i.e. the inserted tuple is not removed).

Additional Comments

emelsimsek commented 4 months ago

I have tried to reproduce this using the below python code. However on both 12.1.0 and 12.1.3, procedure f is called, it throws division by 0 exception and nothing is inserted to the table t. Is anything missing in this code?

    cur = conn.cursor()

    sql = "CREATE TABLE t (p INT NOT NULL, i INT NOT NULL, PRIMARY KEY (p, i));"
    cur.execute(sql);

    sql = "SELECT create_distributed_table('t', 'p');"
    cur.execute(sql);

    conn.commit();

    sql = """ CREATE PROCEDURE f(_p INT, _i INT) LANGUAGE plpgsql AS
    $$ BEGIN SELECT i / 0 FROM t WHERE p = _p AND i = _i; END; $$;"""

    cur.execute(sql);

    conn.commit();

    sql = "SELECT create_distributed_function('f(INT,INT)', distribution_arg_name := '_p', colocate_with := 't');"
    cur.execute(sql);

    conn.commit();

    sql = "INSERT INTO t (p, i) VALUES (%s, %s); CALL f(1, %s);"

    cur.execute(sql,(1,2,2))
    conn.commit();
DS-AdamMilazzo commented 4 months ago

I am not familiar with the python client, but I think there are two problems with that code.

  1. I think that by default, the python client wraps your commands in a transaction. You should switch it to autocommit mode. As mentioned in the post, the problem doesn't occur inside a transaction.
  2. Ideally you would use two parameters, as in INSERT INTO t (p, i) VALUES (@0, @1); CALL f(1, @1); rather than three. Using three may result in a different behavior, though I'm not sure.
DS-AdamMilazzo commented 4 months ago

Any luck reproducing this?

m3hm3t commented 3 weeks ago

I was able to replicate the issue with Npgsql on both Citus versions 12.1.3 and 12.1.0.

Here is the code I executed:

using System;
using Npgsql;

class Program
{
    static void Main()
    {
        // Update the connection string with the port number
        var connString = "Host=localhost;Port=9700;Database=citus";

        // Establish the connection
        using (var connection = new NpgsqlConnection(connString))
        {
            connection.Open();

            // Create a command
            using (var cmd = connection.CreateCommand())
            {
                // Command text
                cmd.CommandText = "INSERT INTO t (p, i) VALUES (@0, @1); CALL f(1, @1);";

                // Add parameters
                cmd.Parameters.Add(new NpgsqlParameter("@0", 1));
                cmd.Parameters.Add(new NpgsqlParameter("@1", 2));

                // Execute the command
                cmd.ExecuteNonQuery();
            }
        }
    }
}

Additionally, I attempted to simulate the same behavior using another library:

import psycopg2

# Establish the connection
connection = psycopg2.connect(
    dbname="citus", 
    host="localhost", 
    port="9700"
)

# Enable autocommit mode
connection.autocommit = True

# Execute the combined command with implicit transaction handling
try:
    with connection.cursor() as cursor:
        # Combined command: INSERT INTO t followed by CALL f
        cursor.execute(
            """
            INSERT INTO t (p, i) VALUES (%s, %s);
            CALL f(1, %s);
            """, 
            (1, 2, 2)
        )

except Exception as e:
    print(f"Error during execution: {e}")
finally:
    connection.close()
citus@0e40d9ee7522:/data$ python3 script.py
Error during execution: division by zero
CONTEXT:  while executing command on localhost:9702
SQL statement "SELECT i / 0 FROM t WHERE p = _p AND i = _i"
PL/pgSQL function f(integer,integer) line 1 at SQL statement

In this case, the behavior was as expected.

My experience with Npgsql is somewhat limited, but here’s my take on the issue. It seems like the INSERT works and gets committed right away because of autocommit mode. However, the CALL fails on the client side due to a parameter binding problem in Npgsql. This error happens before the distributed transaction can complete on the worker nodes, so Citus doesn’t roll back the INSERT.

DS-AdamMilazzo commented 3 weeks ago

I don't know what

        cursor.execute(
            """
            INSERT INTO t (p, i) VALUES (%s, %s);
            CALL f(1, %s);
            """, 
            (1, 2, 2)

does exactly, e.g. whether it's going to use actual SQL-level parameters for the values 1, 2, 2. The documentation is not clear on this point, either. It looks like it may well be passing literals and not using SQL-level parameters.

Anyway, if it was due to a client-side problem in Npgsql, that would raise the question of why the problem doesn't occur if you don't call create_distributed_function. I.e. the problem only occurs with Citus distributed functions, but Npgsql doesn't know whether the function has been distributed or not.

m3hm3t commented 3 weeks ago

that would raise the question of why the problem doesn't occur if you don't call create_distributed_function. I.e. the problem only occurs with Citus distributed functions, but Npgsql doesn't know whether the function has been distributed or not.

You're correct; when the function isn't distributed, it runs like a regular PostgreSQL function, resulting in a typical PostgreSQL error (22012: division by zero).

  Exception data:
    Severity: ERROR
    SqlState: 22012
    MessageText: division by zero
    Where: while executing command on localhost:9702
SQL statement "SELECT i / 0 FROM t WHERE p = _p AND i = _i"
PL/pgSQL function f(integer,integer) line 1 at SQL statement
    File: remote_commands.c
    Line: 318
    Routine: ReportResultError

To help identify whether this is a bug in Citus or a client-side issue, tried executing the function directly using psql without involving the client driver (Npgsql or psycopg2).

SELECT f(1, 2);

[local] citus@citus:9700-6010=# CALL f(1, 2);
ERROR:  division by zero
CONTEXT:  SQL statement "SELECT i / 0 FROM t WHERE p = _p AND i = _i"
PL/pgSQL function f(integer,integer) line 1 at SQL statement
while executing command on localhost:9702
Time: 18.283 ms

It points more towards a client-side issue.

Tried to simplify the query in the client code to ensure the parameters are passed in a basic form and check if the same issue occurs.

            using (var cmd = connection.CreateCommand())
            {
                cmd.CommandText = "CALL f(1, 2);"; 

                cmd.ExecuteNonQuery();
            }
              Exception data:
    Severity: ERROR
    SqlState: 22012
    MessageText: division by zero
    Where: SQL statement "SELECT i / 0 FROM t WHERE p = _p AND i = _i"
PL/pgSQL function f(integer,integer) line 1 at SQL statement
while executing command on localhost:9702
    File: remote_commands.c
    Line: 318
    Routine: ReportResultError

It could indicate that the issue arises when binding parameters dynamically rather than being an inherent issue with Citus.

I encountered the same error when I tried both of these.

cmd.CommandText = "CALL f(1, @i);";
cmd.Parameters.Add(new NpgsqlParameter("i", 2)); 

cmd.CommandText = "CALL f($1, $2);";
cmd.Parameters.Add(new NpgsqlParameter("$1", 1));
cmd.Parameters.Add(new NpgsqlParameter("$2", 2));
Exception data:
    Severity: ERROR
    SqlState: 42P02
    MessageText: there is no parameter $1
    Where: while executing command on localhost:9702
    File: remote_commands.c
    Line: 318
    Routine: ReportResultError

It seems likely that this error happens because of a mismatch between how the client sends parameters and how the server expects them, particularly with prepared statements.

Since the function works fine when run directly on the server, the issue might be related to how the client interacts with the distributed Citus setup. The way positional parameter binding works in Npgsql may differ from how PostgreSQL handles those placeholders, so that could be part of the problem.

DS-AdamMilazzo commented 3 weeks ago

I don't agree that it's a client-side problem, but it only happens when specifying parameters at the SQL (prepared statement) level. Clients that transform parameters into literals will not reproduce the problem. Running the function without parameters will not reproduce the problem. Parameter binding works in Npgsql for this function but only if you didn't previously call create_distributed_function on it. Npgsql will not know whether the function has been distributed. The same request, which works, only starts to fail after calling create_distributed_function.

According to my original comments:

There was a similar bug reported previously - #7242 - which was also related to parameterized distributed functions called outside explicit transactions. It may be related, and it was a server-side problem.

m3hm3t commented 3 weeks ago

@DS-AdamMilazzo

Do you already have logs set with log_min_messages and citus.work_min_messages at DEBUG1 (or possibly DEBUG4)?

My end I received these logs. Do you have similar one?

2024-08-28 08:55:03.967 UTC [11560] DEBUG:  parse <unnamed>: INSERT INTO citus.t_102041 (p, i) VALUES (1, 2)
2024-08-28 08:55:03.967 UTC [11560] DEBUG:  bind <unnamed> to <unnamed>
2024-08-28 08:55:03.969 UTC [11560] ERROR:  there is no parameter $1 at character 17
2024-08-28 08:55:03.969 UTC [11560] STATEMENT:  CALL citus.f(1, $1)
2024-08-28 08:55:03.970 UTC [11368] DEBUG:  forked new backend, pid=11561 socket=11
2024-08-28 08:55:03.970 UTC [11561] DEBUG:  processing cancel request: sending SIGINT to process 11560
2024-08-28 08:55:03.970 UTC [11561] DEBUG:  shmem_exit(0): 0 before_shmem_exit callbacks to make
2024-08-28 08:55:03.970 UTC [11561] DEBUG:  shmem_exit(0): 0 on_shmem_exit callbacks to make
2024-08-28 08:55:03.970 UTC [11561] DEBUG:  proc_exit(0): 1 callbacks to make
2024-08-28 08:55:03.970 UTC [11561] DEBUG:  exit(0)
2024-08-28 08:55:03.970 UTC [11561] DEBUG:  shmem_exit(-1): 0 before_shmem_exit callbacks to make
2024-08-28 08:55:03.970 UTC [11561] DEBUG:  shmem_exit(-1): 0 on_shmem_exit callbacks to make
2024-08-28 08:55:03.970 UTC [11561] DEBUG:  proc_exit(-1): 0 callbacks to make
2024-08-28 08:55:03.972 UTC [11368] DEBUG:  reaping dead processes
2024-08-28 08:55:03.972 UTC [11368] DEBUG:  server process (PID 11561) exited with exit code 0
2024-08-28 08:55:03.972 UTC [11560] DEBUG:  shmem_exit(0): 5 before_shmem_exit callbacks to make
2024-08-28 08:55:03.972 UTC [11560] DEBUG:  shmem_exit(0): 6 on_shmem_exit callbacks to make
2024-08-28 08:55:03.972 UTC [11560] DEBUG:  proc_exit(0): 2 callbacks to make
2024-08-28 08:55:03.972 UTC [11560] DEBUG:  exit(0)
2024-08-28 08:55:03.972 UTC [11560] DEBUG:  shmem_exit(-1): 0 before_shmem_exit callbacks to make
2024-08-28 08:55:03.972 UTC [11560] DEBUG:  shmem_exit(-1): 0 on_shmem_exit callbacks to make
2024-08-28 08:55:03.972 UTC [11560] DEBUG:  proc_exit(-1): 0 callbacks to make
2024-08-28 08:55:03.974 UTC [11368] DEBUG:  reaping dead processes
2024-08-28 08:55:03.974 UTC [11368] DEBUG:  server process (PID 11560) exited with exit code 0
2024-08-28 08:55:12.754 UTC [11371] DEBUG:  snapshot of 0+0 running transaction ids (lsn 0/1DBD9F0 oldest xid 780 l
atest complete 779 next xid 780)