CyCoreSystems / ari-proxy

NATS or RabbitMQ message bus Asterisk REST Interface proxy system implemented in Go
Other
79 stars 34 forks source link

Best practice regarding subscriptions #26

Open mtryfoss opened 4 years ago

mtryfoss commented 4 years ago

Hello, and thanks for a really nice lib!

I got a question regarding subscription of channel events. It seems like a normal channelhandle.Subscribe() adds a subscription to "ari.event.." against NATS.

On a heavily used Asterisk node that will cause a lot of duplicate MSG's from NATS which then is discarded by the client (not belonging to me..).

Is it possible to make ari-proxy publish channel-messages on a subject for that channel only?

Ulexus commented 4 years ago

It is true that there will be duplicate messages, but that is not nearly as heavy as you might think at first glance.

First, we never copy the data for the duplicates; they are always just references.

Next, we only decode each message once and unless otherwise required, we only decode it enough to determine the type of Event.

Between these two things, it is actually very little overhead to have a large number of duplicates.

Ulexus commented 4 years ago

Please do let me know if you find a bottleneck here, but we have run some quite large installations without any constraints of this type.

mtryfoss commented 4 years ago

The current setup is about 700 concurrent calls (many long-lasting), and there's some indication of exponential growth of CPU usage compared to calls. 6 processes running the app, and that is using about 80% of one CPU core. However, there might be other parts of the code causing the load.

However, I see you're stating "we only decode it enough to determine the type of Event". I have one subscription to "ari.Events.All" and filtering what I need in a switch. That might be bad.

I'll have to dig more into it. Just wanted a quick feedback :)

Ulexus commented 4 years ago

I definitely would avoid using ari.Events.All. Asterisk produces a lot of events, many of them closely clustered. Subscribe() takes multiple events as an option, and I'd use them. Also, because subscriptions on handles are scoped to those handles, I can often get away with not decoding the complete events at all: just use the channel as a trigger.

Another common issue is goroutine leaks. There are various strategies to reduce the likelihood of these, but the first thing, really, is to figure out where your load is coming from.

Are you familiar with pprof? That's a great tool for profiling your app and seeing where it is spending most of its time.

https://golang.org/pkg/net/http/pprof/ https://blog.golang.org/profiling-go-programs

Ulexus commented 4 years ago

I should also mention that we do support a concept called "dialogs." The naming comes from a previous intention of retaining compatibility with nvisibleinc's ari proxy. In our case, a dialog is really a tag which is applied to certain resources which causes the server to send events for those resources to a different NATS subject. This will have the result of reducing client traffic load on heavily-loaded machines. To use them, just set the Dialog parameter (arbitrary string) on any resource key before sending that key to the proxy server. The server will automatically register it and tag events related to it. The client will automatically switch its subscription over to the NATS subject for that dialog.

I have never actually had to use this in production, but the feature is there, and it is basically what you asked at the beginning. I'd still profile your app before diving into dialogs (which are much less tested than anything else in the package).

mtryfoss commented 4 years ago

I'm pretty sure goroutines are under control. Did a lot of QA on this subject during development.

Ran pprof now, but while the system is relatively idle.

Top 10:
      flat  flat%   sum%        cum   cum%
     910ms 11.43% 11.43%      960ms 12.06%  encoding/json.stateInString
     650ms  8.17% 19.60%     1530ms 19.22%  encoding/json.checkValid
     530ms  6.66% 26.26%     1020ms 12.81%  encoding/json.(*decodeState).scanWhile
     330ms  4.15% 30.40%      450ms  5.65%  runtime.scanobject
     270ms  3.39% 33.79%      610ms  7.66%  encoding/json.(*decodeState).skip
     250ms  3.14% 36.93%     4460ms 56.03%  encoding/json.(*decodeState).object
     240ms  3.02% 39.95%      240ms  3.02%  encoding/json.unquoteBytes
     200ms  2.51% 42.46%      260ms  3.27%  encoding/json.stateEndValue
     170ms  2.14% 44.60%      910ms 11.43%  runtime.mallocgc
     160ms  2.01% 46.61%      160ms  2.01%  runtime.futex

