confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
45 stars 857 forks source link

1.0 API Feedback #614

Closed mhowlett closed 5 years ago

mhowlett commented 6 years ago

This issue is to track community feedback on the new 1.0 API and document changes that are likely to be made before it's release. We'll continue to update this first comment to reflect open changes currently under consideration & progress.

For problems, please open another issue.

Comments / suggestions? Add them below!

Current changes under consideration for 1.0:

Review TODOs:

xqrzd commented 6 years ago

Super exciting to see it getting close to completion! Speaking of Span<T> serializers, I still haven't come up with something I'm happy with, but recently I prototyped using IMemoryOwner<byte> for the return type of ISerializer<T> here https://github.com/xqrzd/confluent-kafka-dotnet/commit/0686dcd58188bc0d8ae4233a053222ece422be50. The serializer would be responsible for allocating an IMemoryOwner<byte>, and would transfer ownership to Confluent.Kafka, according to this rule, https://gist.github.com/GrabYourPitchforks/4c3e1935fd4d9fa2831dbfcab35dffc6#rule-7-if-you-have-an-imemoryownert-reference-you-must-at-some-point-dispose-of-it-or-transfer-ownership-but-not-both and Confluent.Kafka would be responsible for disposing the IMemoryOwner<byte>.

It gives a lot of flexibility, to quote this blog post, https://blogs.msdn.microsoft.com/dotnet/2018/07/09/system-io-pipelines-high-performance-io-in-net/

This provides an extensibility point that lets you plug in more advanced allocation strategies as well as control how buffers are managed (for e.g. provide pre-pinned buffers instead of purely managed arrays).

but since IMemoryOwner<byte> is an interface, it doesn't completely avoid allocations. Even if the interface was implemented on a struct, it would get boxed when it's returned from the serializer.

ChrisProjectRepo commented 6 years ago

I've update today my software to the 1.0Experimental-2 version because I needed the AdminClient. All works like a charm except for the Consumer because I don't like the while(true) for reception. Why you remove the OnMessage event? It was moved or it is no more usable? Is there a workaround to have events when receiving messages?

Thanks in advance

mhowlett commented 6 years ago

@xXCiccioXx - first of all, check out 1.0-experimental-12, where the consume method is in it's final form and more convenient than 1.0-experimental-2. It's my favorite of all the variants we trialled.

Reasons for this change:

mhowlett commented 6 years ago

@xqrzd - thanks for your input here - you're pointing out some important things I've yet to research :-). Updating the serializer interface is in scope for 1.0 because it's a breaking change to the API, but it's a little bit down the list because it's fairly well hidden (most people won't touch this part of the API) and there are more visible things to implement / bugs to fix first. So in summary, thanks for your work, it's useful! and sorry for the delay in getting to this (but we will).

vchekan commented 6 years ago

Testing nuget 1.0.0-experimental-12

  1. I am quite skeptical about idea of dropping raw Consumer and Producer. I think it is generic one which should be dropped along with ISerializer. In my opinion idea of configuring serializer and invoking it from client guts is echo from OOP-only times, when it was too cumbersome to define yet another class to compose your serializer into GetMessage call. Nowadays when we have Linq in C#, Streams in Java and list comprehensions in Python, it is more convenient to call deserializer from Linq expression because I need zero time to answer "how to invoke deserializer function from Linq" .

