EasyNetQ / EasyNetQ

An easy to use .NET API for RabbitMQ
http://easynetq.com
MIT License
2.91k stars 750 forks source link

Library internals misunderstanding #1066

Closed natalie-o-perret closed 4 years ago

natalie-o-perret commented 4 years ago

I wanted to code my own publishing method from scratch with the official RabbitMQ .NET client, but for some reason I can't achieve the same performances as with EasyNetQ.

I was wondering if the code in TestSameChannelSeqAsync is roughly equivalent to the code in TestEasyNetQAsync

public static class Tester
{
    public static async Task TestEasyNetQAsync(IEnumerable<byte[]> data)
    {
        var stopwatch = Stopwatch.StartNew();

        using var bus = RabbitHutch.CreateBus("host=localhost");
        using var subscription = bus.Subscribe(CommonStuff.QueueName, bytes =>
        {
            return Task.Run(() =>
            {
                var message = Encoding.UTF8.GetString(bytes);
            });
        });

        var tasks = data.Select(item =>
            bus.PublishAsync(
                CommonStuff.ExchangeName,
                CommonStuff.QueueName,
                item));

        await Task.WhenAll(tasks);

        stopwatch.Stop();
        Console.WriteLine($"{nameof(TestEasyNetQAsync)}: {stopwatch.Elapsed}");
        Console.ReadKey();
    }

    public static async Task TestSameChannelSeqAsync(IEnumerable<byte[]> data)
    {
        var stopwatch = Stopwatch.StartNew();

        const string queueName = "hello";
        var factory = new ConnectionFactory
        {
            HostName = "localhost",
            DispatchConsumersAsync = true
        };
        using var connection = factory.CreateConnection();
        using var queueDeclareChannel = connection.CreateModel();

        queueDeclareChannel.QueueDeclare(
            queue: CommonStuff.QueueName,
            durable: true,
            exclusive: false,
            autoDelete: false,
            arguments: null);

        using var consumerChannel = connection.CreateModel();
        var consumer = new AsyncEventingBasicConsumer(consumerChannel);
        consumer.Received += (sender, eventArgs) =>
        {
            return Task.Run(() =>
            {
                var message = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
            });
        };

        consumerChannel.BasicConsume(
            queue: CommonStuff.QueueName,
            autoAck: true,
            consumer: consumer);

        using var publishChannel = connection.CreateModel();

        using var asyncLock = new AsyncLock();
        var tasks = data.Select(async body =>
        {
            using var release = await asyncLock.AcquireAsync().ConfigureAwait(false);
            publishChannel.BasicPublish(
                CommonStuff.ExchangeName,
                CommonStuff.QueueName,
                false,
                publishChannel.CreateBasicProperties()
            );
        });

        await Task.WhenAll(tasks);

        stopwatch.Stop();
        Console.WriteLine($"{nameof(TestSameChannelSeqAsync)}: {stopwatch.Elapsed}");
        Console.ReadKey();
    }
}

with the given extension methods:

public static class EasyNetQExtensions
{
    public static Task PublishAsync(
        this IBus bus,
        string exchange,
        string routingKey,
        byte[] body,
        CancellationToken cancellationToken = default) =>
        bus.Advanced.PublishAsync(
            exchange: new Exchange(exchange),
            routingKey: routingKey,
            mandatory: false,
            messageProperties: new MessageProperties(),
            body: body,
            cancellationToken: cancellationToken);

    public static IDisposable Subscribe(this IBus bus, string queueName, Func<byte[], Task> handler) =>
        bus.Advanced.Consume(
            new Queue(queueName),
            (bytes, messageProperties, messageReceivedInfo) =>
                handler(bytes));
}

and how it is called:

using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MaxiMichelle
{
    public static class Program
    {
        public static async Task Main()
        {
            var item = EmbeddedResource.Load("File.xml");
            var itemBytes = Encoding.UTF8.GetBytes(item);
            var data = Enumerable.Repeat(itemBytes, 500_000).ToArray();
            await Tester.TestEasyNetQAsync(data);
            await Tester.TestSameChannelSeqAsync(data);
        }
    }
}

The data in "File.xml":

