wlandau / crew

A distributed worker launcher
https://wlandau.github.io/crew/
Other
129 stars 4 forks source link

Check for running workers using the sockets instead of the worker-specific API #31

Closed wlandau closed 1 year ago

wlandau commented 1 year ago

https://github.com/shikokuchuo/mirai/issues/33#issuecomment-1455133613

wlandau commented 1 year ago

Current thinking on logic for launching and auto-scaling:

wlandau commented 1 year ago

I like this because it does not require waiting for workers to start, and relies completely on the connection and the startup time to tell if a worker is running.

wlandau commented 1 year ago

On second thought, maximum startup time is not very reliable. It can guard against nightmare scenarios, but so many things can affect the startup time of a worker, and it is too hard to predict even in specific cases. I think crew needs to trust that a worker will start up and join (within an extremely generous startup time period). Here, a worker joins if:

  1. mirai::daemons() shows an active connection, or
  2. The worker completed a task since it was started.

crew could keep track of (2) by setting a CREW_SOCKET environment variable in the process that runs miria::server(), and then crew_eval() could grab that environment and return it with the rest of the metadata. Then when task is collected in the collect() method of the connector, we can log the socket in the launcher object.

wlandau commented 1 year ago

c.f. https://github.com/wlandau/crew/issues/32#issuecomment-1458193820

shikokuchuo commented 1 year ago

On second thought, maximum startup time is not very reliable. It can guard against nightmare scenarios, but so many things can affect the startup time of a worker, and it is too hard to predict even in specific cases. I think crew needs to trust that a worker will start up and join (within an extremely generous startup time period). Here, a worker joins if:

1. `mirai::daemons()` shows an active connection, or

2. The worker completed a task since it was started.

crew could keep track of (2) by setting a CREW_SOCKET environment variable in the process that runs miria::server(), and then crew_eval() could grab that environment and return it with the rest of the metadata. Then when task is collected in the collect() method of the connector, we can log the socket in the launcher object.

Would it help you if I simply add a count to each server - tasks started, tasks completed for each node. Then you can just get this by calling daemons() along with the current connection status. Would this give you everything you need?

wlandau commented 1 year ago

That would help so much! Tasks started would let me really know how many workers can accept new tasks (if I also take into account the "expected" workers which are starting up but have not yet connected to the client). And tasks completed is such useful load balancing data. In the case of persistent workers, it could really help users figure out if e.g. they really need 50 workers or they can scale down to 25.

wlandau commented 1 year ago

As long as those counts are refreshed if the server() at the given socket restarts, this would be amazing.

shikokuchuo commented 1 year ago

Give shikokuchuo/mirai@ba5e84e (v0.7.2.9026) a try. Should give you everything you need. As a happy side effect, the active queue keeps getting more efficient even as we add more features.

wlandau commented 1 year ago

Fantastic! I will test as soon as I have another free moment. (Trying to juggle other projects too, so this one may take me a few days.)

shikokuchuo commented 1 year ago

Cool. You'll want to pick up shikokuchuo/mirai@3777610 (v0.7.2.9028) instead. Having gone through https://github.com/wlandau/crew/issues/32#issuecomment-1458193820_ in more detail, I think this now has what you need.

wlandau commented 1 year ago

I tested the specific worker counters with https://github.com/shikokuchuo/mirai/commit/51f2f80c360d872437fd9a17def3ae20184edf79, and they work perfectly.

wlandau commented 1 year ago

I did more thinking about auto-scaling logic, and I care a lot about whether a mirai server is active or inactive. Here, a server is active if it deserves an opportunity to do tasks. Inversely, a worker is inactive if it is definitely broken and we should force-terminate and relaunch if needed.

To determine if a server is active, I need three more definitions:

  1. Connected: The server is connected to its websocket. status_online from mirai::daemons()$nodes is 1. This is not the same as active because a server can take a long time to start.
  2. Discovered: the server may or may not be connected right now, but at some point in its lifecycle, it dialed into the client and changed something in mirai::daemons()$nodes.
  3. Launching: the server process began within the last few minutes and may need more time to initialize and connect to the mirai client.

If the server is connected, then it is automatically active. If it is disconnected, then we should only consider it active if it is launching and not yet discovered.

Screenshot 2023-03-09 at 3 37 40 PM

wlandau commented 1 year ago

