Closed simone-fariselli closed 9 months ago
Attention: 17 lines
in your changes are missing coverage. Please review.
Comparison is base (
d70458a
) 92.15% compared to head (f939c9b
) 92.10%.
Files | Patch % | Lines |
---|---|---|
RabbitMQ.Stream.Client/ConnectionsPool.cs | 6.25% | 15 Missing :warning: |
RabbitMQ.Stream.Client/Client.cs | 95.23% | 1 Missing :warning: |
Tests/UnitTests.cs | 0.00% | 1 Missing :warning: |
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
Thank you @simone-fariselli
env: https://github.com/rabbitmq/rabbitmq-oauth2-tutorial/blob/main/use-cases/keycloak.md
Note: you need pivotalrabbitmq/rabbitmq:token-expiration-in-stream-connections-otp-min-bazel
since is not release in rc yet.
Tested with:
using System.Buffers;
using System.Net;
using System.Net.Http.Headers;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Console;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;
namespace example;
public class KeyCloak
{
private static async Task<string> NewAccessToken()
{
var data = new[]
{
new KeyValuePair<string?, string?>("client_id", "producer"),
new KeyValuePair<string?, string?>("client_secret", "kbOFBXI9tANgKUq8vXHLhT6YhbivgXxn"),
new KeyValuePair<string?, string?>("grant_type", "client_credentials"),
};
var httpRequestMessage = new HttpRequestMessage
{
Method = HttpMethod.Post,
RequestUri = new Uri("http://localhost:8080/realms/test/protocol/openid-connect/token"),
Headers =
{
{
HttpRequestHeader.ContentType.ToString(), "application/x-www-form-urlencoded"
},
},
Content = new FormUrlEncodedContent(data)
};
var client = new HttpClient();
var response = await client.SendAsync(httpRequestMessage);
var responseString = await response.Content.ReadAsStringAsync();
var json = System.Text.Json.JsonDocument.Parse(responseString);
var r = json.RootElement.GetProperty("access_token").GetString();
return r ?? throw new Exception("no access token");
}
public static async Task Start()
{
var accessToken = await NewAccessToken();
Console.WriteLine(accessToken);
var serviceCollection = new ServiceCollection();
serviceCollection.AddLogging(builder => builder
.AddSimpleConsole(options =>
{
options.IncludeScopes = true;
options.SingleLine = true;
options.TimestampFormat = "[HH:mm:ss] ";
options.ColorBehavior = LoggerColorBehavior.Default;
})
.AddFilter(level => level >= LogLevel.Debug)
);
var loggerFactory = serviceCollection.BuildServiceProvider()
.GetService<ILoggerFactory>();
if (loggerFactory != null)
{
var producerLogger = loggerFactory.CreateLogger<Producer>();
var consumerLogger = loggerFactory.CreateLogger<Consumer>();
var system = await StreamSystem.Create(new StreamSystemConfig()
{
UserName = "producer",
Password = accessToken,
VirtualHost = "/",
}, loggerFactory.CreateLogger<StreamSystem>());
const string stream = "test-keycloak";
await system.CreateStream(new StreamSpec(stream)
{
MaxLengthBytes = 1_000_000,
});
var start = DateTime.Now;
var completed = new TaskCompletionSource<bool>();
_ = Task.Run(async () =>
{
while (completed.Task.Status != TaskStatus.RanToCompletion)
{
await Task.Delay(TimeSpan.FromSeconds(1));
if (start.AddSeconds(50) >= DateTime.Now) continue;
Console.WriteLine($"{DateTime.Now} - Updating the secret....");
await system.UpdateSecret(await NewAccessToken()).ConfigureAwait(false);
start = DateTime.Now;
}
});
var consumer = await Consumer.Create(new ConsumerConfig(system, stream)
{
OffsetSpec = new OffsetTypeFirst(),
MessageHandler = (_, _, _, message) =>
{
Console.WriteLine(
$"{DateTime.Now} - Received: {Encoding.UTF8.GetString(message.Data.Contents.ToArray())} ");
return Task.CompletedTask;
}
});
var producer = await Producer.Create(new ProducerConfig(system, stream));
for (var i = 0; i < 10 * 60; i++)
{
await producer.Send(new Message(Encoding.UTF8.GetBytes($"Hello KeyCloak! {i}")));
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine($"{DateTime.Now} - Sent: Hello KeyCloak! {i}");
}
completed.SetResult(true);
Console.WriteLine("Closing...");
await consumer.Close();
await producer.Close();
await system.Close();
Console.WriteLine("Closed.");
}
}
}
add update secret functionality. add test for UpdateSecret function.
Solves #340 with the introduction of the UpdateSecret method in StreamSystem. Usage example: