JavaConcurrencycookbook Notes

Java Concurrency cookbook reading notes

[TOC]

一. Thread management

Target

  • creating,running,and setting the characteristics of a thread
  • Interrupting a thread
  • Controlling the interruption of a thread
  • Sleeping and resuming a thread
  • waiting for the finalization of a thread
  • creating and running a daemon thread
  • processing uncontrolled exceptions in a thread
  • using thread local variables
  • grouping threads and processing uncontrolled exceptions in a group of threads
  • creating threads through a factory

1. Creating,running,and setting the characteristics of a thread

  • Every java program has at least one execution thread>>>main
  • A java program ends when all its threads finish
  • If the initial thread ends, the rest of the thread will continue with their execution until they finish
  • If one of the threads uses the System.exit() instruction to end the execution of the program, all the threads will end their respective execution
  • Only when you call the start()method, a new execution thread is created.

How to create a thread

  • extending the thread class and overriding the run() method;
  • Building a class that implements the Runnable interface and the run() method and then creating an object of the Thread class by passing the Runnable object as a parameter-- this is the preferred approach and it gives your more flexibility

Thread attributes

  • ID: stores a unique identifier for each thread
  • Name: stores the name of thread
  • Priority: stores the priority of the thread objects priority range[0,10],1 is the lowest(not recommended to change the priority of thread)
  • Status: stores the status of thread
    • NEW: created and not yet started
    • RUNNABLE: blocked and waiting for a monitor
    • WAITING: waiting for another thread
    • TIME_WAITING: waiting for another thread with a specified waiting time
    • TERMINATED: finished its execution

2. Interrupting a thread

public class PrimeGenerator extends Thread {
    @Override
    public void run() {
        while (true) if (isInterrupted()) return;
    }
    public static void main(String[] args) {
        Thread task = new PrimeGenerator();
        task.start();
        TimeUnit.SECONDS.sleep(5);
        task.interrupt();
    }
}

3. Sleeping and resuming a thread

Thread.sleep();
//JVM does not guarantee that it will comply with this request
Thread.yield();//only used for debuggin purposes

4. Waiting for the finalization of a thread

For this purpose, we can use the join() method of the Thread class

when we call this method using a thread object. it suspends the execution of the call thread until the object that is called finishes its execution.

thread1.start();
thread2.start();
// Wait for the finalization of the two threads
thread1.join();
thread2.join();

5. Creating and running a daemon thread

you only can call the setDaemon() method before you call the start()method. Once the thread is running, you can't modify its daemon status calling the setDaemon() method.

//WriteTask
public class WriterTask implements Runnable {
    @Override
    public void run() {
        //foreground work
    }
}
//CleanerTask
public class CleanerTask extends Thread {
    public CleanerTask(Deque deque) {
        setDaemon(true);//background thread(Daemon thread)
    }
    @Override
    public void run() {
        while (true) {
            //to-do background work
        }
    }
    public static void main(String[] args) {
        Deque deque = new ConcurrentLinkedDeque();
        for (int i=0;i<availableProcessors;i++) new Thread(new WriterTask(deque)).start();
        new CleanerTask(deque).start();
    }
}

6. Processing uncontrolled exceptions in a thread

when a checked exception is thrown inside the run() method of a thread object, we have to catch and treat them because the run() method doesn't accept a throws clause.

//solve method
public class ExceptionHandler implements Thread.UncaughtExceptionHandler {
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        e.printStackTrace(System.out);
    }
    public static void main(String[] args) {
        Thread thread = new Thread(new Task());
        thread.setUncaughtExceptionHandler(new ExceptionHandler());
        thread.start().join();
    }
}

When an uncaught exception is thrown in the thread, the JVM looks for three possible handlers for this exception.

  • First: Looks for the uncaught exception handler of the thread objects
  • Second: Looks for the uncaught exception handler of ThreadGroup
  • Finally: looks for the default uncaught exception handler

7. Grouping thread and processing uncontrolled exceptions in a group of threads

Java provides the ThreadGroup class to work with a groups of threads. A ThreadGroup object can be formed by thread objects and another ThreadGroup object, generating a tree structure of threads.

public class MyThreadGroup extends ThreadGroup {
    public MyThreadGroup(String name) {
        super(name);
    }
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        e.printStackTrace(System.out);
        interrupt();
    }
    public static void main(String[] args) {
        MyThreadGroup threadGroup = new MyThreadGroup("MyThreadGroup");
        // Create the thread objects associated to the threadGroup
        for (int i=0;i<availableProcessors;i++)new Thread(threadGroup,new Task()).start();
        threadGroup.list();
        Thread[] threads = new Thread[threadGroup.activeCount()];
        threadGroup.enumerate(threads);//copy active thread reference to subArray
        for (int i = 0; i < threadGroup.activeCount(); i++) {
            System.out.println(threads[i].getName()+threads[i].getState());
        }
    }
}

The ThreadGroup class stores thread objects and other ThreadGroup objects associated with it can access all of their information and perform operations over all its members

