dotnetcore / CAP

Distributed transaction solution in micro-service base on eventually consistency, also an eventbus with Outbox pattern
http://cap.dotnetcore.xyz
MIT License
6.6k stars 1.28k forks source link

NATS feature: add the possibility to disable dynamic consumer subject/topic creation through NatsCapOtpions #1545

Open davte-beijer opened 2 months ago

davte-beijer commented 2 months ago

Currently NATS clients will create any missing topics of they are not configured on a stream through NATSConsumerClient.FetchTopics. This requires extra permissions for the client to be configured in the NATS server configuration. e.g:

"permissions": {
   "publish": {
      "JS.API.STREAM.INFO.VolatileStream"
      "JS.API.STREAM.CREATE.VolatileStream.*"
      "JS.API.STREAM.UPDATE.VolatileStream.*"
   }
}

For use cases when you want to limit the permissions required for a nats client, by having a centralized solution for creating streams & topics, then allowing dynamic creation by clients is not feasible as it currently enforces a client to have the publish permissions specified above.

Therefore I propose to extended the NATSOptions with a new property EnableSubscriberClientStreamAndTopicCreation with default value true in order for the change to be backwards compatible.

Pseudo code:

public class NATSOptions
{
   ...
    /// <summary>
    /// Allows a nats client to dynamically create a stream and configure its expected topics.
    /// </summary>
    public bool EnableSubscriberClientStreamAndTopicCreation {get; set;} = true;
    ...
}

This option would then be used in the NATSConsumerClient.FetchTopics method like the following in order to avoid doing calls requiring described client publish permissions via the the JetsManagementContext class.

public ICollection<string> FetchTopics(IEnumerable<string> topicNames)
{
    if (_natsOptions.EnableSubscriberClientStreamAndTopicCreation)
    {
        Connect();

        var jsm = _consumerClient!.CreateJetStreamManagementContext();

        var streamGroup = topicNames.GroupBy(x => _natsOptions.NormalizeStreamName(x));

        foreach (var subjectStream in streamGroup)
        {
            var builder = StreamConfiguration.Builder()
                .WithName(subjectStream.Key)
                .WithNoAck(false)
                .WithStorageType(StorageType.Memory)
                .WithSubjects(subjectStream.ToList());

            _natsOptions.StreamOptions?.Invoke(builder);

            try
            {
                jsm.GetStreamInfo(subjectStream.Key); // this throws if the stream does not exist

                jsm.UpdateStream(builder.Build());
            }
            catch (NATSJetStreamException)
            {
                try
                {
                    jsm.AddStream(builder.Build());
                }
                catch
                {
                    // ignored
                }
            }
        }
    }

    return topicNames.ToList();
}
yang-xiaodong commented 2 months ago

Hi @davte-beijer,

Looks good, Would you want to submit a PR?

davidterins commented 2 months ago

Will do!