How to handle different return types from Future and ExecuteService invokeAll() ?
Last modified: 11 Jul, 2020Introduction
Recently I had to stumble upon a requirement to optimize my code to perform parallel calls to multiple downstream systems returning different response objects and stitch to a specific value object.
There is no common interface implemented by these different response objects and cannot define one as they are auto generated by swagger codegen.
Had to explore options if there is any generic way of doing this, but couldn’t pursue due to time crunch and had to implement in a way which meets my requirement and also optimize my service to perform parallel execution.
Below we shall start going through brief intro about Future, ExecutorService & Executors classes and later provide insights on the solution I had to implement.
Brief intro on Future & ExecutorService
Combining java.util.concurrent.Future<V>
& java.util.concurrent.ExecutorService
is an awesome mechanism when trying to optimize the code to complete the compute in short period.
java.util.concurrent.Future
Future<V>
is an interface that represents the result of an asynchronous computation. Once the computation is finished, we can obtain the result of it by using the get()
method.
Calling get()
method is a blocking operation and waits until the outcome (V) is available. It could take a considerable amount of time.
Instead of wasting time, we can apply two approaches. The first one is using get()
as well, but setting a timeout
value as a parameter that will prevent you from getting stuck if something goes away. The second way is by using the isDone()
method, which takes a quick look at the Future and checks if it has finished its work or not.
java.util.concurrent.ExecutorService
ExecutorService
represents an abstraction of thread pools and can be created by the utility methods of the Executors
class. These methods can initialize a number of executors depending on the purpose they will be used for.
There are several ways to delegate a task to ExecutorService:
execute(Runnable)
– returns void and cannot access the result.submit(Runnable or Callable<T>)
– returns a Future object. The main difference is that when submittingCallable<T>
, the result can be accessed via the returned Future object.invokeAny(Collection<Callable<T>>)
– returns the result of one of theCallable<T>
objects that finished its work successfully. The rest of the tasks are canceled.invokeAll(Collection<Callable<T>>))
– returns a list ofFuture<V>
objects. All tasks are executed and the outcome can be obtained via the returned result list.
When all the tasks have finished their work, the threads in ExecutorService
are still running. They are not destroyed yet and are in a standby
mode. This will keep JVM running.
For the purpose of bypassing this problem, Java offers you two methods – shutdown()
and shutdownNow()
. The key difference between them is the stopping of ExecutorService.
shutdown()
will not stop it immediately and will wait for all running threads to finish. Meanwhile, ExecutorService will not accept new tasks.
shutdownNow()
will try to stop it immediately. It will try to instantly stop all running tasks and to skip the processing of the waiting ones. The method returns a list of all running tasks for which there are no guarantees when they will be stopped.
java.util.concurrent.Executors
Executors
provides factory and utility methods for Executor
, ExecutorService
, ScheduledExecutorService
, ThreadFactory
, and Callable
classes.
This class supports the following kinds of methods:
- Methods that create and return an
ExecutorService
set up with commonly useful configuration settings. - Methods that create and return a
ScheduledExecutorService
set up with commonly useful configuration settings. - Methods that create and return a
wrapped ExecutorService
, that disables reconfiguration by making implementation-specific methods inaccessible. - Methods that create and return a
ThreadFactory
that sets newly created threads to a known state. - Methods that create and return a
Callable
out of other closure-like forms, so they can be used in execution methods requiring Callable.
// Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.
static ExecutorService newCachedThreadPool()
// Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available, and uses the provided ThreadFactory to create new threads when needed.
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
// Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
static ExecutorService newFixedThreadPool(int nThreads)
// Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue, using the provided ThreadFactory to create new threads when needed.
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
// Creates an Executor that uses a single worker thread operating off an unbounded queue.
static ExecutorService newSingleThreadExecutor()
// Creates an Executor that uses a single worker thread operating off an unbounded queue, and uses the provided ThreadFactory to create a new thread when needed.
static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
// Returns an object that delegates all defined ExecutorService methods to the given executor, but not any other methods that might otherwise be accessible using casts.
static ExecutorService unconfigurableExecutorService(ExecutorService executor)
Solution
Based on the requirement, we can get instance of ExecutorService
from Executors
factory methods. We can stick to newFixedThreadPool
which thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
Consider this use case. There are three services ServiceA
, ServiceB
& ServiceC
which returns objects which are not compatible with each other. These services should be accessed parallelly and then the response from these three services should be collated to a value object which will be returned by the processing method.
// ServiceA.java
package com.toomuch2learn.example.a;
public class ServiceA {
public static A processA() throws ClassNotFoundException {
System.out.println("==> ServiceA :: processA :: Begin");
try {
Thread.sleep(2000);
}
catch (Exception e) {
// Do Nothing
}
System.out.println("==> ServiceA :: processA :: End");
return new A();
}
}
// ServiceB.java
package com.toomuch2learn.example.b;
import java.security.InvalidKeyException;
public class ServiceB {
public static B processB() throws InvalidKeyException {
System.out.println("==> ServiceB :: processB :: Begin");
try {
Thread.sleep(1000);
}
catch (Exception e) {
// Do Nothing
}
System.out.println("==> ServiceB :: processB :: End");
return new B();
}
}
// ServiceC.java
package com.toomuch2learn.example.c;
public class ServiceC {
public static C processC() throws IllegalStateException{
System.out.println("==> ServiceC :: processC :: Begin");
try {
Thread.sleep(5000);
}
catch (Exception e) {
// Do Nothing
}
System.out.println("==> ServiceC :: processC :: End");
return new C();
}
}
public class Processor {
public static void main(String[] args) {
try {
Processor processor = new Processor();
ProcessorVO vo = processor.processRequest();
System.out.println(vo);
}
catch (Exception e) {
e.printStackTrace();;
}
}
public ProcessorVO processRequest() throws Exception {
ProcessorVO vo = null;
try {
// List of callable objects to handle different service calls
List<Callable<Object>> servicesToCall = servicesToCall();
// Prepare Executor service with fixed thread pool. Lets set to 2, as we have three services to call
ExecutorService executorService = Executors.newFixedThreadPool(2);
// Invoke all callable objects to process them in independent threads asynchronously
List<Future<Object>> futures = executorService.invokeAll(servicesToCall);
// Shutdown the executor service
executorService.shutdown();
// Loop through the future objects and have the details added to VO based upon the type of instance in future object
vo = new ProcessorVO();
for (Future<Object> future : futures) { if(future.get() instanceof A) { vo.setValueFromA(((A)future.get()).valueFromA()); } else if(future.get() instanceof B) { vo.setValueFromB(((B)future.get()).valueFromB()); } else if(future.get() instanceof C) { vo.setValueFromC(((C)future.get()).valueFromC()); } } }
catch (LambdaWrappedException e) {
throw new Exception(e);
}
return vo;
}
/**
* List of callable objects to handle different service calls
*
* @return List<Callable<Object>>
*/
private List<Callable<Object>> servicesToCall() { List<Callable<Object>> servicesToCall = new ArrayList<>();
// Request to handle ServiceA
servicesToCall.add(() -> { A a = null;
try {
a = ServiceA.processA();
}
catch (ClassNotFoundException e) {
LambdaWrappedException.throwException(e);
}
return a;
});
// Request to handle ServiceB
servicesToCall.add(() -> { B b = null;
try {
b = ServiceB.processB();
}
catch (InvalidKeyException e) {
LambdaWrappedException.throwException(e);
}
return b;
});
// Request to handle ServiceC
servicesToCall.add(() -> { C c = null;
try {
c = ServiceC.processC();
}
catch (IllegalStateException e) {
LambdaWrappedException.throwException(e);
}
return c;
});
return servicesToCall;
}
}
To Summarize
servicesToCall()
method preparesList<Callable<Object>>
objects by creating instance of callable using lambda expression for each service call.- Instance of
ExecutorService
is created by callingnewFixedThreadPool(2)
factory method fromExecutors
class. We configured only 2 threads in this example as we have 3 services to call. - Prepared list of callable objects is passed to ExecutorService
invokeAll
method. This will kickstart the invocation by creating two threads to process the callable objects. - Shutdown the ExecutorService.
- Loop through the future objects. Identify the instance of object retrieved from future object and process accordingly.
- This way we are able to cater different object return types from future object.
Running the main method should print something like below:
==> ServiceA :: processA :: Begin==> ServiceB :: processB :: Begin==> ServiceB :: processB :: End
==> ServiceC :: processC :: Begin==> ServiceA :: processA :: End
==> ServiceC :: processC :: End
ProcessorVO{valueFromA='VALUE_FROM_A', valueFromB=100, valueFromC=2020-07-11}
Conclusion
The solution presented here might not be a proper approach. But this worked for me in one of my project where in there is no common interface available and have to map responses from different downstream APIs.