dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.09k stars 1.56k forks source link

SocketException: Connection reset by peer (OS Error: Connection reset by peer) #55776

Open leeflix opened 4 months ago

leeflix commented 4 months ago

I am using websockets in my dart application. My server occasionally crashes when a "peer resets the connection". I think its odd that an exception crashes the program. Is there any way to handle this?

mraleph commented 4 months ago

You would need to post a reproduction etc. Without code we could debug this is unactionable.

leeflix commented 4 months ago

@mraleph I am not doing anything fancy basically everything i do boils down to this:

    // start listening for ws connections there is no problem here

    SecurityContext? securityContext;
    var server;
    if (Env.certificateChainPath != null && Env.privateKeyPath != null) {
      securityContext = SecurityContext()
        ..usePrivateKeyBytes(File(Env.privateKeyPath!).readAsBytesSync())
        ..useCertificateChainBytes(File(Env.certificateChainPath!).readAsBytesSync());
      server = await HttpServer.bindSecure(Env.local ? "localhost" : InternetAddress.anyIPv6, Env.wsPort, securityContext);
    } else {
      server = await HttpServer.bind(Env.local ? "localhost" : InternetAddress.anyIPv6, Env.wsPort);
    }

    // the problem should be after here

    await for (var request in server) {
        if (WebSocketTransformer.isUpgradeRequest(request)) {
          WebSocket ws = await WebSocketTransformer.upgrade(request);
          ws.add(...); // send data
        }
    }
leeflix commented 4 months ago

I don't know where the error exactly occurs:

WhatsApp Image 2024-05-22 at 17 03 32

leeflix commented 3 months ago

can someone tell me a way to handle this exception?

leeflix commented 3 months ago

please help @mraleph i just need someone to tell me how to handle exceptions which are thrown in the lifecycle of the websocket

mraleph commented 3 months ago

@leeflix The challenge I am having here is that we need a way to reproduce this. Without reproduction it is hard to say what and where exactly is going wrong. I have tried some variations of trying to connect to WS server and then abruptly close connections - but I was not able to reproduce this sort of crash.

If this was a raw socket API, then I could just say that the code is using Socket.add and not catching errors from Socket.done / Socket.flush. But with HTTP/WebSockets these errors are handled internally.

So we really need a repro to be able to fix this.

leeflix commented 2 months ago

@mraleph any ideas why it says "SocketException" and not "WebSocketException". does that not indicate that the error is not caught on the Socket API level? also any ideas where this string "Connection reset by peer" comes from?

mraleph commented 2 months ago

@leeflix it's SocketException because it the low-level networking error, (web socket is just wrapping around lower level socket APIs). You get this sort of error if you have a client socket which you try to write into and the other side abruptly closes the connection. The string "Connection reset by peer" comes from the underlying OS networking stack.

leeflix commented 2 months ago

@mraleph so i still can't find the error. i added a lot of error handling so at least only the ws dies and not the whole application but still i don't get why the websockt closes just because a peer does something weird this should never happen by default. we cannot user live updates in out app if this is not fixed:

import 'dart:io';

import 'package:easywear_backend/controllers/read_controller.dart';
import 'package:easywear_backend/models/user.dart';
import 'package:easywear_backend/tokens/access_token.dart';
import 'package:easywear_backend/tokens/tokens.dart';
import 'package:easywear_backend/utils/env.dart';

class UserCreatedEvent extends Event {
  Map<String, dynamic> userJson;

  UserCreatedEvent(this.userJson);

  Map<String, dynamic> toJson() => {
        "type": "UserCreatedEvent",
        "user": userJson,
      };

  UserCreatedEvent.fromJson(Map<String, dynamic> json) : userJson = json["user"];
}

class RequestUpdatedEvent extends Event {
  Map<String, dynamic> requestJson;

  RequestUpdatedEvent(this.requestJson);

  Map<String, dynamic> toJson() => {
        "type": "RequestUpdatedEvent",
        "request": requestJson,
      };

  RequestUpdatedEvent.fromJson(Map<String, dynamic> json) : requestJson = json["request"];
}

class RequestCreatedEvent extends Event {
  Map<String, dynamic> requestJson;

  RequestCreatedEvent(this.requestJson);

  Map<String, dynamic> toJson() => {
        "type": "RequestCreatedEvent",
        "request": requestJson,
      };

  RequestCreatedEvent.fromJson(Map<String, dynamic> json) : requestJson = json["request"];
}

class CoinsOfUserUpdatedEvent extends Event {
  String userId;
  int coins;

  CoinsOfUserUpdatedEvent(this.userId, this.coins);

  Map<String, dynamic> toJson() => {
        "type": "CoinsOfUserUpdatedEvent",
        "userId": userId,
        "coins": coins,
      };

  CoinsOfUserUpdatedEvent.fromJson(Map<String, dynamic> json)
      : userId = json["userId"],
        coins = json["coins"];
}

class InitEvent extends Event {
  String accessToken;

  InitEvent(this.accessToken);

  Map<String, dynamic> toJson() => {
        "type": "InitEvent",
        "accessToken": accessToken,
      };

  InitEvent.fromJson(Map<String, dynamic> json) : accessToken = json["accessToken"];
}

abstract class Event {}

class EventChannel {
  static Map<String, List<WebSocket>> domainToAdminWebSockets = {};
  static Map<String, Map<String, List<WebSocket>>> domainToUserIdToWebSockets = {};

