apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.31k stars 3.48k forks source link

[C++][Python] FileNotFound with Scality accessed through S3 APIs #30454

Closed asfimport closed 2 years ago

asfimport commented 2 years ago

When using dataset.Dataset with S3FileSystem with compatible S3 object sotrage, get an FileNotFoundError.

 

My code:

 

scality = fs.S3FileSystem(access_key='accessKey1', secret_key='verySecretKey1', endpoint_override="http://localhost:8000", region="")

data = ds.dataset("dasynth/parquet/taxies/2019_june/", format="parquet", partitioning="hive", filesystem=scality)

Environment: linux + python 3.8 Reporter: Luis Morales Assignee: Antoine Pitrou / @pitrou

PRs and other links:

Note: This issue was originally created as ARROW-14930. Please see the migration documentation for further details.

asfimport commented 2 years ago

Luis Morales: dasynth -> bucket

parquet, taxies -> folders

2019_june -> root dir for partitioned parquet

asfimport commented 2 years ago

Joris Van den Bossche / @jorisvandenbossche: A few questions to help diagnose the problem. Could you first try to see if the filesystem object itself can find the directories/files (so whether the problem lies there, or in the dataset code). For example, could you try:


scality.get_file_info("dasynth")
scality.get_file_info("dasynth/parquet/taxies")

If the parameters for the S3FileSystem are correct, it should normally be able to give some basic information about the bucket.

asfimport commented 2 years ago

Luis Morales: Tried three things:

 

(the bucket as a FileSelector object and recursively. OK)

scality.get_file_info(fs.FileSelector("dasynth/", recursive=True))

    (the bucket as a path. OK) **scality.get_file_info("dasynth/")**   (a folder under the bucket as a path. KO) **scality.get_file_info("dasynth/parquet/")**   (a folder under the bucket as a FIleSelector object. OK) **scality.get_file_info(fs.FileSelector("dasynth/parquet/"))** []
asfimport commented 2 years ago

Luis Morales: (they were four tests sorry, not three)

:)

asfimport commented 2 years ago

Luis Morales: Additional info. Logs from S3 compatible server (scality):

scality.get_file_info("dasynth/parquet/")


{"name":"S3","clientIP":"::ffff:172.17.0.1","clientPort":47954,"httpCode":200,"httpMethod":"HEAD","httpURL":"/dasynth/parquet","time":1638621462882,"req_id":"b01ebf5fc0409fbced02","level":"info","message":"received request","hostname":"33938e405ca2","pid":78}
{"name":"S3","bucketName":"dasynth","objectKey":"parquet","bytesReceived":0,"bodyLength":0,"service":"s3","action":"HeadObject","accountDisplayName":"Bart","accountName":"Bart","bytesSent":191,"clientIP":"::ffff:172.17.0.1","clientPort":47954,"httpCode":404,"httpMethod":"HEAD","httpURL":"/dasynth/parquet","time":1638621462887,"req_id":"b01ebf5fc0409fbced02","elapsed_ms":5.076007,"level":"info","message":"responded with error XML","hostname":"33938e405ca2","pid":78}
{"name":"S3","clientIP":"::ffff:172.17.0.1","clientPort":47954,"httpCode":200,"httpMethod":"HEAD","httpURL":"/dasynth/parquet","time":1638621462888,"req_id":"e02c3e47bd4d83c64b45","level":"info","message":"received request","hostname":"33938e405ca2","pid":78}
{"name":"S3","bucketName":"dasynth","objectKey":"parquet","bytesReceived":0,"bodyLength":0,"service":"s3","action":"HeadObject","accountDisplayName":"Bart","accountName":"Bart","bytesSent":191,"clientIP":"::ffff:172.17.0.1","clientPort":47954,"httpCode":404,"httpMethod":"HEAD","httpURL":"/dasynth/parquet","time":1638621462892,"req_id":"e02c3e47bd4d83c64b45","elapsed_ms":3.876694,"level":"info","message":"responded with error XML","hostname":"33938e405ca2","pid":78}
{"name":"S3","clientIP":"::ffff:172.17.0.1","clientPort":47954,"httpCode":200,"httpMethod":"GET","httpURL":"/dasynth?delimiter=%2F&list-type=2&max-keys=1&prefix=parquet%2F","time":1638621462893,"req_id":"44a5115f184a8ff86f54","level":"info","message":"received request","hostname":"33938e405ca2","pid":78}
{"name":"S3","bucketName":"dasynth","bytesReceived":0,"bodyLength":0,"service":"s3","action":"ListObjectsV2","accountDisplayName":"Bart","accountName":"Bart","bytesSent":361,"clientIP":"::ffff:172.17.0.1","clientPort":47954,"httpCode":200,"httpMethod":"GET","httpURL":"/dasynth?delimiter=%2F&list-type=2&max-keys=1&prefix=parquet%2F","time":1638621462913,"req_id":"44a5115f184a8ff86f54","elapsed_ms":20.228346,"level":"info","message":"responded with XML","hostname":"33938e405ca2","pid":78}

 

scality.get_file_info(fs.FileSelector("dasynth/parquet/"))


{"name":"S3","clientIP":"::ffff:172.17.0.1","clientPort":47956,"httpCode":200,"httpMethod":"GET","httpURL":"/dasynth?delimiter=%2F&list-type=2&max-keys=1000&prefix=parquet%2F","time":1638621506540,"req_id":"efc897c4c849927a7de2","level":"info","message":"received request","hostname":"33938e405ca2","pid":78}
{"name":"S3","bucketName":"dasynth","bytesReceived":0,"bodyLength":0,"service":"s3","action":"ListObjectsV2","accountDisplayName":"Bart","accountName":"Bart","bytesSent":364,"clientIP":"::ffff:172.17.0.1","clientPort":47956,"httpCode":200,"httpMethod":"GET","httpURL":"/dasynth?delimiter=%2F&list-type=2&max-keys=1000&prefix=parquet%2F","time":1638621506562,"req_id":"efc897c4c849927a7de2","elapsed_ms":22.359362,"level":"info","message":"responded with XML","hostname":"33938e405ca2","pid":78}

 

Those HEAD requests seem to be the problem, they are answering with 404 http code. Why arn't they in the fs.FileSelector case?

asfimport commented 2 years ago

Luis Morales: The same kind of problem arises when trying to build a dataset:

 

datos = ds.dataset("dasynth/parquet/taxies/2019/", filesystem=scality)

 

The logs are the same as in the scality.get_file_info("dasynth/parquet/") use case. The HEAD operations appear again.

asfimport commented 2 years ago

