ballerina-platform / ballerina-library

The Ballerina Library
https://ballerina.io/learn/api-docs/ballerina/
Apache License 2.0
135 stars 56 forks source link

Getting two string[] for some CSV row when accessing through byte[] stream #5034

Open Chuhaa opened 2 years ago

Chuhaa commented 2 years ago

Description:

Currently we get two string[] for some CSV row when we access through byte[] stream. Describe your problem(s) We need to access a CSV file through byte[] stream and convert it into string[] stream to do a query. When we do this, some csv rows are split into two string[]. This is because of the byte[] separation. So due to this we get 1048 string[] for 1000 CSV rows and we cannot process the string[].

Sample code:

  stream<byte[], io:Error?> movieCatalogResponse = check amazonS3Client->getObject("library-catalog", 
        "movies-tv-shows.csv");
        record {|byte[] value;|}|io:Error? next = movieCatalogResponse.next();
        while next is record {|byte[] value;|} {
            io:ReadableCharacterChannel readableCharacterChannel = new (check io:createReadableChannel(next.value), 
            "UTF-8");
            io:ReadableCSVChannel csvChannel = new (readableCharacterChannel, io:COMMA);
            while (csvChannel.hasNext()) {
                string[]? cells = check csvChannel.getNext();
                if (cells is string[]) {
                    io:println(cells);
                }
            }
            next = movieCatalogResponse.next();
        }

Describe your solution(s) Provide support to get CSV row as one string[].

BuddhiWathsala commented 2 years ago

I think this happens due to the byte chuck that you received from the amazonS3Client->getObject() contains a partial CSV row. Could you check whether you can increase the size of the byte chunk that you received from the following code?.

stream<byte[], io:Error?> movieCatalogResponse = check amazonS3Client->getObject("library-catalog", "movies-tv-shows.csv");
miyurud commented 2 years ago

Based on a discussion we had with the connector team members (@Chuhaa , @indikasampath2000, @LakshanSS , and @abeykoon ) the getObject() is supposed to download a file from S3 as well as upload a file to S3. Ideally I would like to see the getObject() function name changes to getStreamFromObject() because what we receive from the function invocation getObject() is a stream of byte arrays.

The solution we came up in the meeting was to write the complete stream of bytes received from S3 into a temporary file and then once the complete file has been received, we will read it so that we do not face the issue of partial data.

However, this is not the efficient solution in this scenario because there is no such requirement of storing data from S3 into a file in this use case. Also we are not reusing the file content even if we write those to the file. The processing we do here can be done considering each individual row, hence there is no requirement of having access to the complete data set locally. Furthermore, the approach of saving a file will consume additional CPU, memory resources extensively. It will also increase the latency of the application's execution. While the CSV used in the above sample is small one, we cannot assume the approach of downloading a complete file (for example of gigabyte scale) from S3 and then read it in the next round to do the processing. In such scenario reading row by row is the practical and efficient approach to follow.

Whether the data we read from S3 is small or large the io:ReadableCharacterChannel and io:ReadableCSVChannel should be able to handle the delimiters properly. For example, if the programmer knows the CSV he is processing has 25 columns, then he should be able to specify that fact as an input parameter to the above channel instantiation code. Then those channels should makesure they do not return any CSV data unless the 25 columns requirement is satisfied.

BuddhiWathsala commented 2 years ago

This seems not a bug in the existing implementation. This should be an improvement that we have to do after checking the feasibility.

Firstly, this occurs because you create a CSV channel for each byte chunk which you should remove from the implementation. Here, you are requesting some kind of a blocking buffer(or Go lang pipe) kind of implementation that we have to provide that wait until the entire CSV row arrives. I did an initial check on other languages and the only library that comes close to such a requirement is Python pandas.

@Maninda is there any plan to provide this kind of functionality in the upcoming data analytics module under CSV loading?

daneshk commented 2 years ago

Even in the Python pandas module, we need to provide the source which can read the full content. It can be either a file path or URL(Amazon S3) or an in-memory byte array with full content.

From IO Library, we only can give an error, If the source doesn't contain full CSV content.

My suggestion is to have a new remote function is Amazon S3 connector to retrieve CSV directly from the S3 client. Something like,

stream<string[], Error?> response = amazonS3Client->getCsvAsStream("library-catalog", "movies-tv-shows.csv");

Inside the client, we internally handle it and return CSV rows as a stream.

indikasampath2000 commented 2 years ago

I am +1 to provide what @daneshk suggested. But we cannot limit it to only CSV. We should have getXmlAsStream and getJsonAsStream as well.

BuddhiWathsala commented 2 years ago

@indikasampath2000, reading a CSV as a stream is understandable since we might need to process a CSV line by line. However, reading XML and JSON as a stream is confusing because we need the entire JSON or XML as a whole to process.

Maninda commented 2 years ago

@BuddhiWathsala, in Pandas module CSV data loading is still in its design stage which is dependent on io module's functionality ATM.

BuddhiWathsala commented 2 years ago

Transferring this issue since it is discussed above to have this feature in AWS S3 client.

abeykoon commented 2 years ago

@daneshk

stream<string[], Error?> response = amazonS3Client->getCsvAsStream("library-catalog", "movies-tv-shows.csv");

What you meant here as a string[] is whatever content in the chunk we received from S3 as a text array or is it text representing a line in the CSV?

IMO, bringing CSV specific operation to S3 connector is not correct. It is the responsibility of CSV module to consume the byte stream in the way it needs.

We may see a new line in the middle of the chunk. Better way is dumping the whole content to a file and process it back. Then we have a whole CSV file there no matter it is small or large.

abeykoon commented 2 years ago

Currently, we have implemented in a way that we wait for the whole CSV file to a single byte[]. But in a usecase where we need to process CSVs of big sizes we are, how we are going to handle it? Question raised by https://github.com/wso2-enterprise/choreo/issues/8589 still remains. Isn't it?