<?xml version="1.0" encoding="UTF-8"?>
<Document>
  <CstmrCdtTrfInitn>
    <GrpHdr>
      <MsgId>ABC/090928/CCT001</MsgId>
      <CreDtTm>2009-09-28T14:07:00</CreDtTm>
      <NbOfTxs>3</NbOfTxs>
      <CtrlSum>11500000</CtrlSum>
      <InitgPty>
        <Nm>ABC Corporation</Nm>
        <PstlAdr>
          <StrtNm>Times Square</StrtNm>
          <BldgNb>7</BldgNb>
          <PstCd>NY 10036</PstCd>
          <TwnNm>New York</TwnNm>
          <Ctry>US</Ctry>
        </PstlAdr>
      </InitgPty>
    </GrpHdr>
    <PmtInf>
      <PmtInfId>ABC/086</PmtInfId>
      <PmtMtd>TRF</PmtMtd>
      <BtchBookg>false</BtchBookg>
      <ReqdExctnDt>2009-09-29</ReqdExctnDt>
      <Dbtr>
        <Nm>ABC Corporation</Nm>
        <PstlAdr>
          <StrtNm>Times Square</StrtNm>
          <BldgNb>7</BldgNb>
          <PstCd>NY 10036</PstCd>
          <TwnNm>New York</TwnNm>
          <Ctry>US</Ctry>
        </PstlAdr>
      </Dbtr>
      <DbtrAcct>
        <Id>
          <Othr>
            <Id>00125574999</Id>
          </Othr>
        </Id>
      </DbtrAcct>
      <DbtrAgt>
        <FinInstnId>
          <BIC>BBBBUS33</BIC>
        </FinInstnId>
      </DbtrAgt>
      <CdtTrfTxInf>
        <PmtId>
          <InstrId>ABC/090928/CCT001/01</InstrId>
          <EndToEndId>ABC/4562/2009-09-08</EndToEndId>
        </PmtId>
        <Amt>
          <InstdAmt Ccy="JPY">10000000</InstdAmt>
        </Amt>
        <ChrgBr>SHAR</ChrgBr>
        <CdtrAgt>
          <FinInstnId>
            <BIC>AAAAGB2L</BIC>
          </FinInstnId>
        </CdtrAgt>
        <Cdtr>
          <Nm>DEF Electronics</Nm>
          <PstlAdr>
            <AdrLine>Corn Exchange 5th Floor</AdrLine>
            <AdrLine>Mark Lane 55</AdrLine>
            <AdrLine>EC3R7NE London</AdrLine>
            <AdrLine>GB</AdrLine>
          </PstlAdr>
        </Cdtr>
        <CdtrAcct>
          <Id>
            <Othr>
              <Id>23683707994125</Id>
            </Othr>
          </Id>
        </CdtrAcct>
        <Purp>
          <Cd>CINV</Cd>
        </Purp>
        <RmtInf>
          <Strd>
            <RfrdDocInf>
              <Nb>4562</Nb>
              <RltdDt>2009-09-08</RltdDt>
            </RfrdDocInf>
          </Strd>
        </RmtInf>
      </CdtTrfTxInf>
      <CdtTrfTxInf>
        <PmtId>
          <InstrId>ABC/090628/CCT001/2</InstrId>
          <EndToEndId>ABC/ABC-13679/2009-09-15</EndToEndId>
        </PmtId>
        <Amt>
          <InstdAmt Ccy="EUR">500000</InstdAmt>
        </Amt>
        <ChrgBr>CRED</ChrgBr>
        <CdtrAgt>
          <FinInstnId>
            <BIC>DDDDBEBB</BIC>
          </FinInstnId>
        </CdtrAgt>
        <Cdtr>
          <Nm>GHI Semiconductors</Nm>
          <PstlAdr>
            <StrtNm>Avenue Brugmann</StrtNm>
            <BldgNb>415</BldgNb>
            <PstCd>1180</PstCd>
            <TwnNm>Brussels</TwnNm>
          </PstlAdr>
        </Cdtr>
        <CdtrAcct>
          <Id>
            <IBAN>BE30001216371411</IBAN>
          </Id>
        </CdtrAcct>
        <InstrForCdtrAgt>
          <Cd>PHOB</Cd>
          <InstrInf>+32/2/2222222</InstrInf>
        </InstrForCdtrAgt>
        <Purp>
          <Cd>GDDS</Cd>
        </Purp>
        <RmtInf>
          <Strd>
            <RfrdDocInf>
              <Tp>
                <CdOrPrtry>
                  <Cd>CINV</Cd>
                </CdOrPrtry>
              </Tp>
              <Nb>ABC-13679</Nb>
              <RltdDt>2009-09-15</RltdDt>
            </RfrdDocInf>
          </Strd>
        </RmtInf>
      </CdtTrfTxInf>
      <CdtTrfTxInf>
        <PmtId>
          <InstrId>ABC/090928/CCT001/3</InstrId>
          <EndToEndId>ABC/987-AC/2009-09-27</EndToEndId>
        </PmtId>
        <Amt>
          <InstdAmt Ccy="USD">1000000</InstdAmt>
        </Amt>
        <ChrgBr>SHAR</ChrgBr>
        <CdtrAgt>
          <FinInstnId>
            <BIC>BBBBUS66</BIC>
          </FinInstnId>
        </CdtrAgt>
        <Cdtr>
          <Nm>ABC Corporation</Nm>
          <PstlAdr>
            <Dept>Treasury department</Dept>
            <StrtNm>Bush Street</StrtNm>
            <BldgNb>13</BldgNb>
            <PstCd>CA 94108</PstCd>
            <TwnNm>San Francisco</TwnNm>
            <Ctry>US</Ctry>
          </PstlAdr>
        </Cdtr>
        <CdtrAcct>
          <Id>
            <Othr>
              <Id>4895623</Id>
            </Othr>
          </Id>
        </CdtrAcct>
        <Purp>
          <Cd>INTC</Cd>
        </Purp>
        <RmtInf>
          <Strd>
            <RfrdDocInf>
              <Tp>
                <CdOrPrtry>
                  <Cd>CINV</Cd>
                </CdOrPrtry>
              </Tp>
              <Nb>987-AC</Nb>
              <RltdDt>2009-09-27</RltdDt>
            </RfrdDocInf>
          </Strd>
        </RmtInf>
      </CdtTrfTxInf>
    </PmtInf>
  </CstmrCdtTrfInitn>