So, it's seems like narrowing the subscriptions is probably the first step.

I will do some more testing and try an improved version in production during the coming week.

I'm using SetVariable() several places during each call mainly to manipulate SIP headers. I guess this will wait for a ChannelVarset event before return? It seems like this is one of the causes for the massive event generation.

Ulexus commented 4 years ago

It being mostly decoding load, I would definitely trim the subscription scopes, yes. Right, ChannelVarset is the event you want.

I'm curious, though, are you actually needing something in a different goroutine or program to wait for something else to set the variable, or are you using the ChannelVarset as a confirmation of success? channel.SetVar() does provide (functional) failure indication (which is, admittedly, not universal amongst Asterisk's ARI routines).

mtryfoss commented 4 years ago

I guess I was a bit unclear. I do not need a specific confirmation in another goroutine.

I just wondered about the SetVariable()-function. Is the ChannelVarset-event used to determine success or not? If not, I cannot currently see any case where I actually need to get at notification every time a variable is changed.

I already use a heavily modified version of Asterisk, so those events could also be removed to possible reduce some load if changing subscriptions do not work.

Ulexus commented 4 years ago

No, you do not need to watch for the event to confirm success. Asterisk provides confirmation to the underlying REST call, which means we set the error if it fails.

The event is useful when you have one thing seeing a variable and another unrelated thing looking for a change.

mtryfoss commented 4 years ago

Ah, thank you!

The confirmation from the REST call drowned in all the other data. I then assume this is such a feedback from proxy to NATS/client:

PUB _INBOX.ZAbjZi4QmaQzzIkwCcLsWF.QAh7Wfs3 12. {"error":""}.

mtryfoss commented 4 years ago

..added a simple "return" in the beginning of ast_channel_publish_varset(). Reduced a lot of traffic in my case.

Will first try running an adjusted subscription during higher load tomorrow and see how that works. Else we could probably also add some sort of of filter in ari-proxy to achieve the same effect as modifying Asterisk.

Ulexus commented 4 years ago

That's it, yes (assuming that it is the response to a SetVar request, anyway).

Are you using a different client or reading the NATS data directly? If you are using the Go client in this repo, you shouldn't be needing to read the NATS data; it's interpreted by the client and returned as a nil error.

mtryfoss commented 4 years ago

I was just ngrep'ing to have a better understanding of how the communication between proxy and client works.

A normal call will set around 10 variables. Combined with all the native stuff Asterisk set during a call setup, that means a lot of events to be handled in total when call volumes go up.

I'm just thinking loud.. 500 concurrent calls. Each means at least two call-legs. They have each one subscription. Total of 1000 subscriptions. Between NATS and the Go-client that would be something like 10-15000 events being published from NATS to Go for every new call. Combine that with me decoding all of them :)

Ulexus commented 4 years ago

Decoding them is the likely the problem. The rest should really be fine. NATS is quite efficient. We have systems using this with dozens of nodes running thousands of calls, also setting lots of variables.

Ulexus commented 4 years ago

One of these days, I'm going to convert the encoding over to a more efficient one, like protobuffers (at least optionally). In most cases, nowadays, I'm sending a lot of ARI stuff over gRPC anyway... after being adapted from the JSON from ari-proxy, that is. That's why you'll find protobuf definitions for the most critical components (such as resource keys) in the main ari package.

mtryfoss commented 4 years ago

Yes, I'll see how the change behaves tomorrow. Doesn't seem to make any difference now, but that is probably because of very few calls.

Btw, there's currently no way to subscribe to a set of events for multiple channels in one single subscription?

Thanks for all the help so far!

Ulexus commented 4 years ago