Scenario for slow-to-launch transient workers:

  1. Start a server and note the start time.
  2. Push a task and auto-scale.
    • Notice that the server is disconnected and discovered. Now we know the worker is inactive, so it must be not launching.
    • Force-terminate the worker.
    • Reset the start time to NA_real_ to force the worker into a not launching state.
    • Synchronize the crew launcher object with daemons()$nodes so the worker is not discovered. The worker is still dubbed inactive, and now crew is prepared to discover the next worker that dials into the same socket.
  3. Subsequent auto-scaling operations may or may not choose to relaunch another worker at this same socket, but we are confident the worker is inactive, and the re-launch can happen in a different method call than (2).
wlandau commented 1 year ago

Unfortunately, the counts in daemons()$nodes do not quite cover the scenario in https://github.com/wlandau/crew/issues/31#issuecomment-1462792199. I will post feature request to mirai.

shikokuchuo commented 1 year ago

Re. your diagram, I am not sure you need to be so explicit. I believe the daemons() call should be sufficient.

If status is online, you have an active server.

Otherwise you know that a server has either never connected (zero tasks columns) or disconnected (non-zero tasks columns). In both cases you check when it was launched by crew and if past the 'expiry time' then you kill it and launch another if needed - and here you may also utilise the stats to determine this.

wlandau commented 1 year ago

Otherwise you know that a server has either never connected (zero tasks columns) or disconnected (non-zero tasks columns).

If the server ran and then disconnected, I would strongly prefer not to wait for the rest of the expiry time. The expiry time in crew needs to be large enough to accommodate circumstances like a randomly busy cluster or a long wait time for a AWS to find a spot instance within the user's price threshold. So waiting for the remainder of that time would be costly for transient workers that exit almost as soon as they dial in.

Unfortunately, I cannot currently tell if this is happening. A lot of the time, I call daemons()$nodes and see these counts:

                    status_online status_busy tasks_assigned tasks_complete
ws://127.0.0.1:5000             0           0              1              1

If we are still inside the expiry time, then I cannot tell if the server already connected and disconnected, or if the worker is starting and these counts are left over from the previous server that used websocket ws://127.0.0.1:5000.

Does this make sense? Would it be possible to add a new websocket-specific counter in daemons()$nodes that increments every time a different server process dials into the socket? As per the logic in https://github.com/wlandau/crew/issues/31#issuecomment-1462792199, I can watch that counter for changes. This would allow crew to detect connected-then-disconnected servers at any time in the expiry window.

shikokuchuo commented 1 year ago

I think you have look at this from the perspective that mirai is not trying to keep state - calling daemons() offers a snapshot that enables crew to do so.

  1. crew launches first server. Online status is zero - you know you are waiting for it to start. If successful then you observe it becomes 1 at some point or you reach the expiry time and it is still zero in which case you kill and repeat.

  2. After this you observe the status becomes zero again. You know the server has disconnected. perhaps you look at the number of completed tasks vs other servers and use some rule to decide if you re-launch or not.

Once you have re-launched then you are effectively back to step 1. You should know your state at all times.

wlandau commented 1 year ago
  1. crew launches first server. Online status is zero - you know you are waiting for it to start. If successful then you observe it becomes 1

For a quick task and tasklimit = 1, the server online status may go from 0 to 1 to 0 too fast for crew to ever observe the 1. In other words, the task for the tasklimit = 1 worker may be too quick to for me to ever observe a change in online status. crew has no daemons of its own to periodically poll for status_online = 1, and even if it did, the narrow window of the 1 would have to overlap with the polling interval, which is not guaranteed for short tasks.

Previously I though of a workaround where I would make each task tell me which websocket it came from. This would solve some scenarios, but not a scenario where idletime is small and timerstart = 0. In this latter case, the worker could start and vanish before I notice, with no tasks assigned, and I would have to wait until the end of the long expiry time to find out what happened.

If state is an obstacle for mirai, what if each server could present some UUID or other ID which would differ from connection to connection at the websocket? Then mirai would not need to keep track of state for the counter, and crew would watch the UUID for changes.

shikokuchuo commented 1 year ago
  1. crew launches first server. Online status is zero - you know you are waiting for it to start. If successful then you observe it becomes 1

