dlang / project-ideas

Collection of impactful projects in the D ecosystem
36 stars 12 forks source link

Goroutines with wait-free communication #65

Open nin-jin opened 4 years ago

nin-jin commented 4 years ago

Description

std.cocurrency has fibers, threads, logical threads, sandboxes. But:

std.parallelism has tasks, thread pool. But:

vide-core reimplements it: suspendable tasks that runs on thread pool. But:

golang has native support of stackfull coroutines with autosizable stacks that run on thread poll and communication through channels. But:

I wrote go.d to make go-like API with gorouties that run on the thread pool and communicates with wait-free channels. It uses vibe-core so inherits all it's cons. I tried to reimplement it on std.parallelism, on std.concurrency and on both, but suffer defeat. It's hard for me. And I need help.

I think we should have std api's to do it:

  1. The simple way to run suspendable tasks on threadpool.
  2. Tasks should have a small stack size with automatic growth like in go lang.
  3. Locking should be reduced as more as possible.
  4. Tasks should use wait-free gc-free channels to send data (and other tasks too).
  5. Only immutable, shared and movable data should be transferrable through the channel.
  6. Implementation should be as easy as possible.

What are rough milestones of this project?

  1. Currently, there is a very simple code with remarks, tests and some diagrams.
  2. Remove vibe-core dependency from go.d and implement all required functionality based on std.concurrency.
  3. Update std to better support move semantic (ie we can't use sum with non-copiable ranges).
  4. Remove duplicated abstractions from std (tasks, logical thread, fibers) - all abstractions should be orthogonal and composable.
  5. Reimplement vibe-core API based on new std api.

How does this project help the D community?

It gives a standard simple way to make safe and fast multithreading apps.

Recommended skills

What can students expect to get out of doing this project?

Point of Contact

@nin-jin - tell me if I can help you to understand the ideas and join the project.

References

mw66 commented 4 years ago
  1. Tasks should use wait-free gc-free channels to send data (and other tasks too).

https://forum.dlang.org/post/utstivjnjifzelawurzu@forum.dlang.org

lock-free queue?

https://www.liblfds.org/mediawiki/index.php?title=r7.1.1:Queue_(unbounded,_many_producer,_many_consumer)

Java uses the same algorithm for ConcurrentLinkedQueue (in C implementation).

I tried some small examples with liblfds, got slightly better performance than Java. Maybe we don’t want to reinvent the wheels, esp the well tested ones.

try: https://github.com/mingwugmail/liblfdsd

received 100000000 messages in 4632 msec sum=4999999950000000 speed=21588 msg/msec
nin-jin commented 4 years ago

We should think about safety too. Not only performance. Ie my implementation guarantee that 1p1c queue really has at most 1 consumer and 1 provider.

mw66 commented 4 years ago

that liblfds queue is mpmc, i.e. it's correct even under mpmc requests.

nin-jin commented 4 years ago

If it's mpmc than it use locking. Isn't?

mw66 commented 4 years ago

No, it's mpmc and lock free.

liblfds stands for: lib-lock-free-data-structure.

https://www.liblfds.org/mediawiki/index.php?title=r7.1.1:Queue_(unbounded,_many_producer,_many_consumer)#Lock-free_Specific_Behaviour

nin-jin commented 4 years ago

I don't understand how it can be possible without spinlock and CAS.

mw66 commented 4 years ago

It has CAS, but not the 'lock' in the usual sense, the implementation is based on:

https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf

nin-jin commented 4 years ago

CAS based algorithms can produce "live locks" and "false sharing" on many consumers/providers. So I prefer wait-free instead of lock-free.

Geod24 commented 4 years ago
  1. Tasks should have a small stack size with automatic growth like in go lang.

That one is going to be a real pain. Go has compiler support for this, we don't (and can't). We could use the segfault handler but I don't know if we'd match the performance of Go.

mw66 commented 4 years ago
  1. Tasks should have a small stack size with automatic growth like in go lang.

That one is going to be a real pain. Go has compiler support for this, we don't (and can't). We could use the segfault handler but I don't know if we'd match the performance of Go.

Then at least we should come close, right now it's 2~4x time slower than Go. (And the plain message queue in D is ~4x time slower than in Java). It's hard to impress anyone with this kind of performance.

I'm wondering how much we can gain if the queue is faster. Any detailed profile info to show where is the bottle neck?

mw66 commented 4 years ago
  1. Tasks should have a small stack size with automatic growth like in go lang.

That one is going to be a real pain. Go has compiler support for this, we don't (and can't). We could use the segfault handler but I don't know if we'd match the performance of Go.

Actually, this to some extent, can be done in user space, using duff's device:

https://en.wikipedia.org/wiki/Duff%27s_device

https://www.chiark.greenend.org.uk/~sgtatham/coroutines.html

nin-jin commented 4 years ago

Go has compiler support for this, we don't (and can't).

Compiler could mark functions with attribute MaxStackSize depends calls inside.

Geod24 commented 4 years ago

I have been working on a similar problem for a while now, I think I might have mentioned this somewhere on the forum.

The gist of it is:

Different communications primitives

Go and std.concurrency have a different model. Go has channels, which are essentially two-way pipes. You can have a single channel per Goroutine, or you can have many. On the other hand, std.concurrency has a single MessageBox per "task". It was originally conceived as one MessageBox per thread, but when fiber support was added, it was extended. If you have the Go-style primitives, you can do the std.concurrency style. However the other way around is not possible, obviously. As a result, you do not have the ability to listen to multiple MessageBox at the moment (select in Go). That is a critical missing piece of functionality.

C/C++ interop

The main blocker, IMO, is that D is not a managed language like Go. D program can (and will) do everything: you have inline assembler, call to C or C++ functions, direct interactions with the OS. On the other hand, Go has a much more integrated environment and it much harder (on purpose) to do such things. In particular, Go has a global scheduler which handles everything. It can suspend your Goroutine, move it, etc.. It has compiler support. D cannot have compiler support, simply because it might call into C / C++ code (Go also does but the runtime has a bit of magic in it).

Scheduler

In order to be efficient, our channels cannot block the thread, they need to cooperate. So the scheduler is definitely needed. The way I approached it in my project is to have a scheduler that manages fibers at the thread level, a queue for message passing, and a class that binds them together.

Hand-off semantic / D specificity

D is a bit different (and I believe, has an advantage) because of shared. If your channel is used for cross-thread communication, we should use one of the queue you mentioned, and the data needs to be shared. If intra-thread communication is used, we can use direct hand off semantic and avoid a costly yield to the scheduler, but if possible we'd like to avoid to require the use of shared.

TL;DR

I am working on a per thread scheduler, a thread-safe message queue, and a class that binds those together. Go has made some very good documented technical choices and we should draw from it. The page-size part is tricky and I believe can be implemented almost independently.

Cyroxin commented 4 years ago

CAS based algorithms can produce "live locks" and "false sharing" on many consumers/providers. So I prefer wait-free instead of lock-free.

If a single wait free producer&consumer communication is all you need, then you can look into how I did message passing in elembuf. You can close the other thread or give it new orders/tasks. This way you get all the pros of std.parallelism without the cons. You could even make a threadpool out of these threads that can be ordered to do any task.

Elembuf that is using this system is currently about 35x faster at message passing than the benchmark version you posted called go.d. You can make a dependency on the threaded version of Elembuf in your projects, which should make things much easier than reimplementing the wheel. There are still some optimizations left to do, which I haven't gotten into doing yet, you can check these out on the repo's project page for 1.2.X.

elembuf ```` D #!/usr/bin/env dub /+ dub.sdl: name "app" dependency "elembuf" version="~>1.1.5" dflags "-release" "-m64" "-boundscheck=off" "-O" platform="dmd" dflags "-O4" "--release" "--boundscheck=off" platform="ldc2" +/ // dmd.exe -release -m64 -boundscheck=off -O buffer.d // ldc2 -O4 --release --boundscheck=off buffer.d import buffer; import std.stdio; import std.datetime.stopwatch; import core.atomic; import core.thread, std.concurrency; const n=1_000_000_000; //;_000 enum amount = n; void main() //line 22 { Buffer!(size_t, true) buffer = Buffer!(size_t,true)(); size_t srci = 0; // Source index size_t consi = 0; // Consumer index size_t sum = 0; auto src = (size_t[] x) // Tell background thread about source //Line 30 { const needToFill = amount - srci; if( x.length >= needToFill ) // Final fill! { foreach(i;0..needToFill){ x[i] = srci; srci++; } return needToFill; } else // Long way to go still... { foreach(ref i;x){ i = srci; srci++; } return x.length; } }; buffer.fill = src; // START! StopWatch sw; sw.start(); while(consi < amount) { buffer.fill(); foreach(elem; buffer) sum += elem; consi += buffer.length; buffer = buffer[$..$]; } sw.stop(); writeln("finished receiving"); writefln("received %d messages in %d msec sum=%d speed=%d msg/msec", n, sw.peek().total!("msecs"), sum, n/sw.peek().total!("msecs")); } ````
go ```` D #!/usr/bin/env dub /+ dub.sdl: name "app" dependency "jin-go" version="~>2.0.0" +/ import std.datetime.stopwatch; import std.stdio; import jin.go; const int n = 100_000_000; void threadProducer(Output!int queue) { foreach (int i; 0..n) { queue.put(i); } } void main() { Input!int queue; jin.go.go.go!threadProducer(queue.pair); StopWatch sw; sw.start(); long sum = 0; foreach (p; queue) { sum += p; } sw.stop(); writefln("received %d messages in %d msec sum=%d speed=%d msg/msec", n, sw.peek.total!"msecs", sum, n / sw.peek.total!"msecs"); } ````
elembuf.d ldc $dub
received 1000000000 messages in 28237 msec sum=499999999500000000 speed=35414 msg/msec
go.d ldc $dub
received 100000000 messages in 124417 msec sum=4999999950000000 speed=803 msg/msec