NVIDIA / aistore

AIStore: scalable storage for AI applications
https://aistore.nvidia.com
MIT License
1.21k stars 160 forks source link

ais etl error: ConnectionResetError: [Errno 104] Connection reset by peer #146

Closed yingca1 closed 9 months ago

yingca1 commented 1 year ago
Exception occurred during processing of request from ('10.232.8.1', 64979)
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/socketserver.py", line 683, in process_request_thread
    self.finish_request(request, client_address)
  File "/usr/local/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/local/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/local/lib/python3.10/http/server.py", line 433, in handle
    self.handle_one_request()
  File "/usr/local/lib/python3.10/http/server.py", line 421, in handle_one_request
    method()
  File "/code/server.py", line 101, in do_GET
    self.wfile.write(processed_data)
  File "/usr/local/lib/python3.10/socketserver.py", line 826, in write
    self._sock.sendall(b)
ConnectionResetError: [Errno 104] Connection reset by peer

I often encounter the above errors during the ais etl testing process, probably at a rate of 10% to 15%.

Where should I look to troubleshoot this issue?

gaikwadabhishek commented 1 year ago

Hello @yingca1!

Could you kindly explain the operation you are currently performing and some insights into the type and parameters used to initialize ETL?

I have encountered similar errors in the past when switching between networks or using a VPN.

To assist with debugging, you can follow these steps:

  1. Print all the logs on the container pod using the following command:
    $ ais etl view-logs <etl-name>

By reviewing the logs, we can gain a better understanding of the issue and work towards resolving it effectively. Thank you!

yingca1 commented 1 year ago

Actually, besides the business logs that I print myself, the ETL pod doesn't have any other log outputs. It processes 100 tar files, and around 15 of them will throw this error message ConnectionResetError: [Errno 104] Connection reset by peer.

Based on the error, it seems like there was an issue when processing the task and writing it back to aistore.

I'm wondering if this issue is caused by a timeout configuration being set too low in the cluster.

ais show cluster config time
PROPERTY                         VALUE
timeout.cplane_operation         2s
timeout.max_keepalive            4s
timeout.max_host_busy            20m
timeout.startup_time             1m
timeout.join_startup_time        0s
timeout.send_file_time           5m

ais show cluster config client
PROPERTY                         VALUE
client.client_timeout            1m
client.client_long_timeout       30m
client.list_timeout              10m

ais config cluster | grep time
log.flush_time                 0s
log.stats_time                 0s
periodic.stats_time             10s
periodic.retry_sync_time         2s
periodic.notif_time             30s
timeout.cplane_operation         2s
timeout.max_keepalive             4s
timeout.max_host_busy             20m
timeout.startup_time             1m
timeout.join_startup_time         0s
timeout.send_file_time             5m
client.client_timeout             1m
client.client_long_timeout         30m
client.list_timeout             10m
lru.dont_evict_time             2h0m
lru.capacity_upd_time             10m
disk.iostat_time_long             2s
disk.iostat_time_short             100ms
rebalance.dest_retry_time         2m
downloader.timeout             1h0m
distributed_sort.call_timeout         10m
memsys.hk_time                 1m30s
lastupdate_time                 2023-07-28 10:57:26.719073338 +0000 UTC m=+26.070443747

Here's some context about my work.

I use bucket ETL operations.

ais etl init spec --name 'transformer-etl' --from-file './pod.yml' -comm-type=hpull://

ais etl bucket transformer-etl ais://dataset1 ais://out1

server.py

#!/usr/bin/env python
import argparse
import requests
import os
from inspect import signature
from typing import Iterator
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn

from aistore.sdk.const import HEADER_CONTENT_TYPE, STATUS_OK

host_target = os.environ['AIS_TARGET_URL']
arg_type = os.getenv("ARG_TYPE", "url") # url or bytes

try:
    CHUNK_SIZE = int(os.getenv("CHUNK_SIZE", 0))
except Exception:
    CHUNK_SIZE = 0

from etl import wd_etl

def _assert_validations():
    transform_params = len(signature(wd_etl).parameters)
    if CHUNK_SIZE > 0 and transform_params < 2:
        raise ValueError("Required to pass context as a parameter to transform if CHUNK_SIZE > 0")

class StreamWrapper:
    def __init__(self, rfile, content_length, chunk_size):
        self._rfile = rfile
        self._content_length = content_length
        self._chunk_size = chunk_size
        self._remaining_length = content_length

    def read(self) -> bytes:
        return next(self)

    def read_all(self) -> bytes:
        return self._rfile.read(self._remaining_length)

    def __iter__(self) -> Iterator[bytes]:
        while self._remaining_length > 0:
            read_buffer = self._chunk_size if self._remaining_length >= self._chunk_size else self._remaining_length
            self._remaining_length -= read_buffer
            yield self._rfile.read(read_buffer)