ErrorCode.Local_ValueDeserialization is manifestation of this problem. There are 2 ways to handle errors, exceptions and error codes. ErrorCode.Local_ValueDeserialization combines 2. You have exception with error code :) So now I can not apply standard exception handling pattern catch(DeserializationException) {...} catch{(ConsumeException) {...} Catch(Exception) {...}. If deserialization happen as part of application processing pipeline, it would be app responsibility to handle it an no need to introduce ErrorCode.Local_ValueDeserialization. Of course, i can set ByteArray serializers and do my own deserialization in the app, but now error processing becomes unreliable because i need to know which error codes are possible and require handling and which are not possible. Or even now, I can not be sure, can ByteArrayDeserializer throw ErrorCode.Local_ValueDeserialization or not? I have to look at implementation and cross my fingers.

So my point is, forcing IDesrializer does not solve any problem in Linq-enabled world and brings design challenges. Exposing IEnumerable and letting user deal with deserialization is arguably easier and definitely more flexible.

I understand that "no generic" is too aggressive move, so maybe adding Raw[Consumer|Producer] and String[Consumer|Producer] will allow zero config solution for majority cases?

  1. IDesrializer. IDesrializer should be given headers because, for example, header might contain message type which directs which deserializer to invoke. This is case when message bus is used, where messages of all types are published to single topic. Or if application is doing rolling upgrade, where messages of 2 versions will be present for some time.

IDeserializer is inherited from IDisposable. It means that kafka client demand serializer to be disposable. But why? I do not see any use of serializer.Dispose from client. Serializer object is created outside of client and client is provided with an instance of serializer. So it is application's responsibility to call Dispose. Client has no knowledge when it is safe to call Dispose nor does it have any knowledge either IDisposable makes sense. If client has no knowledge either it is required nor when to call it, why does it put such a requirement at all? If you remove IDisposable from I[De]Serializer then app code can implement IDisposable only for cases when it is needed thus simplifying 99% of serializer calsses.

I[De]Serializer.Configure Similar note. Both configuration and serializer instances are provided to client from application code. So, if serializer needs configuration, application already have all things in place to do it. Clients invocation of serializer.Config(config) provides no added value. While adding nothing, it takes away composability. Without Configure member, interface is reduced to single call [De]Serialize, which is good opportunity to convert it to plain Func<Message,T> without any interfaces.

  1. Consumer.Consume is a loop. It would be much more composable to expose it as IEnumerable. If you do so, it could be used in Linq and ReactiveExtentions out of the box. Which would make my point in (1) more apparent.

  2. ByteArrayDeserializer converts ReadOnlySpan into array[]. I would carry on ReadOnlySpan as ByteArrayDeserializer : IDeserializer<ReadOnlySpan<byte>> Or add another deserializer, IReadOnlySpanDeserializer with a not in ByteArrayDeserializer advising its usage.

  3. Callbacks are now specified as Action<DeliveryReport<TKey, TValue>> delegates Nice, I like this.

  4. You should use Consumer or ConsumerAsync instead. What is ConsumerAsync? Did you mean Consumer.ConsumeAsync?

  5. The Consumer.OnPartitionEOF event has been removed I still can see it invoked.

  6. Log handlers are now specified via configuration properties, not OnLog handlers. This allows logging to be enabled during client construction. Hate this one. I do not understand, what this means: "This allows logging to be enabled during client construction". Configuration accepts object as value, no type safety nor any IDE hints on what parameter is expected. Why turn C# into Visual Basic? Regress IMHO. No code in examples folder contain how to do logging. It should not require research on "how do I log" question. If you want to make it property of configuration, why not make it strong typed property of configuration? The idea of configuration being Dictionary<string,object> is a regress. Instead of hitting crtl+space in my ide and start typing keyword to get hints on property names and types, I now have to google, copy-paste and then run program to make sure it actually works. In F# wrapper we always undo this and have nice config like this:

    use producer = 
    Config.Producer.safe
    |> Config.bootstrapServers host
    |> Config.clientId "test-client"
    |> Config.Producer.batchNumMessages batchMessagesCount
    |> Config.debug [DebugFlag.Broker; DebugFlag.Cgrp]
    |> Producer.create

    No googling "is timeout int or int64" anymore and one less runtime exception.

awhittier-metabolon commented 5 years ago

Is there a reason that the callback passed in the 'error_cb' setting is inconsistent between Producers (where it has to be an Action<Error>) and Consumers (where it has to be an Action<ErrorEvent>)?

mhowlett commented 5 years ago

@awhittier-metabolon - thanks! that's an error (fixed to be consistent in #623). Notes: 1. that PR still requires more work including addressing comment by @qed-. 2. the exact delegate type to use here is actually still under consideration.

mhowlett commented 5 years ago

great feedback @vchekan - I've now actioned a lot of it. I'm replacing an old comment with this more succinct one for the benefit of anyone reading along.

Decision to drop the raw (non-generic) consumer and producer.

Exceptions for every librdkafka error

ISerializer interface vs functions.

Provide headers to deserializer

I(de)Serializer implement IDisposable

Configure method on serializers.

Consumer.Consume method

Configuration

AndyPook commented 5 years ago

De/Serializers... There are serdes for most of the dotnet intrinsic types. But the Producer/Consumer classes only recognize a couple from the P/C generic type def.

Maybe add a Get<T> the the Generators classes (returning null if not recognized). This would make the P/C ctors more straight forward and perhaps make it easier to keep these "default" serdes aligned with the "factory" mechanism?

mhowlett commented 5 years ago

thanks for the input @AndyPook!

I'm currently working some changes to make serdes much more in line with what @vchekan suggests (completely decouple producers/consumers from serdes), which would make generators irrelevant. the best path forward is still unclear though, stay tuned.

kimbell commented 5 years ago

In the release note for 1.0.0-beta,

The Consumer.Poll method and corresponding OnMessage event have been removed. You should use Consumer or ConsumerAsync instead.

I'm not able to find any reference to ConsumerAsync in the codebase. Are there plans for async consumers? I'm hosting Kafka using the IHostedService infrastructure that is part of ASP.NET Core 2.0. In that model async is king, and the more I can get for free from other libraries, the easier my job gets :)

