Open xuyao178 opened 2 years ago
A same promble,how solve the trouble.Thank you~!
But gateway get the error as below, There is no encoder for Part type. org.springframework.core.codec.CodecException: No suitable writer found for part: file
Use BodyInserters.fromMultipartData
instead of BodyInserters.fromPublisher
If you do this, you need to make some additional hack modifications:
serverRequest.bodyToMono(new ParameterizedTypeReference<MultiValueMap<String, Part>>() {})
.map(multipartData -> {
//modify the request body like this
return BodyInserters.fromMultipartData(multipartData).with("age", 18);
})
and You need to implement a special CachedBodyOutputMessage
(for MultipartInserter
needs)
public interface FormInserter
extends BodyInserter<MultiValueMap<String, T>, ClientHttpRequest> { public class CachedBodyOutputMessage implements ClientHttpRequest{ }
But gateway get the error as below, There is no encoder for Part type. org.springframework.core.codec.CodecException: No suitable writer found for part: file
Use
BodyInserters.fromMultipartData
instead ofBodyInserters.fromPublisher
If you do this, you need to make some additional hack modifications:
serverRequest.bodyToMono(new ParameterizedTypeReference<MultiValueMap<String, Part>>() {}) .map(multipartData -> { //modify the request body like this return BodyInserters.fromMultipartData(multipartData).with("age", 18); })
and You need to implement a special
CachedBodyOutputMessage
(forMultipartInserter
needs)public interface FormInserter extends BodyInserter<MultiValueMap<String, T>, ClientHttpRequest> {
public class CachedBodyOutputMessage implements ClientHttpRequest{ }
Do you have more detailed code? I have the same problem. Thank you.
@gengxiaoxiaoxin According to your tips, I have solved this problem. Thank you. This is my complete code.
public class FormDataFilter implements GatewayFilter {
private FormDataDecryptor formDataDecryptor;
public FormDataFilter() {
formDataDecryptor = new FormDataDecryptor();
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return exchange.getMultipartData().map(multiValueMap -> {
// Here we do data decryption and reconstruct the form data.
MultiValueMap<String, FormDataDecryptor.FormPart> decryptedMap = formDataDecryptor.decryptFromData(multiValueMap);
MultipartBodyBuilder multipartBodyBuilder = new MultipartBodyBuilder();
for (Map.Entry<String, List<FormDataDecryptor.FormPart>> entry : decryptedMap.entrySet()){
for(FormDataDecryptor.FormPart formPart : entry.getValue()) {
MultipartBodyBuilder.PartBuilder partBuilder = multipartBodyBuilder.part(entry.getKey(), formPart.getContent());
formPart.getHeaders().entrySet()
.forEach(stringListEntry -> {
partBuilder.header(stringListEntry.getKey(), stringListEntry.getValue().toArray(new String[stringListEntry.getValue().size()]));
});
}
}
BodyInserter bodyInserter = BodyInserters.fromMultipartData(multipartBodyBuilder.build());
return bodyInserter;
})
.flatMap(bodyInserter -> {
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
// the new content type will be computed by bodyInserter
// and then set in the request decorator
headers.remove(HttpHeaders.CONTENT_LENGTH);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext())
.log("modify_request", Level.INFO)
.then(Mono.defer(() -> {
ServerHttpRequest decorator = decorate(exchange, headers, outputMessage);
return chain.filter(exchange.mutate().request(decorator).build());
})).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(exchange,
outputMessage, throwable));
});
}
protected Mono<Void> release(ServerWebExchange exchange, CachedBodyOutputMessage outputMessage,
Throwable throwable) {
if (outputMessage.isCached()) {
return outputMessage.getBody().map(DataBufferUtils::release).then(Mono.error(throwable));
}
return Mono.error(throwable);
}
ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
CachedBodyOutputMessage outputMessage) {
return new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(headers);
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
}
else {
// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
// httpbin.org
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}
@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
}
static class CachedBodyOutputMessage implements ClientHttpRequest {
private ServerWebExchange serverWebExchange;
private ServerHttpRequest serverRequest;
private HttpHeaders httpHeaders;
private boolean cached = false;
//
private Flux<DataBuffer> body = Flux
.error(new IllegalStateException("The body is not set. " + "Did handling complete with success?"));
public CachedBodyOutputMessage(ServerWebExchange exchange, HttpHeaders httpHeaders) {
this.serverWebExchange = exchange;
this.serverRequest = exchange.getRequest();
this.httpHeaders = httpHeaders;
}
@Override
public HttpMethod getMethod() {
return serverRequest.getMethod();
}
@Override
public URI getURI() {
return serverRequest.getURI();
}
@Override
public MultiValueMap<String, HttpCookie> getCookies() {
return serverRequest.getCookies();
}
@Override
public <T> T getNativeRequest() {
return (T) serverRequest;
}
@Override
public DataBufferFactory bufferFactory() {
return serverWebExchange.getResponse().bufferFactory();
}
@Override
public void beforeCommit(Supplier<? extends Mono<Void>> action) {
}
@Override
public boolean isCommitted() {
return false;
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
this.body = Flux.from(body);
this.cached = true;
return Mono.empty();
}
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMap(p -> p));
}
@Override
public Mono<Void> setComplete() {
return writeWith(Flux.empty());
}
@Override
public HttpHeaders getHeaders() {
return this.httpHeaders;
}
boolean isCached() {
return this.cached;
}
/**
* Return the request body, or an error stream if the body was never set or when.
* @return body as {@link Flux}
*/
public Flux<DataBuffer> getBody() {
return this.body;
}
}
}
@gengxiaoxiaoxin According to your tips, I have solved this problem. Thank you. This is my complete code.
public class FormDataFilter implements GatewayFilter { private FormDataDecryptor formDataDecryptor; public FormDataFilter() { formDataDecryptor = new FormDataDecryptor(); } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return exchange.getMultipartData().map(multiValueMap -> { // Here we do data decryption and reconstruct the form data. MultiValueMap<String, FormDataDecryptor.FormPart> decryptedMap = formDataDecryptor.decryptFromData(multiValueMap); MultipartBodyBuilder multipartBodyBuilder = new MultipartBodyBuilder(); for (Map.Entry<String, List<FormDataDecryptor.FormPart>> entry : decryptedMap.entrySet()){ for(FormDataDecryptor.FormPart formPart : entry.getValue()) { MultipartBodyBuilder.PartBuilder partBuilder = multipartBodyBuilder.part(entry.getKey(), formPart.getContent()); formPart.getHeaders().entrySet() .forEach(stringListEntry -> { partBuilder.header(stringListEntry.getKey(), stringListEntry.getValue().toArray(new String[stringListEntry.getValue().size()])); }); } } BodyInserter bodyInserter = BodyInserters.fromMultipartData(multipartBodyBuilder.build()); return bodyInserter; }) .flatMap(bodyInserter -> { HttpHeaders headers = new HttpHeaders(); headers.putAll(exchange.getRequest().getHeaders()); // the new content type will be computed by bodyInserter // and then set in the request decorator headers.remove(HttpHeaders.CONTENT_LENGTH); CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers); return bodyInserter.insert(outputMessage, new BodyInserterContext()) .log("modify_request", Level.INFO) .then(Mono.defer(() -> { ServerHttpRequest decorator = decorate(exchange, headers, outputMessage); return chain.filter(exchange.mutate().request(decorator).build()); })).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(exchange, outputMessage, throwable)); }); } protected Mono<Void> release(ServerWebExchange exchange, CachedBodyOutputMessage outputMessage, Throwable throwable) { if (outputMessage.isCached()) { return outputMessage.getBody().map(DataBufferUtils::release).then(Mono.error(throwable)); } return Mono.error(throwable); } ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers, CachedBodyOutputMessage outputMessage) { return new ServerHttpRequestDecorator(exchange.getRequest()) { @Override public HttpHeaders getHeaders() { long contentLength = headers.getContentLength(); HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.putAll(headers); if (contentLength > 0) { httpHeaders.setContentLength(contentLength); } else { // TODO: this causes a 'HTTP/1.1 411 Length Required' // on // httpbin.org httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked"); } return httpHeaders; } @Override public Flux<DataBuffer> getBody() { return outputMessage.getBody(); } }; } static class CachedBodyOutputMessage implements ClientHttpRequest { private ServerWebExchange serverWebExchange; private ServerHttpRequest serverRequest; private HttpHeaders httpHeaders; private boolean cached = false; // private Flux<DataBuffer> body = Flux .error(new IllegalStateException("The body is not set. " + "Did handling complete with success?")); public CachedBodyOutputMessage(ServerWebExchange exchange, HttpHeaders httpHeaders) { this.serverWebExchange = exchange; this.serverRequest = exchange.getRequest(); this.httpHeaders = httpHeaders; } @Override public HttpMethod getMethod() { return serverRequest.getMethod(); } @Override public URI getURI() { return serverRequest.getURI(); } @Override public MultiValueMap<String, HttpCookie> getCookies() { return serverRequest.getCookies(); } @Override public <T> T getNativeRequest() { return (T) serverRequest; } @Override public DataBufferFactory bufferFactory() { return serverWebExchange.getResponse().bufferFactory(); } @Override public void beforeCommit(Supplier<? extends Mono<Void>> action) { } @Override public boolean isCommitted() { return false; } @Override public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) { this.body = Flux.from(body); this.cached = true; return Mono.empty(); } @Override public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) { return writeWith(Flux.from(body).flatMap(p -> p)); } @Override public Mono<Void> setComplete() { return writeWith(Flux.empty()); } @Override public HttpHeaders getHeaders() { return this.httpHeaders; } boolean isCached() { return this.cached; } /** * Return the request body, or an error stream if the body was never set or when. * @return body as {@link Flux} */ public Flux<DataBuffer> getBody() { return this.body; } } }
Can You give me a simple sample github repo to implement this?
I created this https://github.com/hendisantika/spring-boot-logging-filter2
Thanks
spring boot version: 2.7.2 spring cloud gateway version: 3.1.3
There is a api, just like below:
At the gateway side, We will check sign and re-sign with another secret key to replace the old one. I write a filter to handle the logic like below.
But gateway get the error as below, There is no encoder for Part type.
I try to convert the Part to DataBuffer like below
After that, there is no error in gateway. but the backend server cannot get the 'template_file' field! I have no idea how to solution this problem! Is there have a good choice to do this modify?