As a simulator user
I want the simulator to stop sending items when the simulator receives a message through PublishStreamObserver.onError() method
So troubleshooting is easier since I know when an error occurred
Background:
I noticed while testing #178 that the simulator continued to send data even though the server was notifying the simulator through the PublishStreamObserver.onError() method to stop sending items.
Suggested approach:
Error messages from the server are received through PublishStreamObserver.onError()
With the current design, we must signal to components outside this object that they should no longer send data to the server. Here's how I did it:
public class PublishStreamObserver implements StreamObserver<PublishStreamResponse> {
private final Logger logger = System.getLogger(getClass().getName());
private final AtomicBoolean allowNext;
/** Creates a new PublishStreamObserver instance. */
public PublishStreamObserver(final AtomicBoolean allowNext) {
this.allowNext = allowNext;
}
/** what will the stream observer do with the response from the server */
@Override
public void onNext(PublishStreamResponse publishStreamResponse) {
logger.log(Logger.Level.INFO, "Received Response: " + publishStreamResponse.toString());
}
/** what will the stream observer do when an error occurs */
@Override
public void onError(Throwable throwable) {
logger.log(Logger.Level.ERROR, "Error: " + throwable.toString());
allowNext.set(false);
}
/** what will the stream observer do when the stream is completed */
@Override
public void onCompleted() {
logger.log(Logger.Level.DEBUG, "Completed");
}
}
public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient {
private final System.Logger LOGGER = System.getLogger(getClass().getName());
private final BlockStreamServiceGrpc.BlockStreamServiceStub stub;
private final StreamObserver<PublishStreamRequest> requestStreamObserver;
private final AtomicBoolean allowNext = new AtomicBoolean(true);
private final MetricsService metricsService;
/**
* Creates a new PublishStreamGrpcClientImpl instance.
*
* @param grpcConfig the gRPC configuration
*/
@Inject
public PublishStreamGrpcClientImpl(
@NonNull GrpcConfig grpcConfig, @NonNull MetricsService metricsService) {
ManagedChannel channel =
ManagedChannelBuilder.forAddress(grpcConfig.serverAddress(), grpcConfig.port())
.usePlaintext()
.build();
stub = BlockStreamServiceGrpc.newStub(channel);
PublishStreamObserver publishStreamObserver = new PublishStreamObserver(allowNext);
requestStreamObserver = stub.publishBlockStream(publishStreamObserver);
this.metricsService = metricsService;
}
/**
* The PublishStreamObserver class implements the StreamObserver interface to observe the
* stream.
*/
@Override
public boolean streamBlockItem(BlockItem blockItem) {
if (allowNext.get()) {
requestStreamObserver.onNext(
PublishStreamRequest.newBuilder()
.setBlockItem(Translator.fromPbj(blockItem))
.build());
metricsService.get(LiveBlockItemsSent).increment();
} else {
LOGGER.log(ERROR, "Not allowed to send next block item");
}
return allowNext.get();
}
@Override
public boolean streamBlock(Block block) {
for (int count = 0; count < 2; count++) {
if (!streamBlockItem(block.items().get(count))) {
LOGGER.log(ERROR, "Count was: " + count);
if (count == 0) {
LOGGER.log(ERROR, "First block item: " +
block.items().get(count));
} else {
LOGGER.log(ERROR, "Previous block item: " +
block.items().get(count - 1));
}
return false;
}
}
return true;
}
}
BlockStreamSimulatorApp calling streamBlockItem()
private void constantRateStreaming()
throws InterruptedException, IOException, BlockSimulatorParsingException {
int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000;
int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000;
boolean streamBlockItem = true;
int blockItemsStreamed = 0;
while (streamBlockItem) {
// get block item
BlockItem blockItem = blockStreamManager.getNextBlockItem();
if (blockItem == null) {
LOGGER.log(INFO, "Block Stream Simulator has reached the end of the block items");
break;
}
if (!publishStreamGrpcClient.streamBlockItem(blockItem)) {
LOGGER.log(INFO, "Exiting");
break;
}
blockItemsStreamed++;
Thread.sleep(delayMSBetweenBlockItems, delayNSBetweenBlockItems);
if (blockItemsStreamed >= blockStreamConfig.maxBlockItemsToStream()) {
LOGGER.log(
INFO,
"Block Stream Simulator has reached the maximum number of block items to"
+ " stream");
streamBlockItem = false;
}
}
}
These objects share an AtomicBoolean flag to check before sending items
As a simulator user I want the simulator to stop sending items when the simulator receives a message through
PublishStreamObserver.onError()
method So troubleshooting is easier since I know when an error occurredBackground:
PublishStreamObserver.onError()
method to stop sending items.Suggested approach:
PublishStreamObserver.onError()
AtomicBoolean
flag to check before sending items