class Handler(BaseHTTPRequestHandler):
    # Overriding log_request to not log successful requests
    def log_request(self, code='-', size='-'):
        pass

    # Set standard headers for responses
    def _set_headers(self):
        self.send_response(STATUS_OK)
        self.send_header(HEADER_CONTENT_TYPE, "application/octet-stream")
        self.end_headers()

    # PUT handler supports `hpush` operation
    def do_PUT(self):
        content_length = int(self.headers['Content-Length'])
        # post_data = self.rfile.read(content_length)
        # processed_data = self.process_data(post_data)
        # if processed_data is not None:
        #     self._set_headers()
        #     self.wfile.write(processed_data)
        # else:
        #     self.send_response(500)
        #     self.end_headers()
        #     self.wfile.write(b"Data processing failed")
        reader = StreamWrapper(self.rfile, content_length, CHUNK_SIZE)
        if CHUNK_SIZE == 0:
            result = wd_etl(reader.read_all())
            self._set_headers()
            self.wfile.write(result)
            return

        # TODO: validate if transform takes writer as input
        # NOTE: for streaming transforms the writer is expected to write bytes into response as stream.
        self._set_headers()
        wd_etl(reader, self.wfile)

    # GET handler supports `hpull` operation
    def do_GET(self):
        if self.path == "/health":
            self._set_headers()
            self.wfile.write(b"OK")
            return

        query_path = host_target + self.path

        if arg_type == "url":
            processed_data = wd_etl(query_path)
        else:
            input_bytes = requests.get(query_path).content
            processed_data = wd_etl(input_bytes)

        if processed_data is not None:
            self._set_headers()
            self.wfile.write(processed_data)
        else:
            self.send_response(500)
            self.end_headers()
            self.wfile.write(b"Data processing failed")

class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
    """Handle requests in a separate thread."""

def run(addr, port):
    server = ThreadedHTTPServer((addr, port), Handler)
    print(f"Starting HTTP server on {addr}:{port}")
    _assert_validations()
    server.serve_forever()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Run a simple HTTP server")
    parser.add_argument(
        "-l", "--listen",
        help="Specify the IP address on which the server listens",
    )
    parser.add_argument(
        "-p", "--port",
        type=int,
        help="Specify the port on which the server listens",
    )
    parser.add_argument(
        "--compression",
        default="gzip",
        help="Specify the compression algorithm to use (e.g. gzip, bz2)",
    )
    parser.add_argument(
        "--mode",
        default="compress",
        help="Specify the data processing mode to use (e.g. 'compress' or 'decompress')",
    )
    args = parser.parse_args()
    run(addr=args.listen, port=args.port)

pod.yml

apiVersion: v1
kind: Pod
metadata:
  namespace: ais
  name: transformer-etl
  annotations:
    # Values `communication_type` can take are ["hpull://", "hrev://", "hpush://", "io://"].
    # Visit https://github.com/NVIDIA/aistore/blob/master/docs/etl.md#communication-mechanisms 
    communication_type: ${COMMUNICATION_TYPE:-"\"hpull://\""}
    wait_timeout: 5m
spec:
  imagePullSecrets: 
    - name: docker-registry-auth
  containers:
    - name: server
      image: <our-private-docker-registry>/transformer_etl:latest
      imagePullPolicy: Always
      ports:
        - name: default
          containerPort: 80
      env:
        - name: COMM_TYPE
          value: hpull://
        - name: ARG_TYPE
          value: url
        - name: CHUNK_SIZE
        - name: FLAGS
      command: ['/code/server.py', '--listen', '0.0.0.0', '--port', '80']
      readinessProbe:
        httpGet:
          path: /health
          port: default
yingca1 commented 1 year ago
ais config cluster timeout.send_file_time=10m
ais config cluster timeout.max_keepalive=30s
ais config cluster client.client_timeout=2m

after this config set, we have 95% success, but still have some failed case with the same error

I will check the ETL task consumed time.

alex-aizman commented 1 year ago

To put send_file_timeout in perspective, sending 10GB payload will take about 14m over a 100Mbps network including overhead but not including disk reading times. On the other hand, if your network is 100Gbps than it's less then one second. And so here we go - initial config used for deployment cannot be a simple copy of the default template we provide - it won't work or rather, won't always work.

We could, of course, have send_file_time = 10h or remove it altogether... but there's a price: missing or masking a bigger potential problem, etc.

yingca1 commented 1 year ago

update:

There is still an issue, but I've observed that it happens less often on bigger clusters. A 150-node aistore cluster has considerably fewer occurrences compared to a 30-node cluster.

gaikwadabhishek commented 1 year ago

Hey @yingca1 , We are looking into it. We are trying out different web servers to see if other web servers are able to handle the load. We have experimented with flask + gunicorn, it was slightly better but still seeing some issues on very high workloads.

gaikwadabhishek commented 1 year ago

Hey @yingca1, I have some updates, I've been working on this issue and trying out different things for running transformers - https://github.com/NVIDIA/ais-etl/tree/master/bench.

 if arg_type == "url":
            processed_data = wd_etl(query_path)
        else:
            input_bytes = requests.get(query_path).content
            processed_data = wd_etl(input_bytes)

Inside this wd_etl, we are making a GET call to the object, and in case of offline transforms multiple get calls are made simultaneously. These calls are reading from files on target and if the machine is not able to handle the amount of requests, some of the initial calls are closed to accommodate new ones, which might lead to that issue.

Can you try the whole thing without Webdataset once? Try to see examples under https://github.com/NVIDIA/ais-etl/tree/master/transformers we have different transformers running different webservers and all the necessary files for creating it will be available there.

We are still working on creating a one-stop solution for these kinds of problems. I will get back to you once we have a fully working solution.

gaikwadabhishek commented 1 year ago

Hello there, @yingca1!

I've recently implemented certain enhancements to a selection of transformers, resulting in improved speed and optimized request handling. I kindly request you to pattern match your server.py to our hello world transformer or keras transformer's server. During my benchmarking attempts, I observed that the servers and requests were operating without throttling, leading to an overall enhancement in performance and faster execution.

Feel free to reach out if you require any assistance. Kindly keep me informed about the outcome once you've had the chance to implement these changes.

-Abhishek

gaikwadabhishek commented 9 months ago

This issue is being closed because there has been no activity for over 90 days.