  static Future<void> sendCoinsOfUserUpdatedEvent(String domain, String userId, int coins) async {
    CoinsOfUserUpdatedEvent coinsOfUserUpdatedEvent = CoinsOfUserUpdatedEvent(userId, coins);
    await sendEventToAdmins(domain: domain, event: coinsOfUserUpdatedEvent);
    await sendEventToUser(domain: domain, userId: userId, event: coinsOfUserUpdatedEvent);
  }

  static Future<void> sendUserCreatedEvent(String domain, Map<String, dynamic> userJson) async {
    UserCreatedEvent userCreatedEvent = UserCreatedEvent(userJson);
    await sendEventToAdmins(domain: domain, event: userCreatedEvent);
    // await sendEventToUser(domain: domain, userId: requestJson["userId"], event: userCreatedEvent);
  }

  static Future<void> sendRequestUpdatedEvent(String domain, Map<String, dynamic> requestJson) async {
    RequestUpdatedEvent requestUpdatedEvent = RequestUpdatedEvent(requestJson);
    await sendEventToAdmins(domain: domain, event: requestUpdatedEvent);
    await sendEventToUser(domain: domain, userId: requestJson["userId"], event: requestUpdatedEvent);
  }

  static Future<void> sendRequestCreatedEvent({required String domain, required Map<String, dynamic> requestJson}) async {
    RequestCreatedEvent requestCreatedEvent = RequestCreatedEvent(requestJson);
    await sendEventToAdmins(domain: domain, event: requestCreatedEvent);
    await sendEventToUser(domain: domain, userId: requestJson["userId"], event: requestCreatedEvent);
  }

  static Future<void> sendEventToAdmins({required String domain, required Event event}) async {
    try {
      List<WebSocket> adminWebSockets = domainToAdminWebSockets[domain] ?? [];
      for (int i = 0; i < adminWebSockets.length; i++) {
        if (adminWebSockets[i].closeCode != null) {
          adminWebSockets.removeAt(i);
        } else {
          adminWebSockets[i].add(jsonEncode(event));
        }
      }
    } catch (err) {
      print("Caught error during sending event to admins continuing ... 44: ${err}");
    }
  }

  static Future<void> sendEventToUser({required String domain, required String userId, required Event event}) async {
    try {
      List<WebSocket> userWebSockets = domainToUserIdToWebSockets[domain]?[userId] ?? [];
      for (int i = 0; i < userWebSockets.length; i++) {
        if (userWebSockets[i].closeCode != null) {
          userWebSockets.removeAt(i);
        } else {
          userWebSockets[i].add(jsonEncode(event));
        }
      }
    } catch (err) {
      print("Caught error during sending event to user continuing ... 43: ${err}");
    }
  }

  static Future<void> init() async {
    SecurityContext? securityContext;
    var server;
    if (Env.certificateChainPath != null && Env.privateKeyPath != null) {
      securityContext = SecurityContext()
        ..usePrivateKeyBytes(File(Env.privateKeyPath!).readAsBytesSync())
        ..useCertificateChainBytes(File(Env.certificateChainPath!).readAsBytesSync());
      server = await HttpServer.bindSecure(Env.local ? "localhost" : InternetAddress.anyIPv6, Env.wsPort, securityContext);
      print("Server running");
    } else {
      server = await HttpServer.bind(Env.local ? "localhost" : InternetAddress.anyIPv6, Env.wsPort);
      print("Server running");
    }

    try {
      // Listen for incoming connections
      for (var request in server) {
        // Check if the request is a WebSocket upgrade request
        try {
          if (WebSocketTransformer.isUpgradeRequest(request)) {
            // Upgrade the connection to a WebSocket connection
            WebSocket ws = await WebSocketTransformer.upgrade(request);
            ws.done.catchError((error) => print('On Error1: $error'));

            InitEvent initEvent = InitEvent.fromJson(jsonDecode(await ws.handleError((error) => print('On Error2: $error')).first));

            AccessToken decodedAccessToken = Tokens.decodeAccessToken(initEvent.accessToken);

            User user = (await ReadController.read<User>(domain: decodedAccessToken.domain, id: decodedAccessToken.userId))!;

            if (user.isAdmin) {
              if (domainToAdminWebSockets[decodedAccessToken.domain] == null) {
                domainToAdminWebSockets[decodedAccessToken.domain] = [ws];
              } else {
                domainToAdminWebSockets[decodedAccessToken.domain]!.add(ws);
              }
            } else {
              if (domainToUserIdToWebSockets[decodedAccessToken.domain] == null) {
                domainToUserIdToWebSockets[decodedAccessToken.domain] = {
                  decodedAccessToken.userId: [ws]
                };
              } else {
                if (domainToUserIdToWebSockets[decodedAccessToken.domain]![decodedAccessToken.userId] == null) {
                  domainToUserIdToWebSockets[decodedAccessToken.domain]![decodedAccessToken.userId] = [ws];
                } else {
                  domainToUserIdToWebSockets[decodedAccessToken.domain]![decodedAccessToken.userId]!.add(ws);
                }
              }
            }

          }
        } catch (err) {
          print("Caught WebSocket Error continuing ... 42: ${err}");
        }

      }
    } catch (err) {
        print("Error3: ${err}");
    }
  }
}
leeflix commented 1 month ago

switched to sse