mookid8000 / Cirqus

:x: d60 event sourcing + CQRS framework
MIT License
233 stars 39 forks source link

Refresh cirqus system / aggregate root #85

Open minhhungit opened 7 years ago

minhhungit commented 7 years ago

I have an issue that I don't know where should I post, so I write here

My application has a tool to create events for existed data At the time I start it, it run very fast (I just use emit() and don't use apply() / view ) it runs fast for some thousand records ( < 10 000 ) but more and more slow I stop tool and run again (the tool will start again at the end of time we closed tool ) and it's fast again, and more and more slow again I don't know why, how to refresh system or aggregate root like the time we start the tool

I have 32GB RAM and it just used 3GB RAM and about 30% CPU

minhhungit commented 7 years ago

Here is my command and event:

public class UpdateAllServicesDemandForDay : ExecutableCommand
    {
        public string PartNumber { get; private set; }
        public string OemNumber { get; private set; }
        public string HicNumber { get; private set; }
        public string PartDescription { get; private set; }
        public DateTime DemandDate { get; private set; }
        public int ZoneId { get; private set; }
        public long ApuDemand { get; private set; }
        public long CccDemand { get; private set; }
        public long PartsTraderDemand { get; private set; }

        public UpdateAllServicesDemandForDay(
            string partNumber,
            string oemNumber,
            string hicNumber,
            string partDescription,
            DateTime demandDate,
            int zoneId,
            long apuDemand,
            long cccDemand,
            long partsTraderDemand)
        {
            this.PartNumber = partNumber == null ? partNumber : partNumber.ToUpper();
            this.OemNumber = oemNumber == null ? oemNumber : oemNumber.ToUpper();
            this.HicNumber = hicNumber == null ? hicNumber : hicNumber.ToUpper();
            this.PartDescription = partDescription;
            this.DemandDate = demandDate;
            this.ZoneId = zoneId;
            this.ApuDemand = apuDemand;
            this.CccDemand = cccDemand;
            this.PartsTraderDemand = partsTraderDemand;
        }

        public override void Execute(ICommandContext context)
        {
            var root = context.TryLoad<DemandRoot>(DemandRoot.GetAggregateRootId(this.DemandDate));
            if (root == null) root = context.Create<DemandRoot>(DemandRoot.GetAggregateRootId(this.DemandDate));
            root.TrackDemandForDayForAllServices(this);
        }
    }
}

namespace DemoNamespace
{
    public partial class DemandRoot : AggregateRoot,
        IEmit<AllServicesDemandAddedForDay>
    {
        internal void TrackDemandForDayForAllServices(UpdateAllServicesDemandForDay request)
        {
            if (this.Id != GetAggregateRootId(request.DemandDate)) throw new InvalidOperationException("Cannot add this demand to the Aggregate Root for a different day");

            var demandEntry = new AllServicesDemandAddedForDay(this.Id, request.PartNumber, request.OemNumber, request.HicNumber, request.PartDescription, request.DemandDate.Date, request.ZoneId, request.ApuDemand, request.CccDemand, request.PartsTraderDemand);
            if (string.IsNullOrWhiteSpace(demandEntry.PartNumber) || demandEntry.ApuDemand < 0 || demandEntry.CccDemand < 0 || demandEntry.PartsTraderDemand < 0 || demandEntry.ZoneId < 1 || demandEntry.ZoneId > 8)
            {
                throw new InvalidOperationException("Cannot add this demand for this day for an empty or null PartNumber or negative demand number");
            }
            else
            {
                this.Emit(demandEntry);
            }
        }

        public void Apply(AllServicesDemandAddedForDay e) {  }
    }
}
mookid8000 commented 7 years ago

It makes sense for command processing to become slower as the number of events for the loaded aggregate root becomes greater. But if DemandRoot.GetAggregateRootId(this.DemandDate) works by getting a new ID for each day, it should distribute emitted events nicely over the days, leading to a command execution time that looks like this (tL = time to load an aggregate root, t = time):

 tL
  |   /    /    /
  |  /    /    /
  | /    /    /
  |---------------> t

basically resetting on each day (here assuming that commands are executed in a ways where DemandDate is rising).

Combined with seemingly ever-increasing memory usage, my initial presumption is that you must have a memory leak somewhere.

How do you create the UpdateAllServicesDemandForDay command? Could you show me some of the code that surrounds this?

minhhungit commented 7 years ago

Our system used nancy fx to create API service I sent request to API, the API will process command: The bellow code will call api:

var t = SendCommand(cmd, "V1/Demand/UpdateDemandForAllServices");

with SendCommand method was created in here:

