Hi. I faced few issues during development of events feature, which is a combination of MDC, kind of ReplayMerge (archive + live).
Maybe I missed something and there is a solution, but for me it looks like a bug. I checked documentation, wiki, GitHub but did not find any explanations about some use (and corner) cases. I would say it is question/bug issue.
Code is attached.
Tried 1.46.1 version
1. Two subscriptions with the same channel but different streamIds. Inconsistent behavior using different control-mode config.
1.1. Simple subscriptions without any control mode
Example 1
```
import io.aeron.Aeron;
import io.aeron.ExclusivePublication;
import io.aeron.Subscription;
import io.aeron.driver.MediaDriver;
import org.agrona.ExpandableDirectByteBuffer;
public class MDCPortIssue {
private enum Example {
USUAL_SUBSCRIPTION,
MDC_SUBSCRIPTION
}
public static void main(String[] args) {
run(Example.USUAL_SUBSCRIPTION); // Works
run(Example.MDC_SUBSCRIPTION); // Does not work
}
private static void run(Example example) {
String dirName = "aeron/mdc";
MediaDriver.Context mediaDriverContext = new MediaDriver.Context()
.errorHandler(Throwable::printStackTrace)
.aeronDirectoryName(dirName)
.dirDeleteOnStart(true);
Aeron.Context aeronContext = new Aeron.Context()
.aeronDirectoryName(mediaDriverContext.aeronDirectoryName());
try (
MediaDriver mediaDriver = MediaDriver.launch(mediaDriverContext);
Aeron aeron = Aeron.connect(aeronContext)
) {
String sameChannel = "aeron:udp?endpoint=localhost:10000";
Subscription subscription1 = null;
Subscription subscription2 = null;
switch (example) {
// Run 1. Works
case USUAL_SUBSCRIPTION -> {
subscription1 = aeron.addSubscription(sameChannel, 1000);
subscription2 = aeron.addSubscription(sameChannel, 1001);
}
// Run 2. Does not work
case MDC_SUBSCRIPTION -> {
subscription1 = aeron.addSubscription("aeron:udp?control-mode=manual", 1000);
subscription2 = aeron.addSubscription("aeron:udp?control-mode=manual", 1001);
subscription1.addDestination(sameChannel);
subscription2.addDestination(sameChannel);
}
}
ExclusivePublication publication1 = aeron.addExclusivePublication(sameChannel, 1000);
ExclusivePublication publication2 = aeron.addExclusivePublication(sameChannel, 1001);
sleep(500L);
ExpandableDirectByteBuffer buffer = new ExpandableDirectByteBuffer(64);
buffer.putLong(0, 100L);
publication1.offer(buffer, 0, 64);
publication2.offer(buffer, 0, 64);
sleep(500L);
System.out.println(subscription1.isConnected());
System.out.println(subscription2.isConnected());
}
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
```
The first example is working fine, both subscriptions are connected and it prints "true" two times. No errors etc.
But the issue is with second one. It does not work and it says that port is already in use, which looks wrong to me because channel should be reused as in first example. I also tried different tag configs, but I did not find a trick there.
My use case:
All events are published to the same channel but with different stream ids (per group - user crud events with streamId 1, product crud events with streamId 2 etc.).
Consumer side can use all streams or use few of them if needed.
If consumer wants 2 or more streams it will create subscription for each stream but with the same channel. I need 'control-mode=manual' to have 2 publications (1 archive, 1 live) and have 1 image with activeTransportCount 2 on the subscription side as ReplayMerge does. For this case i can not use "1.1." and need to use "1.2."
Question: Why it works for the first case but does not work for the second ? Is there a workaround for it or I did something wrong ? Looks like a bug.
2. Unexpected disconnection of the publication for case with image.activeTransportCount() == 2.
To have a situation when subscription has 1 image and this image has activeTransportCount == 2 we need (from my understanding):
subscription with "control-mode=manual"
two subscription destinations with different endpoints (ports)
two publications with different endpoints (ports), to match expected by subscription + both publications should have same session-id.
both publications should send same fragments (by positions), otherwise all these will not make any sense.
I have 1 manual subscription and 2 default publications.
From my observation always second publication is closed.
With debug I found out that it was market as disconnected due to not updated 'PublicationImage.timeOfLastStateChangeNs' value and after 5 seconds (imageLivenessTimeoutNs) it got disconnected. I guess it is due to "missed" 'onStatusMessage' or something like that. First publication could "take" this message but second publication did not see it. If there is 1 to 1 connection I did not see such issue.
Example 2
```
import io.aeron.Aeron;
import io.aeron.FragmentAssembler;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.driver.MediaDriver;
import org.agrona.ExpandableDirectByteBuffer;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.SleepingIdleStrategy;
import java.util.concurrent.atomic.AtomicBoolean;
public class ActiveTransportCountExample {
private static final int MESSAGE_AMOUNT = 1000;
public static void main(String[] args) {
var publishingThread = new Thread(ActiveTransportCountExample::runServer);
var receivingThread = new Thread(ActiveTransportCountExample::runClient);
receivingThread.start();
publishingThread.start();
try {
publishingThread.join();
receivingThread.join();
} catch (InterruptedException ignored) {
}
}
private static void runServer() {
String dirName = "aeron/server";
MediaDriver.Context mediaDriverContext = new MediaDriver.Context()
.errorHandler(Throwable::printStackTrace)
.aeronDirectoryName(dirName)
.dirDeleteOnStart(true);
Aeron.Context aeronContext = new Aeron.Context()
.aeronDirectoryName(mediaDriverContext.aeronDirectoryName());
try (
MediaDriver mediaDriver = MediaDriver.launch(mediaDriverContext);
Aeron aeron = Aeron.connect(aeronContext)
) {
// Simulate replay publication from the archive and live publication both connected at the same time
Publication udpPublication1 = aeron.addPublication("aeron:udp?endpoint=localhost:10001|session-id=1", 1001);
Publication udpPublication2 = aeron.addPublication("aeron:udp?endpoint=localhost:10002|session-id=1", 1001);
sleep(500); // Await connections
ExpandableDirectByteBuffer buffer = new ExpandableDirectByteBuffer(5 * 1024);
for (int i = 1; i <= MESSAGE_AMOUNT; i++) {
buffer.putInt(0, i);
sendMessage(udpPublication1, buffer);
sendMessage(udpPublication2, buffer);
System.out.println("Message sent: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
private static void runClient() {
String dirName = "aeron/client";
MediaDriver.Context mediaDriverContext = new MediaDriver.Context()
.errorHandler(Throwable::printStackTrace)
.aeronDirectoryName(dirName)
.dirDeleteOnStart(true);
Aeron.Context aeronContext = new Aeron.Context()
.aeronDirectoryName(mediaDriverContext.aeronDirectoryName());
try (
MediaDriver mediaDriver = MediaDriver.launch(mediaDriverContext);
Aeron aeron = Aeron.connect(aeronContext)
) {
Subscription subscription = aeron.addSubscription("aeron:udp?control-mode=manual", 1001);
subscription.addDestination("aeron:udp?endpoint=localhost:10001");
subscription.addDestination("aeron:udp?endpoint=localhost:10002");
AtomicBoolean finished = new AtomicBoolean(false);
FragmentAssembler fragmentAssembler = new FragmentAssembler((buffer, offset, length, header) -> {
int messageId = buffer.getInt(offset);
if (messageId == MESSAGE_AMOUNT) {
finished.set(true);
}
int activeTransportCount = subscription.images().isEmpty() ? 0 : subscription.images().getFirst().activeTransportCount();
System.out.println("Received messageId: " + messageId + " Image count: " + subscription.images().size() + ", atc: " + activeTransportCount + " hp: " + header.position());
});
IdleStrategy idleStrategy = new SleepingIdleStrategy(1000L);
while (true) {
int workCount = subscription.poll(fragmentAssembler, 10);
if (finished.get()) {
break;
}
idleStrategy.idle(workCount);
}
}
}
private static void sendMessage(Publication udpPublication1, ExpandableDirectByteBuffer buffer) {
while (true) {
long offer = udpPublication1.offer(buffer, 0, 5 * 1024);
if (offer == Publication.BACK_PRESSURED || offer == Publication.ADMIN_ACTION) {
continue; // Try again
}
if (offer == Publication.NOT_CONNECTED) {
// For the live publication we just ignore if there are no subscribers
// But in this example subscription is connected and should be visible, we just print message
System.err.println("Not connected: " + udpPublication1.channel());
break;
}
if (offer == Publication.CLOSED) {
throw new IllegalStateException("Publication is closed");
}
if (offer == Publication.MAX_POSITION_EXCEEDED) {
throw new IllegalStateException("Max position reached for the publication");
}
break;
}
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
```
My use case:
ReplayMerge kind of things. ReplayMerge itself is working fine. There I had 1 dynamic udp publication with recording connected to it and live subscription which can connect to the same publication. I found it a bit bad, because why to use udp if you can use IPC for the archive and use second publication as udp only and send data if there are subscribers. So, I made 2 publications.
Interesting observation is that when I comment lines with replay stop inside of ReplayMerge I got 2 publications work and it did not close. After few days of trying to understand it I gave up. The only difference I see is that instead of usual publication it uses the replay. But it still just publication. Maybe due to dynamic control-flow used there in publication, do not know.
5 seconds (or we can set more) can be enough to make a "switch" from the archive to live or back, but it is not best solution to rely on such behavior.
I could have subscription with 2 images and somehow manage it manually, but feature with transport count looks much better, and will have single image with single buffer, probably will handle crossing packet losses from multiple publicaitons and so on. Would be good to have it just work.
Question: It looks strange that ReplayMerge did not disconnect second publication after many seconds (if we did not close replay and keep 2 publications - from archive and live). But in my simplified examples it got disconnected. Looks like a bug.
3. Question about internal logic of activeTransportCount.
I did not find any documentation about it, but just want to clarify few things.
Will it merge packets from both udp streams to the same image buffer ? For example 1 message has 2 packets. Two publications send 1 message each (4 packets total). If client received first packet from publication 1 and second packet from publication 2, will it form a complete message and proceed as normal ? I assume it will, but I did not find documentation about such cases at all.
I spent some days to go deeper and investigate some code, I did not succeed to make it work and I assume these are just bugs. Hope my description is clear enough and useful. Code is attached.
For the second issue/example (about transport count) I found that 1.38.1 is working fine, no disconnections at all. But 1.39.0 does not work.
So the bug is somewhere inside 1.39.0.
Hi. I faced few issues during development of events feature, which is a combination of MDC, kind of ReplayMerge (archive + live). Maybe I missed something and there is a solution, but for me it looks like a bug. I checked documentation, wiki, GitHub but did not find any explanations about some use (and corner) cases. I would say it is question/bug issue.
Code is attached.
Tried
1.46.1
version1. Two subscriptions with the same channel but different streamIds. Inconsistent behavior using different control-mode config. 1.1. Simple subscriptions without any control mode
1.2. Same as 1.1. but with "control-mode=manual"
Example 1
``` import io.aeron.Aeron; import io.aeron.ExclusivePublication; import io.aeron.Subscription; import io.aeron.driver.MediaDriver; import org.agrona.ExpandableDirectByteBuffer; public class MDCPortIssue { private enum Example { USUAL_SUBSCRIPTION, MDC_SUBSCRIPTION } public static void main(String[] args) { run(Example.USUAL_SUBSCRIPTION); // Works run(Example.MDC_SUBSCRIPTION); // Does not work } private static void run(Example example) { String dirName = "aeron/mdc"; MediaDriver.Context mediaDriverContext = new MediaDriver.Context() .errorHandler(Throwable::printStackTrace) .aeronDirectoryName(dirName) .dirDeleteOnStart(true); Aeron.Context aeronContext = new Aeron.Context() .aeronDirectoryName(mediaDriverContext.aeronDirectoryName()); try ( MediaDriver mediaDriver = MediaDriver.launch(mediaDriverContext); Aeron aeron = Aeron.connect(aeronContext) ) { String sameChannel = "aeron:udp?endpoint=localhost:10000"; Subscription subscription1 = null; Subscription subscription2 = null; switch (example) { // Run 1. Works case USUAL_SUBSCRIPTION -> { subscription1 = aeron.addSubscription(sameChannel, 1000); subscription2 = aeron.addSubscription(sameChannel, 1001); } // Run 2. Does not work case MDC_SUBSCRIPTION -> { subscription1 = aeron.addSubscription("aeron:udp?control-mode=manual", 1000); subscription2 = aeron.addSubscription("aeron:udp?control-mode=manual", 1001); subscription1.addDestination(sameChannel); subscription2.addDestination(sameChannel); } } ExclusivePublication publication1 = aeron.addExclusivePublication(sameChannel, 1000); ExclusivePublication publication2 = aeron.addExclusivePublication(sameChannel, 1001); sleep(500L); ExpandableDirectByteBuffer buffer = new ExpandableDirectByteBuffer(64); buffer.putLong(0, 100L); publication1.offer(buffer, 0, 64); publication2.offer(buffer, 0, 64); sleep(500L); System.out.println(subscription1.isConnected()); System.out.println(subscription2.isConnected()); } } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { throw new RuntimeException(e); } } } ```The first example is working fine, both subscriptions are connected and it prints "true" two times. No errors etc. But the issue is with second one. It does not work and it says that port is already in use, which looks wrong to me because channel should be reused as in first example. I also tried different tag configs, but I did not find a trick there.
My use case:
All events are published to the same channel but with different stream ids (per group - user crud events with streamId 1, product crud events with streamId 2 etc.). Consumer side can use all streams or use few of them if needed.
If consumer wants 2 or more streams it will create subscription for each stream but with the same channel. I need 'control-mode=manual' to have 2 publications (1 archive, 1 live) and have 1 image with activeTransportCount 2 on the subscription side as ReplayMerge does. For this case i can not use "1.1." and need to use "1.2."
Question: Why it works for the first case but does not work for the second ? Is there a workaround for it or I did something wrong ? Looks like a bug.
2. Unexpected disconnection of the publication for case with image.activeTransportCount() == 2.
To have a situation when subscription has 1 image and this image has activeTransportCount == 2 we need (from my understanding):
I have 1 manual subscription and 2 default publications.
From my observation always second publication is closed. With debug I found out that it was market as disconnected due to not updated 'PublicationImage.timeOfLastStateChangeNs' value and after 5 seconds (imageLivenessTimeoutNs) it got disconnected. I guess it is due to "missed" 'onStatusMessage' or something like that. First publication could "take" this message but second publication did not see it. If there is 1 to 1 connection I did not see such issue.
Example 2
``` import io.aeron.Aeron; import io.aeron.FragmentAssembler; import io.aeron.Publication; import io.aeron.Subscription; import io.aeron.driver.MediaDriver; import org.agrona.ExpandableDirectByteBuffer; import org.agrona.concurrent.IdleStrategy; import org.agrona.concurrent.SleepingIdleStrategy; import java.util.concurrent.atomic.AtomicBoolean; public class ActiveTransportCountExample { private static final int MESSAGE_AMOUNT = 1000; public static void main(String[] args) { var publishingThread = new Thread(ActiveTransportCountExample::runServer); var receivingThread = new Thread(ActiveTransportCountExample::runClient); receivingThread.start(); publishingThread.start(); try { publishingThread.join(); receivingThread.join(); } catch (InterruptedException ignored) { } } private static void runServer() { String dirName = "aeron/server"; MediaDriver.Context mediaDriverContext = new MediaDriver.Context() .errorHandler(Throwable::printStackTrace) .aeronDirectoryName(dirName) .dirDeleteOnStart(true); Aeron.Context aeronContext = new Aeron.Context() .aeronDirectoryName(mediaDriverContext.aeronDirectoryName()); try ( MediaDriver mediaDriver = MediaDriver.launch(mediaDriverContext); Aeron aeron = Aeron.connect(aeronContext) ) { // Simulate replay publication from the archive and live publication both connected at the same time Publication udpPublication1 = aeron.addPublication("aeron:udp?endpoint=localhost:10001|session-id=1", 1001); Publication udpPublication2 = aeron.addPublication("aeron:udp?endpoint=localhost:10002|session-id=1", 1001); sleep(500); // Await connections ExpandableDirectByteBuffer buffer = new ExpandableDirectByteBuffer(5 * 1024); for (int i = 1; i <= MESSAGE_AMOUNT; i++) { buffer.putInt(0, i); sendMessage(udpPublication1, buffer); sendMessage(udpPublication2, buffer); System.out.println("Message sent: " + i); try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } } } } private static void runClient() { String dirName = "aeron/client"; MediaDriver.Context mediaDriverContext = new MediaDriver.Context() .errorHandler(Throwable::printStackTrace) .aeronDirectoryName(dirName) .dirDeleteOnStart(true); Aeron.Context aeronContext = new Aeron.Context() .aeronDirectoryName(mediaDriverContext.aeronDirectoryName()); try ( MediaDriver mediaDriver = MediaDriver.launch(mediaDriverContext); Aeron aeron = Aeron.connect(aeronContext) ) { Subscription subscription = aeron.addSubscription("aeron:udp?control-mode=manual", 1001); subscription.addDestination("aeron:udp?endpoint=localhost:10001"); subscription.addDestination("aeron:udp?endpoint=localhost:10002"); AtomicBoolean finished = new AtomicBoolean(false); FragmentAssembler fragmentAssembler = new FragmentAssembler((buffer, offset, length, header) -> { int messageId = buffer.getInt(offset); if (messageId == MESSAGE_AMOUNT) { finished.set(true); } int activeTransportCount = subscription.images().isEmpty() ? 0 : subscription.images().getFirst().activeTransportCount(); System.out.println("Received messageId: " + messageId + " Image count: " + subscription.images().size() + ", atc: " + activeTransportCount + " hp: " + header.position()); }); IdleStrategy idleStrategy = new SleepingIdleStrategy(1000L); while (true) { int workCount = subscription.poll(fragmentAssembler, 10); if (finished.get()) { break; } idleStrategy.idle(workCount); } } } private static void sendMessage(Publication udpPublication1, ExpandableDirectByteBuffer buffer) { while (true) { long offer = udpPublication1.offer(buffer, 0, 5 * 1024); if (offer == Publication.BACK_PRESSURED || offer == Publication.ADMIN_ACTION) { continue; // Try again } if (offer == Publication.NOT_CONNECTED) { // For the live publication we just ignore if there are no subscribers // But in this example subscription is connected and should be visible, we just print message System.err.println("Not connected: " + udpPublication1.channel()); break; } if (offer == Publication.CLOSED) { throw new IllegalStateException("Publication is closed"); } if (offer == Publication.MAX_POSITION_EXCEEDED) { throw new IllegalStateException("Max position reached for the publication"); } break; } } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { throw new RuntimeException(e); } } } ```My use case:
ReplayMerge kind of things. ReplayMerge itself is working fine. There I had 1 dynamic udp publication with recording connected to it and live subscription which can connect to the same publication. I found it a bit bad, because why to use udp if you can use IPC for the archive and use second publication as udp only and send data if there are subscribers. So, I made 2 publications.
Interesting observation is that when I comment lines with replay stop inside of ReplayMerge I got 2 publications work and it did not close. After few days of trying to understand it I gave up. The only difference I see is that instead of usual publication it uses the replay. But it still just publication. Maybe due to dynamic control-flow used there in publication, do not know.
5 seconds (or we can set more) can be enough to make a "switch" from the archive to live or back, but it is not best solution to rely on such behavior.
I could have subscription with 2 images and somehow manage it manually, but feature with transport count looks much better, and will have single image with single buffer, probably will handle crossing packet losses from multiple publicaitons and so on. Would be good to have it just work.
Question: It looks strange that ReplayMerge did not disconnect second publication after many seconds (if we did not close replay and keep 2 publications - from archive and live). But in my simplified examples it got disconnected. Looks like a bug.
3. Question about internal logic of activeTransportCount.
I did not find any documentation about it, but just want to clarify few things.
Will it merge packets from both udp streams to the same image buffer ? For example 1 message has 2 packets. Two publications send 1 message each (4 packets total). If client received first packet from publication 1 and second packet from publication 2, will it form a complete message and proceed as normal ? I assume it will, but I did not find documentation about such cases at all.
I spent some days to go deeper and investigate some code, I did not succeed to make it work and I assume these are just bugs. Hope my description is clear enough and useful. Code is attached.
Thanks for the amazing product, Rostyslav.