Thursday, August 11, 2016

BlockingQueue

The BlockingQueue is a queue that has two additional special properties:
■ When trying to put() to the queue, it will cause the putting thread to wait for
space to become available if the queue is full.
■ When trying to take() from the queue, it will cause the taking thread to block
if the queue is empty.
These two properties are very useful because if one thread (or pool of threads) is outstripping
the ability of the other to keep up, the faster thread is forced to wait, thus
regulating the overall system.


Two implementations of BlockingQueue
Java ships with two basic implementations of the BlockingQueue interface: the
LinkedBlockingQueue and the ArrayBlockingQueue. They offer slightly different
properties; for example, the array implementation is very efficient when an exact
bound is known for the size of the queue, whereas the linked implementation may be
slightly faster under some circumstances.

It can be better to have this,

BlockingQueue<WorkUnit<MyAwesomeClass>>

where WorkUnit (or QueueObject, or whatever you want to call the container class) is
a packaging interface or class that may look something like this:
public class WorkUnit<T> {
 private final T workUnit;

 public T getWork() {
  return workUnit;
 }

 public WorkUnit(T workUnit_) {
  workUnit = workUnit_;
 }
}

The reason for doing this is that this level of indirection provides a place to add additional
metadata without compromising the conceptual integrity of the contained type
(MyAwesomeClass in this example).
This is surprisingly useful. Use cases where additional metadata is helpful are
abundant. Here are a few examples:
■ Testing (such as showing the change history for an object)
■ Performance indicators (such as time of arrival or quality of service)
■ Runtime system information (such as how this instance of MyAwesomeClass has
been routed)

Example

public class Appointment<T> {
 private final T toBeSeen;

 public T getPatient() {
  return toBeSeen;
 }

 public Appointment(T incoming) {
  toBeSeen = incoming;
 }
}
abstract class Pet {
 protected final String name;

 public Pet(String name) {
  this.name = name;
 }

 public abstract void examine();
}
class Cat extends Pet {
 public Cat(String name) {
  super(name);
 }

 public void examine() {
  System.out.println("Meow!");
 }
}
class Dog extends Pet {
 public Dog(String name) {
  super(name);
 }

 public void examine() {
  System.out.println("Woof!");
 }
}




From this simple model, you can see that we can model the veterinarian’s queue as
LinkedBlockingQueue<Appointment<Pet>>, with the Appointment class taking the role
of WorkUnit.




import java.util.concurrent.BlockingQueue;

public class Veterinarian extends Thread {
 protected final BlockingQueue<Appointment<Pet>> appts;
 protected String text = "";
 protected final int restTime;
 private boolean shutdown = false;

 public Veterinarian(BlockingQueue<Appointment<Pet>> lbq, int pause) {
  appts = lbq;
  restTime = pause;
 }

 public synchronized void shutdown() {
  shutdown = true;
 }

 @Override
 public void run() {
  while (!shutdown) {
   seePatient();
   try {
    Thread.sleep(restTime);
   } catch (InterruptedException e) {
    shutdown = true;
   }
  }
 }

 public void seePatient() {
  try {
   Appointment<Pet> ap = appts.take();
   Pet patient = ap.getPatient();
   patient.examine();
  } catch (InterruptedException e) {
   shutdown = true;
  }
 }
}


The following is instance to start two threads, one is en-queue pets, one is examine cats in the queue.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;


public class BlockingQueueTest {
 public static final BlockingQueue<Appointment<Pet>> QUEUE = new LinkedBlockingQueue<Appointment<Pet>>(10);
 public static void main(String[] args){
  new Thread(){
   @Override
   public void run(){
    for(int i=0;i<10;i++){
     Pet p1 = new Cat("cat"+i);
     Pet p2 = new Dog("dog"+i);
     Appointment<Pet> app1 = new Appointment<Pet>(p1);
     Appointment<Pet> app2 = new Appointment<Pet>(p2);
     try {
      QUEUE.put(app1);
      System.out.println("EnqueueCat"+i);
      QUEUE.put(app2);
      System.out.println("EnqueueDog"+i);
      Thread.sleep(300);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }
    }
   }
  }.start();
  
  Veterinarian veter = new Veterinarian(QUEUE, 500);
  veter.start();
 }
}

We have investigate on BlockingQueue Usage
http://gvace.blogspot.com/2016/08/fine-grained-control-of-blockingqueue.html

No comments:

Post a Comment