8. Using thread local variables

  • The thread-local variables mechanism stores a value of an attribute for each thread that uses one of these variables, you can read the value using the get() method and change the value using the set() method. The thread-local variable class provides the remove() method that deletes the value stored in a thread-local variable for the thread that it's calling.
  • The java concurrency API includes the InheritableThreadLocal class that provides inheritance of values for threads created form a thread. If thread A has a value in a thread-local variable and it creates another thread B will have the same value as thread A om the thread-local variable. You can override the childValue() method that is called to initialize the value of the child thread in the thread-local variable. It receives the value of the parent thread as a parameter in the thread-local variable.
public class SafeTask implements Runnable {
    private static ThreadLocal startDate = new ThreadLocal() {
        protected Date initialValue() {
            return new Date();
        }
    };
    @Override
    public void run() {
        TimeUnit.SECONDS.sleep(random);
    }
    public static void main(String[] args) {
        SafeTask task = new SafeTask();
        for (int i=0;i<availableProcessors();i++)new Thread(task).start();
    }
}

9. Creating threads through a factory

The ThreadFactory interface has only method, called newThread(). It receives a Runnable Object as a parameter and return a Thread object.

you can improve this implementation by adding some variants, as follows:

  • Creating personalized threads, as in the example, using a special format for the name or even creating your own Thread class that would inherit the Java Thread class
  • Saving thread creation statistics, as shown in the previous example
  • Limiting the number of the threads created
  • Validating the creation of the threads
public class MyThreadFactory implements ThreadFactory {
    private int counter = 0;
    public MyThreadFactory(String name) {}
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r,name+counter++);
    }
    public static void main(String[] args) {
        new MyThreadFactory("name").newThread(new Task()).start();
    }
}

二、Basic Thread Synchronization

A critical section is a block of code that accesses a shared resource and can't be executed by more than one thread at the same time. when a thread wants access to a critical section, it uses one of these synchronization mechanisms to find out whether there is any other thread executing the critical section. If not, the thread enters the critical section. If yes, the thread is suspended bu the synchronization mechanism until the thread that is currently executing the critical section ends it.

Target

  • Synchronizing a method
  • Using conditions in synchronized code
  • Synchronizing a block of code with a lock
  • Synchronizing data access with read/write locks
  • Using multiple conditions in a lock
  • Advanced locking with the StampedLock class

1. Synchronizing a method

The synchronized keyword penalizes the performance of the application

synchronized code block inner method

synchronized(this)//lock current object

synchronized method

public synchronized void method()//lock current object's current method

synchronized static method

public synchronized static void method()//lock all methods and objects of  current class

synchronized class

synchronized(ClassName.class)//lock all object belong to the class

2. Using conditions in synchronized code

A thread can only call the wait() method inside a synchronized block of code. when the thread calls the wait() method, JVM puts the thread to sleep and releases the object that controls the synchronized block of code that it's executing and allows other threads to execute other blocks of synchronized code protected by this object .To wake up the thread, you must call the notify()or notifyAll() methods inside a block of code protected by the same object.

//EventStorage
public class EventStorage {
    private int maxSize = 10;
    private Queue storage = new LinkedList();
    public synchronized void set() {
        while (storage.size() == maxSize)wait();
        notify();
    }
    public synchronized void get() {
        while (storage.size()==0) wait();//wait()
        notify();//notify()
    }
}
//producer
public class Producer implements Runnable{
    public Producer(EventStorage storage) {}
    @Override
    public void run() {
        for (int i = 0; i < 100; i++)storage.set();
    }
}
//consumer
public class Consumer implements Runnable{
    public Consumer(EventStorage storage) {}
    @Override
    public void run() {
        for (int i = 0; i < 100; i++)storage.get();
    }
}

3. Synchronizing a block of code with a lock

  • Synchronized keyword, you only have control over a synchronized block of code in a structured way, However Lock interface allows you to get more complex structures to implement your critical section.
  • tryLock() tries to get control of the lock, return true or false
  • ReadWriteLock interface allows a separation of read and write operations with multiple readers and only one modifier.
  • The Lock interface offers better performance than the synchronized keyword.

Avoiding deadlocks

You have to be very careful with the use of locks to avoid DeadLocks.This situation occurs when two or more threads are blocked while waiting for locks that will never be unlocked.

Thread-A locks Lock(X) && try to locks Lock(Y)

Thread-B locks Lock(Y) && simultaneously try to locks Lock(X)

they are waiting for locks that will never be liberated.

public class PrintQueue {
    private Lock queueLock;
    public PrintQueue(boolean fairMode) {
        queueLock = new ReentrantLock(fairMode);
    }
    public void printJob(Object document) {
        queueLock.lock();
        Thread.sleep(duration);
        queueLock.unlock();
    }
    public static void main (String args[]){
        // Wait for the end of the threads
        for (int i=0; i<10; i++)
        new Thread(new Job(new PrintQueue(true)),"Thread "+i).start().join();
    }
}

4. Synchronizing data access with Read/Write

There can be more than one thread using read operations simultaneously, but only one thread can use write operations. If a thread is doing a write operation, other threads can't write or read.

