Closed gwbaik9717 closed 1 month ago
The recent changes introduce a broadcasting feature in the Client
and Document
classes, allowing clients to subscribe to broadcast events. New event types, Broadcast
and LocalBroadcast
, are added, along with corresponding interfaces and methods for managing subscriptions and broadcasting payloads with validation. Comprehensive test cases validate the new functionality, ensuring robust error handling and event management.
Files | Change Summary |
---|---|
packages/sdk/src/client/client.ts |
Added broadcast method in Client class for payload broadcasting with validation and error handling. |
packages/sdk/src/document/document.ts |
Introduced Broadcast and LocalBroadcast event types, BroadcastEvent and LocalBroadcastEvent interfaces, and methods for subscribing to broadcast events. Updated event handling logic. |
packages/sdk/test/integration/client_test.ts |
Added test cases for successful broadcasting of serializable payloads and error handling for unserializable payloads, enhancing test coverage. |
In the meadow where bytes play,
A broadcast hops and bounds today.
With messages shared, both near and far,
Events flutter like a shining star.
So let us dance, both rabbit and friend,
With joy in code, our fun won't end! 🐇✨
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?
@sejongk
There are a few things I'd like to share:
1. How Handlers for Each Topic Are Managed:
Inside the Document class, I've added a broadcastEventHandlers
which is a map to manage the handlers for each topic.
// document.ts
export class Document<T, P extends Indexable = Indexable> {
// skipped
private broadcastEventHandlers: Map<
string,
(topic: string, payload: any) => void
>;
}
When a user subscribes to a specific topic for broadcast events:
// User subscribes to a broadcast event with the topic "TOPIC_NAME"
doc.subscribeBroadcastEvent('TOPIC_NAME', (topic, payload) => {
// Handle the broadcast event for the specified topic
});
The handler is registered in the broadcastEventHandlers map:
export class Document<T, P extends Indexable = Indexable> {
public subscribeBroadcastEvent(
topic: string,
handler: (topic: string, payload: any) => void,
error?: ErrorFn,
): Unsubscribe {
// Register the handler in broadcastEventHandlers
this.broadcastEventHandlers.set(topic, handler);
2. How PbDocEventType
is Converted to DocEventType
:
Within the applyWatchStream
method of the Document
class, which applies the given watch stream response to the document, the conversion from PbDocEventType
to DocEventType
occurs as follows:
public applyWatchStream(resp: WatchDocumentResponse) {
// skipped
} else if (type === PbDocEventType.DOCUMENT_BROADCAST) {
if (resp.body.value.body) {
const { topic, payload } = resp.body.value.body;
const decoder = new TextDecoder();
event.push({
type: DocEventType.Broadcast,
value: { topic, payload: JSON.parse(decoder.decode(payload)) },
});
}
}
if (event.length > 0) {
this.publish(event);
}
}
}
3. Test Case for Sending Unserializable Payloads:
As discussed earlier, I've added a test case in client_test.ts
to handle unserializable payloads. If there are any additional test cases needed, please let me know.
Lastly, I've verified that the broadcast API works as expected in several examples, but I believe further testing is necessary to ensure complete coverage.
@gwbaik9717 CI failed due to incorrect pnpm version in CI configuration. It is currently fixed in the main branch.
@sejongk
Here is an update.
1. Unified Broadcast Event Subscription
Users can now use the existing subscribe
method to subscribe to broadcast events. Since subscribing to a broadcast event requires both a type
and a topic
, I've introduced a new SubscribePair
type, which is structured as follows:
type SubscribePair = {
type: string;
};
Building on this, the BroadcastSubscribePair
extends SubscribePair
:
type BroadcastSubscribePair = {
type: 'broadcast';
topic: string;
} & SubscribePair;
Here’s an example of how to subscribe to a broadcast event:
const unsubscribe = d2.subscribe(
{ type: 'broadcast', topic: "TOPIC_NAME" },
(topic, payload) => {},
);
2. Call broadcast
from Document
not Client
Broadcasting is now handled directly by the Document
not Client
, aligning with the subscription mechanism.
To support this, I’ve added a client
attribute within the Document
class. This client
attribute is an instance of the Client
and is set when the client attaches to the document, and unset when it detaches.
export class Document<T, P extends Indexable = Indexable> {
// skipped
private client?: Client;
/**
* `setClient` sets the client of this document.
*
* @internal
*/
public setClient(client?: Client): void {
this.client = client;
}
}
Here’s an example of how to broadcast a topic with payload.
// must be serializable
const payload = {a:1, b:"2"}
await doc.broadcast("TOPIC_NAME", payload);
@sejongk
Sorry for the late review. I'll check it today.
CC) @chacha912, @gwbaik9717
@hackerwins, I've reviewed, but could you please check it once more in case I missed anything? (Please check the comments I've left with cc regarding the subscribe interface and client property)
@hackerwins @chacha912 @sejongk
Here's an update.
In order to resolve the circular reference issue, we’ve removed the client
reference from the Document
class. Instead, we've decided to have Client
subscribe to Document
's broadcast events and handle them by calling Client
's broadcast
method.
However, this solution presents two challenges:
1. Unsubscribing Handlers
We had to ensure that handlers registered in subscribe
are properly unsubscribed when document is detached from client.
To address this, we’ve decided to manage unsubscribeBroadcastEvent
within the Attachment
class, since the lifecycle of the client’s attachment and the sub/unsub process are the same.
export class Attachment<T, P extends Indexable> {
// skipped
unsubscribeBroadcastEvent: Unsubscribe;
}
With this approach, when attaching a document to a client using the attach
method of Client, we subscribe to the document’s event stream, and we unsubscribe
when detaching:
public attach<T, P extends Indexable>(
doc: Document<T, P>,
options: {
initialPresence?: P;
syncMode?: SyncMode;
} = {},
): Promise<Document<T, P>> {
// skipped
const unsubscribeBroacastEvent = doc.subscribe(
'broadcast',
(topic, payload, onBroadcastError) => {
try {
this.broadcast(doc.getKey(), topic, payload);
} catch (e: unknown) {
if (e instanceof Error) {
onBroadcastError?.(e);
}
}
},
);
this.attachmentMap.set(
doc.getKey(),
new Attachment(
this.reconnectStreamDelay,
doc,
res.documentId,
syncMode,
unsubscribeBroacastEvent,
),
);
private detachInternal(docKey: DocumentKey) {
// skipped
attachment.unsubscribeBroadcastEvent();
2. Error Handling for Broadcast Events
The second challenge involves handling errors during broadcast events, such as when broadcasting an unserializable payload. Initially, we could simply use a try-catch block to handle errors when directly accessing the client's broadcast method:
try {
doc.broadcast(broadcastTopic, payload)
}catch(e){
// do something
}
However, since we’re now dealing with an event stream, a different approach to error handling is required. To accommodate this, I've introduced a new interface for broadcasting events. This interface includes an error handling function that triggers when a broadcast event fails.
// Usage
const errorFn = () => {
// Do something
}
doc.broadcast("topic", payload, errorFn)
Here's an update.
As mentioned earlier, we've decided to have the Client
subscribe to the Document
's broadcast events and handle them by invoking the Client
's broadcast method. The challenge we encountered was distinguishing between local
and remote
broadcast events. Since the client is now subscribed to the document's broadcast events, both local
and remote
events are received, which creates the following issue:
Without differentiation, this setup can lead to an unexpected infinite loop. This happens because the client, upon receiving its own broadcast event, will rebroadcast it, creating a never-ending cycle.
This scenario can be reproduced with the following test code. The test simulates a situation where the publisher is broadcasting an event and, at the same time, subscribing to the broadcast event. Currently, since Yorkie does not support self-broadcast filtering, the subscribe handler gets called, leading to numerous recursive broadcasts, as evidenced by the test results:
it('Should not trigger the handler for a broadcast event sent by the publisher to itself', async ({
task,
}) => {
await withTwoClientsAndDocuments<{ t: Text }>(
async (c1, d1, c2, d2) => {
const eventCollector1 = new EventCollector<[string, any]>();
const eventCollector2 = new EventCollector<[string, any]>();
const broadcastTopic = 'test';
const payload = { a: 1, b: '2' };
// Publisher subscribes to the broadcast event
const unsubscribe1 = d1.subscribe('broadcast', (topic, payload) => {
if (topic === broadcastTopic) {
eventCollector1.add([topic, payload]);
}
});
const unsubscribe2 = d2.subscribe('broadcast', (topic, payload) => {
if (topic === broadcastTopic) {
eventCollector2.add([topic, payload]);
}
});
d1.broadcast(broadcastTopic, payload);
// Assuming that subscribers can receive the broadcast event within 1000ms.
await new Promise((res) => setTimeout(res, 1000));
unsubscribe1();
unsubscribe2();
assert.equal(eventCollector1.getLength(), 0);
assert.equal(eventCollector2.getLength(), 1);
},
task.name,
SyncMode.Realtime,
);
});
To resolve this issue, I've introduced a LocalBroadcast
event type in addition to the existing Broadcast
event. The LocalBroadcast
event is only triggered when the local document attempts to broadcast. This addition allows us to clearly distinguish between local
and remote
broadcast events, preventing the infinite loop scenario by ensuring that a client does not handle its own broadcast events.
export enum DocEventType {
/**
* `Broadcast` means that the broadcast event is received from the remote client.
*/
Broadcast = 'broadcast',
/**
* `LocalBroadcast` means that the broadcast event is sent from the local client.
*/
LocalBroadcast = 'local-broadcast',
}
/**
* `broadcast` the payload to the given topic.
*/
public broadcast(topic: string, payload: any, error?: ErrorFn) {
const broadcastEvent: LocalBroadcastEvent = {
type: DocEventType.LocalBroadcast,
value: { topic, payload },
error,
};
this.publish([broadcastEvent]);
}
However, the above method still has a drawback. We may want to hide the LocalBroadcast
event from users, as it is primarily intended for internal handling rather than user-facing logic.
doc.subscribe('local-broadcast', (event) => {
// Users do not need to know about this event
},
);
What this PR does / why we need it?
This PR implements a broadcast API, which enables the sharing of a broader range of general events beyond the current document and presence events in Yorkie's Publish-Subscribe model.
Any background context you want to provide?
1. Broadcast Events:
Users can now broadcast custom events with a specified topic and payload. The payload can be of any type, as long as it is serializable.
2. Subscribe to Broadcast Events:
Users can subscribe to specific topics and handle the events via a callback function. The callback is triggered whenever an event with the corresponding topic is broadcast.
What are the relevant tickets?
Related to https://github.com/yorkie-team/yorkie/issues/628
Checklist
Summary by CodeRabbit
Summary by CodeRabbit
New Features
broadcast
method in the Client class to send messages to specified topics.Broadcast
andLocalBroadcast
to enable clients to subscribe to broadcast messages.subscribe
method for clients to register handlers for broadcast events.Bug Fixes
Tests