Created
November 3, 2022 09:45
-
-
Save alabotski/a8629913d8606e25d5e120c4a02e19ab to your computer and use it in GitHub Desktop.
Just an example. How to get value from RequestBody in WebFilter in WebFlux reactive application and put it in request attributes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Order | |
@Slf4j | |
@Component | |
@RequiredArgsConstructor | |
public class PrepareHeaderWebFilter implements WebFilter { | |
private static final byte[] EMPTY_BYTES = {}; | |
public static final String CACHED_REQUEST_BODY_ATTR = "cachedRequestBody"; | |
public static final String CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR = "cachedServerHttpRequestDecorator"; | |
private static final List<HttpMessageReader<?>> MESSAGE_READERS = HandlerStrategies.withDefaults() | |
.messageReaders(); | |
@Override | |
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { | |
return cacheRequestBody(exchange, serverHttpRequest -> { | |
var exchangeMutate = exchange.mutate() | |
.request(serverHttpRequest) | |
.build(); | |
var serverRequest = ServerRequest.create(exchangeMutate, MESSAGE_READERS); | |
return serverRequest.bodyToMono(BaseRequest.class) | |
.map(baseRequest -> { | |
exchange.getAttributes() | |
.put(BaseRequest.Fields.rqUID, baseRequest.getRqUID()); | |
return baseRequest; | |
}) | |
.then(removeCacheAndChain(exchange, chain)); | |
}); | |
} | |
private static Mono<Void> cacheRequestBody(ServerWebExchange exchange, Function<ServerHttpRequest, Mono<Void>> function) { | |
ServerHttpResponse response = exchange.getResponse(); | |
DataBufferFactory factory = response.bufferFactory(); | |
return DataBufferUtils.join(exchange.getRequest() | |
.getBody()) | |
.defaultIfEmpty(factory.wrap(EMPTY_BYTES)) | |
.map(dataBuffer -> decorate(exchange, dataBuffer)) | |
.switchIfEmpty(Mono.just(exchange.getRequest())) | |
.flatMap(function); | |
} | |
private static Mono<Void> removeCacheAndChain(ServerWebExchange exchange, WebFilterChain chain) { | |
ServerHttpRequest cachedRequest = exchange.getAttribute(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR); | |
Assert.notNull(cachedRequest, "cache request shouldn't be null"); | |
exchange.getAttributes() | |
.remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR); | |
return chain.filter(exchange.mutate() | |
.request(cachedRequest) | |
.build()); | |
} | |
private static ServerHttpRequest decorate(ServerWebExchange exchange, DataBuffer dataBuffer) { | |
if (dataBuffer.readableByteCount() > 0) { | |
if (log.isTraceEnabled()) { | |
log.trace("retaining body in exchange attribute"); | |
} | |
exchange.getAttributes() | |
.put(CACHED_REQUEST_BODY_ATTR, dataBuffer); | |
} | |
ServerHttpRequest decorator = new ServerHttpRequestDecorator(exchange.getRequest()) { | |
@Override | |
public Flux<DataBuffer> getBody() { | |
return Mono.fromSupplier(() -> { | |
if (exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null) == null) { | |
// probably == downstream closed or no body | |
return null; | |
} | |
if (dataBuffer instanceof NettyDataBuffer) { | |
NettyDataBuffer pdb = (NettyDataBuffer) dataBuffer; | |
return pdb.factory() | |
.wrap(pdb.getNativeBuffer() | |
.retainedSlice()); | |
} else if (dataBuffer instanceof DefaultDataBuffer) { | |
DefaultDataBuffer ddf = (DefaultDataBuffer) dataBuffer; | |
return ddf.factory() | |
.wrap(Unpooled.wrappedBuffer(ddf.getNativeBuffer()) | |
.nioBuffer()); | |
} else { | |
throw new IllegalArgumentException("Unable to handle DataBuffer of type " + dataBuffer.getClass()); | |
} | |
}) | |
.flux(); | |
} | |
}; | |
exchange.getAttributes() | |
.put(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, decorator); | |
return decorator; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment