Open fofewyoung opened 5 years ago
Hi, May I check if you read my questions please? Maybe I know the reason, but i have no idea to fix it simply, in dotnet-etcd, each watch operation has its own gRPC stream, but cancel watch operation should use the streaming of watch. cancel operation works after i hack the code to let all watch* share a single streaming. Could you please help me deal with it?
Hi, May I check if you read my questions please? Maybe I know the reason, but i have no idea to fix it simply, in dotnet-etcd, each watch operation has its own gRPC stream, but cancel watch operation should use the streaming of watch. cancel operation works after i hack the code to let all watch* share a single streaming. Could you please help me deal with it?
Hi, I have read your question. Will get back once I reproduce the issue and if a problem exists, will fix it.
Do let me know about your findings and how did you modify the code to get it working.
but cancel watch operation should use the streaming of watch.
Strange. What would be the use of watchId
then. Need to dig a bit more in this.
hi, this is my hack code : it is not thread safe, and not graceful
It should be noted that Annotated code: // await watcher.RequestStream.CompleteAsync();
public partial class EtcdClient : IDisposable
{
// the share streaming
AsyncDuplexStreamingCall<WatchRequest, WatchResponse> myWatchStreaming = null;
AsyncDuplexStreamingCall<WatchRequest, WatchResponse> GetWatchStreaming()
{
if (myWatchStreaming == null)
myWatchStreaming = _balancer.GetConnection().watchClient.Watch(null);
return myWatchStreaming;
}
// unwatch operation
public async void UnWatch(long wid)
{
WatchRequest req = new WatchRequest
{
CancelRequest = new WatchCancelRequest
{
WatchId = wid
}
};
await GetWatchStreaming().RequestStream.WriteAsync(req);
}
#region Watch Key
/// <summary>
/// Watches a key according to the specified watch request and
/// passes the watch response to the method provided.
/// </summary>
/// <param name="request">Watch Request containing key to be watched</param>
/// <param name="method">Method to which watch response should be passed on</param>
public async void Watch(WatchRequest request, Action<WatchResponse> method, Metadata headers = null)
{
bool success = false;
int retryCount = 0;
while (!success)
{
try
{
var watcher = GetWatchStreaming();
{
Task watcherTask = Task.Run(async () =>
{
while (await watcher.ResponseStream.MoveNext())
{
WatchResponse update = watcher.ResponseStream.Current;
if (update.Canceled)
break;
method(update);
}
});
await watcher.RequestStream.WriteAsync(request);
//await watcher.RequestStream.CompleteAsync(); // prevent streaming to be closed
await watcherTask;
}
success = true;
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable)
{
retryCount++;
if (retryCount >= _balancer._numNodes)
{
throw ex;
}
}
}
}
}
Thanks. I will look into this. A bit of a busy week, may not be able to come up with a solution soon enough.
Update: I was able to replicate the issue. Still working on the solution.
Lets take an example where we have an etcd cluster and an application using this client. Suppose I have 3 instances of my application behind a load balancer and my application connects to etcd using this client. Now if I a make an api call to my application via loadbalancer for cancel watch request, I would never be able to gurantee for the request to land on the same server from where the watch was started making it difficult to cancel the watch. Will have to look into it why the watch ID is not being honored by etcd.
I will take this up ahead with etcd team.
The watchId exists to manage different watches on the same stream to the server and is only scoped to that stream. See this discussion here.
What I assume is that if you close the response stream (or maybe both request and response stream but in this client the request stream is always closed after sending the initial requests) the watch is also cancelled, but I could not confirm this assumption yet.
So probably, we will have to maintain request streams on our end. Will try to re-design the watch implementation here to persist request streams. The discussion also states that each stream may share the same watch ID which becomes a bit difficult to manage on client side.
@shubhamranjan I'm working on a watch manager to use in our distributed locking client. I can share the code when I have progressed a bit more. You could get some inspiration from it or we could adjust it to be included in this library if you think it fits the scope of this library.
The basic idea is as follows (WM = Watch Manager, WMS = Watch Manager Subscription):
I'm not sure if a WMS needs to track versions of the keys it receives so when the WM needs to create a new stream and resubscribe it knows what the start version is that it needs to receive, making sure the WMS only receives every version once even if we need to create a new channel.
A developer now can get a WatchManger that can issue multiple Subscriptions using a single duplex gprc stream. A Subscription will manage its own lifetime and by disposing the Subscription object the watch will be cancelled on the etcd server.
The WM will issue unique watchIds for all WMS issued by this WM.
I would like to see how you plan to achieve this in case of same etcd watch ID(s) issued in different streams. Because in case of connection exceptions, I believe the stream would be recreated and etcd issued watch ID(s) won't be valid anymore for the new stream.
Looks fine to me if its included in this library. Would make the watch client better.
I plan on making the watchId of the WM increment only (the watchId is a long so I don't think we will run out). Because the WM manages both the stream to the etcd server and the watchId-counter it can guarantee that watchIds are unique for the specific stream that it manages. Because the etcd server scopes the watchIds to a specific stream we only need to guarantee the uniqueness of the watchId in combination with a specific stream to the etcd server.
The WMS will store the watch creation data inside its object so the WM can issue new WatchCreateRequests on behalf of the WMS to restore its watch after a stream has been recreated. The recreated stream will be a new stream to the etcd server with no watches (and therefor no watchIds) linked to this stream. Because the WM will only increment the watchId I can re-use the old watchIds in the new WatchCreateRequests to create new watches with the same watchId.
Note: 2 different WM instances can issue the same watchId but that is not a problem since they would also manage their own stream to the server, so the guarantee is only needed per WM instance.
Note2: Because I keep the watch creation data in the WMS and will have to send new WatchCreateRequests when restoring the connection I think the WMS will have to store the last seen update version so we can issue a correct start version in the new WatchCreateRequests and have the same behaviour as a stream that was not recreated where all updates are delivered exactly once. For single keys this seems do-able since the WMS will receive every update for that watch from the WM but for ranged watches I'm not sure how that would work because I did not see an option to issue start versions for individual keys within a keyrange creation request.
For single keys this seems do-able since the WMS will receive every update for that watch from the WM but for ranged watches I'm not sure how that would work because I did not see an option to issue start versions for individual keys within a keyrange creation request.
Watch
and WatchRange
internally are the same api(s). The revision specified in the request should mean something whether its an individual key or a range. However, I will try these cases out on the weekend and check on the behavior.
That would be good news and should simplify the rebuild of the channel. I will start with just supporting an exact key for this moment and if that is working I will look at the WatchRange.
I have included my fork of this client as submodule in our solution so I can test its behaviour before making the PR to this repository. It will go through manual testing for our use cases and I'm hoping to add some integration-tests. These test would need a running etcd server as asserting and mocking a etcd server at the grpc network level is not something I'm planning to do in the foreseeable future.
Yeah, Automation of test cases is long pending here. I should start that soon enough now.
Is there still a solution for the cancelation problem?
Hi guys, what are the consequenses if we watch variables without canceling the watch? Also, what happens if we watch the same variable again with the same id?
@lapinbleu007 - For now, nothing is handled at the client level, the requests are passed as is to the etcd server, So each watch request would basically be a new watch request.
Describe the bug i try to cancel a watch with below code, but failed.
To Reproduce
Expected behavior input 's' input 'u' console print "cancel: ..."
Screenshots
Additional context dotnet-etcd: 3.0.0 etcd:3.4.3