Closed amamh closed 8 years ago
To be clear, the code you have now (below) does not work?
from orleans-PipeStreamProvider/Client/Program.cs
public async Task TestObserver.Subscribe()
{
var ccGrain = GrainClient.GrainFactory.GetGrain
var details = await ccGrain.GetStreamDetails();
var providerName = details.Item1;
var nameSpace = details.Item2;
var guid = details.Item3;
var provider = GrainClient.GetStreamProvider(providerName);
var stream = provider.GetStream<int>(guid, nameSpace);
await stream.SubscribeAsync(this, null);
}
@jason-bragg
Yes.
This is the third time I've raised this request. Please add tracing or debug information to your adapter to ensure that data is actually flowing through your adapter.
Unless I'm missing something, zeromq us not a persistent queue. It's a message queue library built over sockets, and you're adapter has hard coded socket addresses. Can you verify that, using your adapter layer only, you can send messages between client and host?
In general, if you believe you've found an issue in the streaming framework, it should be reproducible with the azure or sms stream providers. What would really help is if you could submit a test to our streaming tests that reproduces the issue you are having.
If there is an issue with zmq, how come it works fine when I use an observer grain instead of a normal object?
When debugging, I can see that the messages reach the server fine and it can read them in the receiver, but they never reach the consumer app.
or sms stream providers
I thought the sms stream provider doesn't using the same mechanism as the persistent stream provider? am I mistaken?
I'm going to replace zmq with Redis and add logging everywhere as you suggested.
"If there is an issue with zmq, how come it works fine when I use an observer grain instead of a normal object?"
I'm not saying there is anything wrong with zmq. I was questioning the hard coded address. Those would make more sense if you were talking to some persistent queue service. They make less sense in a distributed technology like Orleans where you have no control over which silo is responsible for processing events from a queue.
"I thought the sms stream provider doesn't using the same mechanism as the persistent stream provider? am I mistaken?"
They share a lot of code, especially regarding the pub-sub system and grain extensions. However, you are correct, there are significant differences, like the use of queues and the pulling agent.
"When debugging, I can see that the messages reach the server fine and it can read them in the receiver, but they never reach the consumer app."
Good info. So you can send data, receive it on the silo, then something goes wrong in the handshake. Because we retry the GetSequenceToken calls (the call that goes to the consumer) we lose any specific exceptions. That's a bit of a bummer.
I think I've enough info to reproduce this. If I can't I'll have more questions.
@jason-bragg
Great. I'm also redoing it using Redis instead, so at least I'm using a "real" persistent queue that I can debug independently, and for fun...
Hmm. I was unable to repro.
Please take a look at: Added tests to subscribe from streams (SMS and AQ) from client. #1023
Does that accurately reproduce your scenario?
@jason-bragg
Ok, I will. Also, I've just replaced zmq with Redis, and I got the exact same result i.e. using an observer grain works but an observer client doesn't work.
@jason-bragg
You once mentioned that I cannot serialise the stream. This was when I was trying to get the stream from the producer grain var stream = await dataGrain.GetStream()
. What would I need to do to make it serialisable? I want to try to do that instead of using the provider on the client side.
To be clear, the stream 'is' serializable, but we have no tests around using the serialized stream. It 'should' work, but due to lack of testing, my confidence is not high.
We have done testing around using streams on the client, using the stream acquired from the provider (though not on GitHub, hence my adding them) so I suggested doing it that way. If it had worked it would indicate that there is something wrong with using serialized streams, and then we could look into that. Since it didn't resolve the issue, the cause is still unknown.
Once we've gotten to the root of the client subscription issue you are having, you can try using the serialized stream again, and see if it works. If it does not, we may investigate that, but at this point it is unclear if that is a desirable requirement of the streaming API.
As for the client streaming issue: Unless you find a deviation in the added tests, I think I'll need silo and client logs to track this further. You've asserted that the adapter is performing as expected. The tests indicate that subscribing to streams using Orleans clients works for persistent stream providers. Those to facts should be sufficient to expect the end to end solution to work.. More information is needed to track what is going wrong.
@jason-bragg
I see.
Wouldn't it be easier to simply run the sample I have on my repo than going through logs? If not, then I'll produce the logs you need and post them.
I don't really have the bandwidth to build, deploy and run/debug other's apps. I'm familiar with the logs Orleans generates, so it's far more efficient for me to just review your logs. In general, you'll need to have sufficient application logging an monitoring for your service anyway, so setting up service logs should not be wasted work on your part.
@jason-bragg
Here we go: Client: http://pastebin.com/knzzAkAi Producer: http://pastebin.com/k0dd83xe Server: http://pastebin.com/GS1Zp4s2
Also, source: https://github.com/amamh/orleans-PipeStreamProvider/tree/redis
You can see that the server is somehow reporting message delivery, even though the client doesn't receive anything.
[2015-11-14 00:34:52.650 GMT 13 VERBOSE -1 PipeStreamProvider 0.0.0.0:11111] MessagesDeliveredAsync: Delivered 1, last one has token [EventSequenceToken: SeqNum=18, EventIndex=0]
Producer and consumer are on the same machine and using the same port?
@jason-bragg
...... what do you mean by using the same port? they are on the same machine indeed.
Client:
<ClientConfiguration xmlns="urn:orleans">
<GatewayProvider ProviderType="Config"/>
<Gateway Address="localhost" Port="30000"/>
<Statistics MetricsTableWriteInterval="30s" PerfCounterWriteInterval="30s" LogWriteInterval="300s" WriteLogStatisticsToTable="true"/>
<Tracing DefaultTraceLevel="Verbose" TraceToConsole="true" TraceToFile="{0}-{1}.log">
<!--<TraceLevelOverride LogPrefix="Application" TraceLevel="Info" />-->
</Tracing>
<StreamProviders>
<Provider Type="Orleans.Providers.Streams.SimpleMessageStream.SimpleMessageStreamProvider" Name="SMSProvider"/>
<Provider Type="PipeStreamProvider.PipeStreamProvider" Name="PSProvider"/>
</StreamProviders>
</ClientConfiguration>
Producer:
<ClientConfiguration xmlns="urn:orleans">
<GatewayProvider ProviderType="Config"/>
<Gateway Address="localhost" Port="30000"/>
<Statistics MetricsTableWriteInterval="30s" PerfCounterWriteInterval="30s" LogWriteInterval="300s" WriteLogStatisticsToTable="true"/>
<Tracing DefaultTraceLevel="Verbose" TraceToConsole="true" TraceToFile="{0}-{1}.log">
<!--<TraceLevelOverride LogPrefix="Application" TraceLevel="Info" />-->
</Tracing>
</ClientConfiguration>
Documentation doesn't talk about the client port:
http://dotnet.github.io/orleans/Orleans-Configuration-Guide/Client-Configuration
Only the Orleans server's port
Nm, connections are bidirectional for clients...
This shows up in both client and producer logs. I wonder if their both fighting with eachother. Still investigating.
[2015-11-14 00:34:35.262 GMT 4 VERBOSE 100906 Messaging.ProxiedMessageCenter 127.0.0.1:0] Rejecting message: Request *cli/b4eb05a6@7c14068c->S127.0.0.1:30000:0TypeManagerId@S00000011 #2: Orleans.Runtime.ITypeManager:GetTypeCodeMap(). Reason = Target silo S127.0.0.1:30000:0 is unavailable
[2015-11-14 00:34:35.262 GMT 4 VERBOSE 100000 Message 127.0.0.1:0] Creating Unrecoverable rejection with info 'Target silo S127.0.0.1:30000:0 is unavailable' for Request *cli/b4eb05a6@7c14068c->S127.0.0.1:30000:0TypeManagerId@S00000011 #2: Orleans.Runtime.ITypeManager:GetTypeCodeMap() at: at Orleans.Runtime.Message.CreateRejectionResponse(RejectionTypes type, String info) at Orleans.Messaging.ProxiedMessageCenter.RejectMessage(Message msg, String reasonFormat, Object[] reasonParams) at Orleans.Messaging.ProxiedMessageCenter.RejectOrResend(Message msg) at Orleans.Messaging.ProxiedMessageCenter.SendMessage(Message msg) ...
Just a note. In the beginning before the server starts up, both of them will keep trying to connect every .5 seconds like this:
while (true)
{
try
{
GrainClient.Initialize();
break;
}
catch (Exception)
{
Task.Delay(500).Wait();
}
}
I have this bit in both the client and producer code.
It's not a sure thing, but none of our test scenarios use 0.0.0.0 for gateway or address, can you replace these in the cluster config with 'localhost', and rerun the tests.
Unfortunately I don't have a lot of experience with the silo/client configurations, so I'm mainly narrowing the disparity between your scenario and what we've tested in our test cases.
@jason-bragg
That was it...
Why would that cause a problem?
0.0.0.0 is translated to IPAddress.Any during the configuration process, whereas localhost, loopback, or 127.0.0.1 all get mapped to IPAddress.Loopback.. That appears to cause routing confusion.
@jason-bragg
I thought 0.0.0.0 should bind to all available network interfaces including loopback
Quite a useful discussion !
@amamh Please refer - ClusterConfiguration.cs (think that is what @jason-bragg is telling) -
if (addrOrHost.Equals("loopback", StringComparison.OrdinalIgnoreCase) ||
addrOrHost.Equals("localhost", StringComparison.OrdinalIgnoreCase) ||
addrOrHost.Equals("127.0.0.1", StringComparison.OrdinalIgnoreCase))
{
return loopback;
}
else if (addrOrHost == "0.0.0.0")
{
return IPAddress.Any;
}
In the Internet Protocol Version 4, the address 0.0.0.0 is a non-routable meta-address used to designate an invalid, unknown or non-applicable target.
Version: 1.0.10 -- latest stable
I made a PersistentStreamProvider with a custom adapter that retains old messages in a cache (inside IQueueCache) and is able to replay it for new clients.
My problem is that I can't subscribe to it from normal non-grain objects.
My example is as follows:
It works fine if:
dataGrain.SubscribeMe(this)
wherethis
is the consumer object. This means that the subscription happens on the server side instead of the client side.When I debug the source, I can see that it hangs in PersistentStreamPullingAgent.DoHandshakeWithConsumer:
The source code for this sample project is here: https://github.com/amamh/orleans-PipeStreamProvider/tree/netmq