For a quick task and tasklimit = 1, the server online status may go from 0 to 1 to 0 too fast for crew to ever observe the 1. In other words, the task for the tasklimit = 1 worker may be too quick to for me to ever observe a change in online status. crew has no daemons of its own to periodically poll for status_online = 1, and even if it did, the narrow window of the 1 would have to overlap with the polling interval, which is not guaranteed for short tasks.

Online status may go from 0 to 1 to 0 again, but the snapshot will also show non-zero tasks against the zero status. So you know the server has completed one task in this case and disconnected. You may be missing the fact that tasks only get zeroed when a new server connects. So if you don't choose to start up a new server to connect to this port, the stats will never change.

shikokuchuo commented 1 year ago

Previously I though of a workaround where I would make each task tell me which websocket it came from. This would solve some scenarios, but not a scenario where idletime is small and timerstart = 0. In this latter case, the worker could start and vanish before I notice, with no tasks assigned, and I would have to wait until the end of the long expiry time to find out what happened.

This does not sound very plausible to me - you have something that takes potentially a very long time to spin up stay online for a very short time and have it exit without carrying out a task.

As per my last answer - at each point you want to start up a new server, you have access to all the stats you need. You don't have to poll outside of those points. You won't lose stats because somehow you weren't quick enough to catch them - only a new server connection clears out the previous stats.

shikokuchuo commented 1 year ago

If state is an obstacle for mirai, what if each server could present some UUID or other ID which would differ from connection to connection at the websocket? Then mirai would not need to keep track of state for the counter, and crew would watch the UUID for changes.

It is not at all an obstacle, but there must be some part of the crew workings that I am not getting, because I am not seeing the need.

wlandau commented 1 year ago

Online status may go from 0 to 1 to 0 again, but the snapshot will also show non-zero tasks against the zero status. So you know the server has completed one task in this case and disconnected. You may be missing the fact that tasks only get zeroed when a new server connects. So if you don't choose to start up a new server to connect to this port, the stats will never change.

This is exactly where I am struggling: as you say, the tasks only get zeroed when a new server starts. So if I start my second server at the websocket and observe status_online = 0 with tasks_completed = 1, are those counts left over from the first server, or did the second server run and then disconnect before I could check online status? The result is the same, so I cannot tell the difference. That means I do not know if the second server is starting up, which means I have to wait until the end of the expiry time. Does that make sense?

A major goal ofcrew is to provide a smooth continuum between persistent and transient workers, so even in the case with daemons, I am trying to make transient workers function as efficiently and responsively as possible.

This does not sound very plausible to me - you have something that takes potentially a very long time to spin up stay online for a very short time and have it exit without carrying out a task.

And I could definitely avoid it by requiring tasklimit >= 1 at the level of crew, but I think some users may want tasklimit = 0 with an extremely small idletime because resources are expensive. I plan to build crew into targets, and I have learned there is a wide variety of preferences and scenarios in the user base.

It is not at all an obstacle, but there must be some part of the crew workings that I am not getting, because I am not seeing the need.

I could just as easily be missing something, I am finding it challenging (and interesting) to wrap my head around this problem. Thank you for sticking with me on this. If the first part of my answer does not make sense, please let me know and I may be able to describe it a different way.

wlandau commented 1 year ago

Online status may go from 0 to 1 to 0 again, but the snapshot will also show non-zero tasks against the zero status. So you know the server has completed one task in this case and disconnected. You may be missing the fact that tasks only get zeroed when a new server connects. So if you don't choose to start up a new server to connect to this port, the stats will never change.

Just to be clear, this is totally fine for the first server that dials into a websocket. The delay due to leftover counts only happens when subsequent servers launch at the same websocket long after the first server disconnects. But this is important: crew cares about auto-scaling, so multiple servers could connect to the same websocket (one after the other) over the course of a pipeline.

