mslosarz / nextrtc-signaling-server

NextRTC is simple WebRTC signaling server written in java. It provides signal exchange and easy to integrate API
http://nextrtc.org
MIT License
187 stars 60 forks source link

Signals stop working #27

Closed jacobhub closed 6 years ago

jacobhub commented 6 years ago

Hi, I am using version v1.0.0-RC1, and sometimes the signals stop responding, I do not know why or what causes it. Its difficult to reproduce. but happens occasionally. Are you familiar with the issue ? Please advise how to debug the reason and whats going on in the server . thanks

mslosarz commented 6 years ago

I don't know this issue. Could you provide me more details? "Signals stop responding" means that whole server is down, or there is a problem with setting up connection for a while? I'll write some performance tests and then check memory. But previously I wanted to have some point to start

jacobhub commented 6 years ago

It happened again and now I see it seems to be blocking in ConversationRepository findBy. I have a CustomJoinConversation SignalHandler :

package app.services.webrtc;

import org.eclipse.jetty.util.log.Log;
import org.nextrtc.signalingserver.cases.CreateConversation;
import org.nextrtc.signalingserver.cases.SignalHandler;
import org.nextrtc.signalingserver.domain.Conversation;
import org.nextrtc.signalingserver.domain.InternalMessage;
import org.nextrtc.signalingserver.domain.Signals;
import org.nextrtc.signalingserver.exception.Exceptions;
import org.nextrtc.signalingserver.exception.SignalingException;
import org.nextrtc.signalingserver.repository.ConversationRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.inject.Inject;
import java.util.Optional;

import static org.nextrtc.signalingserver.exception.Exceptions.CONVERSATION_NOT_FOUND;
import static org.nextrtc.signalingserver.exception.Exceptions.MEMBER_IN_OTHER_CONVERSATION;

@Component("joinwithreject")
public class CustomJoinConversation implements SignalHandler {
    static Logger log = LoggerFactory.getLogger(CustomJoinConversation.class);
    public ConversationRepository conversations;
    private CreateConversation createConversation;

    @Inject
    public CustomJoinConversation(ConversationRepository conversations,
                                  CreateConversation createConversation) {
        this.conversations = conversations;
        this.createConversation = createConversation;
    }

    public void execute(InternalMessage context) {

        log.debug("joinwithreject start: " + context.toString());

        conversations.findBy(context.getFrom())
                .map(Conversation::getId)
                .map(MEMBER_IN_OTHER_CONVERSATION::exception)
                .ifPresent(SignalingException::throwException);

        log.debug("joinwithreject: " + context.toString());
        Optional<Conversation> conversation = findConversationToJoin(context);
        if (conversation.isPresent()) {
            log.debug("joinwithreject: found conversation and joining");
            conversation.get().join(context.getFrom());
        } else {
            Exception exception = new Exception();
            log.debug("joinwithreject: conversation not found " + context.getContent());
            Exceptions conversationNotFound = CONVERSATION_NOT_FOUND;

            CustomJoinSignalingException customJoinSignalingException = new CustomJoinSignalingException(CONVERSATION_NOT_FOUND);
            customJoinSignalingException.setConversation(context.getContent());
            throw customJoinSignalingException;

        }
    }

    private Optional<Conversation> findConversationToJoin(InternalMessage message) {
        return conversations.findBy(message.getContent());
    }

}

And the logs shoe that it reached the line : log.debug("joinwithreject start: " + context.toString()); But not : log.debug("joinwithreject: " + context.toString());

Maybe findBy is blocking ?

mslosarz commented 6 years ago

Thanks for details. It might be the reason. I have to check happens-before relationship. Thanks for your involvement.

06.12.2017 12:38 "jacobhub" notifications@github.com napisał(a):

It happened again and now I see it seems to be blocking in ConversationRepository findBy. I have a CustomJoinConversation SignalHandler :

package app.services.webrtc;

import org.eclipse.jetty.util.log.Log; import org.nextrtc.signalingserver.cases.CreateConversation; import org.nextrtc.signalingserver.cases.SignalHandler; import org.nextrtc.signalingserver.domain.Conversation; import org.nextrtc.signalingserver.domain.InternalMessage; import org.nextrtc.signalingserver.domain.Signals; import org.nextrtc.signalingserver.exception.Exceptions; import org.nextrtc.signalingserver.exception.SignalingException; import org.nextrtc.signalingserver.repository.ConversationRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component;

import javax.inject.Inject; import java.util.Optional;

import static org.nextrtc.signalingserver.exception.Exceptions.CONVERSATION_NOT_FOUND; import static org.nextrtc.signalingserver.exception.Exceptions.MEMBER_IN_OTHER_CONVERSATION;

@Component("joinwithreject") public class CustomJoinConversation implements SignalHandler { static Logger log = LoggerFactory.getLogger(CustomJoinConversation.class); public ConversationRepository conversations; private CreateConversation createConversation;

@Inject
public CustomJoinConversation(ConversationRepository conversations,
                              CreateConversation createConversation) {
    this.conversations = conversations;
    this.createConversation = createConversation;
}

public void execute(InternalMessage context) {

    log.debug("joinwithreject start: " + context.toString());

    conversations.findBy(context.getFrom())
            .map(Conversation::getId)
            .map(MEMBER_IN_OTHER_CONVERSATION::exception)
            .ifPresent(SignalingException::throwException);

    log.debug("joinwithreject: " + context.toString());
    Optional<Conversation> conversation = findConversationToJoin(context);
    if (conversation.isPresent()) {
        log.debug("joinwithreject: found conversation and joining");
        conversation.get().join(context.getFrom());
    } else {
        Exception exception = new Exception();
        log.debug("joinwithreject: conversation not found " + context.getContent());
        Exceptions conversationNotFound = CONVERSATION_NOT_FOUND;

        CustomJoinSignalingException customJoinSignalingException = new CustomJoinSignalingException(CONVERSATION_NOT_FOUND);
        customJoinSignalingException.setConversation(context.getContent());
        throw customJoinSignalingException;

    }
}

private Optional<Conversation> findConversationToJoin(InternalMessage message) {
    return conversations.findBy(message.getContent());
}

}

And the logs shoe that it reached the line : log.debug("joinwithreject start: " + context.toString()); But not : log.debug("joinwithreject: " + context.toString());

Maybe findBy is blocking ?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/mslosarz/nextrtc-signaling-server/issues/27#issuecomment-349614241, or mute the thread https://github.com/notifications/unsubscribe-auth/ADysRdpP-MNBC8jF7tF0nBuRFXT2t3cqks5s9nzLgaJpZM4QznsE .

jacobhub commented 6 years ago

Any news on this ? Do you have an idea how I can debug and find the reason ?

jacobhub commented 6 years ago

seems to be blocked here: return member != null && members.contains(member); ( MeshConversation.class line 63 )

mslosarz commented 6 years ago

First please use version 1.0.0-RC2 I've tried with 8 concurrent clients. They were joining to conversation exchange spds and leave conversation. I couldn't recreate issue. I was using 1.0.0-RC2

14.12.2017 12:04 AM "jacobhub" notifications@github.com napisał(a):

seems to be blocked here: return member != null && members.contains(member); ( MeshConversation.class line 63 )

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/mslosarz/nextrtc-signaling-server/issues/27#issuecomment-351553819, or mute the thread https://github.com/notifications/unsubscribe-auth/ADysRWwCVkk9NCs99YcVz8QhH57HME1kks5tAFgBgaJpZM4QznsE .

jacobhub commented 6 years ago

Tested also with 1.0.0-RC3-SNAPSHOT, same issue Seems that for some reason the ConcurrentHashSet 'contains' is blocking. I have written