Joris Van den Bossche / @jorisvandenbossche: Looking at the implemenation for get_file_info with a string passed (https://github.com/apache/arrow/blob/e903a214525dd6dcd8e57b20958a65dc678be47d/cpp/src/arrow/filesystem/s3fs.cc#L2135-L2198), this does indeed do HEAD requests, while the get_file_info with a FileSelector (https://github.com/apache/arrow/blob/e903a214525dd6dcd8e57b20958a65dc678be47d/cpp/src/arrow/filesystem/s3fs.cc#L2200-L2223) is implemented differently.

Now, the fact that those HEAD requests fail, is that an issue on your side?

asfimport commented 2 years ago

Luis Morales: I would say there is no problem in the server side. My thoughts on this:

 

scality.get_file_info("dasynth/parquet/") 

This method through HEAD opeartions is asking for buckets or objects, but in this case dasynth/parquet is none of them, it's just a prefix (or folder or tag... name it here the way you want). That's the reason why the server answers with object not found.

 

When using FileSelector is not asking previously if the object exists, it just asks for the contents with a GET method and in that case it works properly.

 

Maybe with a new parameter with object_type = [bucket, object, tag] and apply a different logic on each case:

bucket, object - > HEAD methods

tag -> the same logic as if it would use FileSelector.

 

would solve the problem 

 

additionally in the dataset() method things should be changed too according to this idea.

 

an additional  example. if you use get_file_info with a file like this:

 

scality.get_file_info("dasynth/parquet/taxies/2019/month_year=2001-01/payment_type=1/9ccd9d4ae28a41e1acaf40ea594b61da.snappy.parquet")

 

it works despite of the folders parquet, taxies, 2019...

asfimport commented 2 years ago

Luis Morales: just as a reference... awsdatawrangler works perfectly, with methods like:

 

wr.s3.list_objects(path='s3://dasynth/parquet', boto3_session=sesion)

 

returning objects (parquet files)

 

for chunk in wr.s3.read_parquet('s3://dasynth/parquet/taxies/2019/', dataset=True, boto3_session=sesion, use_threads=True, chunked=True):     chunks+=1

 

my_second_filter = lambda x: True if x["payment_type"].startswith("2") and x["month_year"].startswith("2019-06") else False for chunk in wr.s3.read_parquet(path="s3://dasynth/parquet/taxies/2019/", dataset=True, partition_filter=my_second_filter,                          boto3_session=sesion, use_threads=True, chunked=True):     chunks+=1

 

working properly with filters...

 

this is what I want to get from pyarrow, as it's going to be a lighter library and not coupled to AWS open source initiatives.

 

asfimport commented 2 years ago

Joris Van den Bossche / @jorisvandenbossche: cc @pitrou

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: [~luis_morales] Is there some kind of public Scality test server that we can try testing against?

asfimport commented 2 years ago

Luis Morales: The product i'm using is scality (scality.com). They have a public docker image that is basically their S3 Connector (they call it zenko cloud server).

 

You can find the image here:

 

https://hub.docker.com/r/zenko/cloudserver/

 

i'm using this docker run (with ssl activated, the docker generates its own self-certificate and ca each time you run it):

 

docker run -v :/usr/src/app/localData -v :/usr/src/app/localMetadata -p 8000:8000 -e SSL=TRUE -e ENDPOINT=s3.scality.test -e LOG_LEVEL=trace -e REMOTE_MANAGEMENT_DISABLE=1 zenko/cloudserver

 

The endpoint is needed when using SSL.

I'll tell scality support guys to have a look at this too.

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: Hmm, I tried the in-memory backed:


$ docker run -d --name cloudserver -p 8000:8000 -e S3BACKEND=mem -e REMOTE_MANAGEMENT_DISABLE=1 zenko/cloudserver

And it seems to work properly with our S3 filesystem:


>>> from pyarrow import fs
>>> f = fs.S3FileSystem(scheme='http', access_key='accessKey1', secret_key='verySecretKey1', endpoint_override='localhost:8000')
>>> f.get_file_info('/')
<FileInfo for '/': type=FileType.Directory>
>>> f.get_file_info(fs.FileSelector('/', recursive=True))
[]
>>> f.create_dir('bucket')
>>> f.get_file_info(fs.FileSelector('/', recursive=True))
[<FileInfo for 'bucket': type=FileType.Directory>]
>>> f.create_dir('bucket/foo/bar')
>>> f.get_file_info(fs.FileSelector('/', recursive=True))
[<FileInfo for 'bucket': type=FileType.Directory>,
 <FileInfo for 'bucket/foo': type=FileType.Directory>,
 <FileInfo for 'bucket/foo/bar': type=FileType.Directory>]
>>> f.get_file_info(fs.FileSelector('bucket/', recursive=True))
[<FileInfo for 'bucket/foo': type=FileType.Directory>,
 <FileInfo for 'bucket/foo/bar': type=FileType.Directory>]
>>> f.get_file_info(fs.FileSelector('bucket/foo', recursive=True))
[<FileInfo for 'bucket/foo/bar': type=FileType.Directory>]
>>> f.get_file_info(fs.FileSelector('bucket/foo/bar', recursive=True))
[]
>>> f.get_file_info('bucket/')
<FileInfo for 'bucket/': type=FileType.Directory>
>>> f.get_file_info('bucket/foo')
<FileInfo for 'bucket/foo': type=FileType.Directory>
>>> f.get_file_info('bucket/foo/')
<FileInfo for 'bucket/foo/': type=FileType.Directory>
>>> f.get_file_info('bucket/foo/bar')
<FileInfo for 'bucket/foo/bar': type=FileType.Directory>
>>> f.get_file_info('bucket/foo/bar/')
<FileInfo for 'bucket/foo/bar/': type=FileType.Directory>
asfimport commented 2 years ago

Antoine Pitrou / @pitrou: Should I try something else?

asfimport commented 2 years ago

Luis Morales: try with local filesystem, with the -v options, something like this:


docker run -v /cto/ring/data:/usr/src/app/localData -v /cto/ring/metadata:/usr/src/app/localMetadata -p 8000:8000 -e SSL=TRUE -e ENDPOINT=s3.scality.test -e LOG_LEVEL=trace -e REMOTE_MANAGEMENT_DISABLE=1 zenko/cloudserver

 

when running 


f.create_dir('bucket')

 i get this error:


OSError: When creating bucket 'bucket': AWS Error [code 100]: Unable to parse ExceptionName: MalformedXML Message: The XML you provided was not well-formed or did not validate against our published schema.
asfimport commented 2 years ago

Antoine Pitrou / @pitrou: Hmm, but what should the "data directory" and "metadata directory" contain? Just empty directories?

asfimport commented 2 years ago

Luis Morales: those folders are for scality. it uses it own internal structure to store both the data and the metadata. it's the S3 layer that translates that structures to S3-like structure of buckets and files...

 

in my use case i create the buckets and store the parquets using s3cmd.

asfimport commented 2 years ago

Luis Morales: The thing is that Im not using pyarrow to create dir's as in your use case... in my test I use s3cmd to create folders and move files, in our production use case I'll use parquet files generated with Apache Spark with Hive partitioning

 

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: I tried reproducing by creating a single file in an empty bucket using s3cmd:


$ s3cmd --no-ssl --host=localhost:8000 --host-bucket="%(bucket)s.localhost:8000" --access_key=accessKey1 --secret_key=verySecretKey1 put setup.cfg s3://bucket/foo/bar.txt
upload: 'setup.cfg' -> 's3://bucket/foo/bar.txt'  [1 of 1]
 1075 of 1075   100% in    0s   109.75 kB/s  done

And it still works properly with PyArrow:


>>> f = fs.S3FileSystem(scheme='http', access_key='accessKey1', secret_key='verySecretKey1', endpoint_override='localhost:8000')
>>> f.get_file_info(fs.FileSelector('/', recursive=True))
[<FileInfo for 'bucket': type=FileType.Directory>,
 <FileInfo for 'bucket/foo': type=FileType.Directory>,
 <FileInfo for 'bucket/foo/bar.txt': type=FileType.File, size=1075>]
>>> f.get_file_info('bucket')
<FileInfo for 'bucket': type=FileType.Directory>
>>> f.get_file_info('bucket/foo')
<FileInfo for 'bucket/foo': type=FileType.Directory>
>>> f.get_file_info('bucket/foo/bar.txt')
<FileInfo for 'bucket/foo/bar.txt': type=FileType.File, size=1075>

even though s3cmd itself doesn't see the "directory":


$ s3cmd --no-ssl --host=localhost:8000 --host-bucket="%(bucket)s.localhost:8000" --access_key=accessKey1 --secret_key=verySecretKey1 info s3://bucket/foo
ERROR: S3 error: 404 (Not Found)
asfimport commented 2 years ago

Luis Morales: I've tried your tests but with local storage, not memory (and with SSL).

docker run -v /cto/ring/data:/usr/src/app/localData -v /cto/ring/metadata:/usr/src/app/localMetadata -p 8000:8000 -e SSL=TRUE -e ENDPOINT=s3.scality.test -e LOG_LEVEL=trace -e REMOTE_MANAGEMENT_DISABLE=1 zenko/cloudserver

I create bucket with s3cmd:

s3cmd mb s3://bucket

 

Then put the file with folders:

s3cmd put hola.txt s3://bucket/foo/bar.txt

 

Now with pyarrow:

scality = fs.S3FileSystem(access_key='accessKey1', secret_key='verySecretKey1', endpoint_override="https://s3.scality.test:8000", scheme='https')

scality.get_file_info(fs.FileSelector('/', recursive=True)). OK.

[<FileInfo for 'bucket': type=FileType.Directory>, <FileInfo for 'bucket/foo': type=FileType.Directory>,

<FileInfo for 'bucket/foo/bar.txt': type=FileType.File, size=11>]

 

scality.get_file_info('bucket'). OK.

<FileInfo for 'bucket': type=FileType.Directory>

 

scality.get_file_info('bucket/foo'). OK.

<FileInfo for 'bucket/foo/bar': type=FileType.Directory>

 

I think this works because foo has a file inside. Lets try a more parquet-oriented use case (the first folder has no file, just the folders that correspond to partitions, hive style)

 

s3cmd put hola.txt s3://bucket/foo/bar/hello.txt

scality.get_file_info(fs.FileSelector('bucket', recursive=True)). OK

[<FileInfo for 'bucket/foo': type=FileType.Directory>,

<FileInfo for 'bucket/foo/bar': type=FileType.Directory>,

<FileInfo for 'bucket/foo/bar/hola.txt': type=FileType.File, size=11>]

 

scality.get_file_info('bucket/foo'). KO

<FileInfo for 'bucket/foo': type=FileType.NotFound>

 

scality.get_file_info('bucket/foo/bar'). OK

<FileInfo for 'bucket/foo/bar': type=FileType.Directory>

 

The point is that from a parquet perspective when I create a dataset I have to ask for the "foo" folder (the one that has all the partitions inside), not the "bar" one (will be equivalent to one particular partition).

 

 

 

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: What does -e ENDPOINT=s3.scality.test do?

asfimport commented 2 years ago

Luis Morales: If SSL activated you need to use this kind of url (it's what the docker uses to create the certificates and CAs). I think you can work without SSL for the tests we are doing.

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: Ok, I've managed to reproduce now and I think it's really a bug in CloudServer.

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: It seems there's a workaround, though.

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: Issue resolved by pull request 11977 https://github.com/apache/arrow/pull/11977