Closed dlg99 closed 6 years ago
For crc32.. are you considering to move to crc32c with java9 builtin support?
And using v2 wire protocol?
In 4.6 we introduced netty recycler it would help a lot for GC
@eolivelli internally I got code that moved java9's crc32c into bk's package to make it usable with java8. I have implementations of crc32c and adler32 digests, still need to do more perf testing, as time permits.
for proto3 I'd love to keep it compatible with current wire protocol but proto3 is a bit lower in priority list for me at the moment.
I'll take a look at recycler changes, it is possible we already have them in our repo, I need to double-check. Somewhat related to netty, we had to add "-Djdk.nio.maxCachedBufferSize=0" jvm flag to prevent out of direct memory errors, and netty 4 is a mojor user of direct memory in our case.
@eolivelli I am not sure if you are trying to push to java9 here. I think we can make bookkeeper function at java9 but we should not depend on java9.
regarding crc32, pulsar already has an implementation and bytebuf binding - it is integrated with netty bytebuf, we should just take this module. https://github.com/apache/incubator-pulsar/blob/master/pulsar-checksum/src/main/java/org/apache/pulsar/checksum/utils/Crc32cChecksum.java
@jvrao @merlimat and me had talked that before, and there is a task for that #590.
@sijie I am not pushing for java9. I think we will keep java8 compatibility. I am migrating my projects to run cleanly on java9 and java10. No hurry
@sijie @eolivelli @dlg99 I have been looking into object allocations in the AddEntry code path a bit. Looks like there's been lots of changes since 4.5 and quite some improvements, especially in 4.7.
But we still have a few copies which I believe we can eliminate. If for example write set size is 3, we currently copy the same buffer 6 times. We could eliminate all 6 copies and replace them with just a single copy. I have a few simple code changes that accomplish this here (https://github.com/nicmichael/bookkeeper/commit/6c18c3b402d228e1b913c53ea53ea547dc680b42). I though I'll get some feedback and your thoughts on those before I open a pull request.
@nicmichael very interesting work!
Can you explain better the reason why we need to use an unpooled buffer? Because it will be hard to keep the reference count?
@nicmichael - I see your idea behind this, that is very smart. you are basically trading one memory copy with unpooled structure, so that you can use UnsafeByteOperation
in protobuf. that is smart.
one suggestion for you around digest manager: I would suggest it takes a flag on whether it is a v2 or v3 protocol. if it is v2 protocol, it is better to use pooled buffer; if it is v3 protocol, use Unpooled. in this way, you can keep the behavior unchanged for v2 protocol and reduce the memory copy for v3 protocol.
@eolivelli, I think Sijie's comment covers most of it already: Currently we're using a pooled ByteBuf, but unfortunately it doesn't have an accessible backing array. So when later on we're converting it to a protobuf, we're forced to allocate a new array and copy its data... and since that part's done in PCBC, we're doing those copies for ever bookie in the write set. My idea was to move the object allocation (and copy) outside of PCBC, and do it once upfront by allocating a new unpooled ByteBuf. This ByteBuf has an accessible array, so later on we don't need any more copies. So we're doing one copy upfront to save 6 copies later on.
@sijie I'm afraid I'm not familiar enough with the code and differences in protocol versions. How does v2 differ from v3, and why would we prefer a pooled buffer with extra arraycopies for v2?
@nicmichael v2 doesn't use protobuf. so it is basically combining two bytebufs into one bytebuflist, one is header+digest, the other one is the payload. v2 has very limited supports on request types, only works for some certain requests. in v2 cases, since there is no extra copy, so it is preferred to have a pooled bytebuffer.
what you actually can do here:
@sijie Thanks, got it!
@nicmichael I think you can send a PR and then we will continue the discussion on it. An high level idea of the change is clear now. Thank you
@sijie @eolivelli Thanks for your feedback. I just opened https://github.com/apache/bookkeeper/pull/1361.
While looking at it a bit more, I actually found that we can use pooled buffers. How come I've missed that before? All we care is that they're heap buffers (backed by array) rather than direct buffers. My updated code includes this change. By using PooledByteBufAllocator(false), we can eliminate even the original buffer allocation, thus eliminiating all of the 6 allocations (for a write set size of 3). All that remains is a single copy in DigestManager to copy the payload into the (pooled) heap buffer. We're trading this copy against the savings we achieve in PCBC.
I am not sure we can use pooled buffer, because when you use UnsafeByteOperations
to wrap the underlying buffer, before this unsafe wrapped buffer being used by protobuf to convert it to netty channel, we might already release pooled buffer, which the underlying byte array or nio buffer might be used by other requests, which can potentially cause corruption.
so I think using unpooled buffers with UnsafeByteOperations
are much safer than using pooled buffers with UnsafeByteOperations
.
I am not sure we can use pooled buffer, because when you use
UnsafeByteOperations
to wrap the underlying buffer, before this unsafe wrapped buffer being used by protobuf to convert it to netty channel, we might already release pooled buffer, which the underlying byte array or nio buffer might be used by other requests, which can potentially cause corruption.so I think using unpooled buffers with
UnsafeByteOperations
are much safer than using pooled buffers withUnsafeByteOperations
.
I have created #4293 to address this concern and to fix the issue that this PR #791 causes.
We (Salesforce) upgraded NIC on our perf cluster to 20gbps (as 10gbps had become a bottleneck) and could not get throughput to go above 15gbps. Proxy (our bk client/java app) used about 18 cores out of 36 (1800% CPU utilization in 'top').
Upon the investigation I've noticed that GC runs roughly 2 times per second (111 GCs in 60seconds) with GC pause of about 20ms.
From profiler (skipping proxy internal stacks) one can see that roughly 7G/sec are coming from protobuf + digest manager (see below), plus some extra (excluded here) from internal code.
I have some changes in work that reduced allocations to ~1.1GB/sec, need to finish some extra perf benchmarks. Changes include upgrade to protobuf 3.4 w/proto2 with use of unsafe wrappers. With these changes throughput went up close to 20gbps. CPU utilization is lower but still high (~1500%) with a lot of it coming from crc32, subject to another investigation/tuning/.. Majority (90%) of 1.1GB/sec are still coming from protobuf and it looks like the only other thing I can try there is to move to proto3 code gen. Quick prototype shown that simple move to proto3 (and its reduced immutability guarantees) results in flapping tests so the change is not that safe and requires more work, thus excluded form scope at this moment.
Top TLAB allocations, expanded to minimal relevant stacks (in 60 sec interval):
Stack Trace TLABs Total TLAB Size(bytes) Pressure(%) bk-shade.com.google.proto_2.6.1.CodedInputStream.readRawBytesSlowPath(int) 347,980 192,147,664,488 35.738 bk-shade.com.google.proto_2.6.1.CodedInputStream.readBytes() 347,980 192,147,664,488 35.738 org.apache.bookkeeper.proto.BookkeeperProtocol$ReadResponse.(CodedInputStream, ExtensionRegistryLite) 347,980 192,147,664,488 35.738
java.util.zip.CRC32.update(ByteBuffer) 36,317 97,655,274,184 18.163
org.apache.bookkeeper.client.CRC32DigestManager.update(ByteBuf) 36,317 97,655,274,184 18.163
org.apache.bookkeeper.client.DigestManager.verifyDigest(long, ByteBuf, boolean) 36,317 97,655,274,184 18.163
bk-shade.com.google.proto_2.6.1.ByteString.copyFrom(byte[], int, int) 15,601 50,946,922,616 9.476
bk-shade.com.google.proto_2.6.1.ByteString.copyFrom(byte[]) 15,595 50,946,907,320 9.476
org.apache.bookkeeper.proto.PerChannelBookieClient.addEntry(long, byte[], long, ByteBuf, BookkeeperInternalCallbacks$WriteCallback, Object, int) 15,595 50,946,907,320 9.476
bk-shade.com.google.proto_2.6.1.TextFormat.unescapeBytes(CharSequence) 6 15,296 0
org.apache.bookkeeper.proto.PerChannelBookieClient.addEntry(long, byte[], long, ByteBuf, BookkeeperInternalCallbacks$WriteCallback, Object, int) 15,528 50,528,061,008 9.398
org.apache.bookkeeper.proto.BookieClient$2.operationComplete(int, PerChannelBookieClient) 15,528 50,528,061,008 9.398
org.apache.bookkeeper.proto.BookieClient$2.operationComplete(int, Object) 15,528 50,528,061,008 9.398 org.apache.bookkeeper.proto.PerChannelBookieClient.completeOperation(BookkeeperInternalCallbacks$GenericCallback, int) 15,528 50,528,061,008 9.398 org.apache.bookkeeper.proto.PerChannelBookieClient.connectIfNeededAndDoOp(BookkeeperInternalCallbacks$GenericCallback) 15,528 50,528,061,008 9.398 org.apache.bookkeeper.proto.DefaultPerChannelBookieClientPool.obtain(BookkeeperInternalCallbacks$GenericCallback, long) 15,528 50,528,061,008 9.398
org.apache.bookkeeper.proto.BookieClient.addEntry(BookieSocketAddress, long, byte[], long, ByteBuf, BookkeeperInternalCallbacks$WriteCallback, Object, int) 15,528 50,528,061,008 9.398
org.apache.bookkeeper.client.PendingAddOp.sendWriteRequest(int) 15,528 50,528,061,008 9.398
bk-shade.com.google.proto_2.6.1.CodedInputStream.(InputStream) 24,594 13,702,219,976 2.548
bk-shade.com.google.proto_2.6.1.CodedInputStream.newInstance(InputStream) 24,594 13,702,219,976 2.548
bk-shade.com.google.proto_2.6.1.AbstractParser.parsePartialFrom(InputStream, ExtensionRegistryLite) 24,594 13,702,219,976 2.548
bk-shade.com.google.proto_2.6.1.AbstractParser.parseFrom(InputStream, ExtensionRegistryLite) 24,594 13,702,219,976 2.548
bk-shade.com.google.proto_2.6.1.AbstractParser.parseFrom(InputStream, ExtensionRegistryLite) 24,594 13,702,219,976 2.548
org.apache.bookkeeper.proto.BookkeeperProtocol$Response.parseFrom(InputStream, ExtensionRegistryLite) 24,594 13,702,219,976 2.548 org.apache.bookkeeper.proto.BookieProtoEncoding$ResponseEnDecoderV3.decode(ByteBuf) 24,594 13,702,219,976 2.548
io.netty.buffer.AbstractByteBuf.slice(int, int) 1,286 3,807,799,016 0.708
io.netty.buffer.AbstractUnpooledSlicedByteBuf.slice(int, int) 1,263 3,746,733,704 0.697
org.apache.bookkeeper.client.DigestManager.verifyDigest(long, ByteBuf, boolean) 1,263 3,746,733,704 0.697
Top allocations outside of TLAB (in 60 sec interval):
Stack Trace Objects Total Size(bytes) Pressure(%) bk-shade.com.google.proto_2.6.1.CodedInputStream.readRawBytesSlowPath(int) 161,115 10,544,910,872 6.288 bk-shade.com.google.proto_2.6.1.CodedInputStream.readBytes() 161,115 10,544,910,872 6.288 org.apache.bookkeeper.proto.BookkeeperProtocol$ReadResponse.(CodedInputStream, ExtensionRegistryLite) 161,115 10,544,910,872 6.288
java.util.zip.CRC32.update(ByteBuffer) 114,243 7,599,834,224 4.532
org.apache.bookkeeper.client.CRC32DigestManager.update(ByteBuf) 114,243 7,599,834,224 4.532
bk-shade.com.google.proto_2.6.1.ByteString.copyFrom(byte[], int, int) 2,440 929,957,352 0.555
bk-shade.com.google.proto_2.6.1.ByteString.copyFrom(byte[]) 2,440 929,957,352 0.555
org.apache.bookkeeper.proto.PerChannelBookieClient.addEntry(long, byte[], long, ByteBuf, BookkeeperInternalCallbacks$WriteCallback, Object, int) 2,440 929,957,352 0.555
org.apache.bookkeeper.proto.PerChannelBookieClient.addEntry(long, byte[], long, ByteBuf, BookkeeperInternalCallbacks$WriteCallback, Object, int) 2,045 735,510,048 0.439
org.apache.bookkeeper.proto.BookieClient$2.operationComplete(int, PerChannelBookieClient) 2,045 735,510,048 0.439
org.apache.bookkeeper.proto.BookieClient$2.operationComplete(int, Object) 2,045 735,510,048 0.439
org.apache.bookkeeper.proto.PerChannelBookieClient.completeOperation(BookkeeperInternalCallbacks$GenericCallback, int) 2,045 735,510,048 0.439
org.apache.bookkeeper.proto.PerChannelBookieClient.connectIfNeededAndDoOp(BookkeeperInternalCallbacks$GenericCallback) 2,045 735,510,048 0.439
org.apache.bookkeeper.proto.DefaultPerChannelBookieClientPool.obtain(BookkeeperInternalCallbacks$GenericCallback, long) 2,045 735,510,048 0.439
org.apache.bookkeeper.proto.BookieClient.addEntry(BookieSocketAddress, long, byte[], long, ByteBuf, BookkeeperInternalCallbacks$WriteCallback, Object, int) 2,045 735,510,048 0.439
org.apache.bookkeeper.client.PendingAddOp.sendWriteRequest(int) 2,045 735,510,048 0.439
org.apache.bookkeeper.client.PendingAddOp.initiate(ByteBuf) 2,045 735,510,048 0.439