a python script to test , and can easily reproduce the issue.

`#!/usr/bin/env python

import asyncio import websockets import json

async def hello(i,uri,isCreate): async with websockets.connect(uri) as websocket:

    if isCreate:
        x = {
            'from':'',
            'to':'',
            'signal':'create',
            'content':str(i)

        }
    else:
        x = {
            'from':'',
            'to':'',
            'signal':'join',
            'content':str(i)

        }

    s = json.dumps(x);
    await websocket.send(s)

loop = asyncio.get_event_loop()

tasks = [] for i in range(200): t = asyncio.ensure_future(hello(i,'ws://localhost:8080/signaling',True)) tasks.append(t) t = asyncio.ensure_future(hello(i,'ws://localhost:8080/signaling',False)) tasks.append(t)

loop.run_until_complete(asyncio.wait(tasks)) loop.close() `

mslosarz commented 6 years ago

Great I'll run it.

14.12.2017 10:34 AM "jacobhub" notifications@github.com napisał(a):

Tested also with 1.0.0-RC3-SNAPSHOT, same issue Seems that for some reason the ConcurrentHashSet 'contains' is blocking. I have written

a python script to test , and can easily reproduce the issue.

`#!/usr/bin/env python

import asyncio import websockets import json

async def hello(i,uri,isCreate): async with websockets.connect(uri) as websocket:

if isCreate:
    x = {
        'from':'',
        'to':'',
        'signal':'create',
        'content':str(i)

    }
else:
    x = {
        'from':'',
        'to':'',
        'signal':'join',
        'content':str(i)

    }

s = json.dumps(x);
await websocket.send(s)

loop = asyncio.get_event_loop()

tasks = [] for i in range(200): t = asyncio.ensure_future(hello(i,'ws://localhost:8080/signaling',True)) tasks.append(t) t = asyncio.ensure_future(hello(i,'ws://localhost:8080/signaling',False)) tasks.append(t)

loop.run_until_complete(asyncio.wait(tasks)) loop.close() `

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/mslosarz/nextrtc-signaling-server/issues/27#issuecomment-351657330, or mute the thread https://github.com/notifications/unsubscribe-auth/ADysRdHsh07CwuDcmsDzLtEzY8RbymN5ks5tAOudgaJpZM4QznsE .

jacobhub commented 6 years ago

Can you think of a reason for that line to block?

jacobhub commented 6 years ago

ok, I have some more clues, I think what is blocking is the entire function as its synchronized on the object, and there is another thread in 'join' method that is blocked because of the RemoteEndpoint flushBatch in 'sendJoinedFrom' Conversation.class line 58 If I remove 'getRemotePeer().flushBatch();' line 51, from InternalMessage the deadlock doesnt happen. Do you see any reason why not to remove it ?

mslosarz commented 6 years ago

There was an issue. Messages sometimes wasnt sent to client at all because they stay in buffer. There is an issue for that (please se in closed issues). I have to read a little bit more about it... :/

14.12.2017 3:52 PM "jacobhub" notifications@github.com napisał(a):

ok, I have some more clues, I think what is blocking is the entire function as its synchronized on the object, and there is another thread in 'join' method that is blocked because of the RemoteEndpoint flushBatch in 'sendJoinedFrom' Conversation.class line 58 If I remove 'getRemotePeer().flushBatch();' line 51, from InternalMessage the deadlock doesnt happen. Do you see any reason why not to remove it ?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/mslosarz/nextrtc-signaling-server/issues/27#issuecomment-351732915, or mute the thread https://github.com/notifications/unsubscribe-auth/ADysRTrkKBln6BoHfc1RDIK7LNa7ZFeaks5tATZEgaJpZM4QznsE .

jacobhub commented 6 years ago

yes, I remember that issue. What if we run the flush from a thread pool ? something like this:

public void send() {
        if (signal != Signal.PING) {
            log.info("Outgoing: " + toString());
        }
        getRemotePeer().sendObject(transformToExternalMessage());

        threadExecutor.execute(new FlushRunnable(this));
//        try {
//            getRemotePeer().flushBatch();
//        } catch (IOException e) {
//            log.debug("Unable to send message: " + transformToExternalMessage() + " error on flush!");
//        }
    }

This also seems to solve the deadlock.

( how can I pass the ExecutorService to InternalMessage ? for testing I used a new local CachedThreadPool )

mslosarz commented 6 years ago

It'll be hard to pass it just like that... Hipefully I'll figure out something tomorrow.

14.12.2017 10:52 PM "jacobhub" notifications@github.com napisał(a):

yes, I remember that issue. What if we run the flush from a thread pool ? something like this:

public void send() { if (signal != Signal.PING) { log.info("Outgoing: " + toString()); } getRemotePeer().sendObject(transformToExternalMessage());

    threadExecutor.execute(new FlushRunnable(this));

// try { // getRemotePeer().flushBatch(); // } catch (IOException e) { // log.debug("Unable to send message: " + transformToExternalMessage() + " error on flush!"); // } }

This also seems to solve the deadlock.

( how can I pass the ExecutorService to InternalMessage ? for testing I used a new local CachedThreadPool )

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/mslosarz/nextrtc-signaling-server/issues/27#issuecomment-351847276, or mute the thread https://github.com/notifications/unsubscribe-auth/ADysRaowxLJdCuF_0sY20W8DNkaI1eHWks5tAZi5gaJpZM4QznsE .

mslosarz commented 6 years ago

I've read about websocket session and I'll use basic endpoint instead of async. In that configuration metod send must be synchronized and some retry mechanism will be nice to have. I'll prepare a branch with that changes. It's nice point to introduce performance tests (gatling)

mslosarz commented 6 years ago

Could you please check is it working for you? https://github.com/mslosarz/nextrtc-signaling-server/tree/fixed_signals_stop_working_issue_27

jacobhub commented 6 years ago

this also hangs, on 'sendObject' Why not send on new thread? that seems to work... ( just need to use the same ExecutorService for all operations )

mslosarz commented 6 years ago

I've check it. I've created some code to test lib in real environment. I found issue described by you. But after modification I wasn't able to reproduce issue. I'll finish that performance tests and I'll release version RC3. I suspect that you've used old version during second testing session

jacobhub commented 6 years ago

I retested, I am using the new version ( commit b8d2e2ac ) and still getting the deadlock. It is blocking on

this.getRemotePeer().sendObject(this.transformToExternalMessage()); ( InternalMessage.java line 49)

jacobhub commented 6 years ago

btw, I am testing with tomcat 8 , maybe that's the difference ?

mslosarz commented 6 years ago

If you can please provide me full version of tomcat and in what configuration are you using nextrtc (standalone or with spring). I'm working on branch I've found a lot of small issues (mainly connected with race conditions in joining and with overhelmet websockets.

23.12.2017 20:03 "jacobhub" notifications@github.com napisał(a):

btw, I am testing with tomcat 8 , maybe that's the difference ?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/mslosarz/nextrtc-signaling-server/issues/27#issuecomment-353742738, or mute the thread https://github.com/notifications/unsubscribe-auth/ADysRaRLnz97ggd6OVGvBhGtsK8yYJcwks5tDU5sgaJpZM4QznsE .

jacobhub commented 6 years ago

tomcat 8.0.39 nextrtc with spring

mslosarz commented 6 years ago

I've release version 1.0.0-RC3 with a lot of changes according to your issue. Could you please test it in your environment. (I've introduced PerformanceTest where you can test your local server setup)

jacobhub commented 6 years ago

1.0.0-RC3 looks great so far, I will continue testing

jacobhub commented 5 years ago

Hi , I know this is an old issue, but it seems to still be a problem in RC6