Sunday, 18 January 2015

Insight of BlockingQueue in Java -- ArrayBlockingQueue vs LinkedBlockingQueue vs PriorityBlockingQueue

Producer Consumer Design Pattern:

As usual, before we start with use cases of BlockingQueue and its API, let us go through some real time problems. This will help us to better understand use cases of BlockingQueue.

We are part of generation, who saw an era of postal department. The time when communication medium were inland letters and postal cards. Drastic changes came in communication system from Postal Card to STD booth to Pager to Cell phones.

Continuing to the story of postal department, if we have to send a letter to our relatives we have to write down complete recipient address on top of letter and drop it to near by postal box. As per schedule postman used to come and collect those letters/postal card from box and take it to postal office for further journey.

Now if we look at the above scenario in terms of process, your work(e.g. writing and sending postal card) don't have any dependency on post man. You dropped letter into post box and assumes that it will reach destination safely without worrying about when post man will come to collect, what is his name, where he/she will take that letter.

So in complete story post box acts as "working queue", the person who dropped letter is "producer" and post man who collects those letter is "consumer".

Suppose if we remove "post box" from the scenario, then condition would be like we have to inquire when post man usually comes, wait for post man until he/she comes and then give letter by hand.  

Simply by having a "post box", reduces effort at producer side as well as consumer side for complete process.

Here the story of "Producer Consumer Design Pattern" comes to end and we are pretty clear on how this design pattern comes handy to resolve complex real time scenario.

This is good starting point to look at implementations that Java provide for "Producer Consumer Design Pattern".

BlockingQueue is one of interface Java provides to solve problems in Producer Consumer Pattern.

Important points on BlockingQueue:
1. This is an interface added in Java 5 as part of concurrency framework.
2. There are multiple implementations of BlockingQueue like ArrayBlockingQueue,              LinkedBlockingQueue, PriorityBlockingQueue etc .

To understand any framework, the best way is to look at the interfaces provided by that framework.
Classes   are just mere implementation of those interfaces. So if you are done with interfaces itself, classes are very easy to understand.

4. BlockingQueue provides 2 types of methods.
One is blocking methods like put and take which blocks the thread if queue is full and empty respectively. Another flavor is non blocking(timed equivalent) methods like offer and poll
in which method returns false to requesting thread if the specified waiting time elapses before space is available in queue.

5. BlockingQueue can be of bounded or unbounded type.
In case of bounded BlockingQueue(e.g. ArrayBlockingQueue), once capacity exhausted no thread can put task in work queue.For unbounded BlockingQueue (e.g. LinkedBlockingQueue), specifying bound is optional. The capacity, if unspecified, is equal to Integer.MAX_VALUE.

Some Points and Example of ArrayBlockingQueue:
1. A bounded blocking queue which is backed by an array.
2. The head of the queue is that element that has been on the queue the longest time.
3. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.

 import java.util.Iterator;  
 import java.util.concurrent.ArrayBlockingQueue;  
 import java.util.concurrent.BlockingQueue;  
 public class ArrayBlockingQueueExample {  
      public static void main(String[] args) throws InterruptedException {  
           BlockingQueue<String> workQueue = new ArrayBlockingQueue<String>(2);  
           ProducerTask task1 = new ProducerTask(workQueue,  
                     "create simple java project");  
           ProducerTask task2 = new ProducerTask(workQueue,  
                     "create package structure");  
           ProducerTask task3 = new ProducerTask(workQueue, "create java class");  
           Thread prodThread1 = new Thread(task1);  
           Thread prodThread2 = new Thread(task2);  
           Thread prodThread3 = new Thread(task3);  
           prodThread1.start();  
           prodThread2.start();  
           prodThread3.start(); // thread will go into blocked status as queue size is only 2  
           // checking work queue for queued tasks  
           System.out.println("Checking status of queue, before any consumer thread picked task from queue");  
           Iterator<String> workQueueItr = workQueue.iterator();  
           while (workQueueItr.hasNext()) {  
                System.out.println("Queued task in work queue: "+ workQueueItr.next());  
           }  
           System.out.println("*************************************************************************");  
           ConsumerTask task4 = new ConsumerTask(workQueue);  
           Thread consTask4 = new Thread(task4);  
           consTask4.start();  
           consTask4.join();  
           // checking work queue for queued tasks  
           System.out.println("Checking status of queue, after a consumer thread picked a task from queue");  
           workQueueItr = workQueue.iterator();  
           while (workQueueItr.hasNext()) {  
                System.out.println("Queued task in work queue: "+ workQueueItr.next());  
           }  
           System.out.println("*************************************************************************");  
      }  
 }  
 class ProducerTask implements Runnable {  
      private BlockingQueue<String> workQueue;  
      private String taskName;  
      public ProducerTask(BlockingQueue<String> workQueue, String taskName) {  
           this.workQueue = workQueue;  
           this.taskName = taskName;  
      }  
      @Override  
      public void run() {  
           try {  
                workQueue.put(taskName);  
           } catch (InterruptedException e) {  
                e.printStackTrace();  
           }  
      }  
 }  
 class ConsumerTask implements Runnable {  
      private BlockingQueue<String> workQeue;  
      public ConsumerTask(BlockingQueue<String> workQueue) {  
           this.workQeue = workQueue;  
      }  
      @Override  
      public void run() {  
           try {  
                String taskName = workQeue.take();  
                System.out.println("Task taken from work queue: " + taskName);  
                System.out.println("*************************************************************************");  
           } catch (InterruptedException e) {  
                e.printStackTrace();  
           }  
      }  
 }  

If you check results you will understand that 3rd producer thread got blocked and unable to put task in work queue until some consumer thread came and pick one task from work queue.

Result: 
Checking status of queue, before any consumer thread picked task from queue
Queued task in work queue: create simple java project
Queued task in work queue: create package structure
*************************************************************************
Task taken from work queue: create simple java project
*************************************************************************
Checking status of queue, after a consumer thread picked a task from queue
Queued task in work queue: create package structure
Queued task in work queue: create java class
*************************************************************************


Some Points and Example of LinkedBlockingQueue:
1. It is an optionally bounded blocking queue based on linked nodes.
2. The head of the queue is that element that has been on the queue the longest time.
3. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.
4. Capacity, if unspecified, is equals to Integer.MAX_INT.


 import java.util.Random;  
 import java.util.concurrent.BlockingQueue;  
 import java.util.concurrent.LinkedBlockingQueue;  
 public class LinkedBlockingQueueExample {  
      public static void main(String[] args) {  
           final BlockingQueue<String> workQueue = new LinkedBlockingQueue<String>(10);  
           Random randomGenerator = new Random();  
           ProducerTask prodTask = new ProducerTask(workQueue, randomGenerator);  
           Thread prodThread = new Thread(prodTask);  
           prodThread.start();  
           ConsumerTask consTask = new ConsumerTask(workQueue);  
           Thread consThread = new Thread(consTask);  
           consThread.start();  
      }  
 }  
 class ProducerTask implements Runnable {  
      private BlockingQueue<String> workQueue;  
      private Random randomGenerator;  
      public ProducerTask(BlockingQueue<String> workQueue, Random randomGenerator) {  
           this.workQueue = workQueue;  
           this.randomGenerator = randomGenerator;  
      }  
      @Override  
      public void run() {  
           try {  
                while(true){  
                     String taskNumber = randomGenerator.nextInt(1000)+"";  
                     System.out.println("putting task no: "+taskNumber+" in work queue");  
                     workQueue.put(taskNumber);  
                     System.out.println("*************************************************************************");  
                     Thread.sleep(200);  
                }                 
           } catch (InterruptedException e) {  
                e.printStackTrace();  
           }  
      }  
 }  
 class ConsumerTask implements Runnable {  
      private BlockingQueue<String> workQeue;  
      public ConsumerTask(BlockingQueue<String> workQueue) {  
           this.workQeue = workQueue;  
      }  
      @Override  
      public void run() {  
           try {  
                while(true){  
                     String taskNumber = workQeue.take();  
                     System.out.println("Task taken from work queue: " + taskNumber);  
                     System.out.println("*************************************************************************");  
                     Thread.sleep(500);  
                }                 
           } catch (InterruptedException e) {  
                e.printStackTrace();  
           }  
      }  
 }  


Result:

putting task no: 403 in work queue
*************************************************************************
Task taken from work queue: 403
*************************************************************************
putting task no: 496 in work queue
*************************************************************************
putting task no: 446 in work queue
*************************************************************************
Task taken from work queue: 496
*************************************************************************
putting task no: 967 in work queue
*************************************************************************
putting task no: 405 in work queue
*************************************************************************
putting task no: 994 in work queue
*************************************************************************
Task taken from work queue: 446
*************************************************************************

Some Points and Example of PriorityBlockingQueue:
1. An unbounded blocking queue, uses same ordering rules as java.util.PriorityQueue class.
2. null elements are not permitted.
3. A priority queue relying on natural ordering also does not permit insertion of non-comparable objects (doing so results in ClassCastException).
4. All elements inserted into the PriorityBlockingQueue must implement the java.lang.Comparable interface.



 public class PriorityBlockingQueueExample {  
      public static void main(String[] args){  
           BlockingQueue<String> workQueue = new PriorityBlockingQueue<String>();  
           try {  
                workQueue.put("A");  
                workQueue.put("C");  
                workQueue.put("F");  
                workQueue.put("E");  
                workQueue.put("D");  
                workQueue.put("B");  
           } catch (InterruptedException e) {  
                e.printStackTrace();  
           }  
           //checking queue after all puts whether tasks are queued as per natural ordering or not  
           while(true){  
                try {  
                     System.out.println("Task name: "+workQueue.take());  
                } catch (InterruptedException e) {  
                     e.printStackTrace();  
                }  
           }  
      }  
 }  

Result:
As you can see in result that all task are order as per natural ordering of String.
Task name: A
Task name: B
Task name: C
Task name: D
Task name: E
Task name: F

That's all from my end on Producer Consumer Desing Pattern and BlockingQueue in Java.

No comments:

Post a Comment