Sort of. You have only a single resource key per subscription, but that resource key can be anything. So you could have a subscription to events on a particular node, a particular dialog, etc. Also, common uses, such as bridge subscriptions, include association with any number of channels (the channels who are joined to the bridge).

In general, though, it is one resource per subscription, yes.

Ulexus commented 4 years ago

Just for your reference, in case you didn't look into the structure of an ari.Key:

message Key {
    // Kind indicates the type of resource the Key points to.  e.g., "channel",
    // "bridge", etc.
    string kind = 1;

    // ID indicates the unique identifier of the resource
   string id = 2 [(gogoproto.customname) = "ID"];

    // Node indicates the unique identifier of the Asterisk node on which the
    // resource exists or will be created
   string node = 3;

    // Dialog indicates a named scope of the resource, for receiving events
   string dialog = 4;

    // App indiciates the ARI application that this key is bound to.
   string app = 5;
}
Ulexus commented 4 years ago

I really wouldn't go around trying to optimize subscription count unless you know for certain that that is the culprit... and if it is, I'd like to know about it, so that I can fix the issue structurally.

mtryfoss commented 4 years ago

Initially I tried some dialog related stuff, but ended up with keeping it as simple as possible.

I will do some more debugging, and let you know if I think there might be some room for improvement.

mtryfoss commented 4 years ago

Changing subscription type did not seem to help on CPU usage. Compared a patched and unpatched process during higher load.

Did some ngrep'ing and found that over some seconds around 300k events was received. Of them, around 200k ChannelVarset.

I'm investigation a bit more. Just wanted to let you know.

Ulexus commented 4 years ago

That seems like an absurdly high percentage of ChannelVarset events. That's for approximately what call-per-second rate?

mtryfoss commented 4 years ago

13899 StasisStart (not unique) and 192975 ChannelVarset in the dump. 20 seconds capture.

A single event was received 67 times (distributed to every goroutine listening to "something" by NATS). I've found a couple of places where there are two subscriptions pr channel, but even if you divide the numbers by two it's relatively high.

Can't really see how this is not going to happen as long as the NATS subject is only containing the node-id, and no other identifier. I know it's probably difficult changing this because of backward compatibility.

Ulexus commented 4 years ago

I wonder... are you creating a new from-scratch client connection for each call? Generally, you would create a root client and then call client.New() from there to create subsequently-scoped derived clients. Also, you are using subscriptions derived from the channel handles, right?

mtryfoss commented 4 years ago

Nope. Only one client.New() for the whole project. All messages from NATS is delivered through the same connection confirmed by trace.

It was based on one or you examples, like this: func channelHandler(h ari.ChannelHandle, startEvent ari.StasisStart) { ... subs := h.Subscribe(ari.Events.StasisEnd)

However, has there been some major bugs here fixed last months? I might be using some older version.

Ulexus commented 4 years ago

I can't recall any bugs; mostly new features.

I don't see how what you are saying reconciles with what you are measuring. That is, I'm missing something obvious. It shouldn't matter how many subscriptions you have locally, you should not get duplicated messages over NATS visible over the network or to the NATS trace logs.

Ulexus commented 4 years ago

Except... I am apparently lying. We are creating new NATS subscriptions for each. That's going to be a problem for you, yes!

Ulexus commented 4 years ago

Okay, let me see if I can get this fixed.

mtryfoss commented 4 years ago

I really appreciate your help :) And hope you'll also benefit from this.

Ulexus commented 4 years ago

@mtryfoss When you get a chance, can you try building off of the issue-26 branch to see if this alleviates your problem?

mtryfoss commented 4 years ago

Thank you! Will try tomorrow morning.

mtryfoss commented 4 years ago

I experienced an application crash when doing the first Subscribe():

