bazelbuild / bazel-buildfarm

Bazel remote caching and execution service
https://bazel.build
Apache License 2.0
646 stars 200 forks source link

buildfarm out of direct memory error #698

Open babychenge opened 3 years ago

babychenge commented 3 years ago

I have 1 server and 2 workers connected to it. After some time, may be day or some hours require to restart all workers. Because all of workers are out of memory. Error as this:

Mar 02, 2021 11:01:27 AM build.buildfarm.cas.cfc.CASFileCache lambda$putOrReferenceGuarded$26
INFO: expired key 97b3ee41d2a70431374ca15de50314af3240fde1a96d24869019640d0230b800_2545
Mar 02, 2021 11:01:27 AM build.buildfarm.cas.cfc.CASFileCache lambda$putOrReferenceGuarded$26
INFO: expired key 0a15c03adb65715d9551398132f18069ba8b02a28f2cdb8213d3c2b775cd2784_232832107
Mar 02, 2021 11:01:28 AM build.buildfarm.cas.cfc.CASFileCache lambda$putOrReferenceGuarded$26
INFO: expired key ee54d16e23b53951c6dcff161b22c19632551b37897ded3d371f6184c0869d93_549273828
Exception in thread "grpc-default-executor-409" io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 7482638615, max: 7488405504)
    at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:725)
    at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:680)
    at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758)
    at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734)
    at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:227)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
    at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:342)
    at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    at io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:123)
    at io.grpc.netty.NettyWritableBufferAllocator.allocate(NettyWritableBufferAllocator.java:51)
    at io.grpc.internal.MessageFramer.writeKnownLengthUncompressed(MessageFramer.java:227)
    at io.grpc.internal.MessageFramer.writeUncompressed(MessageFramer.java:168)
    at io.grpc.internal.MessageFramer.writePayload(MessageFramer.java:141)
    at io.grpc.internal.AbstractStream.writeMessage(AbstractStream.java:53)
    at io.grpc.internal.ForwardingClientStream.writeMessage(ForwardingClientStream.java:37)
    at io.grpc.internal.ClientCallImpl.sendMessageInternal(ClientCallImpl.java:484)
    at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:468)
    at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:37)
    at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:37)
    at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:37)
    at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(ClientCalls.java:364)
    at build.buildfarm.common.grpc.StubWriteOutputStream.flushSome(StubWriteOutputStream.java:157)
    at build.buildfarm.common.grpc.StubWriteOutputStream.flush(StubWriteOutputStream.java:170)
    at build.buildfarm.common.grpc.StubWriteOutputStream.write(StubWriteOutputStream.java:256)
    at com.google.common.io.ByteStreams.copy(ByteStreams.java:113)
    at build.buildfarm.cas.cfc.CASFileCache.expireEntry(CASFileCache.java:1912)
    at build.buildfarm.cas.cfc.CASFileCache.putOrReferenceGuarded(CASFileCache.java:2618)
    at build.buildfarm.cas.cfc.CASFileCache.putOrReference(CASFileCache.java:2571)
    at build.buildfarm.cas.cfc.CASFileCache.putImpl(CASFileCache.java:2464)
    at build.buildfarm.cas.cfc.CASFileCache.newOutput(CASFileCache.java:1085)
    at build.buildfarm.cas.cfc.CASFileCache$5.getOutput(CASFileCache.java:1050)
    at build.buildfarm.cas.cfc.CASFileCache$ReadThroughInputStream.<init>(CASFileCache.java:744)
    at build.buildfarm.cas.cfc.CASFileCache.newReadThroughInput(CASFileCache.java:541)
    at build.buildfarm.cas.cfc.CASFileCache.newInput(CASFileCache.java:536)
    at build.buildfarm.cas.cfc.CASFileCache.get(CASFileCache.java:566)
    at build.buildfarm.worker.shard.ShardWorkerInstance.getBlob(ShardWorkerInstance.java:126)
    at build.buildfarm.server.ByteStreamService.readLimitedBlob(ByteStreamService.java:239)
    at build.buildfarm.server.ByteStreamService.readBlob(ByteStreamService.java:267)
    at build.buildfarm.server.ByteStreamService.maybeInstanceRead(ByteStreamService.java:308)
    at build.buildfarm.server.ByteStreamService.read(ByteStreamService.java:339)
    at com.google.bytestream.ByteStreamGrpc$MethodHandlers.invoke(ByteStreamGrpc.java:333)
    at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:180)
    at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
    at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 7482638615, max: 7488405504)
        at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:725)
        at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:680)
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758)
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734)
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:227)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:342)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
        at io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:123)
        at io.grpc.netty.NettyWritableBufferAllocator.allocate(NettyWritableBufferAllocator.java:51)
        at io.grpc.internal.MessageFramer.writeKnownLengthUncompressed(MessageFramer.java:227)
        at io.grpc.internal.MessageFramer.writeUncompressed(MessageFramer.java:168)
        at io.grpc.internal.MessageFramer.writePayload(MessageFramer.java:141)
        at io.grpc.internal.AbstractStream.writeMessage(AbstractStream.java:53)
        at io.grpc.internal.ForwardingClientStream.writeMessage(ForwardingClientStream.java:37)
        at io.grpc.internal.ClientCallImpl.sendMessageInternal(ClientCallImpl.java:484)
        at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:468)
        at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:37)
        at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:37)
        at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:37)
        at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(ClientCalls.java:364)
        at build.buildfarm.common.grpc.StubWriteOutputStream.flushSome(StubWriteOutputStream.java:157)
        at build.buildfarm.common.grpc.StubWriteOutputStream.close(StubWriteOutputStream.java:132)
        at build.buildfarm.cas.cfc.CASFileCache.expireEntry(CASFileCache.java:1910)
        ... 23 more

My worker config as this:

# the listening port of the shard-worker grpc server
port: 8981

# the host:port of this grpc server, required to be accessible
# by all shard cluster servers
public_name: "instance1"

# This determines whether the worker is a shard of the cas.
# When not a shard, it will upload into the Sharded CAS
omit_from_cas: false

# the digest function for this worker, required
# to match out of band between the client and server
# since resource names must be determined on the client
# for a valid upload
digest_function: SHA256

# all content for the operations will be stored under this path
root: "/tmp/worker"

# total size in bytes of inline content for action results
# output files, stdout, and stderr content, in that order
# will be inlined if their cumulative size does not exceed this limit.
inline_content_limit: 1048567 # 1024 * 1024

# the period between poll operations at any stage
operation_poll_period: {
  seconds: 1
  nanos: 0
}

dequeue_match_settings: {

  # key/value set of defining capabilities of this worker
  # all execute requests must match perfectly with workers which
  # provide capabilities
  # so an action with a required platform: { arch: "x86_64" } must
  # match with a worker with at least { arch: "x86_64" } here
  platform: {
    # commented out here for illustrative purposes, a default empty
    # 'platform' is a sufficient starting point without specifying
    # any platform requirements on the actions' side
    ###
    # property: {
    #   name: "key_name"
    #   value: "value_string"
    # }
  }

}

# the worker CAS configuration
cas: {
  # present a filesystem CAS, required to host an exec root
  # the first filesystem CAS in the configuration will be used
  # as the storage for an exec filesystem for an operation
  filesystem: {
    # the local cache location relative to the 'root', or absolute
    path: "cache"

    # limit for contents of files retained
    # from CAS in the cache
    max_size_bytes: 2147483648 # 2 * 1024 * 1024 * 1024

    # limit for content size of files retained
    # from CAS in the cache
    max_entry_size_bytes: 2147483648 # 2 * 1024 * 1024 * 1024

    # whether the file directories bidirectional mapping should be stored in memory (HashMap) or
    # in sqlite
    file_directories_index_in_memory: false
  }

  # whether the transient data on the worker should be loaded into the CAS on worker startup.
  # It can be faster for worker startup to skip loading.
  skip_load: false
}

# another cas entry specification here will provide a fallback
# for the filesystem cas. Any number of consecutive filesystem
# fallbacks may be used, terminated with zero or one of grpc or
# memory types.
#
cas: {
  grpc: {
      target: "********:9092",
  }
}

# the number of concurrently available slots in the execute phase
execute_stage_width: 1

# the number of concurrently available slots in the input fetch phase
input_fetch_stage_width: 1

# Use an input directory creation strategy which creates a single
# directory tree at the highest level of the input tree containing
# no output paths of any kind, and symlinks that directory into an
# action's execroot, potentially saving large amounts of time
# spent manufacturing the same read-only input hierarchy over
# multiple actions' executions.
link_input_directories: true

