Open Alogani opened 4 months ago
Some suggestions from a Chat-LLM (as much as it worths) :
EventLoop and DispatcherLoop:
EventLoop Thread: The EventLoop should run in its own dedicated thread, separate from the worker threads. This ensures that the EventLoop can efficiently monitor events without being blocked by task execution.
Per-Thread Task Dispatchers:
Number of Threads:
What design should the DispatcherLoop have ?
Other considerations:
import threading
import heapq
import time
import selectors
import os
import queue
class Task:
def __init__(self, coro, priority=0):
self.coro = coro
self.priority = priority
self.cancelled = False
class EventLoop:
def __init__(self, selector):
self.selector = selector
self.tasks = []
self.timers = []
self.callbacks = []
self.io_count = 0
self.io_lock = threading.Lock()
self.task_queue = queue.PriorityQueue()
self.task_count = 0
self.task_limit = 1000 # Backpressure limit
def add_task(self, task):
self.task_queue.put((task.priority, self.task_count, task))
self.task_count += 1
if self.task_count > self.task_limit:
raise BackpressureError("Task queue limit reached")
def cancel_task(self, task):
task.cancelled = True
def add_timer(self, timer):
heapq.heappush(self.timers, (timer.deadline, timer))
def add_callback(self, callback):
self.callbacks.append(callback)
def handle_io(self):
with self.io_lock:
self.io_count += 1
def release_io(self):
with self.io_lock:
self.io_count -= 1
def run(self):
while True:
# Check for I/O events
events = self.selector.select()
for key, mask in events:
callback = key.data
self.handle_io()
callback()
self.release_io()
# Process timers
now = time.time()
while self.timers and self.timers[0][0] <= now:
_, timer = heapq.heappop(self.timers)
self.add_callback(timer.callback)
# Process tasks
while not self.task_queue.empty():
priority, _, task = self.task_queue.get()
if not task.cancelled:
try:
next(task.coro)
self.add_task(task)
except StopIteration:
pass
# Process callbacks
for callback in self.callbacks:
callback()
self.callbacks.clear()
# Adjust the number of worker threads based on I/O workload
self.adjust_worker_threads()
# Batch and coalesce I/O operations
self.batch_and_coalesce_io()
def adjust_worker_threads(self):
target_threads = max(1, self.io_count * 2)
dispatcher.adjust_threads(target_threads)
def batch_and_coalesce_io(self):
# Implement batching and coalescing of similar I/O operations
pass
class Dispatcher:
def __init__(self, event_loop):
self.event_loop = event_loop
self.threads = []
self.thread_queues = []
self.thread_lock = threading.Lock()
def add_thread(self):
thread = threading.Thread(target=self.thread_loop)
thread.start()
with self.thread_lock:
self.threads.append(thread)
self.thread_queues.append(queue.PriorityQueue())
def remove_thread(self):
with self.thread_lock:
if self.threads:
thread = self.threads.pop()
self.thread_queues.pop()
thread.join()
def adjust_threads(self, target_threads):
with self.thread_lock:
current_threads = len(self.threads)
if target_threads > current_threads:
for _ in range(target_threads - current_threads):
self.add_thread()
elif target_threads < current_threads:
for _ in range(current_threads - target_threads):
self.remove_thread()
def submit_task(self, task):
# Submit task to the least loaded thread queue
least_loaded = min(self.thread_queues, key=lambda q: q.qsize())
least_loaded.put(task)
def steal_task(self):
# Steal a task from the most loaded thread queue
most_loaded = max(self.thread_queues, key=lambda q: q.qsize())
if most_loaded.qsize() > 0:
return most_loaded.get()
return None
def thread_loop(self):
while True:
# Check thread-local queue
if self.thread_queues:
task = self.thread_queues[0].get()
if not task.cancelled:
try:
next(task.coro)
self.event_loop.add_task(task)
except StopIteration:
pass
# Check global queue
task = self.steal_task()
if task:
if not task.cancelled:
try:
next(task.coro)
self.event_loop.add_task(task)
except StopIteration:
pass
# Example usage
selector = selectors.DefaultSelector()
event_loop = EventLoop(selector)
dispatcher = Dispatcher(event_loop)
# Start with a minimum of 2 worker threads
dispatcher.adjust_threads(2)
# Submit tasks
async def my_task(delay):
await asyncio.sleep(delay)
# Task implementation
dispatcher.submit_task(Task(my_task(1), priority=0))
dispatcher.submit_task(Task(my_task(2), priority=1))
dispatcher.submit_task(Task(my_task(3), priority=2))
import collections
class EventLoop:
# ... (other code)
def batch_and_coalesce_io(self):
# Batch and coalesce similar I/O operations
io_operations = collections.defaultdict(list)
for key, mask in self.selector.select(0):
callback = key.data
operation_type = type(callback).__name__
io_operations[operation_type].append(callback)
for operation_type, callbacks in io_operations.items():
if len(callbacks) > 1:
# Coalesce similar I/O operations
self.coalesce_io_operations(operation_type, callbacks)
else:
# Execute a single I/O operation
callbacks[0]()
def coalesce_io_operations(self, operation_type, callbacks):
if operation_type == "FileReadCallback":
self.coalesce_file_reads(callbacks)
elif operation_type == "SocketReadCallback":
self.coalesce_socket_reads(callbacks)
elif operation_type == "FileWriteCallback":
self.coalesce_file_writes(callbacks)
elif operation_type == "SocketWriteCallback":
self.coalesce_socket_writes(callbacks)
else:
# Execute each callback individually
for callback in callbacks:
callback()
def coalesce_file_reads(self, callbacks):
# Batch and coalesce file read operations
file_handles = set(callback.file_handle for callback in callbacks)
for file_handle in file_handles:
data = b"".join(callback.read_data for callback in callbacks if callback.file_handle == file_handle)
for callback in callbacks:
if callback.file_handle == file_handle:
callback.set_result(data)
def coalesce_socket_reads(self, callbacks):
# Batch and coalesce socket read operations
sockets = set(callback.socket for callback in callbacks)
for socket in sockets:
data = b"".join(callback.read_data for callback in callbacks if callback.socket == socket)
for callback in callbacks:
if callback.socket == socket:
callback.set_result(data)
def coalesce_file_writes(self, callbacks):
# Batch and coalesce file write operations
file_handles = set(callback.file_handle for callback in callbacks)
for file_handle in file_handles:
data = b"".join(callback.write_data for callback in callbacks if callback.file_handle == file_handle)
for callback in callbacks:
if callback.file_handle == file_handle:
callback.set_result(len(data))
def coalesce_socket_writes(self, callbacks):
# Batch and coalesce socket write operations
sockets = set(callback.socket for callback in callbacks)
for socket in sockets:
data = b"".join(callback.write_data for callback in callbacks if callback.socket == socket)
for callback in callbacks:
if callback.socket == socket:
callback.set_result(len(data))
Most of your questions are actually opinionated design decisions that should be guided with:
Just be aware of analysis paralysis, and sometimes the only way to make progress is to get started. When I learned about multithreading runtime, I blindly started to copy 6 different designs: https://github.com/mratsim/weave/tree/v0.1.0/experiments to get better informed decisions about where I wanted to go and the gotcha that you only realize during implementation.
Should have 61 Os Threads (why ?)
You misread. Go starts with as many threads as the number of OS threads, and it can go up to GOMAXPROCS. See threadpool resizing logic https://github.com/golang/go/blob/956f8a6/src/runtime/proc.go#L5663-L5812
The 61 comes from here: https://github.com/golang/go/blob/956f8a6/src/runtime/proc.go#L3295-L3305.
There are 2 level of answers to the why:
For example, imagine you have a task/actor A that checks an inbox and when it depending on the message it spawns:
Now why 61? 61 is a prime number and prime number break patterns.
For example, compare 10 and 11:
See that 2 is a factor of 10, and 2 cannot generate all the numbers mod 10. 11 has no factor besides 1 and itself.
Thanks mratism, this is very interesting (clever the modulo on prime number !)
I agree those are opiniated questions. And because I intend NimGo to be a general purpose library focused on simplicity of use rather than pure performance, none of those questions will have a definitive answer, and I don't care. General purpose is difficult because it needs a balance of everything and we can' t have everything.
Those questions are mostly to identify tradeoffs and to avoid a bottleneck. But having the fastest or most memory sparing library are non goals. In fact, I think it will be a much more simple design and maybe evolve in future refactors (but this still needs some thoughts and not to rush on code).
I hope Araq will have some time to "have fun" on this project by focusing on the multithread parts (DispatcherLoop) and have an opinion on what balance design could fit. But I have some works to do for this library before (making it works on windows mainly, and resolving some minor issues). And if Araq won't join the party, no problem, a single threaded library is enough for my needs :-) And I am not in any rush, the library is already usable for my other projects !
Because this is open source project and I rely on collaboration for transforming it into M:N, those are not only my choices and opinions now.
Please read here the definitions I use before responding to this post. If we don't agree on what we talk, we won't talk about much.
Questions
Proposition of response
If I read correctly this paper shared from @mratsim : https://assets.ctfassets.net/oxjq45e8ilak/48lwQdnyDJr2O64KUsUB5V/5d8343da0119045c4b26eb65a83e786f/100545_516729073_DMITRII_VIUKOV_Go_scheduler_Implementing_language_with_lightweight_concurrency.pdf
The answers could be :
However I am wondering how well a single selector can scale. But having multiple selectors by thread is trickier, even if the same fd can be registered in multiple dispatcher (with special considerations for thread safety, but this might be implementation specific).