panic: runtime error: invalid memory address or nil pointer dereference [signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x83190c] goroutine 33 [running]: github.com/nats-io/nats%2ego.(*EncodedConn).subscribe(0x0, 0xc000084a50, 0x22, 0x0, 0x0, 0xb6c240, 0xc00034a220, 0xbe3880, 0x1, 0xc00034a220) /root/go/src/github.com/nats-io/nats.go/enc.go:237 +0x1bc github.com/nats-io/nats%2ego.(*EncodedConn).Subscribe(0x0, 0xc000084a50, 0x22, 0xb6c240, 0xc00034a220, 0x0, 0x11, 0xc0000bd4a0) /root/go/src/github.com/nats-io/nats.go/enc.go:174 +0x62

nc pointer is nil in function newBusWrapper() in bus.go.

executing c.core.Start() before bus.New() in client.New() seem to fix the issue for me, but I don't know if that is problematic.

I suspect this might be related to using "WithURI" instead of "WithNATS" for a new connection: client.New(ctx, client.WithApplication(ariApp), client.WithURI(natsuri))

Ulexus commented 4 years ago

That's what I get for making a last-minute clean-up. Should be fixed.

mtryfoss commented 4 years ago

Reverted my changed and pulled yours. Still crashing.

Ulexus commented 4 years ago

Oh, sorry. Missed the core bus, too. I don't know how I managed to reverse those.

mtryfoss commented 4 years ago

No problem :) Seems to work now.

ngrep show that there are two subscriptions receiving events on the same subject, but it seems to be constant regardless of concurrent calls.

Will do some more extended testing and get back to you.

mtryfoss commented 4 years ago

It's been running in a low-volume node since Friday. No issues so far. Will wait some more days before testing it with a higher volume.

mtryfoss commented 4 years ago

Hi again!

Seems like there'a new memory leak introduced: flat flat% sum% cum cum% 261.41MB 88.10% 88.10% 261.41MB 88.10% github.com/CyCoreSystems/ari/stdbus.newSubscription (inline)

Total: 296.72MB ROUTINE ======================== github.com/CyCoreSystems/ari/stdbus.newSubscription in /root/go/src/github.com/CyCoreSystems/ari/stdbus/bus.go 261.41MB 261.41MB (flat, cum) 88.10% of Total . . 124: } . . 125:} . . 126: . . 127:// Events returns the events channel . . 128:func (s subscription) Events() <-chan ari.Event { 261.41MB 261.41MB 129: return s.C . . 130:} . . 131: . . 132:// Cancel cancels the subscription and removes it from . . 133:// the event bus. . . 134:func (s subscription) Cancel() {

Ulexus commented 4 years ago

I'm not surprised. I knew it was going to be a bit leaky in the current implementation. I mainly wanted to see if that fixed your scale-out problem. I'll see about getting the leaks (the ones I know about, anyway) fixed.

mtryfoss commented 4 years ago

CPU seems to be fine now. Updated version uses only 5-10% compared to unpatched. And that is after I temp removed some unnused events from Asterisk.

mtryfoss commented 4 years ago

Hi again!

I completely forgot about this case. Did you get time to have a look at the leaks, or at least give me some hint on what to check?

Ulexus commented 4 years ago

I'm afraid I did, as well. The cleanup issue (commented in the code) remains, and I don't have a solution to it right away. Probably some sort of subbus tracking, but I don't know right now.

mtryfoss commented 4 years ago

I've been looking a bit at this one myself.

It seems like you're using stdbus from the ari package for subscriptions. Because of that, the Cancel() in the stdbus/ari package is used and there is not way to remove the entry appended from this line (107 in bus.go): b.subs = append(b.subs, sub)

For me it seems like the only way would be to extract all what is needed from stdbus and implement this code locally with a specific Cancel() function to do the necessary cleanup?

edit: Is it safe to append to "b.subs" without locking?

Ulexus commented 4 years ago

Yeah, that's one of the leaks I was referring to above. The current PR is definitely more a proof of concept than a good solution, which is why it hasn't been merged.

It should be safe to append to b.subs without locking because: