Closed mhsimkin closed 6 years ago
What is your requirement/end goal?
What to what? I might have a viable example.
On Wed, Jun 27, 2018, 1:35 PM mhsimkin notifications@github.com wrote:
I'm evaluating this library, as well as ENQ, to determine which would be a good replacement for our in-house developed RabbitMQ wrapper.
I have already determined that I can override the naming convention to use our current exchange/queue naming methodology.
I'm trying to determine if it is possible to bypass RawRabbit's message serializer or provide my own serialization logic?
Thanks
-marc
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/pardahlman/RawRabbit/issues/355, or mute the thread https://github.com/notifications/unsubscribe-auth/ABWT-sRrHruPVSV586lLoC65_biE7z60ks5uA8JzgaJpZM4U6G2T .
Our in-house developed library used a custom binary payload. There are over 100+ apps in production that have been running 5+ years. Therefore, converting to a new library will be done over time.
When I get ready to roll out a new wrapper around Rabbit, I will need to be able to handle subscriptions that will return a payload that is either binary or json. The messages will be the same class, only how they were serialized will be different.
Thanks
Okay so the Func 0=> BodyContent is overloadable to outbound messages.
This goes in your PublishMiddleware choice.
BodyFunc = c => c.Get<byte[]>(PipeKey.SerializedMessage)
Of course the SerializeMiddleware is replaceable too:
Default: .Use<BodySerializationMiddleware>()
YourExotic: .Use<MyCustomBodySerializationMiddleware>()
by customizing your action pipes.
Inbound, shouldn't really matter, you get your message from an AckableResult or a BasicGetResult from an IModel. BasicGetResult has the property Body which is just byte[]. The bytes in this body are yours to do with as you please include binary/json deserialization.
Are you trying to do a flow logic based on the ContentType? For example if the ContentType is of type MyNameSpace.MyLibrary.MyObject? So one Subscriber / Consumer gets a type ambiguous queue that then routes the logic on the IBasicProperties ContentType?
Although I am in the middle of adding a custom simplified middleware for PR to RawRabbit, I will be adding code that will handle binary, xml, and json deserialization in my own personal RabbitMQ wrapper. I started with Gzip content first this morning. https://github.com/thyams/CookedRabbit
Mine is grossly stupid in comparison to RawRabbit. RawRabbit is very feature rich.
@thyams Thank you. I will try and mock something up and see if I can get it to work.
Our current library is not that sophisticated. Messages that are rendered to binary are strongly typed by adding an ObjectId integer as a field in the headers collection.
Queues/Exchanges are named with the application that owns the queue, then functionality, and optionally the priority or the direction (input, output). For example: q.pds.ren.aggregation.pipeline.high, q.pds.ren.aggregation.pipeline.regular.
We are using topic exchanges and rely on the routing key. Typically when an exchange/queue is handling different message types, the routing key is used to determine the message type, regardless of whether the payload is binary or json.
We are working on switching to all text for many reasons. All new apps created within the last two years are not using binary payloads, but they are using our in-house library.
There are lots of issues with that library:
Thanks for your help.
Let us know what you come up with and perhaps we can abstract it away and start building How-To wiki!
@thyams where do I replace the serializer middleware? I have this code so far:
var _client = RawRabbitFactory.CreateSingleton(new RawRabbitOptions
{
ClientConfiguration = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("rawrabbit.json")
.Build()
.Get<RawRabbitConfiguration>(),
Plugins = p => p.UseGlobalExecutionId()
.UseMessageContext<MessageContext>()
});
await _client.SubscribeAsync<ScribeMessage, MessageContext>((requested, ctx) => OnMessage(requested, ctx), ctx => ctx
.UseSubscribeConfiguration(cfg => cfg
.Consume(c => c
.WithRoutingKey("#").WithAutoAck(false))
.FromDeclaredQueue(queue => queue.WithName("q.pds.tools.scribe.marc"))
.OnDeclaredExchange(e => e.WithName("e.pds.tools.scribe.input")
.WithType(ExchangeType.Topic)))
);
Do I need to build my own version of SubscribeMessageContextExtension that has SubscribeAsync method in order to change the pipeline?
Thanks
If you visit the Test projects (by cloning RawRabbit) you will see many of them use PublishAsync().
If you step into it, you will see PublishMessageExtension. This is the layer where we have written most of our code. Instead of calling PublishAsync, we manually call InvokeAsync and have customized this action pipe.
As you can see BodySerializationMiddleware is then replaceable with your own internal object.
public static readonly Action<IPipeBuilder> PublishPipeAction = pipe => pipe
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(StageMarker.Initialized))
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(StageMarker.ProducerInitialized))
.Use<PublishConfigurationMiddleware>()
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(StageMarker.PublishConfigured))
.Use<ExchangeDeclareMiddleware>()
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(PublishStage.ExchangeDeclared))
.Use<BodySerializationMiddleware>()
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(PublishStage.MessageSerialized))
.Use<BasicPropertiesMiddleware>(new BasicPropertiesOptions
{
PostCreateAction = (ctx, props) =>
{
props.Headers.TryAdd(PropertyHeaders.Sent, DateTime.UtcNow.ToString("O"));
}
})
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(StageMarker.BasicPropertiesCreated))
.Use<PooledChannelMiddleware>(new PooledChannelOptions { PoolNameFunc = c => PublishKey.Publish })
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(PublishStage.ChannelCreated))
.Use<ReturnCallbackMiddleware>()
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(PublishStage.PreMessagePublish))
.Use<PublishAcknowledgeMiddleware>()
.Use<BasicPublishMiddleware>(new BasicPublishOptions
{
BodyFunc = c => c.Get<byte[]>(PipeKey.SerializedMessage)
})
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(PublishStage.MessagePublished));
public static Task PublishAsync<TMessage>(this IBusClient client, TMessage message, Action<IPublishContext> context = null, CancellationToken token = default(CancellationToken))
{
return client.InvokeAsync(
PublishPipeAction,
ctx =>
{
ctx.Properties.Add(PipeKey.MessageType, message?.GetType() ?? typeof(TMessage));
ctx.Properties.Add(PipeKey.Message, message);
context?.Invoke(new PublishContext(ctx));
}, token);
}
I'm evaluating this library, as well as ENQ, to determine which would be a good replacement for our in-house developed RabbitMQ wrapper.
I have already determined that I can override the naming convention to use our current exchange/queue naming methodology.
I'm trying to determine if it is possible to bypass RawRabbit's message serializer or provide my own serialization logic?
Thanks
-marc