"While the concept of publish-subscribe messaging is not new, Spring Cloud Stream takes the extra step of making it an opinionated choice for its application model."
The above is an extract from the Spring Cloud Stream documentation here. While I am all in favour of the pub-sub pattern for application integration, it is difficult for many use-cases to completely shed themselves of the request-reply interactions that exist in the overall application architecture.
Messaging middleware solutions such as the Solace PubSub+ Event Broker recognise this requirement and have the API natively support both publish-subscribe and point-to-point (i.e. request-reply) styles of interaction with equal importance.
This repository will provide a working sample in the context of a hypothetical use-case in financial services. The use-case implements event-driven microservices architecture in the backend, which are supporting an externally facing API offered over RESTful HTTP. The event-driven processing pipeline is essentially triggered by a synchronous HTTP operation, which becomes the request message that demands a corresponding response message from the end of the event-driven processing pipeline.
Two important constructs enable request-reply communication to take place over common messaging middleware:
While Spring Cloud Stream has no direct support to create a specialised 'request' message that automatically handles the mechanics of reply-to topics and correlation IDs, we can of course use the building blocks of message headers and the setting of target destination to achieve a successful request-reply interaction between two microservices.
Note: The Microgateway feature in the Solace PubSub+ broker will handle the protocol mediation, essentially converting each HTTP operation to a request message, which contains the reply-to topic the final response message needs to target.
Our hypothetical use-case involves a bank that provides co-branded credit cards and services to partner organisations that are not banks. e.g. a large grocery retailer issuing credit cards for their customers.
The partner organisation is able to call a RESTful API (hosted by the bank) to check if a credit card is suspected of fraudulent usage. If yes, the partner also has the option to request the card be blocked from further use, or still remain active. (The partner may decide to block the card in a different API call following some additional checks for example.)
While externally the HTTP API appears simple to the partner, inside the bank numerous microservices are involved to support its interaction like so:
A high-level flow is as follows:
Service | Function |
---|---|
fraudCheck Mediator | Mediate the translation from the externally facing HTTP API to the internal event-driven architecture and its topic taxonomy. It is also a suitable location to validate the request payload as being correct and 'fail-fast' to the caller if needed. |
fraudCheck Error Handling | This service generically receives any error messages from the internal services, to then produce a final, externally-suitable response message back to the caller. |
fraudCheck Orchestrator | This service holds the logic to orchestrate all the necessary internal services to support the externally facing 'fraudCheck' API. It is envisioned that there is a corresponding Mediator, Error Handler and Orchestrator Service for each of the externally presented APIs. |
Transactions History | This is a simple service that returns the requested number of recent transactions for a given card number. It can generically support flows in addition to the fraudCheck pipeline here. |
Fraud Detection | This is a simple service that takes some supplied transactions and returns a boolean state of whether fraud is detected. |
Card Block | This is a simple service that takes a supplied card number and sets it to the requested block or not-blocked state. |
The configuration files (application.yaml
) for each of the services uses a PubSub+ Event Broker available for public access and hosted in Solace Cloud. This means you are able to checkout the project and run it without needing to setup an Event Broker of your own first. Your running instances of the service will automatically join the competitive consumption against any other instance already running by someone else.
If you want to instead leverage your own broker instance:
spring > cloud > stream > binders > solace-broker
section of each application.yaml
configuration file first with connection details of your message VPN.gateway
mode for the message VPN. (More details here.)application.yaml
file. curl
or postman steps.)We will start with a minimal deployment of these two services first. It will demonstrate the ability of one service to receive the HTTP operation as a message, and another separate service to handle an error with the request and produce a response back to the API caller.
Clone this GitHub repository containing all the services to a suitable location:
git clone https://github.com/itsJamilAhmed/scs-credit-card-demo/
cd scs-credit-card-demo
In separate terminals, go into the directory for the two services and use the bundled Gradle wrapper to start the spring boot application:
Terminal 1:
cd MediatorService/
./gradlew bootRun
Terminal 2:
cd ApiErrorHandlingService/
./gradlew bootRun
To trigger the deployed microservices, issue a HTTP POST operation using a tool like curl
or postman using the details below.
(This directly hits the Microgateway feature of the above mentioned public access Event Broker, but could easily have been fronted by an API Gateway product first to pass-thru the HTTP call.)
URI | Username | Password | Content-Type |
---|---|---|---|
https://public-demo-broker.messaging.solace.cloud:9443/fraudCheck |
scs-demo-public-user |
scs-demo-public-user |
application/json |
Example if using curl:
curl -u scs-demo-public-user:scs-demo-public-user -H "Content-Type: application/json" -X POST https://public-demo-broker.messaging.solace.cloud:9443/fraudCheck
If all successful, the empty payload will trigger the Mediator Service to generate an error event message. This will be picked up by the Error Handling Service to construct a suitable JSON response to the caller like so:
{
"status": "error",
"errorMsg": "Error processing message: Did not receive a valid JSON formatted message. Unexpected token END OF FILE at position 0."
"elapsedTimeMs": 34,
}
No other service (such as the Orchestrator) was needed at this stage. We will go onto to deploy them next.
Once again across separate terminals, start the services like so:
Terminal 3:
cd OrchestratorService/
./gradlew bootRun
Terminal 4:
cd TransactionsHistoryService/
./gradlew bootRun
Terminal 5:
cd FraudDetectionService/
./gradlew bootRun
Terminal 6:
cd CardBlockService/
./gradlew bootRun
Just as in step 2 above, invoke the API again using your tool of choice. This time with this sample JSON payload:
{
"partner": "onyx",
"cardNumber": "1234-5678-1234-5688",
"blockCardIfFraudulent": true
}
Using curl this might look something like:
curl -u scs-demo-public-user:scs-demo-public-user -H "Content-Type: application/json" -X POST https://public-demo-broker.messaging.solace.cloud:9443/fraudCheck -d '{ "partner":"onyx", "cardNumber": "1234-5678-1234-5688", "blockCardIfFraudulent":true }'
The Orchestrator Service is a natural observation point of the whole event flow and processing pipeline. Multiple input channels are used to invoke processing functions to further the orchestrated pipeline.
Review those logs as you submit further requests to observe the interactions. Randomisation is introduced on the fraud check status to create different responses, as well as for simulating processing delays at each service by sleeping for a number of milliseconds.
A sample log output is below for comparison:
If the logs in your deployed service instance are not updating when you issue the API calls, yet still getting a response, it means another instance deployed by someone else has picked it up and processed it.
This last step is about demonstrating another exception path in the processing pipeline. Assuming your instance of the Orchestrator was the only one running, or you are running these tests against your own dedicated Event Broker, the lack of the Orchestrator Service effectively means the API cannot be properly serviced. After the request has passed the mediator service, what options are available to monitor the pipeline and take any recovery steps?
While the powers of persistent messaging mean that request message can sit forever waiting for the Orchestrator Service to return, it is not the desired behaviour with a synchronous call essentially waiting for a response. That call actually needs to timeout gracefully in this exceptional scenario so the caller can try again later. Fortunately, the Solace PubSub+ features of message time-to-live and dead-message-queues (DMQs) can help here.
The Mediator Service can set some message properties to make that request time out after a given period (say 3 seconds), and let that message move to the inbound channel (i.e. queue) of another waiting service instead. That is the Error Handling Service in this example, with that assuming responsibility to send an apprioriate response to the caller.
With the Orchestrator Service terminated, issue a new API call like in Step 4 earlier.
Using curl this might look something like:
curl -u scs-demo-public-user:scs-demo-public-user -H "Content-Type: application/json" -X POST https://public-demo-broker.messaging.solace.cloud:9443/fraudCheck -d '{ "partner":"onyx", "cardNumber": "1234-5678-1234-5688", "blockCardIfFraudulent":true }'
After a 3 second wait, you should see a response like so:
{
"status": "error",
"errorMsg": "This service is currently unavailable. Please try again later.",
"elapsedTimeMs": 3090
}
These sample services use a topic taxonomy to demonstrate three important concepts:
For advice on defining a comprehensive topic taxonomy, consult the Solace documentation here. The topics used by these services favour brevity and may not incorporate all the necessary best practices.
In the table below, elements in {}
are dynamic elements as determined from either the original API request contents (e.g. {partner}
) or static strings defined in the service to identify itself as a source system (e.g. {platform}
).
Service | Subscribe Topic | Publish Topic | Error Topic |
---|---|---|---|
fraudCheck Mediator | POST/fraudCheck |
myBank/cards/fraudCheckApi/status/v1/{platform}/{partner} :ledger: |
myBank/cards/fraudCheckApi/error :orange_book: |
fraudCheck Orchestrator (getRecentTransactions) | myBank/cards/fraudCheckApi/status/v1/> :ledger: |
myBank/cards/txnService/history/req/v1/{platform}/{partner}/{UUID} :closed_book: |
myBank/cards/fraudCheckApi/error :orange_book: |
fraudCheck Orchestrator (getFraudStatus) | myBank/cards/fraudCheckApi/reply/txnService/history/v1/> :spades: |
myBank/cards/fraudService/status/req/v1/{platform}/{partner}/{UUID} :green_book: |
myBank/cards/fraudCheckApi/error :orange_book: |
fraudCheck Orchestrator (requestCardBlock) | myBank/cards/fraudCheckApi/reply/fraudService/status/v1/> :hearts: |
myBank/cards/cardService/block/req/v1/{platform}/{partner}/{UUID} :blue_book: |
myBank/cards/fraudCheckApi/error :orange_book: |
fraudCheck Orchestrator (returnFinalResponse) | myBank/cards/fraudCheckApi/reply/fraudService/status/v1/> :diamonds: |
Topic string as provided in message header app_fraudCheckMediator_replyTo |
myBank/cards/fraudCheckApi/error :orange_book: |
fraudCheck Error Handling | myBank/cards/fraudCheckApi/error :orange_book: |
Topic string as provided in message header app_fraudCheckMediator_replyTo |
N/A |
Transactions History | myBank/cards/txnService/history/req/v1/> :closed_book: |
Topic string as provided in message header reply_to_destination :spades: |
N/A |
Fraud Detection | myBank/cards/fraudService/status/req/v1/> :green_book: |
Topic string as provided in message header reply_to_destination :hearts: |
N/A |
Card Block | myBank/cards/cardService/block/req/v1/> :blue_book: |
Topic string as provided in message header reply_to_destination :diamonds: |
N/A |
( :books: The colour coding is provided to visually link the publish topic of one service with the subscribe topic or wildcard of another.)
The design for these services has been modelled in PubSub+ Event Portal and a JSON export of the Application Domain is available here for those who wish to import it into their own Event Portal.
Welcome any feedback and suggestions as I plan to continue refining this sample.
See the list of contributors who participated in this project
This project is licensed under the Apache License, Version 2.0. - See the LICENSE file for details.
For more information try these resources: