janestreet / memtrace

Streaming client for OCaml's Memprof
MIT License
65 stars 13 forks source link

How to merge traces? #14

Open ljw1004 opened 2 years ago

ljw1004 commented 2 years ago

Since multithreaded-ocaml isn't mature, we're using forked processes to do work in parallel. I can make individual forked process write its own memtrace .ctf file -- but, how to merge them?

It'd be great if there could be a test/merge.ml, to go alongside test/copy.ml.

Here's my attempt. It seems to work. But I had to obliterate the timestamps...

let merge_memtraces srcs dst =
  let open Memtrace.Trace in
  (* Memtrace requires us to claim a unique pid for each trace. We have to lie; we'll pick this one. *)
  let pid = Int64.of_int (Unix.getpid ()) in
  let fd = fd_of_path dst in
  (* We'll pick out the single earliest memtrace, and all other timestamps will be relative to this. *)
  let infos =
    List.map
      (fun filename ->
        let reader = Reader.open_ ~filename in
        let info = Reader.info reader in
        Reader.close reader;
        info)
      srcs
    |> List.sort (fun info1 info2 ->
           Float.compare
             (Timestamp.to_float info1.Info.start_time)
             (Timestamp.to_float info2.Info.start_time))
  in
  begin
    match infos with
    | [] -> ()
    | info :: _ ->
      let info = { info with Info.pid } in
      let writer = Writer.create fd ~getpid:(fun () -> info.Info.pid) info in
      List.iter
        (fun src ->
          let obj = Obj_id.Tbl.create 100 in
          let reader = Reader.open_ ~filename:src in
          Reader.iter ~parse_backtraces:true reader (fun _time ev ->
              (* TODO: I can't figure out how to put the correct timestamp here. *)
              let time = info.Info.start_time in
              match ev with
              | Event.Alloc
                  {
                    obj_id;
                    length;
                    nsamples;
                    source;
                    backtrace_buffer;
                    backtrace_length;
                    common_prefix = _;
                  } ->
                let btrev =
                  Array.init backtrace_length (fun i ->
                      backtrace_buffer.(backtrace_length - 1 - i))
                in
                let decode_callstack_entry loc =
                  Reader.lookup_location_code reader loc
                in
                let id =
                  Writer.put_alloc
                    writer
                    time
                    ~length
                    ~nsamples
                    ~source
                    ~callstack:btrev
                    ~decode_callstack_entry
                in
                Obj_id.Tbl.add obj obj_id id
              | Event.Promote id ->
                let id = Obj_id.Tbl.find obj id in
                Writer.put_promote writer time id
              | Event.Collect id ->
                let id = Obj_id.Tbl.find obj id in
                Writer.put_collect writer time id);
          Reader.close reader)
        srcs;
      Writer.flush writer;
      ()
  end;
  Unix.close fd;
  ()

When I tried to respect the original timestamps, (and sort them in order since that's required), then it still produced a trace with the correct number of samples, but its content was incorrect - i.e. it ascribed the wrong backtraces to allocations. memtrace_dump reveals that it got the wrong number in a lot of the samples, in the "33" position below. (Unfortunately I couldn't find an explanation of what memtrace_dump produces, and don't know what this number means).

0000029720 0000000001 alloc 1 len=5   33: $12345 module@file:line:cols... 

Anyway, here's my my incorrect attempt at preserving timestamps:

let merge_memtraces srcs dst =
  let open Memtrace.Trace in
  (* Memtrace requires us to claim a unique pid for each trace. We have to lie; we'll pick this one. *)
  let pid = Int64.of_int (Unix.getpid ()) in
  let fd = fd_of_path dst in
  (* We'll pick out the single earliest memtrace, and all other timestamps will be relative to this. *)
  let srcs =
    List.map
      (fun filename ->
        let reader = Reader.open_ ~filename in
        let info = Reader.info reader in
        (reader, info, Obj_id.Tbl.create 100))
      srcs
    |> List.sort (fun (_, info1, _) (_, info2, _) ->
           Float.compare
             (Timestamp.to_float info1.Info.start_time)
             (Timestamp.to_float info2.Info.start_time))
  in
  (* Turn it into a flat list of events, sorted by timestamp, since that's what's needed for a ctf file *)
  let events = ref [] in
  List.iter
    (fun (reader, info, objs) ->
      Reader.iter ~parse_backtraces:true reader (fun time_delta ev ->
          let time = Timedelta.offset info.Info.start_time time_delta in
          let decode_callstack_entry = Reader.lookup_location_code reader in
          let event = (time, decode_callstack_entry, objs, ev) in
          events := event :: !events))
    srcs;
  let events =
    List.sort
      (fun (time1, _, _, _) (time2, _, _, _) ->
        Int64.compare (Timestamp.to_int64 time1) (Timestamp.to_int64 time2))
      !events
  in
  (* We'll copy the info of the first source *)
  let info =
    match srcs with
    | [] -> None
    | (_, info, _) :: _ -> Some { info with Info.pid }
  in
  (* Now write each of the merged events, in order, into the file.
     [relative_id] is the id within an individual src;
     [absolute_id] is the id within dst;
     each src has its own [objs], a map from its relative_id to absolute_id. *)
  Option.iter info ~f:(fun info ->
      let writer = Writer.create fd ~getpid:(fun () -> info.Info.pid) info in
      List.iter
        (fun (time, decode_callstack_entry, objs, ev) ->
          match ev with
          | Event.Alloc
              {
                obj_id = relative_id;
                length;
                nsamples;
                source;
                backtrace_buffer;
                backtrace_length;
                common_prefix = _;
              } ->
            let btrev =
              Array.init backtrace_length (fun i ->
                  backtrace_buffer.(backtrace_length - 1 - i))
            in
            let absolute_id =
              Writer.put_alloc
                writer
                time
                ~length
                ~nsamples
                ~source
                ~callstack:btrev
                ~decode_callstack_entry
            in
            Obj_id.Tbl.add objs relative_id absolute_id
          | Event.Promote relative_id ->
            let absolute_id = Obj_id.Tbl.find objs relative_id in
            Writer.put_promote writer time absolute_id
          | Event.Collect relative_id ->
            let absolute_id = Obj_id.Tbl.find objs relative_id in
            Writer.put_collect writer time absolute_id)
        events;
      Writer.flush writer);
  List.iter (fun (reader, _, _) -> Reader.close reader) srcs;
  Unix.close fd;
  ()
stedolan commented 2 years ago

Apologies about the very slow response!

I don't see anything wrong with that code at first glance, do you have some trace files handy that fail to merge? I wouldn't worry about the "33: " being different: that shows the length of the common prefix between this backtrace and the last, which you'd expect should change when you merge files together. (That field isn't very useful in the dump output and should probably be removed. It was more for debugging memtrace itself)