NVIDIA / aistore

AIStore: scalable storage for AI applications
https://aistore.nvidia.com
MIT License
1.21k stars 160 forks source link

ETL WebDataset connection timeout #136

Closed yingca1 closed 1 year ago

yingca1 commented 1 year ago

I tried ETL by referencing this article https://aiatscale.org/blog/2023/05/11/aisio-transforms-with-webdataset-pt-2

Keep getting timeout while trying to transform_object_inline https://github.com/NVIDIA/aistore/blob/5c5a2a3ecabcaa8a10b7ccd7f5b9bc800481abd6/docs/examples/aisio_webdataset/etl_webdataset.py#L100

$ aistore/docs/examples/aisio_webdataset# python etl_webdataset.py                                           

{'Ais-Atime': '1687530497164671185', 'Ais-Bucket-Name': 'images', 'Ais-Bucket-Provider': 'ais', 'Ais-Checksum-Type': 'xxhash', 'Ais-Checksum-Value': 'a487f46
d49561afd', 'Ais-Location': 't[rrViDbDG]:mp[/ais1, nvme0n2]', 'Ais-Mirror-Copies': '1', 'Ais-Mirror-Paths': '[/ais1]', 'Ais-Name': 'samples-00.tar', 'Ais-Pre
sent': 'true', 'Ais-Version': '2', 'Content-Length': '45895680', 'Date': 'Fri, 23 Jun 2023 14:34:36 GMT'}                                                    

http://<proxy-lb-public-address>/v1/objects/images/samples-00.tar?provider=ais&etl_name=wd-transform

Traceback (most recent call last):
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/urllib3/connection.py", line 200, in _new_conn                                                
    sock = connection.create_connection(                                                                                                                     
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/urllib3/util/connection.py", line 85, in create_connection                                        raise err                                                                                                                                                
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/urllib3/util/connection.py", line 73, in create_connection                                    
    sock.connect(sa)                                                                                                                                         
TimeoutError: [Errno 110] Connection timed out                                                                                                               

The above exception was the direct cause of the following exception:

Traceback (most recent call last):                                            
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/urllib3/connectionpool.py", line 790, in urlopen                                              
    response = self._make_request(                                                                                                                           
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/urllib3/connectionpool.py", line 496, in _make_request 
    conn.request(                                                                                                                                            
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/urllib3/connection.py", line 388, in request                                                  
    self.endheaders()                                                                                                                                        
  File "/root/miniconda3/lib/python3.9/http/client.py", line 1250, in endheaders                           
    self._send_output(message_body, encode_chunked=encode_chunked)                                                                                           
  File "/root/miniconda3/lib/python3.9/http/client.py", line 1010, in _send_output                                 
    self.send(msg)                                                                                                                                           
  File "/root/miniconda3/lib/python3.9/http/client.py", line 950, in send                                                                                    
    self.connect()                                                                                                                                           
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/urllib3/connection.py", line 236, in connect                                                  
    self.sock = self._new_conn()                                              
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/urllib3/connection.py", line 215, in _new_conn                                                
    raise NewConnectionError(                                                 
urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPConnection object at 0x7f2253c3f730>: Failed to establish a new connection: [Errno 110] Connec
tion timed out                                                                

The above exception was the direct cause of the following exception:                                                                                         

Traceback (most recent call last):                                                                                                                           
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/requests/adapters.py", line 486, in send                                                      
    resp = conn.urlopen(                                                                                                                                     
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/urllib3/connectionpool.py", line 844, in urlopen
    retries = retries.increment(    
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/urllib3/util/retry.py", line 515, in increment                                                
    raise MaxRetryError(_pool, url, reason) from reason  # type: ignore[arg-type]                                                                            
urllib3.exceptions.MaxRetryError: HTTPConnectionPool(host='10.10.0.6', port=30186): Max retries exceeded with url: /ais%2F@%23%2Fimages%2Fsamples-00.tar (Cau
sed by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f2253c3f730>: Failed to establish a new connection: [Errno 110] Connection timed o
ut'))                                                                         

During handling of the above exception, another exception occurred:           

Traceback (most recent call last):                                                                                                                           
  File "<path-to-workspace>/aistore/docs/examples/aisio_webdataset/etl_webdataset.py", line 131, in <module>        
    transform_object_inline()                                                                                                                                
  File "<path-to-workspace>/aistore/docs/examples/aisio_webdataset/etl_webdataset.py", line 108, in transform_object_inline                                
    processed_shard = single_object.get(etl_name=etl_name).read_all()                                                                                        
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/aistore/sdk/object.py", line 113, in get    
    resp = self._client.request(                                                                                                                             
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/aistore/sdk/request_client.py", line 91, in request 
    resp = self._session.request(                                                                                                                            
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/requests/sessions.py", line 589, in request                                                   
    resp = self.send(prep, **send_kwargs)                                                                                                                    
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/requests/sessions.py", line 725, in send                                                      
    history = [resp for resp in gen]                                          
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/requests/sessions.py", line 725, in <listcomp>                                                
    history = [resp for resp in gen]                                          
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/requests/sessions.py", line 266, in resolve_redirects                                         
    resp = self.send(                                                         
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/requests/sessions.py", line 703, in send                                                      
    r = adapter.send(request, **kwargs)                                                                                                                      
  File "<path-to-workspace>/venv/lib/python3.9/site-packages/requests/adapters.py", line 519, in send                                                      
    raise ConnectionError(e, request=request)                                                                                                                
requests.exceptions.ConnectionError: HTTPConnectionPool(host='10.10.0.6', port=30186): Max retries exceeded with url: /ais%2F@%23%2Fimages%2Fsamples-00.tar (
Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f2253c3f730>: Failed to establish a new connection: [Errno 110] Connection time
d out'))                                                                                                                                                    

I think there was a timeout during the ETL request processing. I'm puzzled why there was a request with 'host='10.10.0.6', port=30186'. https://github.com/NVIDIA/aistore/blob/5c5a2a3ecabcaa8a10b7ccd7f5b9bc800481abd6/python/aistore/sdk/object.py#L113

$ kubectl -n ais get po -owide
NAME                               READY   STATUS    RESTARTS   AGE    IP           NODE                          NOMINATED NODE   READINESS GATES
aistore-admin                      1/1     Running   0          18h    10.72.2.36   gke-ais1-ais1-bbc650cb-6hck   <none>           <none>
aistore1-proxy-0                   1/1     Running   0          18h    10.72.2.34   gke-ais1-ais1-bbc650cb-6hck   <none>           <none>
aistore1-proxy-1                   1/1     Running   0          18h    10.72.4.30   gke-ais1-ais1-bbc650cb-97zb   <none>           <none>
aistore1-proxy-2                   1/1     Running   0          18h    10.72.1.28   gke-ais1-ais1-bbc650cb-d8q1   <none>           <none>
aistore1-target-0                  1/1     Running   0          18h    10.72.4.31   gke-ais1-ais1-bbc650cb-97zb   <none>           <none>
aistore1-target-1                  1/1     Running   0          18h    10.72.2.35   gke-ais1-ais1-bbc650cb-6hck   <none>           <none>
aistore1-target-2                  1/1     Running   0          18h    10.72.1.29   gke-ais1-ais1-bbc650cb-d8q1   <none>           <none>
transform-images-iumkgvzt          1/1     Running   0          68m    10.72.1.39   gke-ais1-ais1-bbc650cb-d8q1   <none>           <none>
transform-images-rrvidbdg          1/1     Running   0          68m    10.72.4.41   gke-ais1-ais1-bbc650cb-97zb   <none>           <none>
transform-images-szsjmiqg          1/1     Running   0          68m    10.72.2.46   gke-ais1-ais1-bbc650cb-6hck   <none>           <none>
wd-transform-iumkgvzt              1/1     Running   0          42m    10.72.1.41   gke-ais1-ais1-bbc650cb-d8q1   <none>           <none>
wd-transform-redirect-1-iumkgvzt   1/1     Running   0          135m   10.72.1.36   gke-ais1-ais1-bbc650cb-d8q1   <none>           <none>
wd-transform-redirect-1-rrvidbdg   1/1     Running   0          135m   10.72.4.38   gke-ais1-ais1-bbc650cb-97zb   <none>           <none>
wd-transform-redirect-1-szsjmiqg   1/1     Running   0          135m   10.72.2.43   gke-ais1-ais1-bbc650cb-6hck   <none>           <none>
wd-transform-rrvidbdg              1/1     Running   0          42m    10.72.4.43   gke-ais1-ais1-bbc650cb-97zb   <none>           <none>
wd-transform-szsjmiqg              1/1     Running   0          42m    10.72.2.48   gke-ais1-ais1-bbc650cb-6hck   <none>           <none>
yingca1 commented 1 year ago

Update: why it redirects to port from 51081 to 30186?

root@aistore-admin:/# curl -vvv -L "http://<target-public-ip-address>:51081/v1/objects/images/samples-00.tar?provider=ais&etl_name=wd-transform"
*   Trying <target-public-ip-address>:51081...
* Connected to <target-public-ip-address> (<target-public-ip-address>) port 51081 (#0)
> GET /v1/objects/images/samples-00.tar?provider=ais&etl_name=wd-transform HTTP/1.1
> Host: <target-public-ip-address>:51081
> User-Agent: curl/7.81.0
> Accept: */*
> 
* Mark bundle as not supporting multiuse
< HTTP/1.1 307 Temporary Redirect
< Content-Type: text/html; charset=utf-8
< Location: http://10.10.0.6:30186/ais%2F@%23%2Fimages%2Fsamples-00.tar
< Date: Fri, 23 Jun 2023 18:25:59 GMT
< Content-Length: 95
< 
* Ignoring the response-body
* Connection #0 to host <target-public-ip-address> left intact
* Clear auth, redirects to port from 51081 to 30186
* Issue another request to this URL: 'http://10.10.0.6:30186/ais%2F@%23%2Fimages%2Fsamples-00.tar'
*   Trying 10.10.0.6:30186...
* Connected to 10.10.0.6 (10.10.0.6) port 30186 (#1)
> GET /ais%2F@%23%2Fimages%2Fsamples-00.tar HTTP/1.1
> Host: 10.10.0.6:30186
> User-Agent: curl/7.81.0
> Accept: */*
> 
* Empty reply from server
* Closing connection 1
curl: (52) Empty reply from server
aaronnw commented 1 year ago

Thanks for the feedback! I am unable to replicate this locally with minikube, so I suspect it's something to do with your setup. We haven't done any testing on GKE but it would be great to get it working. I don't think there is anything on the AIS side that should be returning a 307 for an object GET. Since we are using hpull comm type here, it will redirect.... I will look at how the port is determined.

aaronnw commented 1 year ago

When the ETL pod is initialized, its port is determined here: https://github.com/NVIDIA/aistore/blob/master/ext/etl/boot.go#L350-L367

Kubernetes will allocate a port for the new pod https://kubernetes.io/docs/concepts/services-networking/service/#type-nodeport so that's where 30186 comes from.

So it looks like that part is working properly as the cluster receives your GET request above. It looks up the etl specified by etl_name and finds the comm_type (hpull here). Then it redirects to the new port (because it's hpull). https://github.com/NVIDIA/aistore/blob/c211ddf7f66a4a62e1823d64d679c6eabaa510f3/ext/etl/communicator.go#L351

That said, not sure why you're seeing an empty reply or a connection refused. Perhaps a problem with AIS_TARGET_URL on the etl. If you could look at the logs on wd-transform pod itself maybe that would help.

yingca1 commented 1 year ago

@aaronnw Thanks a lot for your response! I now know how to troubleshoot the issue and will continue to do so.

Also, I have a question: I noticed that the ETL pod doesn't output Python print logs, only error logs. How can I enable debug logs in the ETL pod?

yingca1 commented 1 year ago

Update:

get new error:

rrViDbDG:
Starting HTTP server on 0.0.0.0:80
object_url: http://<target-public-ip-address>:51081/v1/etl/_object/ROuCnPJNjv/ais%2F@%23%2Fimages%2Fsamples-00.tar
XXX lineno: 60, opcode: 164
----------------------------------------
Exception happened during processing of request from ('10.72.4.1', 6619)
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/socketserver.py", line 683, in process_request_thread
    self.finish_request(request, client_address)
  File "/usr/local/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/local/lib/python3.8/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/local/lib/python3.8/http/server.py", line 435, in handle
    self.handle_one_request()
  File "/usr/local/lib/python3.8/http/server.py", line 423, in handle_one_request
    method()
  File "/server.py", line 92, in do_GET
    result = transform(query_path)
  File "<path-to-workspace>/aistore/docs/examples/aisio_webdataset/etl_webdataset.py", line 60, in wd_etl
SystemError: unknown opcode
----------------------------------------
aaronnw commented 1 year ago

We will need to add some better support for debug logs... As for the new error, it looks like it's due to a python version mismatch. We only currently have provided images for Python 3.8, 3.10, and 3.11 for using with the init_code option (defaulting to 3.8). Other versions will require providing a podspec manually and using init_spec.

yingca1 commented 1 year ago

@aaronnw Thanks a lot for your reply, this info looks really helpful!

  1. Does the Python version of the ETL task client need to match the ETL runtime version?
  2. Is there an easy way to check if an ETL job is running and see how it's doing?
aaronnw commented 1 year ago

@yingca1

  1. Yes, they need to match to work reliably. We use the cloudpickle library to encode the transform function (https://github.com/cloudpipe/cloudpickle) and then decode and call it with the Python runtime in the ETL container.
  2. Unfortunately we don't have anything extra beyond what kubernetes includes. Logs inside the transform function should be visible in the pod logs, but that's all for now.
yingca1 commented 1 year ago
    client.etl(etl_name).init_code(
        transform=wd_etl,
        runtime="python3.10v2",
        preimported_modules=["torch"],
        dependencies=["webdataset", "pillow", "torch", "torchvision"],
        communication_type="hpull",
        transform_url=True
    )

After checking, I found that when communication_type is set to hpull and transform_url is True, ARG_TYPE should be set to url. https://github.com/NVIDIA/aistore/blob/c211ddf7f66a4a62e1823d64d679c6eabaa510f3/ext/etl/transform.go#L206

The pods created in Kubernetes do not have the environment variable ARG_TYPE.

check the code in ETL pod /server.py

#!/usr/bin/env python
import os
import importlib.util
from typing import Iterator
from inspect import signature
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn

import requests

host_target = os.environ["AIS_TARGET_URL"]
code_file = os.getenv("MOD_NAME")
arg_type = os.getenv("ARG_TYPE", "bytes")

....

    def do_GET(self):
        if self.path == "/health":
            self._set_headers()
            self.wfile.write(b"OK")
            return

        query_path = host_target + self.path

        if arg_type == "url":
            result = transform(query_path)
        else:
            input_bytes = requests.get(query_path).content
            result = transform(input_bytes)

        self._set_headers()
        self.wfile.write(result)

So Webdataset has been receiving bytes all along, which is why it hasn't been working properly.

yingca1 commented 1 year ago

https://sourcegraph.com/search?q=context:global+repo:%5Egithub%5C.com/NVIDIA/aistore%24%40v1.3.17+ARG_TYPE&patternType=standard&sm=1&groupBy=path

I checked the code for tag v1.3.17 and didn't find any code related to ARG_TYPE.

aaronnw commented 1 year ago

It is not in version 3.17, but is on the master branch (and aisnode:latest image on dockerhub). I should have clarified this in the blog post, so thank you for bringing this up! Sorry for the confusion.

There is a workaround to work with bytes rather than url (and other comm_types) by making the ETL transform function accept bytes, if you'd like to try it on version 3.17. It is not nearly as clean, but here is an example that manually processes the object bytes using the io comm rather than having the WebDataset library do the work:

def wd_etl():
    def img_to_bytes(img):
        buf = io.BytesIO()
        img = img.convert("RGB")
        img.save(buf, format="JPEG")
        return buf.getvalue()

    def process_trimap(trimap_bytes):
        image = Image.open(io.BytesIO(trimap_bytes))
        preprocessing = torchvision.transforms.Compose(
            [
                torchvision.transforms.CenterCrop(350),
                torchvision.transforms.Lambda(img_to_bytes)
            ]
        )
        return preprocessing(image)

    def process_image(image_bytes):
        image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
        preprocessing = torchvision.transforms.Compose(
            [
                torchvision.transforms.CenterCrop(350),
                torchvision.transforms.ToTensor(),
                # Means and stds from ImageNet
                torchvision.transforms.Normalize(
                    mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
                ),
                torchvision.transforms.ToPILImage(),
                torchvision.transforms.Lambda(img_to_bytes),
            ]
        )
        return preprocessing(image)

    input_bytes = io.BytesIO(sys.stdin.buffer.read())
    buffer = io.BytesIO()
    with tarfile.open(fileobj=buffer, mode='w') as out_tar:
        with tarfile.open(fileobj=input_bytes, mode='r') as in_tar:
            # Loop through each member file in the tar and change the extension
            for member in in_tar.getmembers():
                orig_filename = member.name
                orig_ext = orig_filename.split('.')[-1]
                member_bytes = in_tar.extractfile(member).read()

                if orig_ext == "jpg":
                    processed_bytes = process_image(member_bytes)
                elif orig_ext == "png":
                    processed_bytes = process_trimap(member_bytes)
                else:
                    processed_bytes = member_bytes

                # Create a new TarInfo object with the new filename and write it to the new tar file
                new_member = tarfile.TarInfo(name=member.name)
                new_member.size = len(processed_bytes)
                out_tar.addfile(new_member, io.BytesIO(processed_bytes))

    # Return the new tar file as bytes to standard output
    sys.stdout.buffer.write(buffer.getvalue())

def create_wd_etl(client):
    client.etl(etl_name).init_code(
        transform=wd_etl,
        dependencies=["torchvision"],
        communication_type="io"
    )
yingca1 commented 1 year ago

After conducting some tests and investigations, I have identified the issue. Let me clarify:

  1. The ARG_TYPE environment variable used in the aistorage/runtime_python Docker image does not support aistorage/aisnode:3.17. Instead, we should use aistorage/aisnode:latest. If we want to use webdataset, we must set the ARG_TYPE value to url. We hope that version 3.18 will be released soon.

  2. If we choose to use inline ETL, we should receive the transformed job result from the ETL pod. However, we need to check the request to ensure that the aistore client can access the ETL pod located at the Kubernetes nodeport to download the processed result. If the aistore client cannot communicate with the Kubernetes cluster node, it will not be able to get the processed result.

  3. We need to ensure that the Python ETL runtime matches the local environment.

  4. If we want to print ETL logs, we can reset the logging configuration and set up customized logging as needed, like this.

def wd_etl(object_url):
    logging.root.handlers = []
    logging.basicConfig(level=logging.DEBUG,
                    format="%(asctime)-15s %(levelname)-8s %(message)s",
                    handlers=[logging.StreamHandler(), logging.FileHandler("etl.log")])

so, are you considering addressing the issue of being unable to access Kubernetes nodeport outside of the cluster and not being able to obtain results using inline ETL?

alex-aizman commented 1 year ago

agree, will look into it and get back

gaikwadabhishek commented 1 year ago

@aaronnw Thanks a lot for your reply, this info looks really helpful!

  1. Does the Python version of the ETL task client need to match the ETL runtime version?
  2. Is there an easy way to check if an ETL job is running and see how it's doing?

Hey @yingca1, I have also struggled setting up ETLs. How I debug is usually I run the commands to setup ETL then I see which pods were spawned. Then I use the log command on all the newly spawned pods to see if there was any issue in initialisation. 90% of the times you can figure out from the logs of these pods about the issue.

aaronnw commented 1 year ago

@yingca1 Regarding your points 1-3, all that is correct.

As for logging, you should be able to view print logs without any problem in the transform function. Just do kubectl logs <pod_name> on the ETL pod. If you want to see more logs for ETL setup on the cluster side, you can now enable more verbosity there as well with ais config cluster log.modules etl (config examples here).

so, are you considering addressing the issue of being unable to access Kubernetes nodeport outside of the cluster and not being able to obtain results using inline ETL?

The nodeport for the ETL pod is not intended to be accessed outside the cluster, but only directly from the ais target node requesting transformation of its objects.

With the version of AIS supporting the url ARG_TYPE and matching python code with the etl runtime version, is there still any issue with the pod communication?

yingca1 commented 1 year ago

@aaronnw It's working fine now, except for the issue of not being able to retrieve results from outside the cluster.

I don't have any other questions now. Thanks for all your answers and help.