public abstract class ClientBase
    {
        protected string baseUrl = "";
        static List<HttpClient> Clients = new List<HttpClient>();

        protected Task<HttpResponseMessage> SendCommand<T>(T cmd, string path)
        {
            if (string.IsNullOrEmpty(baseUrl))
            {
                this.baseUrl = string.Format("http://{0}:{1}/",
                                       ConfigurationManager.AppSettings["domain"],
                                       int.Parse(ConfigurationManager.AppSettings["port"]));
            }

            HttpClient client = null;
            lock (Clients)
            {
                if (Clients.Count > 0)
                {
                    client = Clients.First();
                }
                else
                {
                    client = new HttpClient();

                    client.DefaultRequestHeaders.Accept.Clear();
                    client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
                    client.DefaultRequestHeaders.Add("ApiKey", "{guid-value}");

                    Clients.Add(client);
                }

                // HTTP POST
                //var body = JsonConvert.SerializeObject(cmd);
                var t = client.PostAsJsonAsync(this.baseUrl + path, cmd);
                t.Wait();
                return t;
            }
        }
    }

!!! Code for UpdateAllServicesDemandForDay command was posted at previous comment

And here is API:

var dto = this.Bind<UpdateDemandForAllServicesDto>();
            var cmd = new UpdateAllServicesDemandForDay(dto.PartNumber, dto.OemNumber, dto.HicNumber, dto.PartDescription, dto.DemandDate, dto.ZoneId, dto.ApuDemand, dto.CccDemand, dto.PartsTraderDemand);
            var cmdResult = HostService.CommandBus.ProcessCommand(cmd);
            return Response.AsJson(cmdResult);

More FYI, this is code for GetAggregateRootId

        public static string GetAggregateRootId(DateTime date)
        {
            return string.Format("demand/{0}", date.ToString("yyyy-MM-dd"));
        }
mookid8000 commented 7 years ago

It could be the HttpClient leak that you are experiencing. I don't know what you are achieving with the static List<HttpClient>, but HttpClient is meant to be newed up ONCE and kept around for the entire duration of your application's lifetime.

This also means that headers like API keys need to be added to each HttpRequestMessage instead of setting them as default headers.

Moreover, this is bad:

protected Task<HttpResponseMessage> SendCommand<T>(T cmd, string path)
{
    // (....)

    var t = client.PostAsJsonAsync(this.baseUrl + path, cmd);
    t.Wait();
    return t;
}

The t.Wait() makes the executing thread block and wait for the task to finish. In ASP.NET this can lead to deadlocks. You should change it into this:

protected async Task<HttpResponseMessage> SendCommand<T>(T cmd, string path)
{
    // (....)

    var result = await client.PostAsJsonAsync(this.baseUrl + path, cmd);

    return result;
}

(notice the async in the method signature and the await performed on the Task returned from the client)

I don't know if these things are somehow related to the memory leak it seems you are experiencing though – but it would be nice to get these things out of the way.

minhhungit commented 7 years ago

So I won't worry about nancy api & httpclient Anyway thanks for your suggest

This's screenshot about StressTest tool

I think we have problem when we load aggregate

minhhungit commented 7 years ago

Dear @mookid8000: After investigating the cause, I saw it slow when I load aggregrate root Here is execute method:

public override void Execute(ICommandContext context)
{
       var root = context.TryLoad<DemandRoot>(DemandRoot.GetAggregateRootId(this.DemandDate));
       if (root == null) root = context.Create<DemandRoot>(DemandRoot.GetAggregateRootId(this.DemandDate));
       root.TrackDemandForDayForAllServices(this);
 }

Do you have any idea, sir

mookid8000 commented 7 years ago

After investigating the cause, I saw it slow when I load aggregrate root

Loading an aggregate root will become slower as the number of events for that particular root increases. That's what I meant with this one:

 tL
  |   /    /    /
  |  /    /    /
  | /    /    /
  |---------------> t

which is meant to illustrate what I imagined was your scenario, where a new root instance would be created for each day.

Generally, it is best to avoid having roots that "live forever" in the sense that their event stream just keeps growing and growing. There are several ways you can make the loading of aggregate roots faster, though...

Cirqus has a builtin snapshotting mechanism, which you can leverage to make the aggregate root hydration use a snapshot as its outset, and then just apply the remaining events to bring it up-to-date.

You should probably go and find your memory leak first, though. I suggest you grab a memory profiler (if you have one, or go get a trial version of [dotmemory](https://www.jetbrains.com/dotmemory/ if you don't) and then you let the program run for a while. After a while you can snapshot the process' memory and get a report on which types of objects are currently allocated – this can probably provide a good hint as to where memory is leaking.

minhhungit commented 7 years ago

Can you please explain more detail about snapshot feature, how to use it, right now we don't have document for it

minhhungit commented 7 years ago

@mookid8000: Everytime execute a command, it always replay all events for aggregate

So, if I have an aggregate with 10000 events, I send a command (command 10001) to that aggregate then it will replay 10000 old events. Next, when I send command 10002 then it will replay all 10001 old events.

How can we load aggregate root with latest state without replay event, I mean can we load aggregate with latest state from memory or anything like that ?

I also can not use snapshot because my tool send commands for this aggregate just one time