日期:2014-05-16  浏览次数:20470 次

LinkedBlockingQueue 例子

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.lang.RandomStringUtils;

/**
 * @author zhaoqilong 
 * @version 创建时间:2012-6-7 上午9:16:56
 *
 */

public class Test {
  private static LinkedBlockingQueue<String> queue =new LinkedBlockingQueue<String>();
  // 线程控制开关
  private final CountDownLatch latch = new CountDownLatch(1);
  //的线程池 
  private final ExecutorService pool;  
 //AtomicLong 计数 生产数量
  private final AtomicLong output = new AtomicLong(0);   
 //AtomicLong 计数  销售数量
  private final AtomicLong sales = new AtomicLong(0);   
  //是否停止线程
  private final boolean clear;   
 
  public Test(boolean clear){
	  this.pool = Executors.newCachedThreadPool();  
	  this.clear=clear;
  }
   
   public void service() throws InterruptedException{
	   Saler a=new Saler(queue, sales, latch, clear);
      pool.submit(a);   
	  Worker w=new Worker(queue, output, latch);
	  pool.submit(w);   
	  latch.countDown();
   }
   public static void main(String[] args) {
	   Test t=new Test(false);
	   try {
		t.service();
	} catch (InterruptedException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
   }
   class Saler implements Runnable{
	   private final LinkedBlockingQueue<String> queue;   
	   private final AtomicLong sales;   
	   private final CountDownLatch latch;   
	   private final boolean clear;   
	  
	   public Saler(LinkedBlockingQueue<String> queue, AtomicLong sales, CountDownLatch latch, boolean clear){
		 this.queue = queue;   
		 this.sales = sales;   
		 this.latch = latch;   
		 this.clear = clear;
	   }
		public void run() {
			try {   
			      latch.await(); // 放闸之前老实的等待着   
				  for (;;) {   
				        sale();   
				       Thread.sleep(500);   
				   }   
		      }catch (InterruptedException e) {   
		          if(clear) { // 响应中断请求后,如果有要求则销售完队列的产品后再终止线程   
		               cleanWarehouse();   
		          } else {   
		        	  System.out.println("Seller Thread will be interrupted...");   
		          }   
	       }   
		}
		public void sale(){
			System.out.println("==取take=");
			try {
				String item = queue.poll(50, TimeUnit.MILLISECONDS);
				System.out.println(item);
				if(item!=null){
					sales.incrementAndGet(); // 可以声明long型的参数获得返回值,作为日志的参数
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}   
		} 
		 /**  
		    * 销售完队列剩余的产品  
		    */  
		private void cleanWarehouse() {   
		  try {   
			while (queue.size() > 0) {   
				 sale();   
			}   
		  } catch (Exception ex) {   
			  	System.out.println("Seller Thread will be interrupted...");   
	      }   
		}
   }
   
   /**
    * 生产者
    * @author Administrator
    *
    */
   class Worker implements Runnable{
	   private  LinkedBlockingQueue<String> queue;   
	   private  CountDownLatch latch;   
	   private  AtomicLong output;
	   public Worker(){
		   
	   }
	   public Worker(LinkedBlockingQueue<String> queue, AtomicLong output,CountDownLatch latch){
		   this.queue=queue;
		   this.latch=latch;
		   this.output=output;
	   }
	   public void run() {
		   try {   
		       latch.await(); // 线程等待  
	           for (;;) {   
	            	work();   
	                Thread.sleep(100);   
	             }   
			 }catch (InterruptedException e) {   
			       System.out.println("Worker thread will be interrupted...");   
			 }   
 		}
 		/**
 		 *  工作
 		 */
 		public void work(){
 			 try {
 				 String product=RandomStringUtils.randomAscii(3);
 				 boolean success=queue.offer(product, 100, TimeUnit.MILLISECONDS);
 				 if(success){
 					output.incrementAndGet();// 可以声明long型的参数获得返回值,作为日志的参数  
 				 }
 			} catch (InterruptedException e) {
 				e.printStac