Open habeep opened 3 years ago
package com.processor.config;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.processor.model.PrimaryKeyColumn; import com.processor.model.ValueColumn; import com.processor.exception.UDTFlowException; import com.processor.mapping.configuration.UDTMappingDefinition; import com.processor.mapping.model.UDTPayload; import com.processor.mapping.model.UDTRequest; import com.processor.model.UDTFlowMessage; import com.processor.model.UDTResponseDTO; import com.processor.service.UDTWSClient; import com.processor.service.impl.ControlPersistenceServiceImpl; import com.processor.common.enums.RequestControlStatus; import com.processor.abc.web.service.models.axon.ControlMessage; import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.http.HttpStatus; import org.springframework.kafka.support.Acknowledgment; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.web.reactive.function.client.WebClient;
import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map;
import static com.processor.common.common.Constants.CONTROL_ID_AXON_MESSAGE_KEY; import static com.processor.common.common.Constants.CONTROL_OPERATION_AXON_MESSAGE_KEY; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.isNull; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when;
@RunWith(SpringRunner.class) @ExtendWith(SpringExtension.class) @ContextConfiguration(classes = {Resilience4JConfig.class, UDTWSClient.class, UDTWebClientConfig.class, CommonConfigData.class}) @SpringBootTest(classes = {UDTWSClient.class, CommonConfigData.class}, properties = { "pcf.enabled=false", "udt.ws.uri=http://webhook.site/56352051-90cc-4187-929b-38decbb9ee13" }) @Slf4j class AbcIntegrationFlowConfigTestWithCircuitBreaker {
@InjectMocks
private AbcIntegrationFlowConfig abcIntegrationFlowConfig;
@Autowired
private WebClient webClient;
@Autowired
private UDTWSClient udtwsClient;
@Mock
private ControlPersistenceServiceImpl controlPersistenceService;
@Mock
private ObjectMapper mapper;
private UDTFlowMessage udtFlowMessage = new UDTFlowMessage();
private String payload = "[{\"primaryKeyColumns\":[{\"name\": \"account_range\"," +
"\"value\": \"5555600000000000-5555600000000099\"}]},{\"valueColumns\": [{\"name\": \"threshold\"," +
"\"value\": 50}]}]";
@Mock
Acknowledgment acknowledgment;
@Mock
Message<UDTFlowMessage> message;
@Mock
MessageHeaders messageHeaders;
@Mock
AxonConfig axonConfig;
@Autowired
Resilience4JConfig resilience4JConfig;
@Autowired
CircuitBreakerRegistry circuitBreakerRegistry;
@BeforeEach
public void setUp() {
MockitoAnnotations.initMocks(this);
UDTRequest request = new UDTRequest();
List<PrimaryKeyColumn> pk = new ArrayList<>();
PrimaryKeyColumn pk1 = new PrimaryKeyColumn();
pk1.setName("account_range");
pk1.setValue("5555600000000000-5555600000000099");
pk.add(pk1);
List<ValueColumn> vk = new ArrayList<>();
ValueColumn vk1 = new ValueColumn();
vk1.setName("threshold");
vk1.setValue("50");
vk.add(vk1);
request.setPayload((new UDTPayload.UDTPayloadBuilder()).withUDTRecord(pk, vk).build());
UDTMappingDefinition udtMappingDefinition = new UDTMappingDefinition();
udtMappingDefinition.setId("90000");
udtMappingDefinition.setUdtURIs(Collections.singletonMap("PUT", "/udt/test"));
request.setUdtMappingDefinition(udtMappingDefinition);
ControlMessage controlMessage = new ControlMessage();
Map<String, String> metaData = new HashMap<>();
metaData.put(CONTROL_ID_AXON_MESSAGE_KEY, "123456");
metaData.put(CONTROL_OPERATION_AXON_MESSAGE_KEY, "UPDATE");
controlMessage.setMetaData(metaData);
udtFlowMessage.setControlMessage(controlMessage);
udtFlowMessage.setUdtRequests(Collections.singletonList(request));
// Create a CircuitBreakerRegistry with a custom global configuration CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.of(circuitBreakerConfig);
when(message.getPayload()).thenReturn(udtFlowMessage);
when(message.getHeaders()).thenReturn(messageHeaders);
when(messageHeaders.get(any(), any())).thenReturn(acknowledgment);
}
@Test
void testSendToUDTWs_ShouldCallControlPersistenceAndAck_WhenUpdateRequestIsSuccessful() throws JsonProcessingException, UDTFlowException {
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(2)
.waitDurationInOpenState(Duration.ofMillis(1000))
.permittedNumberOfCallsInHalfOpenState(2)
.slidingWindowSize(2)
.ignoreExceptions(com.processor.exception.WebClientCBIgnoreException.class, io.github.resilience4j.ratelimiter.RequestNotPermitted.class)
.build();
CircuitBreakerRegistry circuitBreakerRegistry =
CircuitBreakerRegistry.of(circuitBreakerConfig);
CircuitBreaker circuitBreaker = circuitBreakerRegistry
.circuitBreaker("enterprise_circuitbreaker");
circuitBreaker.transitionToOpenState();
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
// Arrange
UDTResponseDTO udtResponseDTO = new UDTResponseDTO(HttpStatus.ACCEPTED, "SUCCESS");
when(udtwsClient.sendUDTRequest(anyString(), anyString(), "PUT")).thenReturn(udtResponseDTO);
when(mapper.writeValueAsString(any())).thenReturn(payload);
// Act & Assert
assertDoesNotThrow(() -> {
abcIntegrationFlowConfig.sendToUDTWs(message);
});
// Assert
assertEquals(1, metrics.getNumberOfBufferedCalls());
assertEquals(0, metrics.getNumberOfFailedCalls());
assertEquals(1, metrics.getNumberOfSuccessfulCalls());
verify(udtwsClient, times(1)).sendUDTRequest(anyString(), anyString(), anyString());
verify(controlPersistenceService, times(1)).updateControlOperationStatusInDb(anyString(), any(RequestControlStatus.class), isNull(), anyBoolean(), isNull());
verify(controlPersistenceService, never()).deleteControlById(anyString());
verify(acknowledgment, times(1)).acknowledge();
verify(acknowledgment, never()).nack(anyLong());
}
}
package com.processor.config;
import io.github.resilience4j.circuitbreaker.CircuitBreaker; import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; import io.github.resilience4j.circuitbreaker.event.CircuitBreakerEvent; import io.github.resilience4j.circuitbreaker.event.CircuitBreakerOnStateTransitionEvent; import io.github.resilience4j.core.registry.EntryAddedEvent; import io.github.resilience4j.core.registry.EntryRemovedEvent; import io.github.resilience4j.core.registry.EntryReplacedEvent; import io.github.resilience4j.core.registry.RegistryEventConsumer; import io.github.resilience4j.ratelimiter.RateLimiter; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import java.time.Duration;
import static io.github.resilience4j.circuitbreaker.CircuitBreakerConfig.SlidingWindowType.COUNT_BASED;
@Slf4j @Configuration public class Resilience4JConfig {
@Bean
public RegistryEventConsumer<RateLimiter> abcACPRateLimiterRegistryEventConsumer() {
return new RegistryEventConsumer<RateLimiter>() {
@Override
public void onEntryAddedEvent(EntryAddedEvent<RateLimiter> entryAddedEvent) {
entryAddedEvent.getAddedEntry().getEventPublisher().onEvent(e-> log.debug(e.toString()));
}
@Override
public void onEntryRemovedEvent(EntryRemovedEvent<RateLimiter> entryRemoveEvent) {
entryRemoveEvent.getRemovedEntry().getEventPublisher().onEvent(e-> log.debug(e.toString()));
}
@Override
public void onEntryReplacedEvent(EntryReplacedEvent<RateLimiter> entryReplacedEvent) {
entryReplacedEvent.getNewEntry().getEventPublisher().onEvent(e-> log.debug(e.toString()));
entryReplacedEvent.getOldEntry().getEventPublisher().onEvent(e-> log.debug(e.toString()));
}
};
}
@Bean
public CircuitBreakerRegistry circuitBreakerRegistry() {
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.slidingWindowType(COUNT_BASED)
.minimumNumberOfCalls(3)
.waitDurationInOpenState(Duration.ofMillis(1000))
.failureRateThreshold(5)
.permittedNumberOfCallsInHalfOpenState(5)
.slowCallRateThreshold(70)
.slowCallDurationThreshold(Duration.ofMillis(1000))
.slidingWindowSize(100)
.ignoreExceptions(io.github.resilience4j.ratelimiter.RequestNotPermitted.class)
.build();
return CircuitBreakerRegistry.of(circuitBreakerConfig);
}
// The below class is needed to validate circuit breaker processing. It allows us to track circuit breaker state changes
// and configurations. It is not needed for normal functionality
@Bean
public RegistryEventConsumer<CircuitBreaker> abcACPCircuitBreakerRegistryEventConsumer() {
return new RegistryEventConsumer<CircuitBreaker>() {
@Override
public void onEntryAddedEvent(EntryAddedEvent<CircuitBreaker> entryAddedEvent) {
CircuitBreaker cb = entryAddedEvent.getAddedEntry();
cb.getEventPublisher().onStateTransition(this::onStateTransition);
cb.getEventPublisher().onEvent(event -> onEvent(cb, event));
}
private void onStateTransition(CircuitBreakerOnStateTransitionEvent event) {
log.info(String.format("%s state : %s", event.getCircuitBreakerName(), event));
}
private void onEvent(CircuitBreaker cb, CircuitBreakerEvent event) {
log.info(String.format("%s event(%s) : %s", event.getCircuitBreakerName(), cb.getState(), event));
}
@Override
public void onEntryRemovedEvent(EntryRemovedEvent<CircuitBreaker> entryRemoveEvent) {
entryRemoveEvent.getRemovedEntry().getEventPublisher().onEvent(event -> log.info(event.toString()));
}
@Override
public void onEntryReplacedEvent(EntryReplacedEvent<CircuitBreaker> entryReplacedEvent) {
entryReplacedEvent.getOldEntry().getEventPublisher().onEvent(event -> log.info(event.toString()));
entryReplacedEvent.getNewEntry().getEventPublisher().onEvent(event -> log.info(event.toString()));
}
};
}
package com.processor.service;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.github.resilience4j.circuitbreaker.CallNotPermittedException; import io.github.resilience4j.ratelimiter.RequestNotPermitted; import io.netty.handler.timeout.TimeoutException; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.kafka.dsl.Kafka; import org.springframework.integration.kafka.dsl.KafkaMessageDrivenChannelAdapterSpec; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.ErrorMessage; import org.springframework.web.reactive.function.client.WebClientResponseException;
import java.util.Objects; import java.util.UUID;
@Configuration @EnableIntegration @AllArgsConstructor @IntegrationComponentScan @Slf4j public class AbcIntegrationFlowConfig {
private UDTWSClient udtwsClient;
@ServiceActivator(inputChannel = UDT_INPUT_CHANNEL)
public void sendToUDTWs(Message<UDTFlowMessage> message) throws UDTFlowException {
UDTFlowMessage udtFlowMessage = message.getPayload();
String controlOperation = udtFlowMessage.getControlMessage().getMetaData()
.get(CONTROL_OPERATION_AXON_MESSAGE_KEY);
String udtOperation = UDTRequestOperation.valueOf(controlOperation).getUdtOperation();
try {
for (UDTRequest r : udtFlowMessage.getUdtRequests()) {
udtwsClient.sendUDTRequest(r.getUdtMappingDefinition().getUdtURIs().get(udtOperation),
mapper.writeValueAsString(r.getPayload().getRecords()), udtOperation);
}
} catch (JsonProcessingException jsonProcessingException) {
throw new UDTFlowUnexpectedException(jsonProcessingException);
} catch (WebClientCBIgnoreException exception) {
throw new UDTFlowException(true, true, false, exception);
} catch (WebClientResponseException exception) {
if (exception.getStatusCode().is5xxServerError()) {
throw new UDTFlowException(false, true, true, exception);
} else {
throw new UDTFlowUnexpectedException(exception);
}
} catch (TimeoutException | CallNotPermittedException | RequestNotPermitted e) {
throw new UDTFlowException(false, false, false, e);
} catch (Throwable throwable) {
throw new UDTFlowUnexpectedException(throwable);
}
}
}
package com.processor.service;
import com.processor.common.Constants; import com.processor.exception.WebClientCBIgnoreException; import com.processor.model.UDTHealthCheckResponse; import com.processor.model.UDTResponseDTO; import com.processor.model.UDTResponseEntity; import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker; import io.github.resilience4j.ratelimiter.annotation.RateLimiter; import io.vavr.collection.Seq; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClientResponseException;
import java.util.Collections;
import static com.processor.common.Constants.ENTERPRISE_RATELIMITER;
@Component @Slf4j public class UDTWSClient {
private String udtWsUri;
private WebClient webClient;
@Autowired
CircuitBreakerRegistry circuitBreakerRegistry;
@Autowired
public UDTWSClient(@Value("${udt.ws.uri}") String udtWsUri,
WebClient webClient) {
this.udtWsUri = udtWsUri;
this.webClient = webClient;
}
public ResponseEntity udtWebServiceHealthCheck(String udtWsHealthUri) {
return webClient.get()
.uri(udtWsHealthUri)
.retrieve()
.toEntity(UDTHealthCheckResponse.class)
.block();
}
@CircuitBreaker(name = "enterprise_circuitbreaker")
public UDTResponseDTO sendUDTRequest(String uri, String payload, String method) {
String requestUri = udtWsUri + uri;
log.debug("Sending UDT Request [{}:{}] with payload: {}", method, requestUri, payload);
ResponseEntity<UDTResponseEntity> response;
try {
response = webClient
.method(HttpMethod.valueOf(method))
.uri(requestUri)
.headers(headers -> headers.addAll(initializeHeaders()))
.body(BodyInserters.fromValue(payload))
.retrieve()
.toEntity(UDTResponseEntity.class)
.block();
} catch (WebClientResponseException exception) {
if (exception.getStatusCode().is4xxClientError()) {
throw new WebClientCBIgnoreException(exception.getMessage(), exception);
} else {
throw exception;
}
}
UDTResponseDTO responseDTO = new UDTResponseDTO(response.getStatusCode(),
response.getBody() == null ? null : response.getBody().getResult());
log.info("Response from UDT WS: Status=[{}] Body=[{}]", responseDTO.getHttpStatus(), responseDTO.getMessage());
return responseDTO;
}
private HttpHeaders initializeHeaders() {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.add(Constants.OPENAPI_CLIENT_ID, Constants.DSE_CLIENT_ID);
headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
return headers;
}
Hello, we are trying to write the integration/unit test but it is failing or not working as we expected. can you please help us?
dependency details: