Open alexander-held opened 7 months ago
The fsspec
https library doesn't seem to have a retry for http accesses that go to S3. For example, if we get the dreaded Slow Down!
error, we should back off and re-try the request. Nick S thinks this might be a modification in the fsspec
library, unfortunately. How persistent the error is remains to be seen.
Not only do we need that retry, we are finding that we hit a rate-limit using DASK hitting CEPH - and it starts sending "Slow Down!"... which means we either need to back things off or up the rate limits to CEPH (which are probably there for a reason).
This is requests per second. We do not know what the sustained rate out of CEPH is, however, so we don't know how fast we could run if we spread our requests out.
ceph rgw seem to allow rate limit ops or bandwidth per bucket or user. With the current ceph version(pacific) deployed on AF though, the ratelimit setting is not well documented and I couldn't even find what the limit is set to be. The next ceph version quincy have the ratelimit setting well documented and we plan to update our ceph version during our next maintenance which is in about 2 weeks. With dask scale out capabilities, I imagine some sort of way to handle with a rate limit setting from the server side is important. After all the servers will have a set capacities that may not be able to match that of clients.
Copied and summarized from Slack.
Starting point: "the file is in RAM (virtual memory)". If the file is being transferred over a network to get into RAM, then that latency will have to be added to these figures.
Stopping point: "we're getting our first TBasket bytes", in other words, the start of the physics data transfer.
This time is
DAOD_PHYSLITE.37233417._000052.pool.root.1
85C799FF-7460-6840-AFA9-2D7F7CFE1DCC.root
Using this instrumented code. It's a mostly (entirely?) single-threaded process, on a 3.2 MHz CPU.
For the ATLAS DAOD_PHYSLITE:
0.000000 uproot.open
0.000770 requesting single chunk
0.000793 got data for TFile header
0.000824 requesting single chunk
0.000837 got data for root directory header
0.001104 requesting single chunk
0.002300 got data for compressed TTree
0.002308 starting decompression of b'ZS'
0.014363 finished decompression of b'ZS'
0.014697 got data for TTree from uncompressed block
0.014780 interpreting members of TTree
0.014999 just before TBranches
0.707694 between TBranches and TLeaves (1343 branches!)
time spent in the TBranches themselves: 0.465910
0.711089 just after TLeaves
0.711097 finished interpreting TTree
0.715396 finished postprocessing TTree
0.728686 requesting 368 chunks
0.736777 got data for Model_TBasket
0.736807 starting decompression of b'ZS'
For the CMS NanoAOD:
0.000000 uproot.open
0.000477 requesting single chunk
0.000883 got data for TFile header
0.000919 requesting single chunk
0.000933 got data for root directory header
0.001113 requesting single chunk
0.002069 got data for compressed TTree
0.002076 starting decompression of b'ZS'
0.013841 finished decompression of b'ZS'
0.014167 got data for TTree from uncompressed block
0.014244 interpreting members of TTree
0.014457 just before TBranches
0.589086 between TBranches and TLeaves (1852 branches!)
time spent in the TBranches themselves: 0.457403
0.593271 just after TLeaves
0.593279 finished interpreting TTree
0.596977 finished postprocessing TTree
0.606837 requesting 239 chunks
0.614208 got data for Model_TBasket
0.614240 starting decompression of b'ZS'
As you can see, the biggest time-step is in interpreting the TBranch metadata. This instrumentation measures the time interpreting only the TBranches themselves, without the surrounding TObjArray and read_object_any
parts.
(In principle, we could replace just the TBranch reading itself with a compiled fast-path that would save up to 450 ms (both files). Maybe for the special case of no subbranches and exactly one unshared TLeaf. Drat: DAOD_PHYSLITE has nested TBranches. And while the TLeaves are not directly shared, they point to shared TLeaves, so it would be necessary to get into the complexity of read_object_any
to implement this fast-path, which makes it more difficult. Maybe a month of work could save ~400 ms per file.)
The other important piece is remote file latency, which is not included in the above test, but you can add 3 × your round-trip latency by hand for a good estimate. The three steps that say
requesting single chunk
are round-trip requests for (possibly remote) data and they are blocking. The first requests the TFile header (starts at byte 0), which contains information about the location of the root TDirectory. The second requests the root TDirectory, which contains information about the location of the TTree metadata. The third requests the TTree metadata, which has all of the TBranches to interpret. The TBranches contain information about where to find all of the TBaskets, so the next step is to request all of the TBaskets that are needed for the desired branches, all at once.
In the past, we've tried to avoid this triple round-trip latency by preemptively requesting a large chunk at the beginning and at the end of the file, hoping that the root TDirectory and TTree are contained within that, but it wasn't successful often enough to be worth the cost, and even knowing where the end of the file is requires data from the TFile header. (Uproot still has that machinery as begin_chunk_size
and hopefully also end_chunk_size
, though I don't see the latter in the documentation, as uproot.open options, which can also be passed to uproot.dask.)
I did a quick test comparing the 8 different XCache instances, the code below can be plugged into the materialize_branches.ipynb
notebook. This ran over the full wjets
sample. Nothing suspicious here, all instances perform similarly.
all_ips = ['192.170.240.141',
'192.170.240.142',
'192.170.240.143',
'192.170.240.144',
'192.170.240.145',
'192.170.240.146',
'192.170.240.147',
'192.170.240.148']
fig, ax = plt.subplots(nrows=8, constrained_layout=True, figsize=(4,16))
bins = np.linspace(0, 20, 41)
for i, ip in enumerate(all_ips):
print("--- ip", ip)
rates = []
for entry in out:
if ip not in entry["fname"]:
continue
rates.append(entry["num_entries"] / entry["runtime"] / 1_000)
ax[i].hist(rates, bins=bins)
ax[i].set_title(ip)
ax[i].set_xlabel("event rate [kHz]")
From the most recent 50 TB servicex test (see #68 ).
Finally, some things @Ilija Vukotic and I learned about the system (Ilija, please add more conclusions!)
Rates running with 1200 pods in AF and 500 pods in River.
This will gather lessons learned, we might want to switch to another format eventually but this at least will have to do as an intermediate solution.
AnalysisJetsAuxDyn.EnergyPerSampling
(AsObjects(AsVector(True, AsVector(False, dtype('>f4'))))
) can be expensive to interpret (done with Forth), choice of branches can matter for total CPU costPrimaryVerticesAuxDyn.neutralParticleLinks
seems to take an extremely long time to read