MatrikMoon / TournamentAssistant

A program designed to make it easier to coordinate tournaments for the VR rhythm game Beat Saber
MIT License
39 stars 25 forks source link

Websocket lacks synchronization #52

Closed nanikit closed 1 year ago

nanikit commented 1 year ago

What I did

Send real-time score update event simultaneously by two client.

Expected behavior

No problem

Actual behavior

System.InvalidOperationException: There is already one outstanding 'SendAsync' call for this WebSocket instance. ReceiveAsync and SendAsync can be called simultaneously, but at most one outstanding operation for each of them is allowed at the same time.
   at System.Net.WebSockets.WebSocketBase.SendAsyncCore(ArraySegment`1 buffer, WebSocketMessageType messageType, Boolean endOfMessage, CancellationToken cancellationToken)
   at TournamentAssistantCore.Sockets.Server.Send(ConnectedUser connectedUser, PacketWrapper packet) in TournamentAssistant\TournamentAssistantCore\Sockets\Server.cs:line 349
Client Disconnected!
Sending data: (Event) (user_left_event)
Received data: (ForwardingPacket) (Push)
System.Net.WebSockets.WebSocketException (0x80004005): The 'System.Net.WebSockets.ServerWebSocket' instance cannot be used for communication because it has been transitioned into the 'Aborted' state.
 ---> System.Net.WebSockets.WebSocketException (0x80004005): The 'System.Net.WebSockets.ServerWebSocket' instance cannot be used for communication because it has been transitioned into the 'Aborted' state.
   at System.Net.WebSockets.WebSocketProtocolComponent.ThrowIfSessionHandleClosed(WebSocketBase webSocket)
   at System.Net.WebSockets.WebSocketProtocolComponent.WebSocketGetAction(WebSocketBase webSocket, ActionQueue actionQueue, Buffer[] dataBuffers, UInt32& dataBufferCount, Action& action, BufferType& bufferType, IntPtr& actionContext)
   at System.Net.WebSockets.WebSocketBase.WebSocketOperation.Process(Nullable`1 buffer, CancellationToken cancellationToken)
   at System.Net.WebSockets.WebSocketBase.SendAsyncCore(ArraySegment`1 buffer, WebSocketMessageType messageType, Boolean endOfMessage, CancellationToken cancellationToken)
   at System.Net.WebSockets.WebSocketBase.ThrowIfAborted(Boolean aborted, Exception innerException)
   at System.Net.WebSockets.WebSocketBase.ThrowIfConvertibleException(String methodName, Exception exception, CancellationToken cancellationToken, Boolean aborted)
   at System.Net.WebSockets.WebSocketBase.SendAsyncCore(ArraySegment`1 buffer, WebSocketMessageType messageType, Boolean endOfMessage, CancellationToken cancellationToken)
   at TournamentAssistantCore.Sockets.Server.Send(ConnectedUser connectedUser, PacketWrapper packet) in TournamentAssistant\TournamentAssistantCore\Sockets\Server.cs:line 349

I think exception is self-explanatory.

MatrikMoon commented 1 year ago

This is certainly something to address, though I wonder if it's not "simultaneously by two clients," and is instead "simultaneously by one client."

If you take a look in that function, image You can see that each instance of a user has their own instance of a WebSocket, and as that error indicates, "There is already one outstanding 'SendAsync' call for this WebSocket instance."

So, I wager this is actually just a double-send to one client. What was your setup when receiving this error? Can you reproduce it?

nanikit commented 1 year ago

I wonder if it's not "simultaneously by two clients," and is instead "simultaneously by one client."

Even in that case I think there can still be problems. Message receive is sequential but there's no guarantee in server process order. So one client suddenly sends lot message, and server delayed it and process in batch, there may occur same exception without synchronization.

One thing I know is there's no need of global lock. As you say this serial send / receival constraint applies per websocket instance.

Can you reproduce it?

I can't share whole part because it's very early stage of my work, but problematic script is here.

import WebSocket from 'ws';
import { models, packet } from '../lib/protos/index.js';

const server = 'ws://127.0.0.1:2053';
const messages = {
  nanikit: [] as packet.Packet[],
  suisensei: [] as packet.Packet[],
  sier: [] as packet.Packet[],
};

main().catch(console.error);