//PricesInfo
public class PricesInfo {
    private double price = 1.0;
    private ReadWriteLock lock = new ReentrantReadWriteLock();
    public double getPrice() {
        lock.readLock().lock();
        double value=price;
        lock.readLock().unlock();
        return value;
    }
    public void setPrices(double price) {
        lock.writeLock().lock();
        this.price=price;
        lock.writeLock().unlock();
    }
}
//Reader
public class Reader implements Runnable {
    public Reader (PricesInfo pricesInfo){}
    @Override
    public void run() {
        for (int i=0; i<20; i++)pricesInfo.getPrice1();
    }
}
//Writer
public class Writer implements Runnable {
    public Writer(PricesInfo pricesInfo){}
    @Override
    public void run() {
        for (int i=0; i<3; i++) pricesInfo.setPrices(Math.random()*10, Math.random()*8);
    }
    public static void main(String[] args) {
        PricesInfo pricesInfo = new PricesInfo();
        for (int i=0; i<5; i++)new Thread(new Reader(pricesInfo)).start();
        new Thread(new Writer(pricesInfo)).start();
    }
}

5. Using multiple conditions in a lock

A lock may be associated with one or more conditions. These conditions are declared in the Condition interface. The purpose of these conditions is to allow threads to have control of a lock and check whether a condition is true or not. The condition interface provides the mechanism to suspend a thread and wake up a suspended thread and weak up a suspended thread.

principle

All the Condition objects are associated with a lock and are created using the newCondition() method declared in the Lock interface. Before we can do any operation with a condition, you have to have control of the lock associated with the condition. So operations with conditions must be done in a thread that holds the lock with a call to a lock()

public class Buffer {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition linesCondition = lock.newCondition();
    private final Condition spaceCondition = lock.newCondition();
    public void insert(String line) {
        lock.lock();
        while (condition)spaceCondition.await();
        //to-do
        linesCondition.signalAll();
        lock.unlock();
    }
    public String get() {
        lock.lock();
        while (condition)linesCondition.await();
        //to-do
        spaceCondition.signalAll();
        lock.unlock();
        return result;
    }
}

Advanced locking with the StampedLock class

The most important features of StampedLock locks are as follows:

  • You can obtain control of the lock in three different modes
    • Write: exclusive access to the lock. No other thread can have control of the lock in this mode
    • Read: non-exclusive access to the lock
    • Optimistic Read: The thread doesn't have control over the block. Other threads can get control of the lock in write mode.
//reader
public class Reader implements Runnable {
    @Override
    public void run() {
        for (int i=0; i<50; i++) {
            long stamp=lock.readLock();
            //to-do
            lock.unlockRead(stamp);
        }
    }
}
//writer
public class Writer implements Runnable {
    @Override
    public void run() {
        for (int i=0; i<10; i++) {
            long stamp = lock.writeLock();
            //to-do
            lock.unlockWrite(stamp);
        }
    }
}
//optimisticReader
public class OptimisticReader implements Runnable {
    @Override
    public void run() {
        for (int i=0; i<100; i++) {
            stamp=lock.tryOptimisticRead();
            bool result = lock.validate(stamp);//validate stamp
            //to-do
        }
    }
}

三、Thread Synchronization Utilities

Target

  • Controlling concurrent access to one or more copies of a resource
  • Waiting for multiple concurrent events
  • Synchronizing tasks at a common point
  • Running concurrent-phased tasks
  • Controlling phase change in concurrent-phased tasks
  • Exchanging data between concurrent tasks
  • Completing and linking tasks asynchronously

1. Controlling concurrent access to one or more copies of a resources

A semaphore is a counter that protects access to one or more shared resources.

  • When a thread wants to access one of the shared resources, it must first acquire the semaphore. If the internal counter of the semaphore is greater than 0, the semaphore decrements the counter and allows access to the shared resource. A counter bigger than 0 implies that there are free resources that can be used, so the thread can access an use one of them.
  • If the counter is 0, the semaphore puts the thread to sleep until the counter is greater than 0(means all the shared resources are used by other threads)
  • When the thread has finished using the shared resource, it must release the semaphore so that another thread can access the resource, This operation increases the internal counter of the semaphore.
public class PrintQueue {
    private final Semaphore semaphore = new Semaphore(3);
    private final boolean freePrinters[] = new boolean[]{true,true,true};
    private final Lock lockPrinters = new ReentrantLock();
    public void printJob (Object document){
        semaphore.acquire();
        int assignedPrinter=getPrinter();
        //to-do
        freePrinters[assignedPrinter]=true;
        semaphore.release();            
    }
    private int getPrinter() {
        int ret=-1;
        lockPrinters.lock();
        // Look for the first free printer
        lockPrinters.unlock();
        return ret;
    }
    public static void main (String args[]){
        for (int i=0; i < threads.length; i++)new Thread(new Job(new PrintQueue())).start();
    }
}

2. Waiting for multiple concurrent events

