MKuranowski / aiocsv

Python: Asynchronous CSV reading/writing
https://pypi.org/project/aiocsv/
MIT License
67 stars 9 forks source link

Error when decoding csv using sftp client of asyncssh #15

Closed djmarsel closed 1 year ago

djmarsel commented 1 year ago

Hello. I have an error when reading csv using aiocsv via asyncssh. If the file is larger than some size (about 10K), i catch this error:

Traceback (most recent call last):
  File "test.py", line 45, in <module>
    asyncio.run(main())
  File ".pyenv/versions/3.10.11/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File ".pyenv/versions/3.10.11/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "test.py", line 37, in main
    async for row in aiocsv.AsyncReader(f):
  File ".venv/lib/python3.10/site-packages/aiocsv/readers.py", line 37, in __anext__
    return await self._parser.__anext__()
  File "aiocsv/_parser.pyx", line 216, in parser
  File ".venv/lib/python3.10/site-packages/asyncssh/sftp.py", line 3086, in read
    return cast(AnyStr, data.decode(self._encoding, self._errors))
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd0 in position 2047: unexpected end of data

My code looked like this:

    async with asyncssh.connect(
            sftp_host,
            port=sftp_port,
            username=sftp_username,
            password=sftp_password,
            client_keys=sftp_client_keys,
    ) as conn:  # type: asyncssh.SSHClientConnection
        async with conn.start_sftp_client() as sftp:  # type: asyncssh.SFTPClient
            async with sftp.open(
                sftp_path / filename,
                pflags_or_mode='r',
            ) as f:  # type: asyncssh.SFTPClientFile
                async for row in aiocsv.AsyncReader(f):
                    print(row)

Does anyone have any idea whats is wrong there? Thank's a lot!

MKuranowski commented 1 year ago

This looks like an error in asyncssh. You can't attempt to decode an arbitrarily-sliced UTF-8 string, as the slicing might cut off in the middle of a multi-byte sequence. asyncssh needs to use codecs.IncrementalDecoder.

djmarsel commented 12 months ago

Thank you for the explanation. I solved this problem by using an additional proxy class that uses the codecs.IncrementalDecoder before passing a piece of data to the aiocsv reader.

I will post here an example of code that may be useful to someone to solve a similar problem:

from codecs import getincrementaldecoder, IncrementalDecoder
from typing import AnyStr

import asyncssh
from asyncssh.sftp import SFTPClientFile

class AsyncSftpFileReader:
    """
    This class uses IncrementalDecoder to correctly decode file split by chunks
    where chunk bound could split UTF8 multibyte char.

    Note! SFTPClientFile must be opened in binary mode.
    """
    __slots__ = ('_decoder', '_sftp_file')

    def __init__(self, sftp_file: SFTPClientFile,  encoding='utf-8', errors='strict'):
        self._decoder: IncrementalDecoder = getincrementaldecoder(encoding)(errors)
        self._sftp_file = sftp_file

    async def __aenter__(self):
        return self

    async def read(self, size: int = -1) -> AnyStr:
        resp = await self._sftp_file.read(size)
        return self._decoder.decode(resp)

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            self._decoder.decode(b'', final=True)

async with asyncssh.connect(
        sftp_host,
        port=sftp_port,
        username=sftp_username,
        password=sftp_password,
        client_keys=sftp_client_keys,
) as conn:  # type: asyncssh.SSHClientConnection
    async with conn.start_sftp_client() as sftp:  # type: asyncssh.SFTPClient
        async with sftp.open(
            sftp_path / filename,
            pflags_or_mode='rb',
        ) as f:  # type: asyncssh.SFTPClientFile
            async with AsyncSftpFileReader(f) as async_file:
                async for row in aiocsv.AsyncReader(async_file):
                    print(row)
MKuranowski commented 12 months ago

self._decoder.decode(b'', final=True) might still return further bytes. I have already posted an async-equivalent to io.TextIOWrapper in #2:

from typing import AnyStr, Protocol

class SupportsAsyncRead(Protocol[AnyStr]):
    async def read(self, size: int = ...) -> AnyStr: ...

class AsyncTextReaderWrapper:
    def __init__(self, obj: SupportsAsyncRead[bytes], encoding: str, errors: str = "strict") -> None:
        self.obj = obj

        decoder_factory = codecs.getincrementaldecoder(encoding)
        self.decoder = decoder_factory(errors)

    async def read(self, size: int = -1) -> str:
        raw_data = await self.obj.read(size)

        if not raw_data:
            return self.decoder.decode(b"", final=True)

        return self.decoder.decode(raw_data, final=False)
vitaly-burovoy commented 12 months ago

Hello!

@MKuranowski, your solution seems better!

The only question I have after reading your message is:

self._decoder.decode(b'', final=True) might still return further bytes.

why do you think it is possible? Could you prove it?

Digging into CPython source code I found (at least for utf8) decode function parses as many chars (bytes) as possible: https://github.com/python/cpython/blob/3.12/Lib/codecs.py#L319 https://github.com/python/cpython/blob/3.12/Objects/unicodeobject.c#L4669 https://github.com/python/cpython/blob/3.12/Objects/unicodeobject.c#L4687

Therefore passing empty binary string (even with final=True) can not "finish" even single Unicode char to be returned.

Did I miss something?

MKuranowski commented 12 months ago

Sorry, it's not return further bytes, it should be detect partial multi-byte sequences.

>>> import codecs
>>> decoder = codecs.getincrementaldecoder("utf-8")("strict")
>>> str_without_last_byte = "Zażółć gęślą jaźń".encode("utf-8")[:-1]
>>> decoder.decode(str_without_last_byte)
'Zażółć gęślą jaź'
>>> decoder.decode(b"", final=True)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<frozen codecs>", line 322, in decode
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc5 in position 0: unexpected end of data
MKuranowski commented 12 months ago

Update: it can return more bytes, with errors="replace", and with other non-strict error handling. Actually would be interesting to see if there is an encoding scheme which relies on EOF to distinguish between characters.

>>> import codecs
>>> decoder = codecs.getincrementaldecoder("utf-8")("reaplce")
>>> str_without_last_byte = "Zażółć gęślą jaźń".encode("utf-8")[:-1]
>>> decoder.decode(str_without_last_byte)
'Zażółć gęślą jaź'
>>> decoder.decode(b"", final=True)
'�'
vitaly-burovoy commented 11 months ago

Wow! Great!

I've never used errors="replace" (but knew this feature) and have not thought that way.

Thank you very much for the proof!