Closed wdbaruni closed 2 weeks ago
The changes in this pull request involve extensive modifications across multiple packages, primarily focusing on enhancing event handling, message processing, and logging within the compute and orchestrator components. Key updates include the introduction of pointer types for event structures, the transition from using the messages
package to the legacy
package for various request and response types, and the addition of new constants and methods to improve protocol handling. New test suites have been created to validate the functionality of the dispatcher and message handler, ensuring robust error handling and processing logic.
File Path | Change Summary |
---|---|
pkg/compute/bidder.go | Updated handleBidResult and handleError methods to use []*models.Event . |
pkg/compute/constants.go | Added constant messageHandlerErrorComponent . |
pkg/compute/endpoint.go | Removed unused fields from BaseEndpointParams and BaseEndpoint , updated method signatures to use legacy types. |
pkg/compute/executor.go | Updated event handling in UpdateExecutionState , modified cleanup logic in prepareWasmVolumes . |
pkg/compute/executor_buffer.go | Changed Events field type in UpdateExecutionRequest to []*models.Event . |
pkg/compute/management_client.go | Added SupportedProtocols field to NodeInfo . |
pkg/compute/message_handler.go | Introduced MessageHandler struct with methods for processing messages. |
pkg/compute/message_handler_test.go | Organized under compute package. |
pkg/compute/mocks.go | Updated mock types to use legacy package types. |
pkg/compute/startup.go | Updated failExecution method to use []*models.Event . |
pkg/compute/store/boltdb/store.go | Updated event handling methods to use []*models.Event . |
pkg/compute/store/boltdb/utils.go | Removed toPtrSlice function. |
pkg/compute/store/mocks.go | Updated mock methods to use *models.Event . |
pkg/compute/store/test/store_suite.go | Updated test methods to use []*models.Event . |
pkg/compute/store/types.go | Updated AddExecutionEvent and CreateExecution methods to use []*models.Event . |
pkg/compute/types.go | Updated Endpoint and Callback interfaces to use legacy types. |
pkg/compute/watchers/bprotocol_dispatcher.go | Renamed CallbackForwarder to BProtocolDispatcher , updated methods for legacy types. |
pkg/compute/watchers/bprotocol_dispatcher_test.go | Renamed test suite to match dispatcher name. |
pkg/devstack/devstack.go | Modified orchestrator address assignment logic. |
pkg/jobstore/boltdb/store.go | Updated event registration and handling for job executions. |
pkg/jobstore/boltdb/store_test.go | Enhanced event handling tests for job execution states. |
pkg/jobstore/context.go | Removed TracingContext type and related methods. |
pkg/jobstore/errors.go | Added NewErrInvalidExecutionDesiredState function. |
pkg/jobstore/types.go | Updated UpdateExecutionRequest to include Events field. |
pkg/lib/boltdblib/tracing_context.go | Introduced TracingContext for transaction tracing. |
pkg/lib/ncl/publisher.go | Updated publish methods to use PublishRequest . |
pkg/lib/ncl/types.go | Added PublishRequest struct for message publishing. |
pkg/lib/watcher/boltdb/boltdb.go | Enhanced logging in GetEvents method. |
pkg/lib/watcher/types.go | Added WatcherID field to GetEventsRequest . |
pkg/models/constants.go | Updated orchestrator metadata constants. |
pkg/models/event.go | Changed return type of EventFromError to *Event . |
pkg/models/execution_upsert.go | Introduced ExecutionUpsert struct for tracking execution states. |
pkg/models/job.go | Added OrchestratorID method to Job struct. |
pkg/models/messages/base.go | Added BaseRequest and BaseResponse types for message handling. |
pkg/models/messages/bidding.go | Updated request/response types to use BaseRequest and BaseResponse . |
pkg/models/messages/constants.go | Defined message type constants. |
pkg/models/messages/execution.go | Removed CancelExecutionRequest , updated other types to inherit from base types. |
pkg/models/messages/legacy/bidding.go | Introduced new legacy message types for bidding. |
pkg/models/messages/legacy/execution.go | Added legacy execution message types. |
pkg/models/messages/legacy/metadata.go | Renamed package from messages to legacy . |
pkg/models/messages/logging.go | Modified ExecutionLogsRequest to include NodeID . |
pkg/models/node_info.go | Added SupportedProtocols field to NodeInfo . |
pkg/models/protocol.go | Defined Protocol type and related constants. |
pkg/nats/proxy/callback_proxy.go | Updated method signatures to use legacy types. |
pkg/nats/proxy/compute_handler.go | Removed streaming capabilities from ComputeHandler . |
pkg/nats/proxy/compute_proxy.go | Updated request/response types to use legacy package. |
pkg/nats/proxy/logstream_handler.go | Introduced LogStreamHandler for logstream operations. |
pkg/nats/proxy/logstream_proxy.go | Added LogStreamProxy for handling log stream requests. |
pkg/nats/transport/nats.go | Enhanced NATSTransport with logstream capabilities. |
pkg/node/compute.go | Updated NewComputeNode to include NCL publisher/subscriber. |
pkg/node/constants.go | Renamed watcher ID constants for clarity. |
pkg/node/heartbeat/client.go | Updated SendHeartbeat to use PublishRequest . |
pkg/node/ncl.go | Expanded message type registrations and added new functions. |
pkg/node/node.go | Added SupportedProtocols to NodeStateProviderParams . |
pkg/node/requester.go | Updated NewRequesterNode to include logstreamServer . |
pkg/orchestrator/callback.go | Updated callback methods to use legacy types. |
pkg/orchestrator/endpoint.go | Updated endpoint structures to include LogstreamServer . |
pkg/orchestrator/evaluation/watcher.go | Modified event handling logic in WatchHandler . |
pkg/orchestrator/message_handler.go | Introduced MessageHandler for job execution messages. |
pkg/orchestrator/message_handler_test.go | Added tests for MessageHandler . |
pkg/orchestrator/planner/compute_forwarder.go | Removed ComputeForwarder type and related methods. |
pkg/orchestrator/planner/compute_forwarder_test.go | Deleted test suite for ComputeForwarder . |
pkg/orchestrator/planner/utils_test.go | Removed ComputeRequestMatcher type and methods. |
pkg/orchestrator/scheduler/batch_service_job.go | Updated job failure handling in createMissingExecs . |
pkg/orchestrator/scheduler/ops_job.go | Updated job failure handling in createMissingExecs . |
pkg/orchestrator/scheduler/utils_test.go | No changes to exported entities. |
pkg/orchestrator/transformer/job.go | Enhanced job metadata handling in RequesterInfo . |
pkg/orchestrator/watchers/bprotocol_dispatcher.go | Introduced BProtocolDispatcher for event handling. |
pkg/orchestrator/watchers/bprotocol_dispatcher_test.go | Added tests for BProtocolDispatcher . |
buildkite/pipelines/bacalhau-golang.yaml | Added unit and integration test steps for NCL protocol. |
pkg/orchestrator/watchers/dispatcher_test.go | Introduced test suite for Dispatcher . |
In the meadow where the code does play,
A rabbit hops with joy today.
With events and messages all in line,
The changes made are simply divine!
From bids accepted to logs that stream,
Our code now dances, a coder's dream! 🐇✨
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?
The PR introduces an async messaging based communication protocol between orchestrator and compute nodes based on NCL library. We are still supporting the existing request/response protocol (bprotocol) for backward compatibility, and it is still the default protocol while NCL protocol is stabilized before v1.6
Compute nodes now share their supported protocols in their NodeInfo to the orchestrators, and the orchestrator will use the preferred protocol out of the supported ones.
To force using the new protocol, set
BACALHAU_PREFER_NCL_PROTOCOL
env variable.Testing Done
Summary by CodeRabbit
Release Notes
New Features
MessageHandler
for processing job execution messages in the orchestrator.LogstreamServer
in various components.BProtocolDispatcher
for managing execution state changes in a backward-compatible manner.Improvements
createMissingExecs
method to ensure proper event handling and state updates.Tests
MessageHandler
,BProtocolDispatcher
, andDispatcher
.LogStreamHandler
andLogStreamProxy
.Chores