nim-lang / Nim

Nim is a statically typed compiled systems programming language. It combines successful concepts from mature languages like Python, Ada and Modula. Its design focuses on efficiency, expressiveness, and elegance (in that order of priority).
https://nim-lang.org
Other
16.56k stars 1.47k forks source link

Threads don't work in combination with async await on Windows 8 / 10 #13415

Open sveri opened 4 years ago

sveri commented 4 years ago

When I try to spawn multiple threads that call an async function on windows, everytime the same thread is used (as can be seen in the output). The same code on linux spawns and uses multiple threads as expected.

Example

import asyncdispatch, asyncnet

from osproc import countProcessors

type
  Server = ref object
    socket: AsyncSocket

proc newServer*(): Server = new result

proc close*(server: Server) =
  server.socket.close()

proc sendResp*(socket: AsyncSocket, code: string) {.async.} =
  await socket.send("HTTP/1.1 " & code & "\c\L" &
                            "Content-Length: " & $((code).len + 2) & "\c\L\c\L" &
                            code & "\c\L")

proc processMessage(server: Server, csocket: AsyncSocket) {.async.} =

  echo "threadid: " & $getThreadId()
  while not csocket.isClosed():
    try:
      let line = await csocket.recvLine()

      if line.len == 0:
        csocket.close()
        break

      await csocket.sendResp("200 OK")
    except Exception as e:
      echo "Failed processClient(): " & e.msg
      break

proc loop(server: Server, port = 5000){.async.} =
  server.socket = newAsyncSocket(buffered = false)
  server.socket.setSockOpt(OptReuseAddr, true)
  server.socket.setSockOpt(OptReusePort, true)
  server.socket.bindAddr(port.Port)
  server.socket.listen()

  while true:
    let (netAddr, clientSocket) = await server.socket.acceptAddr()
    echo "accepted connection from " & netAddr
    asyncCheck processMessage(server, clientSocket)

proc runServer() =
  let svr = newServer()
  waitFor loop(svr)
  svr.close()

proc start*() =
  let cores = countProcessors()
  echo cores
  var threads = newSeq[Thread[void]](cores)
  for i in 0 ..< cores:
    createThread[void](threads[i], runServer)
  echo("Listening on port 5000") # This line is used in the tester to signal readiness.
  joinThreads(threads)

start()

Current Output

8
Listening on port 5000
accepted connection from 127.0.0.1
threadid: 10908
accepted connection from 127.0.0.1
threadid: 10908
accepted connection from 127.0.0.1
threadid: 10908
accepted connection from 127.0.0.1
threadid: 10908
accepted connection from 127.0.0.1
threadid: 10908

Expected Output

8
Listening on port 5000
accepted connection from 93.205.239.72
threadid: 27367
accepted connection from 93.205.239.72
threadid: 27365
accepted connection from 93.205.239.72
threadid: 27368
accepted connection from 93.205.239.72
threadid: 27370
accepted connection from 93.205.239.72
threadid: 27370
accepted connection from 93.

Possible Solution

Additional Information

dom96 commented 4 years ago

Are you sure that OptReusePort is supported on Windows? It's a fairly new feature even on Linux.

sveri commented 4 years ago

That's a good question. I didn't even think about it to be honest. I just read through this extensive answer: https://stackoverflow.com/questions/14388706/how-do-so-reuseaddr-and-so-reuseport-differ and it seems like it's even considered a security problem if sockets can be hijacked. My knowledge in this area is pretty slim, I just assumed it worked on linux, so it should work on windows, but, that seems not so. Maybe someone with more knowledge can chime in?

ringabout commented 4 years ago

I encounter similar problems when porting httpbeast to windows(my fork). Enabling multi-threads in windows makes Prologue running slower than single thread. https://github.com/planety/prologue

ringabout commented 4 years ago

It is not specific to async await. It also happens with synchronous net.

import net, os

type
  Server = ref object
    socket: Socket

proc newServer*(): Server = new result

proc close*(server: Server) =
  server.socket.close()

proc sendResp*(socket: Socket, code: string) =
  socket.send("HTTP/1.1 " & code & "\c\L" &
                            "Content-Length: " & $((code).len + 2) & "\c\L\c\L" &
                            code & "\c\L")

proc processMessage(server: Server, csocket: Socket) =

  echo "threadid: " & $getThreadId()

  try:
    let line = csocket.recvLine()

    if line.len == 0:
      csocket.close()

    csocket.sendResp("200 OK")
  except Exception as e:
    echo osLastError()
    echo "Failed processClient(): " & e.msg

proc loop(server: Server, port = 5000) =
  server.socket = newSocket(buffered = false)
  server.socket.setSockOpt(OptReuseAddr, true)
  server.socket.setSockOpt(OptReusePort, true)
  server.socket.bindAddr(port.Port)
  server.socket.listen()

  var netAddr: string
  var clientSocket: Socket
  while true:
    server.socket.acceptAddr(clientSocket, netAddr)
    echo "accepted connection from " & netAddr
    processMessage(server, clientSocket)

proc runServer() =
  let svr = newServer()
  loop(svr)
  svr.close()

proc start*() =
  var threads = newSeq[Thread[void]](4)
  for i in 0 ..< 4:
    createThread[void](threads[i], runServer)
  echo("Listening on port 5000") # This line is used in the tester to signal readiness.
  joinThreads(threads)

start()