Closed NetTecture closed 4 years ago
This is on me and I'll be looking into this for 3.0.
@ahsonkhan @steveharter as well since this relates to Json. Ask here is for a conceptual doc with samples.
@davidfowl I would seriously appreciate it. RIght now I feel lost based on basically ONLY function level documentation, which seriously is not really easy to understand because there are a TON of concepts hidden in there. Thanks a ton. You guys made a ton of amazing new stuff here (also because the .net core 3.0 has some amazing additional new functionality like Memory and Span, which allows a lot of nice functions) but this really is a cloud of "wtf are they doing here". As simple example - I am totally lost on how to actually call into the reading part. Do I start the reading part myself as task? There is a scheduler, there are old retired callbacks, and ZERO overview how to use this.
One of the better sources of information I had was https://blog.marcgravell.com/2018/07/pipe-dreams-part-1.html and the followup posts on the topic, that combined with some questions here got me a client implementation using pipelines. Json I didn't touch.
Thanks, Will read it. I will give you an idea where I am stuck.
There is no example about streaming. I came up with this loop:
async Task Read() {
var stream = pipeline.Reader.AsStream();
while (!token.IsCancellationRequested)
{
var json = await JsonSerializer.DeserializeAsync<StreamData>(stream, cancellationToken: token);
Console.WriteLine(json);
Callback?.Invoke(this, json);
}
}
but it totally gets stuck at the DeserializeAsync call. I can only assume this i because for some reason the Flush in the read loop:
const Int32 minBufferSize = 512;
var writer = pipeline.Writer;
while (true)
{
try {
Memory<Byte> memory = writer.GetMemory(minBufferSize);
var readResult = await wssClient.ReceiveAsync(memory, token);
if (readResult.Count == 0)
{
break;
}
//await writer.WriteAsync(memory);
writer.Advance(readResult.Count);
var flushResult = await writer.FlushAsync();
if (flushResult.IsCompleted)
{
break;
}
}
catch (Exception ex) {
Console.WriteLine(ex);
}
}
does not see a delimiter - but there is neither proper debugging support (which is really tricky given that the payload from the server may be crapping out and I need to debug this) now any sensible documentation. How am I supposed to debug a stuck stream because of a bad delimiter setting? How does that work if the delimiter is CR - which happens every line? Would the reader stream get stuck - but would the data processing still push new data into the pipeline? How the heck can I see what is in the buffers without destroying them, preferably with SOME help - any some help- in Visual Studio? In my world I do quite some stream programming and I am pretty much used to see the incoming "raw" data to fix special cases because hey, the server is not really following the standard ;(
So far I am trying to follow method by method calls, but that leads to frustration without a page describing the higher level concepts I mentioned. Scheduling is mentioned as optimized - am I supposed to read the source to see how to start the tasks? And how to debug this?
Also, a client side possibl VERY standard approach. How is multi threaded writing going to work MOST efficiently? Info: Most streams I work with want a regular ping if no activity happens. Am I supposed to wrap every write somehow in a lock just because I have a task running checking every x seconds whether y seconds (like every 5 seconds whether 45 seconds have passed) and then how do I coordinate writing this ping in case a separate request JUST happens at the same time? I do high performance streaming - but 99.9% of the stream is incoming. That said, I send pings very regularly, but when data is to be send - optimizing the threading is sort of critical but nowhere described.
It shows how to reduce the loops into two functions actually. It does not go into any control sequence. Particularly - do I start the tasks for both loops manually?
Not sure what you mean. The task starts when you call the function. Depending on what you're trying to implement, you would just call the 2 methods and wait on them to finish at some point (if you were interested in knowing when it was cleaned up),
How does the pipieline scheduler get involved here?
The scheduler is invoked whenever an asynchronous callback is triggered. When you call FlushAsync, that will invoke the schedule method so the call to ReadAsync on the other side yields.
Given the performance sensitive area here - any higher level overview is needed. So far I managed to get SOME result by starting 2 tasks for both methods manually.
What do you mean exactly? What sort of details are you looking for (this will feed into the doc I'm writing).
There is no example about streaming. I came up with this loop:
The loop would be fine if you're parsing a streaming JSON payload. That would mean you're sending payloads that look like this on the wire:
{"name":"Foo"}{"name":"Bar"}
.
I'm not sure if our serializer supports that. cc @ahsonkhan
JSON.NET has an explicit mode for streaming json (e.g. https://stackoverflow.com/questions/26601594/what-is-the-correct-way-to-use-json-net-to-parse-stream-of-json-objects)
but it totally gets stuck at the DeserializeAsync call. I can only assume this i because for some reason the Flush in the read loop:
Maybe? It's hard to tell, it might be getting stuck because the JSON payload wasn't sent? It looks like you're doing websockets so maybe an end to end sample would help here.
How am I supposed to debug a stuck stream because of a bad delimiter setting? How does that work if the delimiter is CR - which happens every line? Would the reader stream get stuck - but would the data processing still push new data into the pipeline? How the heck can I see what is in the buffers without destroying them, preferably with SOME help - any some help- in Visual Studio? In my world I do quite some stream programming and I am pretty much used to see the incoming "raw" data to fix special cases because hey, the server is not really following the standard ;(
Debugging a hanging task is pretty hard today but something we're working on to improve in the tools. As for why the everything is stuck, if you can provide a running sample I can help you debug it.
Also, a client side possibl VERY standard approach. How is multi threaded writing going to work MOST efficiently? Info: Most streams I work with want a regular ping if no activity happens. Am I supposed to wrap every write somehow in a lock just because I have a task running checking every x seconds whether y seconds (like every 5 seconds whether 45 seconds have passed) and then how do I coordinate writing this ping in case a separate request JUST happens at the same time? I do high performance streaming - but 99.9% of the stream is incoming. That said, I send pings very regularly, but when data is to be send - optimizing the threading is sort of critical but nowhere described.
I'm not sure what you're asking here.
Ok, lets go:
Calling methods. I was then getting confused on the fact that there is a scheduler somewhere. The functionality of which is never really explained. What I do right now is:
public async Task Connect(Uri uri)
{
await wssClient.ConnectAsync(uri, token);
_ = Task.Factory.StartNew(() => ProocessIncoming());
_ = Task.Factory.StartNew(() => Read());
}
which seems to be the way this is intended to be used then. This was never clear.
High Performance, Server - imagine multiple feeds getting high call numbers. Possibly significantly high. Nailing processing to individual CPU cores may be advantageous (cache coherency). What else is the need of the scheduler than this?
They are not. I never get the call back from Read.
The read loop is blocking on the await async JsonSerializer.DeserializeAsync call. This may be something about delimiter, it may be something about anything - I simply have no idea. I can not even see the payload easily. I am also not sure the source is sending minified data in one line - it may send readable data, which is allowed under JSON.
I definitely get a reaturn from ReceiveAsync in ProcessIncoming with The ReadResult is:
| Count | 28 | int | EndOfMessage | true | bool | MessageType | Text | System.Net.WebSockets.WebSocketMessageType
which indicates the initial payload. The ending of the byte array - position 27 - is ASCII 125, which would indicate }. EndOfMessage is given, too.
I then do:
//await writer.WriteAsync(memory);
writer.Advance(readResult.Count);
var flushResult = await writer.FlushAsync();
but I never get the call in the read loop. No visualization supported of the Memory, making sensible debugging quite complex.
If I replace the read loop with a simple debugging info:
var stream = pipeline.Reader.AsStream();
while (!token.IsCancellationRequested)
{
var result = await pipeline.Reader.ReadAsync(token);
Console.WriteLine(result);
}
then actually I DO get a result that has 28 bytes - so the pipeline forwarding here works.
How am I supposed to use AsStream then? But yeah, it looks like pipelines is working, Time for another ticket for the Json possibly, though I see those often connected.
I'm not sure what you're asking here.
Imagine the sending pipeline. I am supposed to send a ping to the server every X seconds (lets say 60) if I have not sent anything else. Given a high performance multi threaded nature and the ultimately not thread safe send method, I would more or less use a lock to make sure only one element is ever sent at the same time, so the ping never mixes with any other sending. To my knowledge this is pretty standard particularly in web sockets because they lack a decent way to see that the other side is still there ;)
I really wish there was a visualizer for UTF8 and UTF 16 encoding on any sort of array / memory / span;
Just as note: the 28 bytes I am getting are, decoded with
var stream = pipeline.Reader.AsStream();
while (!token.IsCancellationRequested)
{
var result = await pipeline.Reader.ReadAsync(token);
var array = System.Buffers.BuffersExtensions.ToArray(result.Buffer);
var ss = Encoding.UTF8.GetString(array);
Console.WriteLine(result);
}
{"event":"info","version":1}
Valid JSON, no CR or CRLF or LF at the end. Deserializer never returns.
You don't have to call Task.Factory.StartNew
, you just take the two tasks returned, combine them into one with Task.WhenAll
and await
(or return) it 😊
Calling methods. I was then getting confused on the fact that there is a scheduler somewhere. The functionality of which is never really explained. What I do right now is:
As mentioned above, the scheduler is used to trigger asynchronous callbacks. When you call FlushAsync on the writer, it will schedule the reader using the pipe scheduler. Similarly, when there's back pressure because the writer is outpacing the reader, the flush async task will be incomplete, when it resumes, it will be scheduled by the scheduler. By default, it's using the thread pool, for more advanced scenarios you can plug in your own scheduler but that's extremely rare and advanced.
which seems to be the way this is intended to be used then. This was never clear.
Here's an example of what a websocket -> pipe adapter looks like https://github.com/aspnet/AspNetCore/blob/f676c249d25eb438ffc282edb551d86eea1d9709/src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs. There's actually a type that does what you want that's used by SignalR to implement it's protocol as part of Microsoft.AspNetCore.Http.Connections.Client but there are no docs for that either 🤣
I'd recommend using Task.Run instead:
public async Task Connect(Uri uri)
{
await wssClient.ConnectAsync(uri, token);
_ = Task.Run(() => ProocessIncoming());
_ = Task.Run(() => Read());
}
You can also keep track of those tasks to make sure then things shutdown and unwind properly. See the code I linked to above which has a StartAsync and StopAsync. The tasks are stored in StartAsync and StopAsync makes sure that they run to completion.
High Performance, Server - imagine multiple feeds getting high call numbers. Possibly significantly high. Nailing processing to individual CPU cores may be advantageous (cache coherency). What else is the need of the scheduler than this?
This is the wrong level of abstraction, if you're talking about managing your own threads then you can absolutely do that but it's really nothing to do with pipelines directly. All of the APIs you're using schedule on the thread pool which you can't configure. The most you can do is control the readasync and flushasync callbacks. Which should be fine or most cases. I wouldn't even be concerned with custom schedulers until you figure out why the thing isn't working to begin with.
The read loop is blocking on the await async JsonSerializer.DeserializeAsync call. This may be something about delimiter, it may be something about anything - I simply have no idea. I can not even see the payload easily. I am also not sure the source is sending minified data in one line - it may send readable data, which is allowed under JSON.
I see, it's hard to tell since that loop is inside of the serializer call itself. One way to see the bytes would be to create a wrapping Stream that logs the bytes being read. That'll help you debug to understand if a delimiter is missing (or something unexpected is happening).
but I never get the call in the read loop. No visualization supported of the Memory, making sensible debugging quite complex.
You may be able to look at the .Span property in the debugger but we should make sure we have a debugger visualization for Memory<byte>
(I thought we did) cc @stephentoub @ahsonkhan.
How am I supposed to use AsStream then? But yeah, it looks like pipelines is working, Time for another ticket for the Json possibly, though I see those often connected.
AsStream
is just an adapter for using APIs that take Stream when you have a pipe (like the JSON serializer).
Imagine the sending pipeline. I am supposed to send a ping to the server every X seconds (lets say 60) if I have not sent anything else. Given a high performance multi threaded nature and the ultimately not thread safe send method, I would more or less use a lock to make sure only one element is ever sent at the same time, so the ping never mixes with any other sending. To my knowledge this is pretty standard particularly in web sockets because they lack a decent way to see that the other side is still there ;)
That's up to you and your application protocol. Pipelines are single writer/reader so you need to synchronize access to the writer if you have different threads writing (pings and data).
PS: SignalR basically implements everything you're trying to do here BTW.
I really wish there was a visualizer for UTF8 and UTF 16 encoding on any sort of array / memory / span;
This is a good suggestion and worth filing another bug for.
I just noticed, the JSON serializer can't work here because it assumes the entire stream is a payload. There's no way to read a stream of JSON this way. The caller is responsible for knowing when a frame starts and ends and passing those bytes to the JsonSerializer. You may be better off not using pipelines here since websockets already has framing and if you can use EndOfMessage to know when the payload is finished.
To make this work with the serializer, we'd need to add a new method like DeserializeSingleAsync
or have an akin to JSON.NET's SupportMultipleContent
(cc @JamesNK ).
Yeah, looks like. It would be appreciated because there are quite some advantages for using a high performance json parser.
@davidfowl Sorry, I sort of need pipelines. As per the WSS client documentation, a message can be split over multiple frames ;( That said, I do not need the pipelines mechanism to FIND the end, I can push that in (i.e. Advance(result,Count, messageComplete: true) as overload to avoid the search for end). The WebSocket receiver returns basically: https://docs.microsoft.com/en-us/dotnet/api/system.net.websockets.websocketreceiveresult?view=netframework-4.8
Then you just need to have your own delimiter that you find, then pass each buffer to the Deserializer separately
Why? I mean, I am putting buffers in - I should be able to say that a specific buffer completes a message in an overload, including telling Pipelines that "no, do not look for a delimiter, I handle this and basically use your PIPELINE functionality".
Why? I mean, I am putting buffers in - I should be able to say that a specific buffer completes a message in an overload, including telling Pipelines that "no, do not look for a delimiter, I handle this and basically use your PIPELINE functionality".
Pipelines is just for streaming data not framed data, buffers are merged together when you write them to the PipeWriter so when you read from the PipeReader on the other side, the lengths may not match up. You need to treat the PipeReader like an arbitrary Stream of data.
If you'd like to preserve the frames it may be better to use a Channel<T>
where each frame is written to the channel.
@davidfowl This assumes that frames preserve. Data may be split into multiple logical reads, so from the reader I may get multiple buffers filled. A frame will never CROSS a read, but a frame may consisst of multiple reads.
i.e. you get a large frame, which you read in multiple reads (i.e. from socket). See, tcp packets are limited (talking raw socket) to aroiund 1500 bytes on the internet (no jumbo packets there). So, you send a large data payload it may ge tsplit into ethernet packages which all get possibly returned speparately by the reader.
BUT: an ethernet packet will never contain data for 2 frames.
WebSocket uses soething similar ;)
So, there is no "in data" delimiter, but the reading framework tells mea frame is complete.
Right now I have to insert a fake delimiter (which is quite doable, either ascii 0 or ascii 13) to allow the parsing. THis particualr case could actually handle this without parsing the payload for the delimiter ;)
WebSockets are framed so yes you can use those frames as your frames. If you ignore those frames then you need a frame within the data since you're throwing away the websocket frames.
So, there is no "in data" delimiter, but the reading framework tells mea frame is complete.
Right now I have to insert a fake delimiter (which is quite doable, either ascii 0 or ascii 13) to allow the parsing. THis particualr case could actually handle this without parsing the payload for the delimiter ;)
This is what SignalR does, though it isn't fake, it's just a delimiter that works across transports (websockets, tcp, http etc).
I just noticed, the JSON serializer can't work here because it assumes the entire stream is a payload. There's no way to read a stream of JSON this way
Yep. That's the current behavior. We'd need to add a feature to support this in vNext.
The loop would be fine if you're parsing a streaming JSON payload. That would mean you're sending payloads that look like this on the wire:
{"name":"Foo"}{"name":"Bar"}
.I'm not sure if our serializer supports that.
The serializer doesn't support reading more than a single JSON value (when reading from a stream). Here's a related issue: https://github.com/dotnet/corefx/issues/39843
The caller would have to use the low-level utf8jsonreader to figure out when a single JSON value ends, and pass only that data to the serializer. If the caller can buffer the data from the stream/guarantee that more than 1 semantic json value has been read from the stream, then you could create a reader around that data and pass it to the serializer.
The serializer API that accepts a Utf8JsonReader
only reads one JSON value.
byte[] json = Encoding.UTF8.GetBytes("{\"name\":\"Foo\"}{\"name\":\"Bar\"}");
MemoryStream stream = new MemoryStream(json);
// Some buffering mechanism (could include reading from the stream in chunks)
// Using ToArray() just as a short-hand
byte[] buffer = stream.ToArray();
var reader = new Utf8JsonReader(buffer);
// Stops reading after the first JSON value
Temp obj1 = JsonSerializer.Deserialize<Temp>(ref reader);
Assert.Equal("Foo", obj1.name);
Assert.Equal(14, reader.BytesConsumed);
// Or keep reading more data to get the next chunk
reader = new Utf8JsonReader(buffer.AsSpan((int)reader.BytesConsumed));
// Now you can read the next JSON value
Temp obj2 = JsonSerializer.Deserialize<Temp>(ref reader);
Assert.Equal("Bar", obj2.name);
Assert.Equal(14, reader.BytesConsumed);
but I never get the call in the read loop. No visualization supported of the Memory, making sensible debugging quite complex.
You may be able to look at the .Span property in the debugger but we should make sure we have a debugger visualization for
Memory<byte>
(I thought we did)
We only have it for Memory<char>
and potentially for char8/utf8string. We don't have debugger view for Memory<byte>
primarily because there is no indication the text is UTF-8 encoded.
https://github.com/dotnet/corefx/blob/e33622a69ca6eb408b61115281d14e6cd6a193a5/src/Common/src/CoreLib/System/Memory.cs#L223-L238
I really wish there was a visualizer for UTF8 and UTF 16 encoding on any sort of array / memory / span;
Yes, let's split that off as a separate issue/discussion. If there was some indication from the T what is the encoding of the data, then that is doable. For example, we end up using char to indicate UTF-16. The concern is using byte for UTF-8 may not work since you could just have arbitrary binary data (rather than text), and then what would be displayed for that?
Time for another ticket for the Json possibly, though I see those often connected.
Yes, please. If you could provide a simplified sample where you observe an issue, that would be really helpful, particularly if you think its a bug.
Yes, let's split that off as a separate issue/discussion. If there was some indication from the T what is the encoding of the data, then that is doable. For example, we end up using char to indicate UTF-16. The concern is using byte for UTF-8 may not work since you could just have arbitrary binary data (rather than text), and then what would be displayed for that?
It is possible to have multiple visualizers for a single type?
Yes, please. If you could provide a simplified sample where you observe an issue, that would be really helpful, particularly if you think its a bug.
I filed something similar https://github.com/dotnet/corefx/issues/38581
I don't think so looking at how the DebuggerDisplayAttribute
works. Looks like we only have the ability to control the one string that shows up. We could try to treat it as UTF-8 encoded bytes, and if it isn't valid, fall back to the binary array view. I wanted to avoid things like replacement characters showing up for binary data (that isn't valid UTF-8 text):
We can do something with DebuggerTypeProxy
. We could consider adding multiple "views" in the debugger display over the same data (say, have properties for utf8 text, string, raw bytes, hex view, etc.).
We can do something with
DebuggerTypeProxy
. We could consider adding multiple "views" in the debugger display over the same data (say, have properties for utf8 text, string, raw bytes, hex view, etc.).
Yes, please, that would be awesome. It's probably best to remove the existing DebuggerBrowsableState.RootHidden
attribute and just keep it as a separate property though:
Also, what do you show for non-byte types? It would be nice if you could have more specific debugger type proxies for closed generic types.
Moving to 5.0. We still want to get these docs added but don't need to track as an issue for 3.0.
This is done now https://docs.microsoft.com/en-us/dotnet/standard/io/pipelines. If you'd like to see more please file an issue on the docs.
and particularly lacking in regards to the new Json library in System.Text.Json. I am stumbling around for an hour now trying to get it to work, and all I do actually find are the namespace method for method documentation - not a single linked example or background info page. This is particualrly bad as I am writing a CLIENT (not a server as most examples seem to assume) and I need possibly to adjust some parameters for the json stream.
I am NOT asking for help here - I understand that this is what you prefer on Stackoverflow. I just point out that a method per method documentation without any basic example in text in there is not enough to get those 2 libraries working without background information. Going into the soruce also does not work - that is what documentation is for.
Please add more documentation on this. Particularly focus on the json interaction AND on customization for various json streams. I understand in the clasic ASP.NET scenario not everyone will have a need for that - you just use the deserialized objects, But i.e. some of us have to make it work on a client that uses Websocket (i.e. my dotnet core desktop client connects to a websocket feed by a third party) and then documentation is lacking.