Just to make sure crew's auto-scaling behavior is clear, the following is a likely scenario:

  1. The user needs to run tasks on a busy SLURM cluster. Given how busy it is, they set an expiry startup time of 30 minutes.
  2. The user submits enough tasks that crew decides to scale up the number of mirai servers.
  3. crew submits a SLURM job to run mirai server A.
  4. The SLURM job for mirai server A spends 5 minutes in the SLURM task queue.
  5. The SLURM job dequeues, and mirai server A dials into ws://192.168.0.2:5000/23.
  6. Most of the tasks complete, and few tasks are left in the queue. mirai server A exits and disconnects from ws://192.168.0.2:5000/23 because the idle time limit is reached. Tasks assigned and tasks completed both equal 2 for socket ws://192.168.0.2:5000/23 at this point.
  7. After several minutes, the user submits another load of tasks to the crew queue, and crew decides to scale up the mirai servers again. crew launches mirai server B to connect to the same socket at ws://192.168.0.2:5000/23.
  8. Tasks are distributed among multiple servers, and 5 minutes later, crew notices for ws://192.168.0.2:5000/23 that the online status is 0, tasks completed is 2, and tasks assigned is 2. Did mirai server B finish its tasks and idle out, or is it still trying to start and connect to ws://192.168.0.2:5000/23? The counts are exactly the same either way, and crew is not going to poll the SLURM job queue to find out because doing so on a regular basis would overburden squeue. (This kind of platform-specific polling would also be expensive and slow in the general case, e.g. AWS, where each one would have to be an HTTP/REST API call.)
  9. Wait an extra 25 minutes because we are not sure if mirai server B ever connected at all. (Recall the expiry startup time of 30 minutes from (1).)
wlandau commented 1 year ago

To quote @brendanf from https://github.com/wlandau/crew/issues/32#issuecomment-1458361871:

Using targets with clustermq on Slurm, I sometimes get extreme queue times for some of the workers, on the order of days.

So the posited 30-minute expiry time from https://github.com/wlandau/crew/issues/31#issuecomment-1463089721 may not be nearly enough for some users.

wlandau commented 1 year ago

You know what? I may be able to handle all this in crew without needing to burden mirai with it, and the end product may actually cleaner and easier on my end. I could submit the crew worker with known UUID that I keep track of, then send the UUID back when the mirai server is done. If I receive the same UUID that I sent the worker with, then I know the worker finished.

crew_worker <- function(socket, uuid, ...) {
  server_socket <- nanonext::socket(protocol = "req", dial = socket)
  on.exit(nanonext::send(con = server_socket, data = uuid)) # Tell the client when the server is done.
  mirai::server(url = socket, ...)
}

On the server process:

crew_worker("ws://192.168.0.2:5000/finished_servers", uuid = "MY_UUID", idletime = 100, tasklimit = 0)

On the client:

sock <- nanonext::socket(protocol = "rep", listen = "ws://192.168.0.2:5000/finished_servers")
# ... Do some other work, do not poll at regular intervals.
uuid <- nanonext::recv(sock) # Did any workers finish since last I checked? 
# ... Check the uuid against the known set of UUIDs I submitted workers with.

There is still a slim possibility that mirai::server() connects but the subsequent manual nanonext::send() fails, but I don't think that will come up as much, and we can rely on expiry time to detect those rare failures.

shikokuchuo commented 1 year ago

You know what? I may be able to handle all this in crew without needing to burden mirai with it, and the end product may actually cleaner and easier on my end. I could submit the crew worker with known UUID that I keep track of, then send the UUID back when the mirai server is done. If I receive the same UUID that I sent the worker with, then I know the worker finished.

crew_worker <- function(socket, uuid, ...) {
  server_socket <- nanonext::socket(protocol = "req", dial = socket)
  on.exit(nanonext::send(con = server_socket, data = uuid)) # Tell the client when the server is done.
  mirai::server(url = socket, ...)
}

On the server process:

crew_worker("ws://192.168.0.2:5000/finished_servers", uuid = "MY_UUID", idletime = 100, tasklimit = 0)

On the client:

sock <- nanonext::socket(protocol = "rep", listen = "ws://192.168.0.2:5000/finished_servers")
# ... Do some other work, do not poll at regular intervals.
uuid <- nanonext::recv(sock) # Did any workers finish since last I checked? 
# ... Check the uuid against the known set of UUIDs I submitted workers with.

There is still a slim possibility that mirai::server() connects but the subsequent manual nanonext::send() fails, but I don't think that will come up as much, and we can rely on expiry time to detect those rare failures.

Sorry I was just in the middle of a long reply when I saw this pop up! Let me send that out first and I'll have a look at the above.

shikokuchuo commented 1 year ago

Likewise, thanks for sticking with this subject and providing the descriptions. This is turning out to be much more interesting than I expected.

Let's first of all discount the zero tasks and exit case - I think I can probably give you something that allows you to distinguish that case.

However this I think still leaves you with your other overarching issue - and I can see the problem as you described it. Please indulge me if digress a bit, but hopefully this may help with the development of crew. My aim is also for crew to be the best it can.

I see mirai performing the role of a fault-tolerant (thanks to NNG) switch (at the application level rather than the hardware level), call it a task dispatcher if you will. We have been calling this thing a queue, but I will have a think perhaps move away from that wording as I am finding it not particularly helpful or accurate. I have not coded a queue. The mirai process maintains the minimal amount of state to dispatch tasks to servers that are 'free' or else wait. That is all it does and my objective is for it to do that as best it can. Messages are actually queued at the underlying NNG library level or the system socket level. That is where I am coming from when I find it unnatural for mirai to be providing state variables such as cumulative connections (although it is trivial to obtain).

crew on the other hand, is a distributed task launcher - it controls all the tasks that are sent, and it also controls all the servers. Conceptually for me, crew should maintain at a minimum a state of how many tasks have been sent and how many servers have been launched and killed - and it should update this state as frequently as needed. I believe it is not the best design for it to query mirai to provide the ground truth for state. The issue you have in this respect is use of the timeout mechanism in mirai as that effectively causes you to lose track of state as you can only obtain updates as frequently as there are user interactions with crew.

Perhaps conceptually providing the number of connections is not so different to providing the number of tasks. Maybe there is no way around it, but I feel there could be a better solution.

shikokuchuo commented 1 year ago

You know what? I may be able to handle all this in crew without needing to burden mirai with it, and the end product may actually cleaner and easier on my end. I could submit the crew worker with known UUID that I keep track of, then send the UUID back when the mirai server is done. If I receive the same UUID that I sent the worker with, then I know the worker finished.

crew_worker <- function(socket, uuid, ...) {
  server_socket <- nanonext::socket(protocol = "req", dial = socket)
  on.exit(nanonext::send(con = server_socket, data = uuid)) # Tell the client when the server is done.
  mirai::server(url = socket, ...)
}

On the server process:

crew_worker("ws://192.168.0.2:5000/finished_servers", uuid = "MY_UUID", idletime = 100, tasklimit = 0)

On the client:

sock <- nanonext::socket(protocol = "rep", listen = "ws://192.168.0.2:5000/finished_servers")
# ... Do some other work, do not poll at regular intervals.
uuid <- nanonext::recv(sock) # Did any workers finish since last I checked? 
# ... Check the uuid against the known set of UUIDs I submitted workers with.

There is still a slim possibility that mirai::server() connects but the subsequent manual nanonext::send() fails, but I don't think that will come up as much, and we can rely on expiry time to detect those rare failures.

This works, and thanks for thinking about this problem. You do seem to need a sign-out when a server disconnects for the state at crew to get updated. Let me think about it a bit more.

But rest assured I won't subject you to actually needing the above (it is rather un-ergonomic)!

shikokuchuo commented 1 year ago

I think I have found another way for you to detect connected and then exited nodes, implemented in e2e879e.

I have added a state attribute to the node URLs, which start as a vector of TRUE, and whenever a node disconnects it simply flips the relevant flag.

So when you query daemons(), you simply need to retrieve its state vector by

attr(dimnames(daemons()[["nodes"]])[[1L]], "state")

Sorry above should be fast if unwieldly, just I did not want this to print as it's not terribly useful for end users.

Then next time you query, if you notice the state has flipped then you know that there has been a disconnect (i.e. server has connected then disconnected) in the meantime. That should cover both of the cases where you might not be able to distinguish, including the timerstart = 0L case.

Please let me know if this works from your perspective. Thanks!

wlandau commented 1 year ago

Thank you so much for working on this, @shikokuchuo! I am testing https://github.com/shikokuchuo/mirai/commit/e2e879e18b2100a8dc5b6a7e26943480ec60cabb, and I think it does what I need in well-behaved cases. On the client, I start a local active server queue with 2 server nodes.

library(mirai)
daemons("ws://127.0.0.1:5000", nodes = 2)
attr(dimnames(daemons()[["nodes"]])[[1L]], "state")
#> [1] TRUE TRUE

I connect a server.

Rscript -e 'mirai::server("ws://127.0.0.1:5000/1")'

On the client, I see:

daemons()$nodes
#>                       status_online status_busy tasks_assigned tasks_complete
#> ws://127.0.0.1:5000/1             1           0              0              0
#> ws://127.0.0.1:5000/2             0           0              0              0
attr(dimnames(daemons()[["nodes"]])[[1L]], "state")
#> [1] TRUE TRUE

Then I disconnect the server and see on the client:

daemons()$nodes
#>                       status_online status_busy tasks_assigned tasks_complete
#> ws://127.0.0.1:5000/1             1           0              0              0
#> ws://127.0.0.1:5000/2             0           0              0              0
attr(dimnames(daemons()[["nodes"]])[[1L]], "state")
#> [1] FALSE TRUE

So far so good. I can record a state of FALSE in crew to match the one returned by daemons(). Next, I launch second server at the same websocket, which may take time to start up.

Rscript -e 'mirai::server("ws://127.0.0.1:5000/1")'

While I am waiting for the next worker to start up, I see the same result as before, so there has not been any worker activity over the websocket.

attr(dimnames(daemons()[["nodes"]])[[1L]], "state")
#> [1] FALSE TRUE

Then the worker briefly touches the connection and exits quickly. After that, I see:

attr(dimnames(daemons()[["nodes"]])[[1L]], "state")
#> [1] TRUE TRUE

The new state of TRUE now disagrees with crew's state of FALSE, so I know the worker has generated activity on its end of the websocket connection. This is exactly what I need to know.

wlandau commented 1 year ago

We just need to assume the current worker is the only one dialing into this websocket. I think that is a reasonable assumption in most cases, but I do wonder about the race condition when:

  1. A mirai server is starting as an AWS Batch job.
  2. Before the server connects, the expiry time is reached.
  3. crew sends a REST API request to shut down the server. The request takes time to process.
  4. crew launches a second server at the socket.
  5. The mirai server from (1) connects to the client before the shutdown request can be processed.
  6. The shutdown API request is finally received, and the connected mirai server from (1) shuts down 10 seconds later.
  7. The mirai server from (4) finishes and disconnects.
  8. The "state" bit is the same as in (1) because (6) and (7) both flipped it, so crew waits the expiry time for a third worker to launch, even though there is no worker at the socket anymore.

Options to handle this:

A. Just wait the expiry startup time. B. Allow no more than one mirai server at a time to access a websocket at a given time (Is this possible?) C. In crew, poll the specific architecture to make sure the SLURM job, AWS Batch jobe, etc. is really terminated.

shikokuchuo commented 1 year ago

OK - let me just throw open the alternative, especially as it seems you're now familiar enough with nanonext :)

To query the number of past connections over a listener really is as simple as

nanonext::stat(sock$listener[[1L]], "accept")

But purely from an efficiency perspective if there really are say 500 nodes, it feels excessive to query them all every time you call daemons().

None of the options are good of course :)

shikokuchuo commented 1 year ago

I realise now there are many things outside of your control, so I am not opposed to implementing the above.

wlandau commented 1 year ago

But purely from an efficiency perspective if there really are say 500 nodes, it feels excessive to query them all every time you call daemons().

How efficient or inefficient would that be exactly, compared to the current behavior and queries in daemons()?

wlandau commented 1 year ago

I think I like the crew-only https://github.com/wlandau/crew/issues/31#issuecomment-1463698455 because the UUID gives an exact match with the actual worker I am waiting for. If I don't get that exact UUID back, then correct decision is to wait the expiry time. Other UUIDs from stale workers may come in in the meantime, but crew will not react.

In the scenario from https://github.com/wlandau/crew/issues/31#issuecomment-1464013505, crew switches to the UUID at (4) and just focuses on that one. The UUID from (1) no longer matters and no longer has an effect. crew only needs to know that the shutdown signal from (1) was successfully sent (HTTP status 200 in the response header), it doesn't need to wait for the actual worker to shut down.

shikokuchuo commented 1 year ago

But purely from an efficiency perspective if there really are say 500 nodes, it feels excessive to query them all every time you call daemons().

How efficient or inefficient would that be exactly, compared to the current behavior and queries in daemons()?

This is all relative you understand - the query time for each socket is perhaps 6 microseconds, whereas simply to append the state vector would be 5 microseconds in total. To be clear, I don't see this as an obstacle in itself!

wlandau commented 1 year ago

Re https://github.com/wlandau/crew/issues/31#issuecomment-1464040648, the only caveat is what if mirai::server() connects but my custom NNG socket does not? As long as the package is working correctly, I think this unlikely, and this is something I can test locally in crew's unit tests.

wlandau commented 1 year ago

This is all relative you understand - the query time for each socket is perhaps 6 microseconds, whereas simply to append the state vector would be 5 microseconds in total. To be clear, I don't see this as an obstacle in itself!

From https://github.com/wlandau/crew/issues/31#issuecomment-1464040648, I am starting to think the connection count in mirai will not be necessary on my end. As long as I can work directly with NNG successfully for the UUID workaround. I have not tested this yet, but I am optimistic.

shikokuchuo commented 1 year ago

This is all relative you understand - the query time for each socket is perhaps 6 microseconds, whereas simply to append the state vector would be 5 microseconds in total. To be clear, I don't see this as an obstacle in itself!

From #31 (comment), I am starting to think the connection count in mirai will not be necessary on my end. As long as I can work directly with NNG successfully for the UUID workaround. I have not tested this yet, but I am optimistic.

Unless you see other benefits to using a UUID? I think if just to solve this issue then if I'm looking at this holistically it seems like too much work, just to get a bit of state that you can't get because you don't have a channel back from the server to crew. As I said, I'm not opposed to providing the count.

shikokuchuo commented 1 year ago

Sorry I have been thinking in terms of flags... I realise if I just increment an integer counter... 626228e I've called it node_instance and put it in the main $nodes matrix for easier access.

Quite interesting when I was testing it with the self-repairing local active queue!

wlandau commented 1 year ago

UUIDs appeal to me because it's extra assurance about the specific server process I am watching for. I am working with potentially long delays in crew, and so I would feel better if I implemented a blanket protection against race conditions in general. (I am not confident in my ability to anticipate all race conditions.) But node_instance will be useful too. I think I will learn more about load balancing from those counts.

shikokuchuo commented 1 year ago

Great, sounds sensible. As you say it is still useful then I will leave in node_instance in any case as it's virtually no overhead.

wlandau commented 1 year ago

Would I need an additional TCP port for this? I see:

library(mirai)
library(nanonext)
daemons("ws://127.0.0.1:5000/1", nodes = 1)
#> [1] 1
connection_for_done_worker <- nanonext::socket(protocol = "rep", listen = "ws://127.0.0.1:5000/done")
#> Error in nanonext::socket(protocol = "rep", listen = "ws://127.0.0.1:5000/done") : 
#>   10 | Address in use
shikokuchuo commented 1 year ago

Shouldn't do. mirai has listeners on each of the ws addresses - no different.

wlandau commented 1 year ago

In that case, how would you recommend I listen to ws://127.0.0.1:5000/done using nanonext?

shikokuchuo commented 1 year ago

Well the error message says address in use, so have you tried other random paths? no path? I don't see anything obviously wrong.

wlandau commented 1 year ago

I just tried with different paths/ports, both on the loopback address and withgetip::getip(type = "local"), and I saw the same result. Example:

library(mirai)
library(nanonext)
daemons("ws://127.0.0.1:61903/path1", nodes = 1)
#> [1] 0
connection_for_done_worker <- socket(protocol = "rep", listen = "ws://127.0.0.1:61903/path2")
#> Error in socket(protocol = "rep", listen = "ws://127.0.0.1:61903/path2") : 
#>   10 | Address in use
shikokuchuo commented 1 year ago

Oh, because you are listening using mirai from another process. I guess that is why. In that case you need another port.

shikokuchuo commented 1 year ago

I will throw in this suggestion though as it will be less error-prone, and that it to just establish a connection and not actually send any messages.

First use the 'bus' protocol as that is the lightest:

connection_for_done_worker[[1L]] <- socket(protocol = "bus", listen = "ws://127.0.0.1:61903/UUID")

etc.

stat(connection_for_done_worker[[1L]]$listener[[1L]], "accept")

will give you the total number of connections accepted at the listener (1 if the server has dialled in).

stat(connection_for_done_worker[[1L]]$listener[[1L]], "pipes")

will give you the number of current connections. (0 if server has disconnected).

So a combination of 1 and 0 above means the server has dialled in and disconnected after presumably finishing its tasks.

wlandau commented 1 year ago

Wow! This is so much better than trying to catch messages! So much easier. Thank you so much!