The Java concurrency API provides a class that allows one or more threads to wait until a set of operations are made. It's called the CountDownLatch class. This class is initialized with an integer number, which is the number of operations the threads are going to wait for. When a thead wants to wait for the execution of these operations, it uses the await()method, This method puts the thread to sleep until the operations are completed. When one of these operations finishes, it uses the countDown() ,when the counter arrives at 0, the class wakes up all the threads that were sleeping in the await() method.

//Video 
public class Videoconference implements Runnable{
    private final CountDownLatch controller;
    public Videoconference(int number) {
        controller=new CountDownLatch(number);
    }
    public void arrive(String name){
        controller.countDown();
    }
    @Override
    public void run() {
        controller.await();
    }
}
//Participant
public class Participant implements Runnable {
    public Participant(Videoconference conference, String name) {
        this.conference=conference;
        this.name=name;
    }
    @Override
    public void run() {
        TimeUnit.SECONDS.sleep(duration);
        conference.arrive(name);
    }
    public static void main(String[] args) {
        Videoconference conference=new Videoconference(10);
        new Thread(conference).start();
        for (int i=0; i<10; i++)new Thread(new Participant(conference, "Participant "+i)).start();
    }
}

3. Synchronizing tasks at a common point

The Java concurrency API provides a synchronizing utility that allows the synchronization of two or more threads at a determined point. It's the CyclicBarrier class. This class is initialized with an integer number, which is the number of threads that will be synchronized at the determined point. When one of these threads arrives at the determined point, it calls the await() method to wait for the other threads(the CyclicBarrier class blocks the thread that is sleeping until the other threads arrive.) When the last thread calls the await() method of the CyclicBarrier Object, it wakes up all the threads that were waiting and continues with its job.

public class Searcher implements Runnable {
    public Searcher(CyclicBarrier barrier){
        this.barrier = barrier;
    }
    @Override
    public void run() {
        //to-do
        barrier.await();
    }
    public static void main(String[] args) {
        for (int i=0; i<PARTICIPANTS; i++)new Thread(new Searcher(new CyclicBarrier(new Runnable()))).start();
    }
}

4. Running concurrent-phased tasks

One of the most complex and powerful functionalities offered is the ability to execute concurrent-phased tasks using the Phase class. This mechanism is useful when we have some concurrent tasks divided into steps. The Phase class provides us with a mechanism to synchronize threads at the point of each step, so no thread will start with the second step until all the threads have finished the first one.

public class FileSearch implements Runnable {
    public FileSearch(Phaser phaser) {
        this.phaser = phaser;
    }
    @Override
    public void run() {
        phaser.arriveAndAwaitAdvance();
        // 1st Phase: Look for the files
        lookForFiles();
        // If no results, deregister in the phaser and ends
        if (!checkResults())return;
        // 2nd Phase: Filter the results
        filterResults();
        // If no results after the filter, deregister in the phaser and ends
        if (!checkResults())return;
        // 3rd Phase: Show info
        showInfo();
        phaser.arriveAndDeregister();
    }
    private boolean checkResults() {
        if (results.isEmpty()) {
            //empty 
            phaser.arriveAndDeregister();
            return false;
        } else {
            //to-do
            phaser.arriveAndAwaitAdvance();
            return true;
        }
    }
    public static void main(String[] args) {
        Phaser phaser=new Phaser(3);
        new Thread(new FileSearch(phaser)).start().join();
        new Thread(new FileSearch(phaser)).start().join();
        new Thread(new FileSearch(phaser)).start().join();
    }
}

5. Controlling phase change in concurrent-phased tasks

The Phaser class provides a method that is executed each time Phaser changes the phase. It's the onAdvance() method. It receives two parameters: the number of the current phases and the number of registered participants. It returns a Boolean value false if Phaser continues its execution or the value true if Phaser has finished and has to enter the termination state.

6. Exchanging data between concurrent tasks

The Exchanger class allows you to have a definition of a synchronization point between two threads. When the two threads arrive at this point, they interchange a data structure such that the data structure of the first thread goes to the second one and vice versa.(Exchanger class synchronizes only two threads)

public class Consumer implements Runnable {
    public Consumer(List buffer, Exchanger> exchanger){}
    @Override
    public void run() {    
        for (int cycle = 1; cycle <= 10; cycle++){
            buffer=exchanger.exchange(buffer);
            for (int j=0; j<10; j++)buffer.remove(0);
        }
    }
}
public class Producer implements Runnable {
    public Producer (List buffer, Exchanger> exchanger){}
    @Override
    public void run() {    
        for (int cycle=1; cycle<=10; cycle++){
            for (int j=0; j<10; j++)buffer.add(message);
            buffer=exchanger.exchange(buffer);
        }
    }
    public static void main(String[] args) {
        Exchanger> exchanger=new Exchanger();
        new Thread(new Producer(new ArrayList(), exchanger)).start();
        new Thread(new Consumer(new ArrayList(), exchanger)).start();
    }
}

7. Completing and linking tasks asynchronously

The CompletionStage interface that gives it the following two characteristics:

  • As the Future object, a CompletableFuture object will return a result sometime in future.
  • As the CompletionStage object, you can execute more asynchronous tasks after the completion of one or more CompletableFuture objects.
//有点难呀,没搞懂
public class SeedGenerator implements Runnable {
    public SeedGenerator (CompletableFuture completable) {
        this.resultCommunicator=completable;
    }
    @Override
    public void run() {
        TimeUnit.SECONDS.sleep(5);
        resultCommunicator.complete((int) Math.rint(Math.random() * 10));
    }
}
public static void main(String[] args) {
    CompletableFuture seedFuture = new CompletableFuture();
    new Thread(new SeedGenerator(seedFuture)).start();
    int seed = seedFuture.get();
    CompletableFuture> startFuture = CompletableFuture.supplyAsync(new NumberListGenerator(seed));
    //Launching step 1
    CompletableFuture step1Future = startFuture.thenApplyAsync(list -> {
        //to-do
        return selected;
    });
    //Launching step 2
    CompletableFuture step2Future = startFuture.thenApplyAsync(list -> list.stream().max(Long::compare).get());
    CompletableFuture write2Future = step2Future.thenAccept(selected -> {
        //to-do
    });
    //Launching step 3
    CompletableFuture step3Future = startFuture.thenApplyAsync(new NumberSelector());
    //Waiting for the end of the three steps
    CompletableFuture waitFuture = CompletableFuture.allOf(step1Future, write2Future, step3Future);
    CompletableFuture finalFuture = waitFuture.thenAcceptAsync((param) -> {
        System.out.printf("Main: The CompletableFuture example has been completed.");
    });
    finalFuture.join();
}

四、Thread Executors

Target

  • Creating a thread executor and controlling its rejected tasks
  • Executing tasks in an executor that returns a result
  • Running multiple tasks and processing the first result
  • Running multiple tasks and processing all the results
  • Running a task in an executor after a delay
  • Running a task in an executor periodically
  • Canceling a task in an executor
  • Controlling a task finishing in an executor
  • Separating the launching of tasks and the processing of their results in an executor

1. Creating a thread executor and controlling its rejected tasks

  • create an object of the ThreadPoolExecutor class

    • Four constructor provided by ThreadPoolExecutor
    • Executors factory method

    If you send a task to an executor between the shutdown() method and the end of its execution, the task will be rejected.

  • getPoolSize(): returned the actual number of the threads in the pool of the executor

  • getActiveCount: returned the number of threads that were executing tasks in the executor

  • getTaskCount():returned the number of task that were scheduled for execution. The returned value is only an approximation because it changes dynamically.

  • getCompletedTaskCount():returned the number of tasks completed by the executor.

public class Server {
    class RejectedTaskController implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {}
    }
    private final ThreadPoolExecutor executor;
    public Server(){
        executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        executor.setRejectedExecutionHandler(new RejectedTaskController());
    }
    public void executeTask(Task task){
        executor.execute(task);
    }
    public void endServer() {
        executor.shutdown();
    }
    public static void main(String[] args) {
        Server server=new Server();
        for (int i=0; i<100; i++)server.executeTask(new Task("Task "+i));
        server.endServer();
        server.executeTask(new Task("Rejected task"));    
    }
}

Tips: One critical aspect of the ThreadPoolExecutor class, and of executors in general, is that you have to end them explicitly. If you don't do this, the executor will continues its execution and the program won't end. so, is you don't terminate the executor, your application will never end.

The ThreadPoolExecutor class also provides other methods related to the finalization of the executor. These methods are:

  • shutdownNow(): This shuts down the executor immediately. It doesn't execute pending tasks.
  • isTerminated(): returns true if you call either the shutdown() or shutdownNow()
  • isShutdown(): return true if you call the shutdown() method of the executor
  • awaitTermination(long timeout,TimeUnit unit): blocks the calling thread until the tasks of the executor end or a timeout occurs.

2. Executing tasks in an executor that returns a result

one of the advantages of the Executor framework is that it allows you to run concurrent tasks that return a result. The Java Concurrency API achieves this with the following two interfaces:

  • Callable: This interface has the call() method. In this method, you have to implement the logic of the task. The Callable interface is a parameterized interface, meaning you have to indicate the type of data the call() method will return.
  • Future: This interface has some methods to obtain the result generated by a Callable object and manage its state.
public class FactorialCalculator implements Callable {
    @Override
    public Integer call() throws Exception {
        return Integer;
    }
    public static void main(String[] args) {
        ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(2);
        List> resultList=new ArrayList();
        // Create and send to the executor the ten tasks
        for (int i=0; i<10; i++)resultList.add(executor.submit(new FactorialCalculator()));
        do {// Wait for the finalization of the ten tasks
            for (int i=0; i result=resultList.get(i);
        } while (executor.getCompletedTaskCount()<resultList.size());
        //write the result
        for (int i=0; i    number=resultList.get(i).get();
        executor.shutdown();
    }
}

3. Running multiple tasks and processing the first result

List taskList=new ArrayList();
//ValidatorTask implements Callable
taskList.add(new ValidatorTask("TASK_A"));
taskList.add(new ValidatorTask("TASK_B"));
ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool();
String result = executor.invokeAny(taskList);
executor.shutdown();

4. Running multiple tasks and processing all the results

when you want to wait for the finalization of a task, you can use the following two method:

  • isDone()
  • awaitTermination()
ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool();
List taskList=new ArrayList();
for (int i=0; i<10; i++)taskList.add(new Task());
List> resultList=executor.invokeAll(taskList);
executor.shutdown();
//get the result of execution task
for (int i=0; i future=resultList.get(i).get();

5. Running a task in an executor after a delay

The Executor framework provides the scheduledExecutorService interface along with its implementation, namely the ScheduledThreadPoolExecutor class

ScheduledExecutorService executor=Executors.newScheduledThreadPool(1);
for (int i=0; i<5; i++)executor.schedule(new Task(),i+1,TimeUnit.SECONDS);
executor.shutdown();
executor.awaitTermination(1, TimeUnit.DAYS);

6. Running a task in an executor periodically

ScheduleExecutorService executor = Executors.newScheduledThreadPool(1);
// as your task was a Runnable object that was not parameterized, you had to parameterize them with the ? symbol as a parameter. 
ScheduledFuture result = executor.scheduleAtFixedRate(task,1,2,TimeUnit.SECONDS);
  • An important point to consider is that the period between two executions is the period of time between the start of those two executions. If you have a periodic task that takes 5 seconds to execute and you put in a period of 3 seconds, you will have two instances of the task executing at a time
  • In the scheduledAtFixedRate() method, the third parameter determines the period of time between the starting of two executions. In the *scheduledWithFixedRate() *method, the parameter determines the period of time between the end of an execution of the task and its beginning.

7. Canceling a task in an executor

ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();
Future result=executor.submit(new Task());
result.cancel(true);
executor.shutdown();
  • If the task has finished or has been canceled earlier, or it can't be canceled due to any other reason, the method will return the false value and the task won't be canceled.
  • If the task is waiting in the executor to get a Thread object that will execute it, the task is canceled and will never begin its execution. If the task is already running, it depends on the parameter of the method. The cancel() method receives a Boolean value as a parameter. If the value of this parameter is true and the task is running, it will be canceled. If false, It won't be canceled.

8. Controlling a task finishing in an executor

Java API provides the FutureTask class as a cancelable asynchronous computation. It implement the Runnable and Future interfaces. we can create a FutureTask class using a Callable or Runnable object.

public class ResultTask extends FutureTask {
    public ResultTask(ExecutableTask callable) {
        super(callable);
        this.name=callable.getName();
    }
    @Override
    protected void done() {
        if (isCancelled()) {
            //Has been cancelled
        } else {
            //Has finished
        }
    }
    public static void main(String[] args) {
        ExecutorService executor=Executors.newCachedThreadPool();
        ResultTask resultTasks[]=new ResultTask[5];
        for (int i=0; i<5; i++) {
            resultTasks[i]=new ResultTask(new ExecutableTask("Task "+i));
            executor.submit(resultTasks[i]);
        }
        //to-do
        for (int i=0; i<resultTasks.length; i++)resultTasks[i].cancel(true);
        for (int i=0; i<resultTasks.length; i++)if (!resultTasks[i].isCancelled())resultTasks[i].get();
        executor.shutdown();
    }
}

9. Separating the launching of tasks and the procecssing of their results in an executor

You can find situations where you need to send the tasks to the executor in one object and proccess the results in another one, For such situations, Java Concurrency API provides the CompletionService class

//ReportProcessor
public class ReportProcessor implements Runnable {
    private final CompletionService service;
    private volatile boolean end = false;
    public ReportProcessor (CompletionService service){}
    @Override
    public void run() {
        Future result=service.poll(20, TimeUnit.SECONDS);
        if (result!=null)String report=result.get();        
    }
}
//ReportRequest
public class ReportRequest implements Runnable {
    public ReportRequest(CompletionService service){}
    @Override
    public void run() {
        service.submit(new Runnable());
    }
    public static void main(String[] args) {
        ExecutorService executor=Executors.newCachedThreadPool();
        CompletionService service=new ExecutorCompletionService(executor);
        new Thread(new ReportRequest("Face", service)).start().join();
        new Thread(new ReportRequest("Online", service)).start().join();
        new Thread(new ReportProcessor(service)).start();
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.DAYS);
    }
}

This class also provides two other method to obtain the Future objects of the finished tasks. These methods are as follows:

  • poll(): The version of poll() method without arguments checks whether there are any Future objects in the queue. If the queue is empty, it returns null immediately. Otherwise, it returns its first element and removes it from the queue.
  • take() : This method, without arguments, check whether there are any Future objects in the queue. If it is empty, it blocks the thread until the queue has an element. If the queue has elements, it returns and deletes its first element from the queue.

五、Fork/join Framework

  • Fork operation: When you divide a task into smaller tasks and execute them using the framework.
  • Join operation: When a task waits for the finalization of the tasks it has created. It's used to combine the results of those tasks.

The main difference between the fork/join and the Executor frameworks is the work-stealing algorithm. Unlike the Executor framework, when a task is waiting for the finalization of the subtasks it has created using the join operation, the thread that is executing that task(called worker thread) looks for other tasks that have not been executed yet and begins their execution.

  • Tasks can only use the fork() and join() operations as synchronization mechanisms. If they use other synchronization mechanisms, the worker threads can't execute other tasks when they are in the synchronization operation.
  • Tasks should not perform I/O operations such as read or write data in a file.
  • Tasks can't throw checked exceptions. They have to include the code necessary to process them.

The core of the fork/join framework is formed bu the following two classes:

  • ForkJoinPool: This class implements the ExecutorService interface and the work-stealing algorithm. It manages the worker threads and offers information about the status of the tasks and their execution.
  • ForkJoinTask: This is the base class of the tasks that will execute in the ForkJoinPool It provides the mechanisms to execute the fork() and join() operations inside a task and the methods to control the status of the tasks. Usually, to implement your fork /join tasks, you will implement a subclass of three subclasses of this class:
    • RecursiveAction: no return
    • RecursiveTask :one result
    • CountedConpleter: tasks that launch a completion action when all the subtasks have finished.

Target

  • Creating a fork/join pool
  • Joining the results of the tasks
  • Running tasks asynchronously
  • Throwing exceptions in the tasks
  • Canceling a task

1. Creating a fork/join pool

Steps

  • Creating a ForkJoinPool object to execute the tasks

    using the default constructor

  • Creating a subclass of ForkJoinTask to be executed in the pool

    execute the tasks in a synchronized way. When a task executes two or more subtasks, it waits for their finalizations. In this way, the thread that was executing that task(called worker thread) will look for other tasks to execute, taking full advantage of their execution time.

public class Task extends RecursiveAction {
    //serial version UID. The ForkJoinTask class implements the serializable interface.
    private static final long serialVersionUID = 1L;
    public Task (List products, int first, int last) {}
    @Override
    protected void compute() {
        if (condition)//process
        else invokeAll(new Task(products,first,middle),new Task(products,middle+1,last));
    }
    public static void main(String[] args) {
        Task task=new Task(products, 0, products.size(), 0.20);
        ForkJoinPool pool=new ForkJoinPool();
        pool.execute(task);
        do {TimeUnit.MILLISECONDS.sleep(5);
        } while(!task.isDone());
        pool.shutdown();
        if (task.isCompletedNormally()){}//The process has completed normally
        for (int i=0; i<products.size(); i++)Product product=products.get(i);
    }
}

2. Joining the results of the tasks

The fork/join framework provides the ability to execute tasks return a result. This kind of tasks is implemented by the RecursiveTask class. This class extends the ForkJoinTask class and implements the Future interface provided by the Executor framework.

//Line Task
public class LineTask extends RecursiveTask{
    private static final long serialVersionUID = 1L;
    public LineTask(String line[], int start, int end, String word) {}
    @Override
    protected Integer compute() {
        Integer result=null;
        if (condition){
            result=count(line, start, end, word);
        }else {
            LineTask task1=new LineTask(line,start,mid,word);
            LineTask task2=new LineTask(line,mid,end,word);
            invokeAll(task1, task2);
            result=task1.get()+task2.get();
        }
        return result;
    }
}
//DocumentTask 
public class DocumentTask extends RecursiveTask {
    private static final long serialVersionUID = 1L;
    public DocumentTask (String document[][],int start, int end, String word){}
    @Override
    protected Integer compute() {
        Integer result=null;
        if (condition){
            result=processLines(document,start,end,word);
        } else {
            int mid=(start+end)/2;
            DocumentTask task1=new DocumentTask(document,start,mid,word);
            DocumentTask task2=new DocumentTask(document,mid,end,word);
            invokeAll(task1,task2);
            result=task1.get()+task2.get();
        }
        return result;
    }
    private Integer processLines(String[][] document, int start, int end,String word) {
        List tasks=new ArrayList();
        for (int i=start;i<end;i++)tasks.add(new LineTask(document[i],0,document[i].length,word));
        invokeAll(tasks);
        for (int i=0; i<tasks.size(); i++)result=result+tasks.get(i).get();
        return result;
    }
    public static void main(String[] args) {
        DocumentTask task=new DocumentTask(document,0,100,"the");
        ForkJoinPool commonPool=ForkJoinPool.commonPool();
        commonPool.execute(task);
        do {TimeUnit.SECONDS.sleep(1);
        } while (!task.isDone());
        commonPool.shutdown();
        commonPool.awaitTermination(1, TimeUnit.DAYS);
        task.get()//get result 
    }
}

3. Running tasks asynchronously

  • Synchronous methods: The task that calls one of these methods(invokeAll() ) is suspended until the tasks it send to the pool finish their execution. This allows the ForkJoinPool class to use the work-steal algorithm to assign a new task to the worker thread that executed the sleeping task.
  • Asynchronous methods: the task continues with its execution(fork()), so the ForkJoinPool class can't use the work-stealing algorithm to increase the performance of the application. In this case, only when you call the join() or get() methods to wait for the finalization of a task, the ForkJoinPool class can use that algorithm.
public class FolderProcessor extends CountedCompleter> {
    private static final long serialVersionUID = -1826436670135695513L;
    private List tasks = new ArrayList();
    private List resultList = new ArrayList();    
    protected FolderProcessor (CountedCompleter completer, String path, String extension) {super(completer);}
    public FolderProcessor (String path, String extension) {}
    @Override
    public void compute() {
        File content[] = new File(path).listFiles();
        for (int i = 0; i < content.length; i++) {
            if (content[i].isDirectory()) {
                FolderProcessor task=new FolderProcessor(this,content[i].getAbsolutePath(), extension);
                task.fork();
                addToPendingCount(1);
                tasks.add(task);
            } else {
                resultList.add(content[i].getAbsolutePath());
            }
        }
        tryComplete();// Try the completion of the task
    }
    @Override
    public void onCompletion(CountedCompleter completer) {
        for (FolderProcessor childTask:tasks) {
            resultList.addAll(childTask.getResultList());
        }
    }
    public static void main(String[] args) {
        ForkJoinPool pool=new ForkJoinPool();
        // Create three FolderProcessor tasks for three diferent folders
        FolderProcessor system=new FolderProcessor("path", "log");
        FolderProcessor apps=new FolderProcessor("path","log");
        pool.execute(system);
        pool.execute(apps);
        do {TimeUnit.SECONDS.sleep(2);
        } while ((!system.isDone())||(!apps.isDone()));
        pool.shutdown();
        system.getResultList();
        apps.getResultList();
    }
}

4. Throwing exceptions in the tasks

  • Checked exception: These exceptions must be specified in the throws clause of a method or caught them. Form example, IOException.
  • Unchecked exception: These exceptions don't have to be specified or caught.
public class Task extends RecursiveTask {
    private static final long serialVersionUID = 1L;
    @Override
    protected Integer compute() {
        if (ExceptionCondition)throw new RuntimeException("Exception message");
        return 0;
    }
    public static void main(String[] args) {
        Task task=new Task();
        ForkJoinPool pool=new ForkJoinPool();
        pool.execute(task);
        pool.shutdown();
        pool.awaitTermination(1, TimeUnit.DAYS);
        if (task.isCompletedAbnormally()){}//completedAbnormally
    }
}

5. Canceling a task

The ForkJoinTask class provides the cancel() method for this purpose. There are some points you have to take into account when you want to cancel a task, which are as follows:

  • The ForkJoinPool class doesn't provide any method to cancel all the tasks it has running or waiting in the pool
  • When you cancel a task, you don't cancel the tasks this task has executed.

六、Parallel and Reactive Steams

Target

  • Creating streams from different sources
  • Reducing the elements of a stream
  • Collecting the elements of a stream
  • Applying an action to every element of a stream
  • Filtering the elements of a stream
  • Transforming the elements of a stream
  • Sorting the elements of a stream
  • Verifying conditions in the elements of a stream
  • Reactive programming with reactive streams

1. Creating streams from different sources

2. Reducing the elements of a stream

3. Collecting the elements of a stream

4. Applying an action to every element of a stream

5. Filtering the elements of a stream

6. Transforming the elements of a stream

7. Sorting the elements of a stream

8. Verifying conditions in the elements of a stream

9. Reactive programming with reactive streams

七、Concurrent Collections

  • Using non-blocking thread-safe deques
  • Using blocking thread-safe deques
  • Using blocking thread-safe queue ordered by priority
  • Using thread-safe lists with delayed elements
  • Using thread-safe navigable maps
  • Using thread-safe HashMaps
  • Using atomic variables
  • Using atomic arrays
  • Using the volatile keyword
  • Using variable handles

Using non-blocking thread-safe deques

Using blocking thread-safe deques

Using blocking thread-safe queue ordered by priority

Using thread-safe lists with delayed elements

Using thread-safe navigable maps

Using thread-safe HashMaps

Using atomic variables

Using atomic arrays

Using the volatile keyword

Using variable handles

八、Customizing Concurrency Classes

Target

  • Customizing the ThreadPoolExecutor class
  • Implementing a priority-based Executor class
  • Implementing the ThreadFactory interface to generate custom threads
  • Using our ThreadFactory in an Executor Object
  • Customizing tasks running in a scheduled thread pool
  • Implementing the ThreadFactory interface to generate custom threads for the fork/join framework
  • Customizing tasks running in the fork/join framework
  • Implementing a custom Lock class
  • Implementing a transfer queue-based on priorities
  • Implementing your own atomic object
  • Implementing you own stream generator
  • Implementing your own asynchronous stream

1. Customizing the ThreadPoolExecutor class

public class MyExecutor extends ThreadPoolExecutor {
    private final ConcurrentHashMap startTimes;
    public MyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        startTimes = new ConcurrentHashMap();
    }
    @Override
    public void shutdown() {
        super.shutdown();
    }
    @Override
    public List shutdownNow() {
        return super.shutdownNow();
    }
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        System.out.println(t.getName()+","+r.hashCode());
        startTimes.put(r,new Date());
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Future result = (Future) r;
        System.out.println(result.get());
        super.afterExecute(r, t);
    }
}
全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务