Friday, August 12, 2016

TransferQueue

If someone is already waiting to take the element, we don't need to put it in the queue, just directly transfer it to receiver.

Java 7 introduced the TransferQueue. This is essentially a BlockingQueue with an
additional operation—transfer(). This operation will immediately transfer a work
item to a receiver thread if one is waiting. Otherwise it will block until there is a thread
available to take the item. This can be thought of as the “recorded delivery” option—
the thread that was processing the item won’t begin processing another item until it
has handed off the current item. This allows the system to regulate the speed at which
the upstream thread pool takes on new work.
It would also be possible to regulate this by using a blocking queue of bounded
size, but the TransferQueue has a more flexible interface. In addition, your code may
show a performance benefit by replacing a BlockingQueue with a TransferQueue.
This is because the TransferQueue implementation has been written to take into
account modern compiler and processor features and can operate with great efficiency.
As with all discussions of performance, however, you must measure and prove
benefits and not simply assume them. You should also be aware that Java 7 ships with
only one implementation of TransferQueue—the linked version.
In the next code example, we’ll look at how easy it is to drop in a TransferQueue as
a replacement for a BlockingQueue. Just these simple changes to listing 4.13 will
upgrade it to a TransferQueue implementation, as you can see here.



public class BlockingQueueTest1 {
 public static void main(String[] args) {

  final Update.Builder ub = new Update.Builder();
  final TransferQueue<Update> lbq = new LinkedTransferQueue<Update>();
  MicroBlogExampleThread t1 = new MicroBlogExampleThread(lbq, 10) {
   public void doAction() {
    text = text + "X";
    Update u = ub.author(new Author("Tallulah")).updateText(text)
      .build();
    boolean handed = false;
    try {
     handed = updates.tryTransfer(u, 100, TimeUnit.MILLISECONDS);//tryTransfer
    } catch (InterruptedException e) {
    }
    if (!handed)
     System.out.println("Unable to hand off Update to Queue due to timeout");
    else System.out.println("Offered");
   }
  };
  MicroBlogExampleThread t2 = new MicroBlogExampleThread(lbq, 200) {
   public void doAction() {
    Update u = null;
    try {
     u = updates.take();
     System.out.println("Took");
    } catch (InterruptedException e) {
     return;
    }
   }
  };

  t1.start();
  t2.start();
 }
}


abstract class MicroBlogExampleThread extends Thread {
 protected final TransferQueue<Update> updates;
 protected String text = "";
 protected final int pauseTime;
 private boolean shutdown = false;

 public MicroBlogExampleThread(TransferQueue<Update> lbq_, int pause_) {
  updates = lbq_;
  pauseTime = pause_;
 }

 public synchronized void shutdown() {
  shutdown = true;
 }

 @Override
 public void run() {
  while (!shutdown) {
   doAction();
   try {
    Thread.sleep(pauseTime);
   } catch (InterruptedException e) {
    shutdown = true;
   }
  }
 }

 public abstract void doAction();
}









No comments:

Post a Comment