Reactor Series: How to make API call with Spring Reactor
To call an API from spring reactor you need to know about WebClient
Before Spring 5 we consume any REST service by using either one of the below
a. URL (provided by java.net package)
b. RestTemplate (provided by Spring framework)
c. HttpClient’s (provided by JaxRS or Apache etc…)
Most of the clients that facilitate to consume API’s are blocking in nature.
e.g) If you make a request to get details of the users from a service your execution thread will be held till the response returns back from the server. Only after receiving response the next instructions will be processed.
Blocking threads are becoming a bottle neck in high traffic systems. Every time a client requests for some information from API a dedicated thread will be waiting to hear response from the service. The thread will be in idle state till response is received from the API. If system is busy the idle threads cannot be reused. To avoid this bottle neck we have libraries that can help us to make an Async Call and return a Future (Future was introduced as part of java.concurrent package in Java 5).
The overhead with the async client (I/O calls doesn’t hold the main thread) libraries are
- Thread pool has to be managed by the developers
- A dedicated thread will be involved for processing after the async task is complete
- No support for high level functions to process the response (like map, flatMap etc…). High level functions make the code more readable and as a result we can deliver quality code without bugs.
- No support for handling errors.
Sample Client Code:
RestTemplate:
RestTemplate restTemplate = new RestTemplate();String resource = "http://localhost:8080/employees";ResponseEntity<String> response = restTemplate.getForEntity(resource, String.class);
Async RestTemplate:
HttpClient client = HttpClient.newBuilder().version(Version.HTTP_2).build();HttpRequest request = HttpRequest.newBuilder(URI.create(“http://localhost:8080/employees”))CompletableFuture<HttpResponse<String>> response = client.sendAsync(request,HttpResponse.BodyHandlers.ofString());
WebClient:
WebClient client = WebClient.create("http://localhost:8080/");
Mono<ClientResponse> result = client.get()
.uri("employees")
.accept(MediaType.TEXT_PLAIN)
.exchange();
In AsyncHttpClient the actual call will be made when client.sendAsync
is invoked. This is not the case for WebClient client.get()
will not invoke the actual call.
Mono/Flux is a promise (Future) where it says there will be a result available. The actual call is deferred. Unless one subscribes or block the mono the action will not be invoked.
For WebClient to work we should add below lines
result.subscribe(e -> System.out.println(e))
or result.bodyToMono(String.class).block()
Let’s now try to solve the below problem
A company wants to organize an event to celebrate gender diversity in their company. Now the company wants to send out invites for all the female employees in the company.
The company has a paginated api which returns all the users with details like id, gender and DOB.
There is another API which will return the details of user like email id, address etc…
Now our task is to make use of those 2 API’s and print the email id’s of all the female employees in that company.
Let’s try to do this task with both AsyncHttpClient and then we will try with WebClient
AsyncRestTemplate
AsyncRestTemplate restTemplate = new AsyncRestTemplate();
String baseUrl = "https://gorest.co.in/public-api/users?page=";
List<Future<ResponseEntity<Example>>> collect = IntStream.range(1, 5)
.mapToObj(e -> {
Future<ResponseEntity<Example>> futureEntity = restTemplate.getForEntity(baseUrl + e, Example.class);
return futureEntity;
})
.collect(Collectors.toList());
collect.stream()
.parallel()
.forEach(future -> logger.info(uncheckedSupplier(future::get).get().getBody().toString()));
wait wait!!! Did this code solve the purpose? NO
In this code we have just called the API 5 times and fetching the body. Next we have to add filter on the body and then again make API calls like above and then trigger notification. Uffff….
Okay now let’s give a try with WebClient
Flux.range(1, 5)
.flatMap(i -> client.get()
.uri("public-api/users?page={id}", i)
.retrieve()
.bodyToMono(Example.class))
.flatMapIterable(e -> e.getData()
.stream()
.filter(in -> in.getGender().equalsIgnoreCase("female"))
.map(i -> i.getId())
.collect(Collectors.toList()))
.delayElements(Duration.ofMillis(1000))
.flatMap(i -> client.get()
.uri("public-api/users/"+ i)\
.retrieve()
.bodyToMono(User.class))
.doOnNext(e -> logger.info(e.getData().getEmail()))
.blockLast();
Its Done!!! Continue reading to understand what code is doing
We have an API that returns list of Users. The API is paginated and we get 20 results per page.
The ask is to call the API (here I have called 5 pages). Below code does this work.
** Please look at the code comments for more clarity
Flux.range(1, 5) // Integer range 1 to 5
.flatMap(i -> client.get() // HTTP method converted to method
.uri("public-api/users?page={id}", i)
.retrieve() // gets the response body
.bodyToMono(Example.class)) // Single result will be returned here so bodyToMono is used.
Our next task is to filter the list and get all female users.
.flatMapIterable(e -> e.getData() // flatMapIterable is used because the result of this is going to be a list. flatMapIterable will flatten multiple lists.
.stream()
.filter(in ->
in.getGender().equalsIgnoreCase("female"))
.map(i -> i.getId())
.collect(Collectors.toList()))
Next task is to get email id of those selected users
.delayElements(Duration.ofMillis(1000)) // Added delay to avoid spike in service
.flatMap(i -> client.get()
.uri("public-api/users/"+ i)
.accept(MediaType.TEXT_PLAIN)
.retrieve()
.bodyToMono(User.class))
Next is to log those email id’s
.doOnNext(e -> logger.info(e.getData().getEmail())) // For each ID onNext will be called
.blockLast(); // Till last item is emitted the Flux will be blocked
Key points to remember:
- flatMap is different from Java and Reactor. In reactor flatMap will auto subscribe and return the Flux. So it is mandatory that the return value should be of type Mono or Flux
- flatMap uses different thread for any async processing
- The number of threads in reactor is limited to 4 per core. Threads will be reused
- In our sample if first flatMap use 4 threads then the 4 will reuse those 4 threads but order is not guaranteed
- Unless one subscribes/blocks the stream action will not be performed. The stream is basically a blue print without subscribe or block action.