</Document>

It turns out that my version coded from scratch takes about 2 min while EasyNetQ takes only about 40 seconds.

I'm not really sure to understand why, since I've basically followed the path taken in the EasyNetQ.

I just would like to know why EasyNetQ is blazing fast in that particular case?

Pliner commented 4 years ago

Hi @kerry-perret,

It's much better to have such examples as a github repo which compiles and works ;)

Anyway, without a launch, I could advice two things:

  1. To separate test cases to avoid interference between them
  2. To setup thread pool min threads(see #458 for more details)

I don't see reason except (2) explaining why code behaves differently.

natalie-o-perret commented 4 years ago

It's much better to have such examples as a github repo which compiles and works ;)

Sure, will create a .NET Core solution for that along with the dockerfile.

Pliner commented 4 years ago

It's much better to have such examples as a github repo which compiles and works ;)

Sure, will create a .NET Core solution for that along with the dockerfile.

Did you try to tune thread pool settings?

natalie-o-perret commented 4 years ago

Not yet, actually I plan to have everything in that solution first, there is a huge lack of resources about performances when it comes to code with .NET.

I mean there are like general guidelines and some part of the RabbitMQ tutorial but that's pretty much about it:

Don't worry I will also check out the thread starvation.

natalie-o-perret commented 4 years ago

@Pliner If you want, you can check this out: https://github.com/kerry-perret/CSharp.RabbitMQ.Benchmarks

Pliner commented 4 years ago

@kerry-perret Have you had a chance to adjust thread pool and benchmark again?

natalie-o-perret commented 4 years ago

@Pliner sorry I didnt have much time lately, I had 6 medical appointment in a row + work.

Will try to double check things this coming weekend or next week.

natalie-o-perret commented 4 years ago

Going thru a hectic schedule at the moment: lawyer + tax residency issues + incoming surgeries + daily job + OTing.

I haven't fogotten that issue, will get back to this asap.

Pliner commented 4 years ago

Feel free to reopen when you will have results.