JavaConcurrencycookbook Notes
Java Concurrency cookbook reading notes
[TOC]
一. Thread management
Target
- creating,running,and setting the characteristics of a thread
- Interrupting a thread
- Controlling the interruption of a thread
- Sleeping and resuming a thread
- waiting for the finalization of a thread
- creating and running a daemon thread
- processing uncontrolled exceptions in a thread
- using thread local variables
- grouping threads and processing uncontrolled exceptions in a group of threads
- creating threads through a factory
1. Creating,running,and setting the characteristics of a thread
- Every java program has at least one execution thread>>>main
- A java program ends when all its threads finish
- If the initial thread ends, the rest of the thread will continue with their execution until they finish
- If one of the threads uses the System.exit() instruction to end the execution of the program, all the threads will end their respective execution
- Only when you call the start()method, a new execution thread is created.
How to create a thread
- extending the thread class and overriding the run() method;
- Building a class that implements the Runnable interface and the run() method and then creating an object of the Thread class by passing the Runnable object as a parameter-- this is the preferred approach and it gives your more flexibility
Thread attributes
- ID: stores a unique identifier for each thread
- Name: stores the name of thread
- Priority: stores the priority of the thread objects priority range[0,10],1 is the lowest(not recommended to change the priority of thread)
- Status: stores the status of thread
- NEW: created and not yet started
- RUNNABLE: blocked and waiting for a monitor
- WAITING: waiting for another thread
- TIME_WAITING: waiting for another thread with a specified waiting time
- TERMINATED: finished its execution
2. Interrupting a thread
public class PrimeGenerator extends Thread { @Override public void run() { while (true) if (isInterrupted()) return; } public static void main(String[] args) { Thread task = new PrimeGenerator(); task.start(); TimeUnit.SECONDS.sleep(5); task.interrupt(); } }
3. Sleeping and resuming a thread
Thread.sleep(); //JVM does not guarantee that it will comply with this request Thread.yield();//only used for debuggin purposes
4. Waiting for the finalization of a thread
For this purpose, we can use the join() method of the Thread class
when we call this method using a thread object. it suspends the execution of the call thread until the object that is called finishes its execution.
thread1.start(); thread2.start(); // Wait for the finalization of the two threads thread1.join(); thread2.join();
5. Creating and running a daemon thread
you only can call the setDaemon() method before you call the start()method. Once the thread is running, you can't modify its daemon status calling the setDaemon() method.
//WriteTask public class WriterTask implements Runnable { @Override public void run() { //foreground work } } //CleanerTask public class CleanerTask extends Thread { public CleanerTask(Deque deque) { setDaemon(true);//background thread(Daemon thread) } @Override public void run() { while (true) { //to-do background work } } public static void main(String[] args) { Deque deque = new ConcurrentLinkedDeque(); for (int i=0;i<availableProcessors;i++) new Thread(new WriterTask(deque)).start(); new CleanerTask(deque).start(); } }
6. Processing uncontrolled exceptions in a thread
when a checked exception is thrown inside the run() method of a thread object, we have to catch and treat them because the run() method doesn't accept a throws clause.
//solve method public class ExceptionHandler implements Thread.UncaughtExceptionHandler { @Override public void uncaughtException(Thread t, Throwable e) { e.printStackTrace(System.out); } public static void main(String[] args) { Thread thread = new Thread(new Task()); thread.setUncaughtExceptionHandler(new ExceptionHandler()); thread.start().join(); } }
When an uncaught exception is thrown in the thread, the JVM looks for three possible handlers for this exception.
- First: Looks for the uncaught exception handler of the thread objects
- Second: Looks for the uncaught exception handler of ThreadGroup
- Finally: looks for the default uncaught exception handler
7. Grouping thread and processing uncontrolled exceptions in a group of threads
Java provides the ThreadGroup class to work with a groups of threads. A ThreadGroup object can be formed by thread objects and another ThreadGroup object, generating a tree structure of threads.
public class MyThreadGroup extends ThreadGroup { public MyThreadGroup(String name) { super(name); } @Override public void uncaughtException(Thread t, Throwable e) { e.printStackTrace(System.out); interrupt(); } public static void main(String[] args) { MyThreadGroup threadGroup = new MyThreadGroup("MyThreadGroup"); // Create the thread objects associated to the threadGroup for (int i=0;i<availableProcessors;i++)new Thread(threadGroup,new Task()).start(); threadGroup.list(); Thread[] threads = new Thread[threadGroup.activeCount()]; threadGroup.enumerate(threads);//copy active thread reference to subArray for (int i = 0; i < threadGroup.activeCount(); i++) { System.out.println(threads[i].getName()+threads[i].getState()); } } }
The ThreadGroup class stores thread objects and other ThreadGroup objects associated with it can access all of their information and perform operations over all its members
8. Using thread local variables
- The thread-local variables mechanism stores a value of an attribute for each thread that uses one of these variables, you can read the value using the get() method and change the value using the set() method. The thread-local variable class provides the remove() method that deletes the value stored in a thread-local variable for the thread that it's calling.
- The java concurrency API includes the InheritableThreadLocal class that provides inheritance of values for threads created form a thread. If thread A has a value in a thread-local variable and it creates another thread B will have the same value as thread A om the thread-local variable. You can override the childValue() method that is called to initialize the value of the child thread in the thread-local variable. It receives the value of the parent thread as a parameter in the thread-local variable.
public class SafeTask implements Runnable { private static ThreadLocal startDate = new ThreadLocal() { protected Date initialValue() { return new Date(); } }; @Override public void run() { TimeUnit.SECONDS.sleep(random); } public static void main(String[] args) { SafeTask task = new SafeTask(); for (int i=0;i<availableProcessors();i++)new Thread(task).start(); } }
9. Creating threads through a factory
The ThreadFactory interface has only method, called newThread(). It receives a Runnable Object as a parameter and return a Thread object.
you can improve this implementation by adding some variants, as follows:
- Creating personalized threads, as in the example, using a special format for the name or even creating your own Thread class that would inherit the Java Thread class
- Saving thread creation statistics, as shown in the previous example
- Limiting the number of the threads created
- Validating the creation of the threads
public class MyThreadFactory implements ThreadFactory { private int counter = 0; public MyThreadFactory(String name) {} @Override public Thread newThread(Runnable r) { return new Thread(r,name+counter++); } public static void main(String[] args) { new MyThreadFactory("name").newThread(new Task()).start(); } }
二、Basic Thread Synchronization
A critical section is a block of code that accesses a shared resource and can't be executed by more than one thread at the same time. when a thread wants access to a critical section, it uses one of these synchronization mechanisms to find out whether there is any other thread executing the critical section. If not, the thread enters the critical section. If yes, the thread is suspended bu the synchronization mechanism until the thread that is currently executing the critical section ends it.
Target
- Synchronizing a method
- Using conditions in synchronized code
- Synchronizing a block of code with a lock
- Synchronizing data access with read/write locks
- Using multiple conditions in a lock
- Advanced locking with the StampedLock class
1. Synchronizing a method
The synchronized keyword penalizes the performance of the application
synchronized code block inner method
synchronized(this)//lock current object
synchronized method
public synchronized void method()//lock current object's current method
synchronized static method
public synchronized static void method()//lock all methods and objects of current class
synchronized class
synchronized(ClassName.class)//lock all object belong to the class
2. Using conditions in synchronized code
A thread can only call the wait() method inside a synchronized block of code. when the thread calls the wait() method, JVM puts the thread to sleep and releases the object that controls the synchronized block of code that it's executing and allows other threads to execute other blocks of synchronized code protected by this object .To wake up the thread, you must call the notify()or notifyAll() methods inside a block of code protected by the same object.
//EventStorage public class EventStorage { private int maxSize = 10; private Queue storage = new LinkedList(); public synchronized void set() { while (storage.size() == maxSize)wait(); notify(); } public synchronized void get() { while (storage.size()==0) wait();//wait() notify();//notify() } } //producer public class Producer implements Runnable{ public Producer(EventStorage storage) {} @Override public void run() { for (int i = 0; i < 100; i++)storage.set(); } } //consumer public class Consumer implements Runnable{ public Consumer(EventStorage storage) {} @Override public void run() { for (int i = 0; i < 100; i++)storage.get(); } }
3. Synchronizing a block of code with a lock
- Synchronized keyword, you only have control over a synchronized block of code in a structured way, However Lock interface allows you to get more complex structures to implement your critical section.
- tryLock() tries to get control of the lock, return true or false
- ReadWriteLock interface allows a separation of read and write operations with multiple readers and only one modifier.
- The Lock interface offers better performance than the synchronized keyword.
Avoiding deadlocks
You have to be very careful with the use of locks to avoid DeadLocks.This situation occurs when two or more threads are blocked while waiting for locks that will never be unlocked.
Thread-A locks Lock(X) && try to locks Lock(Y)
Thread-B locks Lock(Y) && simultaneously try to locks Lock(X)
they are waiting for locks that will never be liberated.
public class PrintQueue { private Lock queueLock; public PrintQueue(boolean fairMode) { queueLock = new ReentrantLock(fairMode); } public void printJob(Object document) { queueLock.lock(); Thread.sleep(duration); queueLock.unlock(); } public static void main (String args[]){ // Wait for the end of the threads for (int i=0; i<10; i++) new Thread(new Job(new PrintQueue(true)),"Thread "+i).start().join(); } }
4. Synchronizing data access with Read/Write
There can be more than one thread using read operations simultaneously, but only one thread can use write operations. If a thread is doing a write operation, other threads can't write or read.
//PricesInfo public class PricesInfo { private double price = 1.0; private ReadWriteLock lock = new ReentrantReadWriteLock(); public double getPrice() { lock.readLock().lock(); double value=price; lock.readLock().unlock(); return value; } public void setPrices(double price) { lock.writeLock().lock(); this.price=price; lock.writeLock().unlock(); } } //Reader public class Reader implements Runnable { public Reader (PricesInfo pricesInfo){} @Override public void run() { for (int i=0; i<20; i++)pricesInfo.getPrice1(); } } //Writer public class Writer implements Runnable { public Writer(PricesInfo pricesInfo){} @Override public void run() { for (int i=0; i<3; i++) pricesInfo.setPrices(Math.random()*10, Math.random()*8); } public static void main(String[] args) { PricesInfo pricesInfo = new PricesInfo(); for (int i=0; i<5; i++)new Thread(new Reader(pricesInfo)).start(); new Thread(new Writer(pricesInfo)).start(); } }
5. Using multiple conditions in a lock
A lock may be associated with one or more conditions. These conditions are declared in the Condition interface. The purpose of these conditions is to allow threads to have control of a lock and check whether a condition is true or not. The condition interface provides the mechanism to suspend a thread and wake up a suspended thread and weak up a suspended thread.
principle
All the Condition objects are associated with a lock and are created using the newCondition() method declared in the Lock interface. Before we can do any operation with a condition, you have to have control of the lock associated with the condition. So operations with conditions must be done in a thread that holds the lock with a call to a lock()
public class Buffer { private final ReentrantLock lock = new ReentrantLock(); private final Condition linesCondition = lock.newCondition(); private final Condition spaceCondition = lock.newCondition(); public void insert(String line) { lock.lock(); while (condition)spaceCondition.await(); //to-do linesCondition.signalAll(); lock.unlock(); } public String get() { lock.lock(); while (condition)linesCondition.await(); //to-do spaceCondition.signalAll(); lock.unlock(); return result; } }
Advanced locking with the StampedLock class
The most important features of StampedLock locks are as follows:
- You can obtain control of the lock in three different modes
- Write: exclusive access to the lock. No other thread can have control of the lock in this mode
- Read: non-exclusive access to the lock
- Optimistic Read: The thread doesn't have control over the block. Other threads can get control of the lock in write mode.
//reader public class Reader implements Runnable { @Override public void run() { for (int i=0; i<50; i++) { long stamp=lock.readLock(); //to-do lock.unlockRead(stamp); } } } //writer public class Writer implements Runnable { @Override public void run() { for (int i=0; i<10; i++) { long stamp = lock.writeLock(); //to-do lock.unlockWrite(stamp); } } } //optimisticReader public class OptimisticReader implements Runnable { @Override public void run() { for (int i=0; i<100; i++) { stamp=lock.tryOptimisticRead(); bool result = lock.validate(stamp);//validate stamp //to-do } } }
三、Thread Synchronization Utilities
Target
- Controlling concurrent access to one or more copies of a resource
- Waiting for multiple concurrent events
- Synchronizing tasks at a common point
- Running concurrent-phased tasks
- Controlling phase change in concurrent-phased tasks
- Exchanging data between concurrent tasks
- Completing and linking tasks asynchronously
1. Controlling concurrent access to one or more copies of a resources
A semaphore is a counter that protects access to one or more shared resources.
- When a thread wants to access one of the shared resources, it must first acquire the semaphore. If the internal counter of the semaphore is greater than 0, the semaphore decrements the counter and allows access to the shared resource. A counter bigger than 0 implies that there are free resources that can be used, so the thread can access an use one of them.
- If the counter is 0, the semaphore puts the thread to sleep until the counter is greater than 0(means all the shared resources are used by other threads)
- When the thread has finished using the shared resource, it must release the semaphore so that another thread can access the resource, This operation increases the internal counter of the semaphore.
public class PrintQueue { private final Semaphore semaphore = new Semaphore(3); private final boolean freePrinters[] = new boolean[]{true,true,true}; private final Lock lockPrinters = new ReentrantLock(); public void printJob (Object document){ semaphore.acquire(); int assignedPrinter=getPrinter(); //to-do freePrinters[assignedPrinter]=true; semaphore.release(); } private int getPrinter() { int ret=-1; lockPrinters.lock(); // Look for the first free printer lockPrinters.unlock(); return ret; } public static void main (String args[]){ for (int i=0; i < threads.length; i++)new Thread(new Job(new PrintQueue())).start(); } }
2. Waiting for multiple concurrent events
The Java concurrency API provides a class that allows one or more threads to wait until a set of operations are made. It's called the CountDownLatch class. This class is initialized with an integer number, which is the number of operations the threads are going to wait for. When a thead wants to wait for the execution of these operations, it uses the await()method, This method puts the thread to sleep until the operations are completed. When one of these operations finishes, it uses the countDown() ,when the counter arrives at 0, the class wakes up all the threads that were sleeping in the await() method.
//Video public class Videoconference implements Runnable{ private final CountDownLatch controller; public Videoconference(int number) { controller=new CountDownLatch(number); } public void arrive(String name){ controller.countDown(); } @Override public void run() { controller.await(); } } //Participant public class Participant implements Runnable { public Participant(Videoconference conference, String name) { this.conference=conference; this.name=name; } @Override public void run() { TimeUnit.SECONDS.sleep(duration); conference.arrive(name); } public static void main(String[] args) { Videoconference conference=new Videoconference(10); new Thread(conference).start(); for (int i=0; i<10; i++)new Thread(new Participant(conference, "Participant "+i)).start(); } }
3. Synchronizing tasks at a common point
The Java concurrency API provides a synchronizing utility that allows the synchronization of two or more threads at a determined point. It's the CyclicBarrier class. This class is initialized with an integer number, which is the number of threads that will be synchronized at the determined point. When one of these threads arrives at the determined point, it calls the await() method to wait for the other threads(the CyclicBarrier class blocks the thread that is sleeping until the other threads arrive.) When the last thread calls the await() method of the CyclicBarrier Object, it wakes up all the threads that were waiting and continues with its job.
public class Searcher implements Runnable { public Searcher(CyclicBarrier barrier){ this.barrier = barrier; } @Override public void run() { //to-do barrier.await(); } public static void main(String[] args) { for (int i=0; i<PARTICIPANTS; i++)new Thread(new Searcher(new CyclicBarrier(new Runnable()))).start(); } }
4. Running concurrent-phased tasks
One of the most complex and powerful functionalities offered is the ability to execute concurrent-phased tasks using the Phase class. This mechanism is useful when we have some concurrent tasks divided into steps. The Phase class provides us with a mechanism to synchronize threads at the point of each step, so no thread will start with the second step until all the threads have finished the first one.
public class FileSearch implements Runnable { public FileSearch(Phaser phaser) { this.phaser = phaser; } @Override public void run() { phaser.arriveAndAwaitAdvance(); // 1st Phase: Look for the files lookForFiles(); // If no results, deregister in the phaser and ends if (!checkResults())return; // 2nd Phase: Filter the results filterResults(); // If no results after the filter, deregister in the phaser and ends if (!checkResults())return; // 3rd Phase: Show info showInfo(); phaser.arriveAndDeregister(); } private boolean checkResults() { if (results.isEmpty()) { //empty phaser.arriveAndDeregister(); return false; } else { //to-do phaser.arriveAndAwaitAdvance(); return true; } } public static void main(String[] args) { Phaser phaser=new Phaser(3); new Thread(new FileSearch(phaser)).start().join(); new Thread(new FileSearch(phaser)).start().join(); new Thread(new FileSearch(phaser)).start().join(); } }
5. Controlling phase change in concurrent-phased tasks
The Phaser class provides a method that is executed each time Phaser changes the phase. It's the onAdvance() method. It receives two parameters: the number of the current phases and the number of registered participants. It returns a Boolean value false if Phaser continues its execution or the value true if Phaser has finished and has to enter the termination state.
6. Exchanging data between concurrent tasks
The Exchanger class allows you to have a definition of a synchronization point between two threads. When the two threads arrive at this point, they interchange a data structure such that the data structure of the first thread goes to the second one and vice versa.(Exchanger class synchronizes only two threads)
public class Consumer implements Runnable { public Consumer(List buffer, Exchanger> exchanger){} @Override public void run() { for (int cycle = 1; cycle <= 10; cycle++){ buffer=exchanger.exchange(buffer); for (int j=0; j<10; j++)buffer.remove(0); } } } public class Producer implements Runnable { public Producer (List buffer, Exchanger> exchanger){} @Override public void run() { for (int cycle=1; cycle<=10; cycle++){ for (int j=0; j<10; j++)buffer.add(message); buffer=exchanger.exchange(buffer); } } public static void main(String[] args) { Exchanger> exchanger=new Exchanger(); new Thread(new Producer(new ArrayList(), exchanger)).start(); new Thread(new Consumer(new ArrayList(), exchanger)).start(); } }
7. Completing and linking tasks asynchronously
The CompletionStage interface that gives it the following two characteristics:
- As the Future object, a CompletableFuture object will return a result sometime in future.
- As the CompletionStage object, you can execute more asynchronous tasks after the completion of one or more CompletableFuture objects.
//有点难呀,没搞懂 public class SeedGenerator implements Runnable { public SeedGenerator (CompletableFuture completable) { this.resultCommunicator=completable; } @Override public void run() { TimeUnit.SECONDS.sleep(5); resultCommunicator.complete((int) Math.rint(Math.random() * 10)); } } public static void main(String[] args) { CompletableFuture seedFuture = new CompletableFuture(); new Thread(new SeedGenerator(seedFuture)).start(); int seed = seedFuture.get(); CompletableFuture> startFuture = CompletableFuture.supplyAsync(new NumberListGenerator(seed)); //Launching step 1 CompletableFuture step1Future = startFuture.thenApplyAsync(list -> { //to-do return selected; }); //Launching step 2 CompletableFuture step2Future = startFuture.thenApplyAsync(list -> list.stream().max(Long::compare).get()); CompletableFuture write2Future = step2Future.thenAccept(selected -> { //to-do }); //Launching step 3 CompletableFuture step3Future = startFuture.thenApplyAsync(new NumberSelector()); //Waiting for the end of the three steps CompletableFuture waitFuture = CompletableFuture.allOf(step1Future, write2Future, step3Future); CompletableFuture finalFuture = waitFuture.thenAcceptAsync((param) -> { System.out.printf("Main: The CompletableFuture example has been completed."); }); finalFuture.join(); }
四、Thread Executors
Target
- Creating a thread executor and controlling its rejected tasks
- Executing tasks in an executor that returns a result
- Running multiple tasks and processing the first result
- Running multiple tasks and processing all the results
- Running a task in an executor after a delay
- Running a task in an executor periodically
- Canceling a task in an executor
- Controlling a task finishing in an executor
- Separating the launching of tasks and the processing of their results in an executor
1. Creating a thread executor and controlling its rejected tasks
create an object of the ThreadPoolExecutor class
- Four constructor provided by ThreadPoolExecutor
- Executors factory method
If you send a task to an executor between the shutdown() method and the end of its execution, the task will be rejected.
getPoolSize(): returned the actual number of the threads in the pool of the executor
getActiveCount: returned the number of threads that were executing tasks in the executor
getTaskCount():returned the number of task that were scheduled for execution. The returned value is only an approximation because it changes dynamically.
getCompletedTaskCount():returned the number of tasks completed by the executor.
public class Server { class RejectedTaskController implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {} } private final ThreadPoolExecutor executor; public Server(){ executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); executor.setRejectedExecutionHandler(new RejectedTaskController()); } public void executeTask(Task task){ executor.execute(task); } public void endServer() { executor.shutdown(); } public static void main(String[] args) { Server server=new Server(); for (int i=0; i<100; i++)server.executeTask(new Task("Task "+i)); server.endServer(); server.executeTask(new Task("Rejected task")); } }
Tips: One critical aspect of the ThreadPoolExecutor class, and of executors in general, is that you have to end them explicitly. If you don't do this, the executor will continues its execution and the program won't end. so, is you don't terminate the executor, your application will never end.
The ThreadPoolExecutor class also provides other methods related to the finalization of the executor. These methods are:
- shutdownNow(): This shuts down the executor immediately. It doesn't execute pending tasks.
- isTerminated(): returns true if you call either the shutdown() or shutdownNow()
- isShutdown(): return true if you call the shutdown() method of the executor
- awaitTermination(long timeout,TimeUnit unit): blocks the calling thread until the tasks of the executor end or a timeout occurs.
2. Executing tasks in an executor that returns a result
one of the advantages of the Executor framework is that it allows you to run concurrent tasks that return a result. The Java Concurrency API achieves this with the following two interfaces:
- Callable: This interface has the call() method. In this method, you have to implement the logic of the task. The Callable interface is a parameterized interface, meaning you have to indicate the type of data the call() method will return.
- Future: This interface has some methods to obtain the result generated by a Callable object and manage its state.
public class FactorialCalculator implements Callable { @Override public Integer call() throws Exception { return Integer; } public static void main(String[] args) { ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(2); List> resultList=new ArrayList(); // Create and send to the executor the ten tasks for (int i=0; i<10; i++)resultList.add(executor.submit(new FactorialCalculator())); do {// Wait for the finalization of the ten tasks for (int i=0; i result=resultList.get(i); } while (executor.getCompletedTaskCount()<resultList.size()); //write the result for (int i=0; i number=resultList.get(i).get(); executor.shutdown(); } }
3. Running multiple tasks and processing the first result
List taskList=new ArrayList(); //ValidatorTask implements Callable taskList.add(new ValidatorTask("TASK_A")); taskList.add(new ValidatorTask("TASK_B")); ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool(); String result = executor.invokeAny(taskList); executor.shutdown();
4. Running multiple tasks and processing all the results
when you want to wait for the finalization of a task, you can use the following two method:
- isDone()
- awaitTermination()
ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool(); List taskList=new ArrayList(); for (int i=0; i<10; i++)taskList.add(new Task()); List> resultList=executor.invokeAll(taskList); executor.shutdown(); //get the result of execution task for (int i=0; i future=resultList.get(i).get();
5. Running a task in an executor after a delay
The Executor framework provides the scheduledExecutorService interface along with its implementation, namely the ScheduledThreadPoolExecutor class
ScheduledExecutorService executor=Executors.newScheduledThreadPool(1); for (int i=0; i<5; i++)executor.schedule(new Task(),i+1,TimeUnit.SECONDS); executor.shutdown(); executor.awaitTermination(1, TimeUnit.DAYS);
6. Running a task in an executor periodically
ScheduleExecutorService executor = Executors.newScheduledThreadPool(1); // as your task was a Runnable object that was not parameterized, you had to parameterize them with the ? symbol as a parameter. ScheduledFuture result = executor.scheduleAtFixedRate(task,1,2,TimeUnit.SECONDS);
- An important point to consider is that the period between two executions is the period of time between the start of those two executions. If you have a periodic task that takes 5 seconds to execute and you put in a period of 3 seconds, you will have two instances of the task executing at a time
- In the scheduledAtFixedRate() method, the third parameter determines the period of time between the starting of two executions. In the *scheduledWithFixedRate() *method, the parameter determines the period of time between the end of an execution of the task and its beginning.
7. Canceling a task in an executor
ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newCachedThreadPool(); Future result=executor.submit(new Task()); result.cancel(true); executor.shutdown();
- If the task has finished or has been canceled earlier, or it can't be canceled due to any other reason, the method will return the false value and the task won't be canceled.
- If the task is waiting in the executor to get a Thread object that will execute it, the task is canceled and will never begin its execution. If the task is already running, it depends on the parameter of the method. The cancel() method receives a Boolean value as a parameter. If the value of this parameter is true and the task is running, it will be canceled. If false, It won't be canceled.
8. Controlling a task finishing in an executor
Java API provides the FutureTask class as a cancelable asynchronous computation. It implement the Runnable and Future interfaces. we can create a FutureTask class using a Callable or Runnable object.
public class ResultTask extends FutureTask { public ResultTask(ExecutableTask callable) { super(callable); this.name=callable.getName(); } @Override protected void done() { if (isCancelled()) { //Has been cancelled } else { //Has finished } } public static void main(String[] args) { ExecutorService executor=Executors.newCachedThreadPool(); ResultTask resultTasks[]=new ResultTask[5]; for (int i=0; i<5; i++) { resultTasks[i]=new ResultTask(new ExecutableTask("Task "+i)); executor.submit(resultTasks[i]); } //to-do for (int i=0; i<resultTasks.length; i++)resultTasks[i].cancel(true); for (int i=0; i<resultTasks.length; i++)if (!resultTasks[i].isCancelled())resultTasks[i].get(); executor.shutdown(); } }
9. Separating the launching of tasks and the procecssing of their results in an executor
You can find situations where you need to send the tasks to the executor in one object and proccess the results in another one, For such situations, Java Concurrency API provides the CompletionService class
//ReportProcessor public class ReportProcessor implements Runnable { private final CompletionService service; private volatile boolean end = false; public ReportProcessor (CompletionService service){} @Override public void run() { Future result=service.poll(20, TimeUnit.SECONDS); if (result!=null)String report=result.get(); } } //ReportRequest public class ReportRequest implements Runnable { public ReportRequest(CompletionService service){} @Override public void run() { service.submit(new Runnable()); } public static void main(String[] args) { ExecutorService executor=Executors.newCachedThreadPool(); CompletionService service=new ExecutorCompletionService(executor); new Thread(new ReportRequest("Face", service)).start().join(); new Thread(new ReportRequest("Online", service)).start().join(); new Thread(new ReportProcessor(service)).start(); executor.shutdown(); executor.awaitTermination(1, TimeUnit.DAYS); } }
This class also provides two other method to obtain the Future objects of the finished tasks. These methods are as follows:
- poll(): The version of poll() method without arguments checks whether there are any Future objects in the queue. If the queue is empty, it returns null immediately. Otherwise, it returns its first element and removes it from the queue.
- take() : This method, without arguments, check whether there are any Future objects in the queue. If it is empty, it blocks the thread until the queue has an element. If the queue has elements, it returns and deletes its first element from the queue.
五、Fork/join Framework
- Fork operation: When you divide a task into smaller tasks and execute them using the framework.
- Join operation: When a task waits for the finalization of the tasks it has created. It's used to combine the results of those tasks.
The main difference between the fork/join and the Executor frameworks is the work-stealing algorithm. Unlike the Executor framework, when a task is waiting for the finalization of the subtasks it has created using the join operation, the thread that is executing that task(called worker thread) looks for other tasks that have not been executed yet and begins their execution.
- Tasks can only use the fork() and join() operations as synchronization mechanisms. If they use other synchronization mechanisms, the worker threads can't execute other tasks when they are in the synchronization operation.
- Tasks should not perform I/O operations such as read or write data in a file.
- Tasks can't throw checked exceptions. They have to include the code necessary to process them.
The core of the fork/join framework is formed bu the following two classes:
- ForkJoinPool: This class implements the ExecutorService interface and the work-stealing algorithm. It manages the worker threads and offers information about the status of the tasks and their execution.
- ForkJoinTask: This is the base class of the tasks that will execute in the ForkJoinPool It provides the mechanisms to execute the fork() and join() operations inside a task and the methods to control the status of the tasks. Usually, to implement your fork /join tasks, you will implement a subclass of three subclasses of this class:
- RecursiveAction: no return
- RecursiveTask :one result
- CountedConpleter: tasks that launch a completion action when all the subtasks have finished.
Target
- Creating a fork/join pool
- Joining the results of the tasks
- Running tasks asynchronously
- Throwing exceptions in the tasks
- Canceling a task
1. Creating a fork/join pool
Steps
Creating a ForkJoinPool object to execute the tasks
using the default constructor
Creating a subclass of ForkJoinTask to be executed in the pool
execute the tasks in a synchronized way. When a task executes two or more subtasks, it waits for their finalizations. In this way, the thread that was executing that task(called worker thread) will look for other tasks to execute, taking full advantage of their execution time.
public class Task extends RecursiveAction { //serial version UID. The ForkJoinTask class implements the serializable interface. private static final long serialVersionUID = 1L; public Task (List products, int first, int last) {} @Override protected void compute() { if (condition)//process else invokeAll(new Task(products,first,middle),new Task(products,middle+1,last)); } public static void main(String[] args) { Task task=new Task(products, 0, products.size(), 0.20); ForkJoinPool pool=new ForkJoinPool(); pool.execute(task); do {TimeUnit.MILLISECONDS.sleep(5); } while(!task.isDone()); pool.shutdown(); if (task.isCompletedNormally()){}//The process has completed normally for (int i=0; i<products.size(); i++)Product product=products.get(i); } }
2. Joining the results of the tasks
The fork/join framework provides the ability to execute tasks return a result. This kind of tasks is implemented by the RecursiveTask class. This class extends the ForkJoinTask class and implements the Future interface provided by the Executor framework.
//Line Task public class LineTask extends RecursiveTask{ private static final long serialVersionUID = 1L; public LineTask(String line[], int start, int end, String word) {} @Override protected Integer compute() { Integer result=null; if (condition){ result=count(line, start, end, word); }else { LineTask task1=new LineTask(line,start,mid,word); LineTask task2=new LineTask(line,mid,end,word); invokeAll(task1, task2); result=task1.get()+task2.get(); } return result; } } //DocumentTask public class DocumentTask extends RecursiveTask { private static final long serialVersionUID = 1L; public DocumentTask (String document[][],int start, int end, String word){} @Override protected Integer compute() { Integer result=null; if (condition){ result=processLines(document,start,end,word); } else { int mid=(start+end)/2; DocumentTask task1=new DocumentTask(document,start,mid,word); DocumentTask task2=new DocumentTask(document,mid,end,word); invokeAll(task1,task2); result=task1.get()+task2.get(); } return result; } private Integer processLines(String[][] document, int start, int end,String word) { List tasks=new ArrayList(); for (int i=start;i<end;i++)tasks.add(new LineTask(document[i],0,document[i].length,word)); invokeAll(tasks); for (int i=0; i<tasks.size(); i++)result=result+tasks.get(i).get(); return result; } public static void main(String[] args) { DocumentTask task=new DocumentTask(document,0,100,"the"); ForkJoinPool commonPool=ForkJoinPool.commonPool(); commonPool.execute(task); do {TimeUnit.SECONDS.sleep(1); } while (!task.isDone()); commonPool.shutdown(); commonPool.awaitTermination(1, TimeUnit.DAYS); task.get()//get result } }
3. Running tasks asynchronously
- Synchronous methods: The task that calls one of these methods(invokeAll() ) is suspended until the tasks it send to the pool finish their execution. This allows the ForkJoinPool class to use the work-steal algorithm to assign a new task to the worker thread that executed the sleeping task.
- Asynchronous methods: the task continues with its execution(fork()), so the ForkJoinPool class can't use the work-stealing algorithm to increase the performance of the application. In this case, only when you call the join() or get() methods to wait for the finalization of a task, the ForkJoinPool class can use that algorithm.
public class FolderProcessor extends CountedCompleter> { private static final long serialVersionUID = -1826436670135695513L; private List tasks = new ArrayList(); private List resultList = new ArrayList(); protected FolderProcessor (CountedCompleter completer, String path, String extension) {super(completer);} public FolderProcessor (String path, String extension) {} @Override public void compute() { File content[] = new File(path).listFiles(); for (int i = 0; i < content.length; i++) { if (content[i].isDirectory()) { FolderProcessor task=new FolderProcessor(this,content[i].getAbsolutePath(), extension); task.fork(); addToPendingCount(1); tasks.add(task); } else { resultList.add(content[i].getAbsolutePath()); } } tryComplete();// Try the completion of the task } @Override public void onCompletion(CountedCompleter completer) { for (FolderProcessor childTask:tasks) { resultList.addAll(childTask.getResultList()); } } public static void main(String[] args) { ForkJoinPool pool=new ForkJoinPool(); // Create three FolderProcessor tasks for three diferent folders FolderProcessor system=new FolderProcessor("path", "log"); FolderProcessor apps=new FolderProcessor("path","log"); pool.execute(system); pool.execute(apps); do {TimeUnit.SECONDS.sleep(2); } while ((!system.isDone())||(!apps.isDone())); pool.shutdown(); system.getResultList(); apps.getResultList(); } }
4. Throwing exceptions in the tasks
- Checked exception: These exceptions must be specified in the throws clause of a method or caught them. Form example, IOException.
- Unchecked exception: These exceptions don't have to be specified or caught.
public class Task extends RecursiveTask { private static final long serialVersionUID = 1L; @Override protected Integer compute() { if (ExceptionCondition)throw new RuntimeException("Exception message"); return 0; } public static void main(String[] args) { Task task=new Task(); ForkJoinPool pool=new ForkJoinPool(); pool.execute(task); pool.shutdown(); pool.awaitTermination(1, TimeUnit.DAYS); if (task.isCompletedAbnormally()){}//completedAbnormally } }
5. Canceling a task
The ForkJoinTask class provides the cancel() method for this purpose. There are some points you have to take into account when you want to cancel a task, which are as follows:
- The ForkJoinPool class doesn't provide any method to cancel all the tasks it has running or waiting in the pool
- When you cancel a task, you don't cancel the tasks this task has executed.
六、Parallel and Reactive Steams
Target
- Creating streams from different sources
- Reducing the elements of a stream
- Collecting the elements of a stream
- Applying an action to every element of a stream
- Filtering the elements of a stream
- Transforming the elements of a stream
- Sorting the elements of a stream
- Verifying conditions in the elements of a stream
- Reactive programming with reactive streams
1. Creating streams from different sources
2. Reducing the elements of a stream
3. Collecting the elements of a stream
4. Applying an action to every element of a stream
5. Filtering the elements of a stream
6. Transforming the elements of a stream
7. Sorting the elements of a stream
8. Verifying conditions in the elements of a stream
9. Reactive programming with reactive streams
七、Concurrent Collections
- Using non-blocking thread-safe deques
- Using blocking thread-safe deques
- Using blocking thread-safe queue ordered by priority
- Using thread-safe lists with delayed elements
- Using thread-safe navigable maps
- Using thread-safe HashMaps
- Using atomic variables
- Using atomic arrays
- Using the volatile keyword
- Using variable handles
Using non-blocking thread-safe deques
Using blocking thread-safe deques
Using blocking thread-safe queue ordered by priority
Using thread-safe lists with delayed elements
Using thread-safe navigable maps
Using thread-safe HashMaps
Using atomic variables
Using atomic arrays
Using the volatile keyword
Using variable handles
八、Customizing Concurrency Classes
Target
- Customizing the ThreadPoolExecutor class
- Implementing a priority-based Executor class
- Implementing the ThreadFactory interface to generate custom threads
- Using our ThreadFactory in an Executor Object
- Customizing tasks running in a scheduled thread pool
- Implementing the ThreadFactory interface to generate custom threads for the fork/join framework
- Customizing tasks running in the fork/join framework
- Implementing a custom Lock class
- Implementing a transfer queue-based on priorities
- Implementing your own atomic object
- Implementing you own stream generator
- Implementing your own asynchronous stream
1. Customizing the ThreadPoolExecutor class
public class MyExecutor extends ThreadPoolExecutor { private final ConcurrentHashMap startTimes; public MyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); startTimes = new ConcurrentHashMap(); } @Override public void shutdown() { super.shutdown(); } @Override public List shutdownNow() { return super.shutdownNow(); } @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println(t.getName()+","+r.hashCode()); startTimes.put(r,new Date()); } @Override protected void afterExecute(Runnable r, Throwable t) { Future result = (Future) r; System.out.println(result.get()); super.afterExecute(r, t); } }