dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Internalize keys and op-codes in bulk messages #7386

Open crusaderky opened 1 year ago

crusaderky commented 1 year ago

Rationale

Scheduler <-> Worker comms are typically dwarfed by Worker <-> Worker ones.

However, Scheduler <-> Worker comms may be a lot more expensive (in terms of time and/or money) than Worker <-> Worker ones too, so I think it's worth spending some effort in trimming them down.

Real life use cases:

Proposed design

Formalize that all batched comm messages must be dict[str, Any]. Ahead of encoding them with msgpack, have a stateful (per-BatchedSend) mapping stage that finds and replaces all keys of the message, plus the value of the op, with incremental integers starting at -32 (because in msgpack ints in the [-32, 127] range cost only 1 byte). Every time a new string is encountered, it's stored and communicated in a special {-32: {-31: "str1", -30: "str2"} dict.

Example

These worker->scheduler messages:

- op: reschedule
  key: x
- op: long-running
  key: y
  compute_duration: 123.4
- op: reschedule
  key: z

Would be re-encoded into:

- -32:
    -31: op
    -30: reschedule
    -29: key
  -31: -30
  -29: x
- -32:
    -28: long-running
    -27: compute_duration
  -31: -28
  -29: y
  -27: 123.4
- -31: -30
  -29: z

Note how in the last message the -32 dict is omitted.

This whole extra encoding would be encapsulated in BatchedSend and invisible by both the worker and scheduler.

CC @ntabris @fjetter @gjoseph92 @hendrikmakait

fjetter commented 1 year ago

I'm not convinced this is actually super helpful and I'm concerned about the additional complexity. Consider the following example.

Let's take a look at this example compute-task message

from distributed.metrics import time

def get_a_key():
    return f"a-very-looong-key-{time()}"

counter = 1000
def get_address():
    global counter
    port = counter
    counter += 1
    return f"10.12.13.54:{port}"
msg = {
    "op": "compute-task",
    "key": get_a_key(),
    "priority": (1, 2, 5, 4),
    "duration": 23.28149,
    "stimulus_id": f"compute-task-{time()}",
    "who_has": {
        get_a_key(): [get_address() for ws in range(10)] for dts in range(10)
    },
    "nbytes": {get_a_key(): 778213.012839 for _ in range(10)},
    "run_spec": None,
    "function": None,
    "args": None,
    "kwargs": None,
    "resource_restrictions": [],
    "actor": False,
    "annotations": None,
}

This is the metadata for a task w/ 10 dependencies that are as well replicated 10 times. There is no runspec. Pickling this will already yield ~3KB

Running this through msgpack yields about 2.7KB. You can inspect this bytestream literally and see that the worker addresses blow up the message, not the keys. In a real message, I expect that the bulk of the message actually comes from the run_spec itself, not the keys we encode here.

b'\x8e\xa2op\xaccompute-task\xa3key\xd9$a-very-looong-key-1670606097.7190049\xa8priority\x94\x01\x02\x05\x04\xa8duration\xcb@7H\x0f\xba\x88&\xab\xabstimulus_id\xbecompute-task-1670606097.719009\xa7who_has\x8a\xd9"a-very-looong-key-1670606097.71901\x9a\xb010.12.13.54:1000\xb010.12.13.54:1001\xb010.12.13.54:1002\xb010.12.13.54:1003\xb010.12.13.54:1004\xb010.12.13.54:1005\xb010.12.13.54:1006\xb010.12.13.54:1007\xb010.12.13.54:1008\xb010.12.13.54:1009\xd9#a-very-looong-key-1670606097.719014\x9a\xb010.12.13.54:1010\xb010.12.13.54:1011\xb010.12.13.54:1012\xb010.12.13.54:1013\xb010.12.13.54:1014\xb010.12.13.54:1015\xb010.12.13.54:1016\xb010.12.13.54:1017\xb010.12.13.54:1018\xb010.12.13.54:1019\xd9#a-very-looong-key-1670606097.719017\x9a\xb010.12.13.54:1020\xb010.12.13.54:1021\xb010.12.13.54:1022\xb010.12.13.54:1023\xb010.12.13.54:1024\xb010.12.13.54:1025\xb010.12.13.54:1026\xb010.12.13.54:1027\xb010.12.13.54:1028\xb010.12.13.54:1029\xd9$a-very-looong-key-1670606097.7190201\x9a\xb010.12.13.54:1030\xb010.12.13.54:1031\xb010.12.13.54:1032\xb010.12.13.54:1033\xb010.12.13.54:1034\xb010.12.13.54:1035\xb010.12.13.54:1036\xb010.12.13.54:1037\xb010.12.13.54:1038\xb010.12.13.54:1039\xd9#a-very-looong-key-1670606097.719022\x9a\xb010.12.13.54:1040\xb010.12.13.54:1041\xb010.12.13.54:1042\xb010.12.13.54:1043\xb010.12.13.54:1044\xb010.12.13.54:1045\xb010.12.13.54:1046\xb010.12.13.54:1047\xb010.12.13.54:1048\xb010.12.13.54:1049\xd9#a-very-looong-key-1670606097.719025\x9a\xb010.12.13.54:1050\xb010.12.13.54:1051\xb010.12.13.54:1052\xb010.12.13.54:1053\xb010.12.13.54:1054\xb010.12.13.54:1055\xb010.12.13.54:1056\xb010.12.13.54:1057\xb010.12.13.54:1058\xb010.12.13.54:1059\xd9#a-very-looong-key-1670606097.719029\x9a\xb010.12.13.54:1060\xb010.12.13.54:1061\xb010.12.13.54:1062\xb010.12.13.54:1063\xb010.12.13.54:1064\xb010.12.13.54:1065\xb010.12.13.54:1066\xb010.12.13.54:1067\xb010.12.13.54:1068\xb010.12.13.54:1069\xd9#a-very-looong-key-1670606097.719032\x9a\xb010.12.13.54:1070\xb010.12.13.54:1071\xb010.12.13.54:1072\xb010.12.13.54:1073\xb010.12.13.54:1074\xb010.12.13.54:1075\xb010.12.13.54:1076\xb010.12.13.54:1077\xb010.12.13.54:1078\xb010.12.13.54:1079\xd9$a-very-looong-key-1670606097.7190342\x9a\xb010.12.13.54:1080\xb010.12.13.54:1081\xb010.12.13.54:1082\xb010.12.13.54:1083\xb010.12.13.54:1084\xb010.12.13.54:1085\xb010.12.13.54:1086\xb010.12.13.54:1087\xb010.12.13.54:1088\xb010.12.13.54:1089\xd9$a-very-looong-key-1670606097.7190368\x9a\xb010.12.13.54:1090\xb010.12.13.54:1091\xb010.12.13.54:1092\xb010.12.13.54:1093\xb010.12.13.54:1094\xb010.12.13.54:1095\xb010.12.13.54:1096\xb010.12.13.54:1097\xb010.12.13.54:1098\xb010.12.13.54:1099\xa6nbytes\x87\xd9#a-very-looong-key-1670606097.719039\xcbA\'\xbf\xca\x06\x92\xd5Z\xd9"a-very-looong-key-1670606097.71904\xcbA\'\xbf\xca\x06\x92\xd5Z\xd9#a-very-looong-key-1670606097.719041\xcbA\'\xbf\xca\x06\x92\xd5Z\xd9#a-very-looong-key-1670606097.719042\xcbA\'\xbf\xca\x06\x92\xd5Z\xd9#a-very-looong-key-1670606097.719043\xcbA\'\xbf\xca\x06\x92\xd5Z\xd9$a-very-looong-key-1670606097.7190442\xcbA\'\xbf\xca\x06\x92\xd5Z\xd9$a-very-looong-key-1670606097.7190452\xcbA\'\xbf\xca\x06\x92\xd5Z\xa8run_spec\xc0\xa8function\xc0\xa4args\xc0\xa6kwargs\xc0\xb5resource_restrictions\x90\xa5actor\xc2\xabannotations\xc0'

Realistically speaking, I expect the batched send to always include more messages than this one s.t. compression can actually be used which currently kicks in at 10KB. Compressing this message yields 777 bytes with default settings (just reducing the threshold)

I'd be surprised if these op codes would matter before or after compression.

TLDR

I don't think we should do encoding for our msg dict keys to safe on message size.

fjetter commented 1 year ago

I'm open to investigate lowering the compression threshold but I'd run a couple of micro benchmarks before doing so

crusaderky commented 1 year ago

The algorithm gives a 10% reduction in overall size: https://gist.github.com/crusaderky/ee00cc6416831b0154fef1499101a812

The POC took me ~30min to write; the full production PR would take barely more than that.

fjetter commented 1 year ago
  1. I do not doubt that key encoding will safe some bandwidth but I'm not convinced that the size of Scheduler<->Worker messages are currently a problem
  2. Even if it was a problem, I'd still be interested to investigate the compression approach first by lowering the compression threshold or increasing the batched send interval to batch up more messages s.t. we benefit from compression better. (There was an issue where @gjoseph92 benchmarked BatchedSend interval settings w/ inconclusive results. If somebody knows which issue I am talking about, I'd prefer a link since this relates.)

Even if we were to go down this route, I would prefer us to not special case anything for batched send but try to do this for our entire RPC layer.

10% reduction doesn't strike me as a big victory considering the absolute numbers we're talking about here.

crusaderky commented 1 year ago

Reducing the compression threshold doesn't seem to help:

maybe_compress min_size BatchedSend.byte_count
10_000 (main) 19168976
1_000 19150519
500 19157295
250 18952894
200 19115250
150 19104396

On such small blobs the compression rate ends up in the whereabouts of 85% ~ 105%, and maybe_compress will refuse to compress if it gets less than 90% compression.

crusaderky commented 1 year ago

No problem moving the logic from BatchedSend to Comm.

I managed to further slim down the output to -35% by adding key, keys, who_has and nbytes to the internalization (with a small-ish LRU of 1000 records per worker):

https://gist.github.com/crusaderky/c721d13c0bc537bc431c174739639c78

Of course this means implementing in Comm hardcoded insight on what WorkerState and SchedulerState will send through the line, which is a bit ugly.

As discussed offline, I'm going to park this ticket for the time being.