# an imposed action-key-invariant timeout used in the unspecified timeout case
default_action_timeout: {
  seconds: 600
  nanos: 0
}

# a limit on the action timeout specified in the action, above which
# the operation will report a failed result immediately
maximum_action_timeout: {
  seconds: 3600
  nanos: 0
}

# prefix command executions with this path
#execution_policies: {
#  name: "test"
#  wrapper: {
#    path: "/path/to/execution/wrapper"
#  }
#}

# A backplane specification hosted with redis cluster
# Fields omitted are expected defaults, and are unused or undesirable on the workers
redis_shard_backplane_config: {
  # The URI of the redis cluster endpoint. This must
  # be a single URI, regardless of the layout of the cluster
  redis_uri: "redis://********:6379"

  # The size of the redis connection pool
  jedis_pool_max_total: 4000

  # The redis key used to store a hash of registered Workers
  # to their registration expiration time. After a worker's
  # registration has expired, they are no longer considered
  # as shards of the CAS
  workers_hash_name: "Workers"

  # A redis pubsub channel key where changes to the cluster
  # membership are announced
  worker_channel: "WorkerChannel"

  # A redis key prefix for all ActionCache entries, suffixed
  # with the action's key and mapping to an ActionResult
  action_cache_prefix: "ActionCache"

  # The ttl maintained for ActionCache entries. This is not
  # refreshed on getActionResult hit
  action_cache_expire: 2419200 # 4 weeks

  # A redis key prefix for all blacklisted actions, suffixed
  # with the action's key hash. An action which is blacklisted
  # should be rejected for all requests where it is identified via
  # its RequestMetadata
  # To meet API standards, a request which matches this condition
  # receives a transient UNAVAILABLE response, which, in bazel's
  # case, can induce a fallback to non-remote recovery, rather
  # than a catastrophic failure.
  action_blacklist_prefix: "ActionBlacklist"

  # The ttl maintained for action_blacklist entries.
  action_blacklist_expire: 3600 # 1 hour

  # A redis key prefix for all blacklisted invocations, suffixed
  # with the tool invocation id. Requests on behalf of an invocation
  # which is blacklisted should be rejected where it is identified via
  # its RequestMetadata
  # To meet API standards, a request which matches this condition
  # receives a transient UNAVAILABLE response, which, in bazel's
  # case, can induce a fallback to non-remote recovery, rather
  # than a catastrophic failure.
  invocation_blacklist_prefix: "InvocationBlacklist"

  # A redis key prefix for all Operations, suffixed with the
  # operation's name and mapping to an Operation which reflects
  # the cluster perceived state of that Operation
  operation_prefix: "Operation"

  # The ttl maintained for all Operations, updated on each
  # modification
  operation_expire: 604800 # 1 week

  # The redis key used to store a list of QueueEntrys
  # awaiting execution by workers. These are queued
  # by an operation_queuer agent, and dequeued by a worker.
  # Redis keyspace manipulation is used here to support multi-
  # key commands.
  # The string contained within {} must match that of
  # dispatching_list_name.
  queued_operations_list_name: "{Execution}:QueuedOperations"

  # The redis key of a list used to ensure reliable processing of
  # ready-to-run queue entries together with operation watch
  # monitoring.
  # The string contained within {} must match that of
  # queued_operations_list_name.
  # dispatching_list_name: "{Execution}:DispatchingOperations"

  # A redis key prefix for operations which are being dequeued
  # from the ready-to-run queue. The key is suffixed with the
  # operation name and contains the expiration time in epoch
  # milliseconds after which the operation is considered lost.
  dispatching_prefix: "Dispatching"

  # The delay in milliseconds used to populate dispathing operation
  # entries
  dispatching_timeout_millis: 10000

  # The redis key of a hash of operation names to the worker
  # lease for its execution. Entries in this hash are monitored
  # by the dispatched_monitor for expiration, and the worker
  # is expected to extend a lease in a timely fashion to indicate
  # continued handling of an operation.
  dispatched_operations_hash_name: "DispatchedOperations"

  # A redis pubsub channel prefix suffixed by an operation name
  # where updates and keepalives are transmitted as it makes its
  # way through the various processing elements of the cluster.
  operation_channel_prefix: "OperationChannel"

  # A redis key prefix suffixed with a blob digest that maps to a
  # set of workers which advertise that blob's availability.
  # This set must be intersected with the set of active worker
  # leases to be considered meaningful.
  cas_prefix: "ContentAddressableStorage"

  # The ttl maintained for cas entries. This is not refreshed on
  # any read access of the blob.
  cas_expire: 604800 # 1 week

  # Enable an agent in the backplane client which subscribes
  # to worker_channel and operation_channel events. If this is
  # disabled, the responsiveness of watchers is reduced and the
  # CAS is reduced.
  # When in doubt, leave this enabled.
  subscribe_to_backplane: true

  ## Specify a queue that supports min/max cpu core specification
  provisioned_queues: {
    queues: {
      name: "cpu"

      platform: {
        # Any specification (including non-specification) of min/max-cores
        # will be allowed to support cpu controls and worker resource
        # delegation.
        properties: {
          name: "min-cores"
          value: "*"
        }
        properties: {
          name: "max-cores"
          value: "*"
        }
      }
    }
  }

  # Redis connection and read timeout values in milliseconds (defaults: 2000, minimum value: 2000)
  #timeout: 20000
  # Redis maximum attempts (default: 5, minimum value: 5)
  #max_attempts: 10
}

