handle-different-return-types-from-future-and-executeservice

Introduction

Recently I had to stumble upon a requirement to optimize my code to perform parallel calls to multiple downstream systems returning different response objects and stitch to a specific value object.

There is no common interface implemented by these different response objects and cannot define one as they are auto generated by swagger codegen.

Had to explore options if there is any generic way of doing this, but couldn’t pursue due to time crunch and had to implement in a way which meets my requirement and also optimize my service to perform parallel execution.

Below we shall start going through brief intro about Future, ExecutorService & Executors classes and later provide insights on the solution I had to implement.

Brief intro on Future & ExecutorService

Combining java.util.concurrent.Future<V> & java.util.concurrent.ExecutorService is an awesome mechanism when trying to optimize the code to complete the compute in short period.

java.util.concurrent.Future

Future<V> is an interface that represents the result of an asynchronous computation. Once the computation is finished, we can obtain the result of it by using the get() method.

Calling get() method is a blocking operation and waits until the outcome (V) is available. It could take a considerable amount of time.

Instead of wasting time, we can apply two approaches. The first one is using get() as well, but setting a timeout value as a parameter that will prevent you from getting stuck if something goes away. The second way is by using the isDone() method, which takes a quick look at the Future and checks if it has finished its work or not.

java.util.concurrent.ExecutorService

ExecutorService represents an abstraction of thread pools and can be created by the utility methods of the Executors class. These methods can initialize a number of executors depending on the purpose they will be used for.

There are several ways to delegate a task to ExecutorService:

When all the tasks have finished their work, the threads in ExecutorService are still running. They are not destroyed yet and are in a standby mode. This will keep JVM running.

For the purpose of bypassing this problem, Java offers you two methods – shutdown() and shutdownNow(). The key difference between them is the stopping of ExecutorService.

shutdown() will not stop it immediately and will wait for all running threads to finish. Meanwhile, ExecutorService will not accept new tasks.

shutdownNow() will try to stop it immediately. It will try to instantly stop all running tasks and to skip the processing of the waiting ones. The method returns a list of all running tasks for which there are no guarantees when they will be stopped.

java.util.concurrent.Executors

Executors provides factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes.

This class supports the following kinds of methods:

Factory and Utility methods returning instance of ExecutorService
// Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.
static ExecutorService	newCachedThreadPool()

// Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available, and uses the provided ThreadFactory to create new threads when needed.
static ExecutorService	newCachedThreadPool(ThreadFactory threadFactory)

// Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
static ExecutorService	newFixedThreadPool(int nThreads)

// Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue, using the provided ThreadFactory to create new threads when needed.
static ExecutorService	newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

// Creates an Executor that uses a single worker thread operating off an unbounded queue.
static ExecutorService	newSingleThreadExecutor()

// Creates an Executor that uses a single worker thread operating off an unbounded queue, and uses the provided ThreadFactory to create a new thread when needed.
static ExecutorService	newSingleThreadExecutor(ThreadFactory threadFactory)

// Returns an object that delegates all defined ExecutorService methods to the given executor, but not any other methods that might otherwise be accessible using casts.
static ExecutorService	unconfigurableExecutorService(ExecutorService executor)

Solution

Based on the requirement, we can get instance of ExecutorService from Executors factory methods. We can stick to newFixedThreadPool which thread pool that reuses a fixed number of threads operating off a shared unbounded queue.

Consider this use case. There are three services ServiceA, ServiceB & ServiceC which returns objects which are not compatible with each other. These services should be accessed parallelly and then the response from these three services should be collated to a value object which will be returned by the processing method.

Class Diagram
Class Diagram

Service Classes
// ServiceA.java
package com.toomuch2learn.example.a;

public class ServiceA {

    public static A processA() throws ClassNotFoundException {
        System.out.println("==> ServiceA :: processA :: Begin");
        try {
            Thread.sleep(2000);
        }
        catch (Exception e) {
            // Do Nothing
        }
        System.out.println("==> ServiceA :: processA :: End");
        return new A();
    }
}

// ServiceB.java
package com.toomuch2learn.example.b;

import java.security.InvalidKeyException;

public class ServiceB {

    public static B processB() throws InvalidKeyException {
        System.out.println("==> ServiceB :: processB :: Begin");
        try {
            Thread.sleep(1000);
        }
        catch (Exception e) {
            // Do Nothing
        }
        System.out.println("==> ServiceB :: processB :: End");
        return new B();
    }
}

// ServiceC.java
package com.toomuch2learn.example.c;

public class ServiceC {

    public static C processC() throws IllegalStateException{
        System.out.println("==> ServiceC :: processC :: Begin");
        try {
            Thread.sleep(5000);
        }
        catch (Exception e) {
            // Do Nothing
        }
        System.out.println("==> ServiceC :: processC :: End");
        return new C();
    }
}
Processor.java
public class Processor {

    public static void main(String[] args) {
        try {
            Processor processor = new Processor();
            ProcessorVO vo = processor.processRequest();

            System.out.println(vo);
        }
        catch (Exception e) {
            e.printStackTrace();;
        }
    }

    public ProcessorVO processRequest() throws Exception {
        ProcessorVO vo = null;
        try {

            // List of callable objects to handle different service calls
            List<Callable<Object>> servicesToCall = servicesToCall();
            // Prepare Executor service with fixed thread pool. Lets set to 2, as we have three services to call
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            // Invoke all callable objects to process them in independent threads asynchronously
            List<Future<Object>> futures = executorService.invokeAll(servicesToCall);
            // Shutdown the executor service
            executorService.shutdown();
            // Loop through the future objects and have the details added to VO based upon the type of instance in future object
            vo = new ProcessorVO();
            for (Future<Object> future : futures) {                if(future.get() instanceof A) {                    vo.setValueFromA(((A)future.get()).valueFromA());                }                else if(future.get() instanceof B) {                    vo.setValueFromB(((B)future.get()).valueFromB());                }                else if(future.get() instanceof C) {                    vo.setValueFromC(((C)future.get()).valueFromC());                }            }        }
        catch (LambdaWrappedException e) {
            throw new Exception(e);
        }

        return vo;
    }

    /**
     * List of callable objects to handle different service calls
     *
     * @return List<Callable<Object>>
     */
    private List<Callable<Object>> servicesToCall() {        List<Callable<Object>> servicesToCall = new ArrayList<>();

        // Request to handle ServiceA
        servicesToCall.add(() -> {            A a = null;
            try {
                a = ServiceA.processA();
            }
            catch (ClassNotFoundException e) {
                LambdaWrappedException.throwException(e);
            }
            return a;
        });

        // Request to handle ServiceB
        servicesToCall.add(() -> {            B b = null;
            try {
                b = ServiceB.processB();
            }
            catch (InvalidKeyException e) {
                LambdaWrappedException.throwException(e);
            }
            return b;
        });

        // Request to handle ServiceC
        servicesToCall.add(() -> {            C c = null;
            try {
                c = ServiceC.processC();
            }
            catch (IllegalStateException e) {
                LambdaWrappedException.throwException(e);
            }
            return c;
        });

        return servicesToCall;
    }
}

To Summarize

Running the main method should print something like below:

Output
==> ServiceA :: processA :: Begin==> ServiceB :: processB :: Begin==> ServiceB :: processB :: End
==> ServiceC :: processC :: Begin==> ServiceA :: processA :: End
==> ServiceC :: processC :: End
ProcessorVO{valueFromA='VALUE_FROM_A', valueFromB=100, valueFromC=2020-07-11}

Conclusion

The solution presented here might not be a proper approach. But this worked for me in one of my project where in there is no common interface available and have to map responses from different downstream APIs.

handle-different-return-types-from-future-and-executeservice