async function main() {
  const { socket: p1 } = await createConnection({
    name: 'nanikit',
    userId: '76561198159100356',
    clientType: models.User.ClientTypes.Player,
    modList: [
      'yt-dlp',
      'BSIPA',
      'BeatSaberMarkupLanguage',
      'SiraUtil',
      'Ini Parser',
      'NoItalics',
      'Arcgap',
      'SoundReplacer',
      'TournamentAssistant',
    ],
  });
  const { socket: p2 } = await createConnection({
    name: 'suisensei',
    userId: '76561198357821968',
    clientType: models.User.ClientTypes.Player,
    modList: [
      'yt-dlp',
      'BSIPA',
      'BeatSaberMarkupLanguage',
      'SiraUtil',
      'Ini Parser',
      'NoItalics',
      'Arcgap',
      'SoundReplacer',
      'TournamentAssistant',
    ],
  });
  const { socket: coordinator, added } = await createConnection({
    name: 'sier',
    userId: '0',
    clientType: models.User.ClientTypes.Coordinator,
  });

  while (true) {
    const users = messages.sier
      .map((x) => x.response?.connect?.state?.users)
      .find((x) => x);
    if (!users) {
      await timeout(300);
      continue;
    }
    coordinator.send(
      packet.Packet.encode({
        event: {
          matchCreatedEvent: {
            match: {
              leader: added.event?.userAddedEvent?.user?.guid,
              associatedUsers: users?.map((x) => x.guid!),
            },
          },
        },
      }).finish()
    );
    break;
  }

  let audience = [] as string[];
  p1.on('message', (message: Buffer) => {
    const pkt = packet.Packet.decode(new Uint8Array(message));
    const { match } =
      pkt.event?.matchCreatedEvent ?? pkt.event?.matchUpdatedEvent ?? {};
    if (match) {
      audience = match.associatedUsers as string[];
    }
  });

  await timeout(100);
  const p1Guid = messages.nanikit
    .map((x) => x.response?.connect?.selfGuid)
    .find((x) => x);
  const p2Guid = messages.suisensei
    .map((x) => x.response?.connect?.selfGuid)
    .find((x) => x);
  while (true) {
    await timeout(300);
    const s = {
      from: p1Guid,
      forwardingPacket: {
        forwardTo: audience,
        packet: {
          push: {
            realtimeScore: {
              userGuid: p1Guid,
              accuracy: Math.random() * 100,
              scoreTracker: { notesMissed: 0 },
            },
          },
        },
      },
    } as packet.IPacket;
    p1.send(packet.Packet.encode(s).finish());
    await timeout(10);
    p2.send(
      packet.Packet.encode({
        from: p2Guid,
        forwardingPacket: {
          forwardTo: audience,
          packet: {
            push: {
              realtimeScore: {
                userGuid: p2Guid,
                accuracy: Math.random() * 100,
                scoreTracker: { notesMissed: 0 },
              },
            },
          },
        },
      }).finish()
    );
  }
}

function timeout(ms: number) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

async function createConnection(user: models.IUser) {
  const socket = new WebSocket(server);
  socket.on('message', (message) => {
    messages[user.name as 'nanikit'].push(
      packet.Packet.decode(new Uint8Array(message as Buffer))
    );
  });
  socket.on('error', (err) => {
    console.error(`${user.name} error: ${err}`);
  });
  socket.on('close', (err) => {
    console.error(`${user.name} close: ${err}`);
  });
  await new Promise((resolve, reject) => {
    socket.once('error', reject);
    socket.once('open', resolve);
  });
  sendHello(socket, { user });
  const message = (await new Promise((resolve, reject) => {
    socket.once('error', reject);
    socket.once('message', resolve);
  })) as Buffer;
  const added = packet.Packet.decode(new Uint8Array(message));
  return { socket, added };
}

function sendHello(socket: WebSocket, { user }: { user: models.IUser }) {
  socket.send(
    packet.Packet.encode(
      packet.Packet.create({
        request: {
          connect: {
            user: models.User.create(user),
            clientVersion: 67,
          },
        },
      })
    ).finish()
  );
}
MatrikMoon commented 1 year ago

Yup! I agree there's still a problem here. Thanks for the tip! I'll look into it at my next availability. And thanks for the code snippet!

nanikit commented 1 year ago

I have a patch for this including other updates, but it has many changes so I can't sure it can be merged. So request for comments.

MatrikMoon commented 1 year ago

Glad you got it sorted for yourself! And wow, I see, that is quite a lot of changes 😆

If you're not already in my discord server, I'd love to have a chat about some of these and see what we can get merged in!