Titbits

How to handle different return types from Future and ExecuteService invokeAll() ?

2much2learn - How to handle different return types from Future and ExecuteService invokeAll() ?
Clone the source code of the article from handle-different-return-types-from-future-and-executeservice

Introduction

Recently I had to stumble upon a requirement to optimize my code to perform parallel calls to multiple downstream systems returning different response objects and stitch to a specific value object.

There is no common interface implemented by these different response objects and cannot define one as they are auto generated by swagger codegen.

Had to explore options if there is any generic way of doing this, but couldn’t pursue due to time crunch and had to implement in a way which meets my requirement and also optimize my service to perform parallel execution.

Below we shall start going through brief intro about Future, ExecutorService & Executors classes and later provide insights on the solution I had to implement.

Brief intro on Future & ExecutorService

Combining java.util.concurrent.Future<V> & java.util.concurrent.ExecutorService is an awesome mechanism when trying to optimize the code to complete the compute in short period.

java.util.concurrent.Future

Future<V> is an interface that represents the result of an asynchronous computation. Once the computation is finished, we can obtain the result of it by using the get() method.

Calling get() method is a blocking operation and waits until the outcome (V) is available. It could take a considerable amount of time.

Instead of wasting time, we can apply two approaches. The first one is using get() as well, but setting a timeout value as a parameter that will prevent you from getting stuck if something goes away. The second way is by using the isDone() method, which takes a quick look at the Future and checks if it has finished its work or not.

java.util.concurrent.ExecutorService

ExecutorService represents an abstraction of thread pools and can be created by the utility methods of the Executors class. These methods can initialize a number of executors depending on the purpose they will be used for.

There are several ways to delegate a task to ExecutorService:

  • execute(Runnable) – returns void and cannot access the result.

  • submit(Runnable or Callable<T>) – returns a Future object. The main difference is that when submitting Callable<T>, the result can be accessed via the returned Future object.

  • invokeAny(Collection<Callable<T>>) – returns the result of one of the Callable<T> objects that finished its work successfully. The rest of the tasks are canceled.

  • invokeAll(Collection<Callable<T>>)) – returns a list of Future<V> objects. All tasks are executed and the outcome can be obtained via the returned result list.

When all the tasks have finished their work, the threads in ExecutorService are still running. They are not destroyed yet and are in a standby mode. This will keep JVM running.

For the purpose of bypassing this problem, Java offers you two methods – shutdown() and shutdownNow(). The key difference between them is the stopping of ExecutorService.

shutdown() will not stop it immediately and will wait for all running threads to finish. Meanwhile, ExecutorService will not accept new tasks.

shutdownNow() will try to stop it immediately. It will try to instantly stop all running tasks and to skip the processing of the waiting ones. The method returns a list of all running tasks for which there are no guarantees when they will be stopped.

java.util.concurrent.Executors

Executors provides factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes.

This class supports the following kinds of methods:

  • Methods that create and return an ExecutorService set up with commonly useful configuration settings.
  • Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.
  • Methods that create and return a wrapped ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.
  • Methods that create and return a ThreadFactory that sets newly created threads to a known state.
  • Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.
Factory and Utility methods returning instance of ExecutorService
// Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.
static ExecutorService	newCachedThreadPool()

// Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available, and uses the provided ThreadFactory to create new threads when needed.
static ExecutorService	newCachedThreadPool(ThreadFactory threadFactory)

// Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
static ExecutorService	newFixedThreadPool(int nThreads)

// Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue, using the provided ThreadFactory to create new threads when needed.
static ExecutorService	newFixedThreadPool(int nThreads, ThreadFactory threadFactory)

// Creates an Executor that uses a single worker thread operating off an unbounded queue.
static ExecutorService	newSingleThreadExecutor()

// Creates an Executor that uses a single worker thread operating off an unbounded queue, and uses the provided ThreadFactory to create a new thread when needed.
static ExecutorService	newSingleThreadExecutor(ThreadFactory threadFactory)

// Returns an object that delegates all defined ExecutorService methods to the given executor, but not any other methods that might otherwise be accessible using casts.
static ExecutorService	unconfigurableExecutorService(ExecutorService executor)

Solution

Based on the requirement, we can get instance of ExecutorService from Executors factory methods. We can stick to newFixedThreadPool which thread pool that reuses a fixed number of threads operating off a shared unbounded queue.

Consider this use case. There are three services ServiceA, ServiceB & ServiceC which returns objects which are not compatible with each other. These services should be accessed parallelly and then the response from these three services should be collated to a value object which will be returned by the processing method.

Class Diagram
Class Diagram

Service Classes
// ServiceA.java
package com.toomuch2learn.example.a;

public class ServiceA {

    public static A processA() throws ClassNotFoundException {
        System.out.println("==> ServiceA :: processA :: Begin");
        try {
            Thread.sleep(2000);
        }
        catch (Exception e) {
            // Do Nothing
        }
        System.out.println("==> ServiceA :: processA :: End");
        return new A();
    }
}

// ServiceB.java
package com.toomuch2learn.example.b;

import java.security.InvalidKeyException;

public class ServiceB {

    public static B processB() throws InvalidKeyException {
        System.out.println("==> ServiceB :: processB :: Begin");
        try {
            Thread.sleep(1000);
        }
        catch (Exception e) {
            // Do Nothing
        }
        System.out.println("==> ServiceB :: processB :: End");
        return new B();
    }
}

// ServiceC.java
package com.toomuch2learn.example.c;

public class ServiceC {

    public static C processC() throws IllegalStateException{
        System.out.println("==> ServiceC :: processC :: Begin");
        try {
            Thread.sleep(5000);
        }
        catch (Exception e) {
            // Do Nothing
        }
        System.out.println("==> ServiceC :: processC :: End");
        return new C();
    }
}
Processor.java
public class Processor {

    public static void main(String[] args) {
        try {
            Processor processor = new Processor();
            ProcessorVO vo = processor.processRequest();

            System.out.println(vo);
        }
        catch (Exception e) {
            e.printStackTrace();;
        }
    }

    public ProcessorVO processRequest() throws Exception {
        ProcessorVO vo = null;
        try {

            // List of callable objects to handle different service calls
            List<Callable<Object>> servicesToCall = servicesToCall();
            // Prepare Executor service with fixed thread pool. Lets set to 2, as we have three services to call
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            // Invoke all callable objects to process them in independent threads asynchronously
            List<Future<Object>> futures = executorService.invokeAll(servicesToCall);
            // Shutdown the executor service
            executorService.shutdown();
            // Loop through the future objects and have the details added to VO based upon the type of instance in future object
            vo = new ProcessorVO();
            for (Future<Object> future : futures) {                if(future.get() instanceof A) {                    vo.setValueFromA(((A)future.get()).valueFromA());                }                else if(future.get() instanceof B) {                    vo.setValueFromB(((B)future.get()).valueFromB());                }                else if(future.get() instanceof C) {                    vo.setValueFromC(((C)future.get()).valueFromC());                }            }        }
        catch (LambdaWrappedException e) {
            throw new Exception(e);
        }

        return vo;
    }

    /**
     * List of callable objects to handle different service calls
     *
     * @return List<Callable<Object>>
     */
    private List<Callable<Object>> servicesToCall() {        List<Callable<Object>> servicesToCall = new ArrayList<>();

        // Request to handle ServiceA
        servicesToCall.add(() -> {            A a = null;
            try {
                a = ServiceA.processA();
            }
            catch (ClassNotFoundException e) {
                LambdaWrappedException.throwException(e);
            }
            return a;
        });

        // Request to handle ServiceB
        servicesToCall.add(() -> {            B b = null;
            try {
                b = ServiceB.processB();
            }
            catch (InvalidKeyException e) {
                LambdaWrappedException.throwException(e);
            }
            return b;
        });

        // Request to handle ServiceC
        servicesToCall.add(() -> {            C c = null;
            try {
                c = ServiceC.processC();
            }
            catch (IllegalStateException e) {
                LambdaWrappedException.throwException(e);
            }
            return c;
        });

        return servicesToCall;
    }
}

To Summarize

  • servicesToCall() method prepares List<Callable<Object>> objects by creating instance of callable using lambda expression for each service call.
  • Instance of ExecutorService is created by calling newFixedThreadPool(2) factory method from Executors class. We configured only 2 threads in this example as we have 3 services to call.
  • Prepared list of callable objects is passed to ExecutorService invokeAll method. This will kickstart the invocation by creating two threads to process the callable objects.
  • Shutdown the ExecutorService.
  • Loop through the future objects. Identify the instance of object retrieved from future object and process accordingly.
  • This way we are able to cater different object return types from future object.

Running the main method should print something like below:

Output
==> ServiceA :: processA :: Begin==> ServiceB :: processB :: Begin==> ServiceB :: processB :: End
==> ServiceC :: processC :: Begin==> ServiceA :: processA :: End
==> ServiceC :: processC :: End
ProcessorVO{valueFromA='VALUE_FROM_A', valueFromB=100, valueFromC=2020-07-11}

Conclusion

The solution presented here might not be a proper approach. But this worked for me in one of my project where in there is no common interface available and have to map responses from different downstream APIs.

Clone the source code of the article from handle-different-return-types-from-future-and-executeservice
author

Madan Narra21 Posts

Software developer, Consultant & Architect

Madan is a software developer, writer, and ex-failed-startup co-founder. He has over 10+ years of experience building scalable and distributed systems using Java, JavaScript, Node.js. He writes about software design and architecture best practices with Java and is especially passionate about Microservices, API Development, Distributed Applications and Frontend Technologies.

  • Github
  • Linkedin
  • Facebook
  • Twitter
  • Instagram

Contents

Latest Post

post preview
post preview
post preview
post preview
post preview

Get The Best Of All Hands Delivered To Your Inbox

Subscribe to our newsletter and stay updated.