crossbario / autobahn-java

WebSocket & WAMP in Java for Android and Java 8
https://crossbar.io/autobahn
MIT License
1.52k stars 426 forks source link

Add docs rgd threads and thread-safety #536

Open ampeixoto opened 2 years ago

ampeixoto commented 2 years ago

I detected an issue when trying to make multiple calls concurrently.

Sometimes, if I make 4 concurrent calls to Session.call() I would only get 3 responses and worse, sometimes one of the responses was the payload of another procedure...

So I dig into the autobahn-java code and noticed the following: In Session.java you have this:

    private <T> CompletableFuture<T> reallyCall(
            String procedure,
            List<Object> args, Map<String, Object> kwargs,
            CallOptions options,
            TypeReference<T> resultTypeReference,
            Class<T> resultTypeClass) {
        throwIfNotConnected();

        CompletableFuture<T> future = new CompletableFuture<>();

        long requestID = mIDGenerator.next();

        mCallRequests.put(requestID, new CallRequest(requestID, procedure, future, options,
                resultTypeReference, resultTypeClass));

        if (options == null) {
            send(new Call(requestID, procedure, args, kwargs, 0));
        } else {
            send(new Call(requestID, procedure, args, kwargs, options.timeout));
        }
        return future;
    }

And in IDGenerator.java this:

public class IDGenerator {
    private long mNext;

    public long next() {
        mNext += 1;
        if (mNext > 9007199254740992L) {
            mNext = 1;
        }
        return mNext;
    }
}

As you can see, that is not thread-safe. Neither mNext nor mCallRequests can be set concurrently.

And to prove it, I created a small snippet in kotlin:

fun main() = runBlocking {

    val scope = CoroutineScope(Job() + Dispatchers.IO)
    println("Start generating ids")
    val results = (1..50).map {
        generateIdsConcurrently(scope)
    }
    println("Results: $results")
    println("All successful: ${results.all { it }}")
}

private suspend fun generateIdsConcurrently(scope: CoroutineScope): Boolean {
    val tasks = mutableListOf<Job>()
    val idsMap = HashMap<Int, Int>()
    val numberOfIdsExpected = 10
    val idGenerator = IDGenerator()
    (1..numberOfIdsExpected).onEach { index ->
        val childJob = scope.launch {
            //this delay forces more failures
            delay(100)
            val id = idGenerator.next()
            idsMap[id.toInt()] = index
        }
        tasks.add(childJob)
    }

    tasks.joinAll()

    val expectedIds = idsMap.values.sorted()
    val generatedIds = idsMap.keys.sorted()
    return expectedIds == generatedIds
}

If we run this code, we can see that it almost always fails (created 50 trials to make it more frequent). So the generated IDs aren't always sequential.

Similar issues happens to Session.subscribe() also (and potentially other methods).

SOLUTION

Make the next() method synchronized:

public class IDGenerator {
    private long mNext;

    public synchronized long next() {
        mNext += 1;
        if (mNext > 9007199254740992L) {
            mNext = 1;
        }
        return mNext;
    }
}

This improved quite a lot but it was still failling sometimes.

Replace the HashMap by a ConcurrentHashMap. With this, the test passes 100% of the time.

QUESTIONS

om26er commented 2 years ago

QUESTIONS

  • Is my analysis correct or I am making some mistake?
  • Is there any hidden reason for why this is not thread safe?
  • Was this already detected before? I didn't find anything about it...
  • Is the caller of autobahn expected to externally synchronize the calls for some reason?

Are you using threads with autobahn API ? The Session class is designed to be single-threaded. Do you have a real reproducer for that issue ?

om26er commented 2 years ago

For more info please take a look at this issue as well https://github.com/crossbario/autobahn-java/issues/329

oberstet commented 2 years ago

Is there any hidden reason for why this is not thread safe?

yes, the library was designed for async style (pls see the discussion in the issue Omer linked), and all objects of a given WAMP session must only be accessed from the same thread.

note that this doesn't mean your program has to be single threaded, and you can still use WAMP objects running on an IO thread from different threads (eg UI) using means provided by the run-time (Android) or generally thread-safe queues.

note also, that this design will generally lead to faster code, since unnecessary locking is not paid for.


Is my analysis correct or I am making some mistake?

yes, it demonstrates that AutobahnJava library objects are not threadsafe

Is there any hidden reason for why this is not thread safe?

yes, pls see above

Was this already detected before? I didn't find anything about it...

yes, it was deliberately designed like this (it works as expected)

Is the caller of autobahn expected to externally synchronize the calls for some reason?

yes, the only valid reason to use threads is to make use of multiple CPU cores for parallel processing.

for concurrent processing, you don't need threads and an event-driven/lockless design like AutobahnJava is sufficient and more efficient

ampeixoto commented 2 years ago

Thanks for the responses @om26er @oberstet 👍

Although I already implemented the change I mentioned on my fork and so far it looks working fine in a multithreaded environment (I'm using kotlin coroutines).

Do you see any other downside about my solution other than making things a bit slower due to locking? Is there the chance of missing events or something like that?

Also, I would like to suggest to add that info (about using autobahn from a single thread) explicit on the README... Otherwise only looking at the implementation we know about that. And as you can see, I am not the first one having that doubt.

oberstet commented 2 years ago

Do you see any other downside about my solution other than making things a bit slower due to locking? Is there the chance of missing events or something like that?

yes, any locking incurs deadlock risks, which you must handle somehow. rgd missed events: I haven't read your fork code .. so not sure. maybe not.

Also, I would like to suggest to add that info (about using autobahn from a single thread) explicit on the README... Otherwise only looking at the implementation we know about that. And as you can see, I am not the first one having that doubt.

yes, I totally agree. this is actually a pretty important design decision taken in the code. it is opinionated. it must be mentioned in the readme.

oberstet commented 2 years ago

btw, another subtle aspect:

checkout https://github.com/crossbario/autobahn-java/blob/455c1c5eae70a83db1362ef848383868a676c135/autobahn/src/main/java/io/crossbar/autobahn/websocket/WebSocketConnection.java#L748

ABJ can run on Android and Netty. on Netty, things work differently and is using the native Netty WebSocket stuff

https://github.com/crossbario/autobahn-java/tree/master/autobahn/src/main/java/io/crossbar/autobahn/wamp/transports