# Create exec trees containing directories that are owned by
# this user
# The default (empty) value does not change the owner
#exec_owner: "nobody"
exec_owner: ""

I will trigger remote builds frequently, but I don't think this should be a buildfarm bug. What's wrong with my configuration or some else?

babychenge commented 3 years ago

And the buildfarm version is 1.5.0.

jacobmou commented 3 years ago

Looks like the error happened when ByteStreams.copy(in, out); in expireEntry(...) in CASFileCache.java. From the error message you provided

Exception in thread "grpc-default-executor-409" io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 7482638615, max: 7488405504)

it's pretty clear that there are not enough direct memory to perform the copy operation. Therefore you might want to increase the size of direct memory by using MaxDirectMemorySize=. I also found discussions about similar errors on github and stackoverflow, which i found helpful.

I'm closing the issue, but feel free to reopen it if you see the error again.

werkt commented 3 years ago

it's pretty clear that there are not enough direct memory to perform the copy operation. Therefore you might want to increase the size of direct memory by using MaxDirectMemorySize=. I also found discussions about similar errors on github and stackoverflow, which i found helpful.

Reopening. This is not quite so cut and dry, and is the result of several competing factors.

While ByteStreams.copy is the activity, the reason it exists at all is because of the previous frame, where it is attempting to expire content.

In your case, you've specified a grpc endpoint as a secondary CAS for a worker config. If I might ask, what is hosting that grpc cas? The problem comes when performing this expiration, which blocks everything within the cas, resulting in the possibility that other memory consumptive processes pile up. Additionally, this upload (which we're doing at an inputstream level) has no notion of flow control, so it will pile as much content (which is unsurprisingly where the OOM occurred) into the outbound stream to write the content into the delegated cas.

At a minimum, we should support flow control on those uploads. Since they really shouldn't be holding the process/expiration lock at that point, we should be async. Further, this use of the secondary cas as overflow-only may not be intended. What is your use case for your secondary cas?

Lastly, the size of your cas in general is actually quite small - is 2G really the only data you want to keep around for all concurrent actions and hot shard segments?

babychenge commented 3 years ago

I have configured the remote cache as the secondary cas. That is https://github.com/buchgr/bazel-remote. And the secondary cas connects to s3. May memory will be used up by upload waiting queue. Am I going to modify these configurations(MaxDirectMemorySize=,max_size_bytesandmax_entry_size_bytes) to alleviate this problem?

werkt commented 3 years ago

You can attempt to modify the memory, but I'll also say that your filesystem CAS is very small for a real workload. 2 gigs per worker may induce over-expiration of content, which can exacerbate this problem. For a reasonably sized CAS shard, the ingress rate should be constant, however, and you may want more workers to deal with your load. Observe the data rate coming into your workers, and tune the number of workers accordingly.

Aside from this, s3 has been observed to have haltingly slow write speeds at times, and it may be something you want to monitor as well, assuming such diagnostics are available. The maximum speed that you can observe writes going into it is going to need to be higher than the average ingress rate of your per-worker CAS traffic, and perhaps the sum of your workers' CAS traffic across the cluster, if you're hitting a single bazel-remote endpoint.