sunng87 / pgwire

PostgreSQL wire protocol implemented as a rust library.
Apache License 2.0
531 stars 40 forks source link

`ToSqlText` for arrays does not follow quoting rules #187

Closed elmaxxo closed 2 months ago

elmaxxo commented 4 months ago

The current implementation of ToSqlText for arrays does not adhere to the specified rules for quoting array elements. According to the PostgreSQL documentation:

The array output routine will put double quotes around element values if they are empty strings, contain curly braces, delimiter characters, double quotes, backslashes, or white space, or match the word NULL. Double quotes and backslashes embedded in element values will be backslash-escaped. For numeric data types it is safe to assume that double quotes will never appear, but for textual data types one should be prepared to cope with either the presence or absence of quotes.

Following these rules is crucial for proper parsing on the client side.

Steps to Reproduce:

  1. Server:
use std::fmt::Debug;
use std::sync::Arc;

use async_trait::async_trait;
use futures::{stream, Sink, SinkExt, StreamExt};
use tokio::net::TcpListener;

use pgwire::api::auth::noop::NoopStartupHandler;
use pgwire::api::copy::NoopCopyHandler;
use pgwire::api::query::{PlaceholderExtendedQueryHandler, SimpleQueryHandler};
use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo, QueryResponse, Response, Tag};
use pgwire::api::{ClientInfo, PgWireHandlerFactory, Type};
use pgwire::error::ErrorInfo;
use pgwire::error::{PgWireError, PgWireResult};
use pgwire::messages::response::NoticeResponse;
use pgwire::messages::PgWireBackendMessage;
use pgwire::tokio::process_socket;

pub struct DummyProcessor;

#[async_trait]
impl SimpleQueryHandler for DummyProcessor {
    async fn do_query<'a, C>(
        &self,
        client: &mut C,
        query: &'a str,
    ) -> PgWireResult<Vec<Response<'a>>>
    where
        C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
        C::Error: Debug,
        PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
    {
        client
            .send(PgWireBackendMessage::NoticeResponse(NoticeResponse::from(
                ErrorInfo::new(
                    "NOTICE".to_owned(),
                    "01000".to_owned(),
                    format!("Query received {}", query),
                ),
            )))
            .await?;

        if query.starts_with("SELECT") {
            let f1 = FieldInfo::new(
                "text_arr".into(),
                None,
                None,
                Type::TEXT_ARRAY,
                FieldFormat::Text,
            );
            let schema = Arc::new(vec![f1]);

            let data = vec![vec!["\"", ",{", "    ", "\"},", ",\""]];
            let schema_ref = schema.clone();

            let data_row_stream = stream::iter(data.into_iter()).map(move |r| {
                let mut encoder = DataRowEncoder::new(schema_ref.clone());
                encoder.encode_field(&r)?;
                encoder.finish()
            });

            Ok(vec![Response::Query(QueryResponse::new(
                schema,
                data_row_stream,
            ))])
        } else {
            Ok(vec![Response::Execution(Tag::new("OK").with_rows(1))])
        }
    }
}

struct DummyProcessorFactory {
    handler: Arc<DummyProcessor>,
}

impl PgWireHandlerFactory for DummyProcessorFactory {
    type StartupHandler = NoopStartupHandler;
    type SimpleQueryHandler = DummyProcessor;
    type ExtendedQueryHandler = PlaceholderExtendedQueryHandler;
    type CopyHandler = NoopCopyHandler;

    fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
        self.handler.clone()
    }

    fn extended_query_handler(&self) -> Arc<Self::ExtendedQueryHandler> {
        Arc::new(PlaceholderExtendedQueryHandler)
    }

    fn startup_handler(&self) -> Arc<Self::StartupHandler> {
        Arc::new(NoopStartupHandler)
    }

    fn copy_handler(&self) -> Arc<Self::CopyHandler> {
        Arc::new(NoopCopyHandler)
    }
}

#[tokio::main]
pub async fn main() {
    let factory = Arc::new(DummyProcessorFactory {
        handler: Arc::new(DummyProcessor),
    });

    let server_addr = "127.0.0.1:5434";
    let listener = TcpListener::bind(server_addr).await.unwrap();
    println!("Listening to {}", server_addr);
    loop {
        let incoming_socket = listener.accept().await.unwrap();
        let factory_ref = factory.clone();
        tokio::spawn(async move { process_socket(incoming_socket.0, None, factory_ref).await });
    }
}
  1. Client:
    
    from psycopg import connect, ClientCursor

conn = connect("user=postgres host=127.0.0.1 port=5434", cursor_factory=ClientCursor, autocommit=True) cur = conn.cursor() cur.execute("SELECT * FROM some_table") rows = cur.fetchall()


3. **Run the server and execute the Python script.** `cur.fetchall()` will fail with the following backtrace:
```python
>>> cur.fetchall()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/max/.local/lib/python3.10/site-packages/psycopg/cursor.py", line 864, in fetchall
    records = self._tx.load_rows(self._pos, self.pgresult.ntuples, self._make_row)
  File "psycopg_binary/_psycopg/transform.pyx", line 464, in psycopg_binary._psycopg.Transformer.load_rows
  File "psycopg_binary/types/array.pyx", line 52, in psycopg_binary._psycopg.ArrayLoader.cload
  File "psycopg_binary/types/array.pyx", line 123, in psycopg_binary._psycopg._array_load_text
  File "psycopg_binary/types/array.pyx", line 160, in psycopg_binary._psycopg._parse_token
psycopg.DataError: malformed array: hit the end of the buffer
sunng87 commented 4 months ago

@elmaxxo Thank you for reporting. Will you create a pull request to fix this?

elmaxxo commented 3 months ago

Will you create a pull request to fix this?

Maybe later, if no one does it before me.

sunng87 commented 2 months ago

I just created #198 to address this.