alantech / alan

Autoscalable Programming Language
https://alan-lang.org
MIT License
306 stars 10 forks source link

Array Parallelization Benchmark #387

Closed depombo closed 9 months ago

depombo commented 3 years ago

There are many ways to fan out parallel array operations across cores. Testing how to run https://github.com/ledwards/advent-2020/blob/main/day01b.ln as fast as possible via https://github.com/alantech/alan/pull/383 siloed me into writing parallel configurations that were over-optimized for that specific workload and which don't perform well when the size and shape of the data is different. Rayon's work stealing model performs best when parallelizing heterogenous workloads of different sizes and shapes by keeping serialization costs to a minimum. We left that model because it kept the AVM a lot leaner for now and that was the right decision in order to build incrementally. However to introduce a static parallel configuration that does not provide a work stealing configuration without properly understanding what are the most common workloads in order to maximize parallelization wins and minimize serialization cost could really hurt the performance of certain programs that might actually be quite common.

For example the three following code snippets will perform better depending on the nature of the Alan program and it is not clear which one to go for without benchmarking different workloads or using a work stealing mechanism. The first chunks up the work to be done in the array across greenthreads, but awaits on the work before proceeding to the next chunk:

  io!(each => fn(args, hand_mem) {
    Box::pin(async move {
      let hand_mem_ref = Arc::new(hand_mem);
      let fractal = hand_mem_ref.read_fractal(args[0]);
      let subhandler = HandlerFragment::new(args[1], 0);
      let n = num_cpus::get();
      let l = fractal.len();
      let s = l / n;
      let mut runners = Vec::with_capacity(s);
      for _ in 0..s {
        runners.push(FuturesUnordered::<HMFuture>::new());
      }
      for i in 0..l {
        let mut hm = HandlerMemory::fork(Arc::clone(&hand_mem_ref));
        hm.register_out(args[0], i, CLOSURE_ARG_MEM_START + 1);
        hm.write_fixed(CLOSURE_ARG_MEM_START + 2, i as i64);
        let j = i / s;
        runners[j].push(Box::pin(subhandler.clone().run(hm)));
      }
      let mut hms: Vec<HandlerMemory> = Vec::with_capacity(l);
      for runner in runners {
        let mut chuncked_hms = task::spawn(async move { runner.collect().await })
          .await
          .unwrap();
        hms.append(&mut chuncked_hms);
      };
      for hm in &mut hms {
        hm.drop_parent();
      }
      Arc::try_unwrap(hand_mem_ref).expect("Dangling reference to parent HM in parallel opcode")
    })
  });

The second also chunks the array computation necessary to greenthreads, but simultaneously awaits for them:

  io!(each => fn(args, hand_mem) {
    Box::pin(async move {
      let hand_mem_ref = Arc::new(hand_mem);
      let fractal = hand_mem_ref.read_fractal(args[0]);
      let subhandler = HandlerFragment::new(args[1], 0);
      let n = num_cpus::get();
      let l = fractal.len();
      let s = l / n;
      let mut runners = Vec::with_capacity(n);
      for _ in 0..s {
        runners.push(FuturesUnordered::<HMFuture>::new());
      }
      for i in 0..l {
        let mut hm = HandlerMemory::fork(Arc::clone(&hand_mem_ref));
        hm.register_out(args[0], i, CLOSURE_ARG_MEM_START + 1);
        hm.write_fixed(CLOSURE_ARG_MEM_START + 2, i as i64);
        let j = i / s;
        runners[j].push(Box::pin(subhandler.clone().run(hm)));
      }
      let mut chuncked_futures: Vec<tokio::task::JoinHandle<Vec<HandlerMemory>>> = Vec::with_capacity(n);
      for runner in runners {
        chuncked_futures.push(task::spawn(async move { runner.collect().await }));
      };
      let chuncked_hms = join_all(chuncked_futures).await;
      for chuncked_hm in chuncked_hms {
        for hm in &mut chuncked_hm.unwrap() {
          hm.drop_parent();
        }
      }
      Arc::try_unwrap(hand_mem_ref).expect("Dangling reference to parent HM in parallel opcode")
    })
  });

The third just awaits for the computation to finish across each chunk of the array without delegating to a new greenthread:

  io!(each => fn(args, hand_mem) {
    Box::pin(async move {
      let hand_mem_ref = Arc::new(hand_mem);
      let fractal = hand_mem_ref.read_fractal(args[0]);
      let subhandler = HandlerFragment::new(args[1], 0);
      let n = num_cpus::get();
      let l = fractal.len();
      let s = l / n;
      let mut runners = Vec::with_capacity(s);
      for _ in 0..s {
        runners.push(FuturesUnordered::<HMFuture>::new());
      }
      for i in 0..l {
        let mut hm = HandlerMemory::fork(Arc::clone(&hand_mem_ref));
        hm.register_out(args[0], i, CLOSURE_ARG_MEM_START + 1);
        hm.write_fixed(CLOSURE_ARG_MEM_START + 2, i as i64);
        let j = i / s;
        runners[j].push(Box::pin(subhandler.clone().run(hm)));
      }
      let mut hms: Vec<HandlerMemory> = Vec::with_capacity(l);
      for runner in runners {
        let mut chuncked_hms = runner.collect().await;
        hms.append(&mut chuncked_hms);
      };
      for hm in &mut hms {
        hm.drop_parent();
      }
      Arc::try_unwrap(hand_mem_ref).expect("Dangling reference to parent HM in parallel opcode")
    })
  });
dfellis commented 3 years ago

I think one of the earlier ideas behind the graph and not-quite-Turing-complete nature of the language is the answer here: we should be computing a rough estimate of the time an opcode will take under various parallelization conditions and choosing the one that is the best versus the overall state of the VM across threads (eg, if there's lots of concurrent activity happening already, we know that scheduling overhead will start to dominate gains from parallelization, while a mostly "empty" VM can fan out as far as makes sense, where the execution prediction time by parallelization informs "how parallel" to go).

As for now, before we're ready to start tackling that whole part of the language? I would bias to more parallelism over less, tbh.

depombo commented 3 years ago

I think one of the earlier ideas behind the graph and not-quite-Turing-complete nature of the language is the answer here: we should be computing a rough estimate of the time an opcode will take under various parallelization conditions and choosing the one that is the best versus the overall state of the VM across threads (eg, if there's lots of concurrent activity happening already, we know that scheduling overhead will start to dominate gains from parallelization, while a mostly "empty" VM can fan out as far as makes sense, where the execution prediction time by parallelization informs "how parallel" to go).

As for now, before we're ready to start tackling that whole part of the language? I would bias to more parallelism over less, tbh.

That is a very good point and I am glad you brought it up. We should be able to eventually beat a work stealing mechanism by estimating the resources required across the board and this would only be possible in Alan. The bias for parallelism is tricky though because I think it is a spectrum. Things are already running it parallel since https://github.com/alantech/alan/pull/384 and https://github.com/alantech/alan/pull/385 were landed so going too far down the parallelization spectrum could hurt performance badly as serialization costs rise without understanding the most common workloads.

dfellis commented 3 years ago

@depombo what about a simple heuristic to choose sequential versions when the number of Tokio tasks crosses some multiple of CPU cores?

Not saying we do that now, but it could be a good first step.

depombo commented 3 years ago

@depombo what about a simple heuristic to choose sequential versions when the number of Tokio tasks crosses some multiple of CPU cores?

Not saying we do that now, but it could be a good first step.

That's interesting idea worth looking into. It might be a flaky heuristic because tokio tasks could be cpu or io bound though. I also don't know that tokio exposes the number of outstanding tokio tasks, but it should be possible somehow

dfellis commented 3 years ago

@depombo what about a simple heuristic to choose sequential versions when the number of Tokio tasks crosses some multiple of CPU cores? Not saying we do that now, but it could be a good first step.

That's interesting idea worth looking into. It might be a flaky heuristic because tokio tasks could be cpu or io bound though. I also don't know that tokio exposes the number of outstanding tokio tasks, but it should be possible somehow

We could make it our own tracking metadata and simply skip actual io opcodes? The concern is it getting out-of-sync and eventually the AVM starts doing something weird because it think there's too many (or too few) concurrent tasks.