mhowlett commented 5 years ago

Previously, I was going to have a completely async api, but the reality is it's not going to be possible to implement this 'properly' in the foreseeable future due to the way librdkafka works. I could have faked it using Task.Run, but you can do that just as well outside of the library and I decided the sync interface was better (we can add async variants in the future). So, the release notes are in error, there is no ConsumeAsync, it is Consume. Yep, I totally understand the use-case. I intend to write an example of how to fit into that.

kimbell commented 5 years ago

Excellent, looking forward to that.

qed- commented 5 years ago

Is there a way to tell from the return value of Consume() if we are at partition EOF?

mhowlett commented 5 years ago

@qed- no, but i'll re-consider it.

mhowlett commented 5 years ago

@qed- currently consume can return one of two things - a message or null. if that is expanded to be three things (partition eof event), it makes writing the consume loop messier - even for people not concerned with partition eof. for that reason i exposed this as an event. a flag on message indicating this is the last message is another possibility, but as @edenhill just pointed out to me, that won't tell you you're at partition eof if there is no message.

can you just write something like?:

 bool eof = false;
consumer.OnPartitionEOF += (_, e)
    => eof = true;

while (true)
{
    var consumeResult = consumer.Consume<Ignore, string>();
    if (eof)
    {
        // do eof stuff
        eof = false;
        continue;
    }

    // do consume message stuff.
}
qed- commented 5 years ago

What I see in library version 1.0.0-beta2 is two overloads of Consume().

The one that takes a timeout will return null in the case of both no message and partition EOF (also firing the event for partition EOF). The one that takes a cancellation token will block in both of those cases so you'd never see null returned from it. This seems a bit inconsistent. Your example code would work for the former case but not the latter, you'd need to put any code that wants to handle EOF directly in the event handler.

Another option would be to add a partition EOF boolean to ConsumeResult. This wouldn't require people not interested in this flag to do anything different, but would allow clients that do care to know the difference between no message and EOF.

qed- commented 5 years ago

To make my use case more concrete: I am using a consumer to persist the latest value per key in a compacted topic to a database. The persistence is triggered by reaching EOF, so it will only write the "current" value. With the beta2 API I'd have to put logic to update the dictionary in the polling loop, and logic to write to database in the EOF event handler. This is fine as the EOF handler gets invoked on the polling thread but it doesn't seem as neat as it could be.

AndyPook commented 5 years ago

What about returning a sentinel value?

var msg = consumer.Consume... if(msg == Message.Eof) { ...handle eof... } if(msg != null) { ...process message... }

On Wed, 31 Oct 2018 at 17:14, David Chapman notifications@github.com wrote:

To make my use case more concrete: I am using a consumer to persist the latest value per key in a compacted topic to a database. The persistence is triggered by reaching EOF, so it will only write the "current" value. With the beta2 API I'd have to put logic to update the dictionary in the polling loop, and logic to write to database in the EOF event handler. This is fine as the EOF handler gets invoked on the polling thread but it doesn't seem as neat as it could be.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/confluentinc/confluent-kafka-dotnet/issues/614#issuecomment-434768459, or mute the thread https://github.com/notifications/unsubscribe-auth/AAvrq0ZWY9YjyG6Xub5KOd3gsrY4EZKNks5uqdp-gaJpZM4WSpB6 .

mhowlett commented 5 years ago

thanks for input on this - you're right, it's not quite right. my additional thoughts:

@qed- - i actually think it's fine (good) that the timeout variant can return null and the cancellation token variant doesn't.

@AndyPook - thanks for adding another alternative into the mix. i think it doesn't help though since it still adds a requirement for additional checking even in the case where people don't care.

here's another idea:

  1. add an additional consumer configuration property EnablePartitionEofNotification. By default it's false. if set to true, ConsumeResults will be emitted at partition eof with Offset set to Offset.End or a new value EOF (need to think through which). Alternatively, this behavior could be controlled via an additional parameter on the consume method (my first thought), but i think a config property is better since that get's a bit awkward.
  2. remove the PartitionEOF event.

quite liking that idea.

mhowlett commented 5 years ago

update to the above (which was written in haste as i was about to board a plane): librdkafka already has the enable.partition.eof config property (though it's set to true by default, not false as i was thinking was best). also the offset is meaningful in the case of eof (it's the next message to be consumed), so can't be repurposed as a flag. still thinking about this.

qed- commented 5 years ago

If you can target language version 7+ you could do this with pattern matching:

public abstract class ConsumeResult { }

public class Message : ConsumeResult { TKey Key {get;} TValue Value {get;} }

public class PartitionEOF: ConsumeResult { Offset Offset {get;} }

var result = consumer.Consume();

switch(result) { case Message message: .... break;

case PartitionEof eof: .... break; }

AndreSteenbergen commented 5 years ago

What's the current status on this EOF Idea? Testing wise I would really like to work with the interface, so mocking is possible. But working with the interfce at this moment doesn't allow any eof detection. (working with beta2)

mhowlett commented 5 years ago

Update on partition EOFs: in librdkafka v1.0, the default value for enable.partition.eof is going to be false. the implication for .net is that we'll be able to provide the EOF notification via ConsumeResult AND keep the required consumer control loop logic terse by default. If the user turns on the partition EOF feature, only then will their control logic need to consider EOFs. for this reason, I expect we'll get rid of OnPartitionEOF in favor of providing the information via the ConsumeResult since it's more straightforward.

I like the subclass idea (@qed- ), except in practice the generic parameters get in the way. You'd end up with PartitionEOF<K,V>, or require dynamic casting (i.e. we ideally want class PartitionEOF : Message<TKey, TValue>, but this is not possible, unless i'm missing something important ...)

I like the message sentinel idea (@AndyPook ) except what is it? null, null, null, Timestamp.NotAvailable? is this completely justifiable?

A boolean flag on ConsumeResult - IsPartitionEOF seems the most straightforward (the tersest) to use, and also conceptually justifiable I think. If it's true, then having the message field present and be null doesn't seem stupid (models what's happening adequately), and the same conversely.

edenhill commented 5 years ago

A boolean flag on ConsumeResult - IsPartitionEOF 👍

AndreSteenbergen commented 5 years ago

the implication for .net is that we'll be able to provide the EOF notification via ConsumeResult

I don't get the implication? Do you mean, when set to false there will be a ConsumeResult with a boolean flag when eof is reached? But how do you know if eof is reached when you set the option to false, don't you need the setting to be true?

mhowlett commented 5 years ago

by default you don't get any eof notification. this means the code handling the ConsumeResult doesn't need to consider that possibility (IsPartitionEof will always be false) and it makes it short. if you want eof notification, you turn it on with the config parameter, but then your code needs to be longer to handle the case.

mhowlett commented 5 years ago

what i'm concerned about is making the code as short as possible in the case people don't want eof notifications, which is most of the time.

AndreSteenbergen commented 5 years ago

Ok I get it, thx

AndyPook commented 5 years ago

Hello, me again :) I've lost track of the state of branches since last last time. I assume that master or the 1.0.x is where all the things are happening now?

Raising the Consumer.Consume methods should be async question again...

I see that a Task.Run has been added. Surely this is unnecessary?

But putting that to one side... why not just make this method async (as I previously tried to argue) this would make it just the same pattern as how the Producer handles the same situation?

I realise that this is quite a breaking change this late in the process. (more than happy to do the refactoring and PR)

AndyPook commented 5 years ago

would you accept a PR that tidies up a bunch of things? Like: typos; calling ToString on strings etc?

mhowlett commented 5 years ago

welcome back!

we merge into 1.0.x, then up into master.

we'll implement async consumer methods at a later time, when librdkafka facilitates doing this properly. Task.Run proved to be necessary to be 100% sure of avoiding deadlocks. I assumed it wasn't necessary as well but it apparently is (I think because ConfigureAwait(true) must be called at all levels and we can't control that). I do take your point, but I believe what we have is the best interim measure on the way to the ideal API.

absolutely feel free to open PRs like you propose on 1.0.x. If they're small an obviously good they can go in before 1.0. err on the side of more small and simple PRs. thanks :-)

We're going to release 1.0 asap. Nearly did it already, but see open PR + i'm reviewing some code that uses the library which may be enlightening, so going to do that first as well just in case.

AndyPook commented 5 years ago

I get that librdkafka is not async. But... I cannot understand why it can be done correctly in the Producer but not in the Consumer.

BTW: ConfigureAwait(false) does not need to be done at every level. Using Task.Run around an async call can cause multiple threads blocked. just method(...).ConfigureAwait(false).GetAwaiter().GetResult(); is more than sufficient.

Making the Consume methods async (using the same pattern as Producer.ProduceAsync() will resolve a lot of these issues (still calling .ConfigureAwait(false) on that line). Note that .ConfigureAwait(false) breaks the SynchronisationContext. So doing this within a non-async method means that anyone using AsyncLocal (ie for Activity and logging context) will break after this call. Making this method async causes a boundary for SyncContext, so it will be captured when called and restored on exit. Doing this as sync over async breaks all the protections.

Lots of nuance. most of which is resolved by making these methods async. All of the pieces built in the my previous several engagements are trying to be async all the way up. So we have to call this sync (not so much of a problem) but then this does sync over async (which can cause all sorts of deadlock and SyncContext issues).

mhowlett commented 5 years ago

I'm thinking the ConfigureAwait(false) after the Task.Run is probably unnecessary, and, if we remove it I think it might solve the breaking the synchronization issue you note above. I need to find time to test this out... so much to do, so little time.

the consumer consume call is blocking and the produce call is not - I don't think the ConsumeAsync method should block, and when we implement it so that it doesn't, I don't want to be suddenly changing the behavior of an existing thing.

Without the Task.Run, I definitely saw deadlock issues.

AndyPook commented 5 years ago

Here we go again...

This "blocking" concern, just isn't one. It is completely typical (especially with interop things). But we've had this discussion before. Suffice to say, doing async over sync is just fine. Of course, you don't get the nice thread offloading by letting the IO system track it. But... this is far more acceptable than doing sync over async. Which is what trying to do both sync and async serdes in the same method forces us to do. So, if I use avro, or some other async serdes, I still do not get any benefit from Task/async! And we just get all the pain with zero gain.

I get and appreciate, that you're trying to do "the right thing" and factor things in a compact way. But it leads it into an async pickle.

  1. Pull out a base class with all but the Consume methods (conveniently last in the file). 2 descendants, pure sync and pure async (for the async serdes). We've discussed this before. But maybe with all the changes, this fits better now? Even is you only do one descendant that's exactly like the current situation, it would allow us library users to our own mechanism and it side-steps the whole discussion :) or
  2. A different way of keeping Consumer pure sync. Is to make it only use the sync serdes. But this only works if Deserializer also takes a SerializationContext (this would actually be useful if you're using json, msgpack, others). Then you could have...
    public delegate T Deserializer<T>(ReadOnlySpan<byte> data, bool isNull, SerializationContext context);

    public static class Deserializers
    {
        public static Deserializer<T> Async<T>(IAsyncDeserializer<T> asyncSerializer) {
            return (data, isNull, context) =>
            {
                return asyncSerializer.DeserializeAsync(new ReadOnlyMemory<byte>(data.ToArray()), isNull, context)
                    .ConfigureAwait(false)
                    .GetAwaiter()
                    .GetResult();
            };
        }
...

So, we keep Consumer as pure sync and just wrap the async serde with the standard sync-over-async pattern. The Builder can deal with the wrapping and the Consumer is none the wiser.

Yes this is just a different method of hiding the async'ness of the async serdes, but, at least it vastly simplifies the Consumer.

mhowlett commented 5 years ago

So, if I use avro, or some other async serdes, I still do not get any benefit from Task/async! And we just get all the pain with zero gain.

yep, you get no benefit. In fact, the implementation forces the serde to be run on a different thread (where there is no async context) to ensure deadlocks can't happen, which is far from ideal from an efficiency perspective. it's a complete hack. a further possible downside from doing this is if the async method suspends, it could resume on a different thread so the deserializer can't make any assumptions about this - but in practice, no deserializer is going to.

this is an interim measure.

you are right that it was perhaps a bad idea to have provided IAsyncDeserializer now, since there is no way for it to be used correctly. also, i did think about disallowing async usage in a sync context as you suggest. but i don't see that allowing this (in the way implemented) will lead to problems, it's just not optimal and to be advised against (when we provide the option to).

mhowlett commented 5 years ago

Note: I'm going to consult an expert on this issue on Thursday. Thanks for persisting.

mhowlett commented 5 years ago

btw: If you provided an example program that I could run, which demonstrates a scenario in which the current implementation does not work, that would really grab my attention.

mhowlett commented 5 years ago

i think the scenario under 'Real-World Example' in https://devblogs.microsoft.com/pfxteam/should-i-expose-synchronous-wrappers-for-asynchronous-methods/ might apply to the current implementation.

something that is unique to our scenario is the async deserializers will almost never need to wait on IO - they will almost always run without suspending. Also, in most scenarios, we won't want them to be executing in parallel if they do need to wait on IO (since they are probably looking up the same schema in schema registry, wasted effort an additional load). maybe it's possible to leverage this and force them to run serially. the above also has a lot to do with why I think what i'm doing is likely legitimate - our scenario is very atypical.

mhowlett commented 5 years ago

you're right - there will be changes, stay tuned.

AndyPook commented 5 years ago

My suggestion is not that the current impl will break. But is "wrong" in other ways. Doing Task.Run will create another Task that "might" cause another Thread leaving the current one blocked until it returns. Async isn't usually about parallelism. It just gives the system an opportunity to release the current thread while some IO waits for a response. As Threads are released they can be used for other work (hence the context). So it's about increasing concurrency/throughput.

Note that Task has a bool property CompletedSynchronously to signal that no release/async actually happened. It's used to optimise hotpaths and paths that shortcut the IO ie due to caching. But the implication is that it is very normal for an async method (aka Task) to CompletelySynchronously ie that they "block" it just means that the complete running on the current thread

So I wouldn't say that this is that atypical. I've written many things that had to be async (due to an interface requirement) that are entirely sync code , then just return Task.Completed. Also methods that does a bunch of sync stuff and optionally makes choices about whether to do async things, so it might complete synchronously or asynchronously. So not that unusual. But always best to do async over sync.

Even if you can't buy into that, please remove the Task.Run. Just call asyncValueDeserializer.DeserializeAsync(...).ConfigureAwait(false).GetAwaiter().GetResult();

Yep, I seen Toubs article (he is def one of the authoritative source on this stuff). But... at that time people where asking is they should be creating async versions of every method in their systems. This is what he's advising against. It doesn't make sense. Elsewhere, people like him, say the equivalent of "do not do sync over async", "async things should only be called from async methods"

Just one guys opinion. Hoping your discussion tomorrow correlates. In either case, have fun geeking out ! :)

mhowlett commented 5 years ago

I'm quite sure the current implementation can deadlock due to thread pool exhaustion (not to mention it's inefficient), which counts it out as a possibility. I didn't realize that before.

reverting to asyncValueDeserializer.DeserializeAsync(...).ConfigureAwait(false).GetAwaiter().GetResult(); isn't acceptable either because it requires all async calls in the DeserializAsync implementation to have ConfigureAwait(false) applied (which is something I don't want to require on users of the API). This is definitely right, I've checked this - i get deadlocks in a WinForms application without it. We could get the avro serializer working in this way (i forgot some ConfigureAwaits so it doesn't work now), but it's not good enough imo for a general api.

Re: CompletedSynchronously - I don't think an async task would ever typically block on IO (which is what we would be doing) though.

I'm still not sure of the best way to resolve this. The leading candidate in my mind right now is re-instating the SerializationContext parameter in Deserializer and making the avro deserializer sync (exactly what you suggest a little bit above). On the flip side of this, the delegate types without the SerializationContext parameter are very convenient for other use-cases, so perhaps there will be more than one type - Deserializer and DeserializerWithContext (but erring on the side of no).

We could also make a ConsumeAsync that does await Task.Run on the librdkafka poll call, which gets around my complaint about the blocking on IO. The disadvantage is the Task.Run nonsense, but the advantage is the API is exactly what we'll have when we implement ConsumeAsync properly. Not favoring this however.

Re: multiple Consumer classes - tried it, very much don't like the api complexity / surface area it brings. I'd prefer to have runtime errors if one class is used inappropriately.

AndyPook commented 5 years ago

"reverting to asyncValueDeserializer.DeserializeAsync(...).ConfigureAwait(false).GetAwaiter().GetResult(); isn't acceptable either because it requires all async calls in the DeserializAsync implementation to have ConfigureAwait(false) applied" I am 100% confident in saying that this is not true (well 99.9999999999...%) The point of CA(f) is to "protect" the caller from deadlock shenanigans further down the call-stack. It says to not continue the execution with the current SyncContext. Deadlocks happen because the current SyncContext is single threaded and so cannot handle the continuation being called from the IO thread while it's waiting on the current call. The AspNet, WPF and others all use a single threaded SC, hence the problem and all the messing about getting the continuation to run correctly. The original problem was when people where trying to await HttpClient calls from within a Controller. Single threaded SC means a deadlock as the SC cannot service the HttpClient continuation. The fix was to use CA(f). Problem solved and did not require the HttpClient to be modified. I have used other async libraries which I knew didn't do CA(f) internally (source available but not modifiable). Just adding CA(f) on the call into the library resolved the deadlock.

Another thing to note is that using CA(f) means that any "ambient" values held in things like AsyncLocal<T>, ExecutioContext, tracing.CorrelationId etc are lost after the CA(f) line. So any eg logging done after this may not contain necessary info. The way this is handled is by putting CA(f) calls inside an async method. The first thing that happens when awaiting is to capture the SC which is then restored on return. As a result anything further up the call-stack is "protected" from the CA(f) side effects.

The guidance is actually for library developers to handle any CA(f) requirements as low as possible and within an async method. This protects any callers fro needing to worry about CA(f)

I hope I've explained that well enough. Suffice to say that you only need CA(f) at some defined boundary and not all the way down the stack. Task.Run is generally not your friend in these situatons

AndyPook commented 5 years ago

... base class +... even if the only descendant "you" provide contains the current thing. It leaves the system open to library users to provide they're own customisations. ie the Open-Closed principal. Related... I know it was me that suggested the Builder pattern. But I had assumed that the built type would still be open. ie still have regular ctor's etc with the builder just being a convenience. I can see the desire to keep the class count low, but... it makes for some hairy code in places and severally limits user ability to customise or arrange the bootstrapping of a system.

AndyPook commented 5 years ago

As I'm on a roll :) The Set*Handler methods on the ConsumerBuilder (prob the Producer version too) ought to take delegates using the IConsumer<KV> interface rather than the concrete type.

I'll try to do a PR for this first thing tomorrow

mhowlett commented 5 years ago

The Set*Handler methods on the ConsumerBuilder (prob the Producer version too) ought to take delegates using the IConsumer interface rather than the concrete type.

woah, that's a good catch. thanks for that.

mhowlett commented 5 years ago

btw: do you want some confluent/kafka swag? (t-shirt and/or book or something), if so let me know via email (look in my profile) and I'll see what I can do.

AndyPook commented 5 years ago

Ha! feels like I've just been a pita! Is that a bribe? :smile: ... I'll take it :wink:

AndyPook commented 5 years ago

878 for the "use interfaces" thing from above