`
QING____
  • 浏览: 2228440 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

SynchronousQueue(同步队列)

    博客分类:
  • JAVA
 
阅读更多

SynchronousQueue:同步Queue,属于线程安全的BlockingQueue的一种,此队列设计的理念类似于"单工模式",对于每个put/offer操作,必须等待一个take/poll操作,类似于我们的现实生活中的"火把传递":一个火把传递地他人,需要2个人"触手可及"才行. 因为这种策略,最终导致队列中并没有一个真正的元素;这是一种pipleline思路的基于queue的"操作传递".

  •  void put(E o):向队列提交一个元素,阻塞直到其他线程take或者poll此元素.
  • boolean offer(E o):向队列中提交一个元素,如果此时有其他线程正在被take阻塞(即其他线程已准备接收)或者"碰巧"有poll操作,那么将返回true,否则返回false.
  •   E take():获取并删除一个元素,阻塞直到有其他线程offer/put.
  •  boolean poll():获取并删除一个元素,如果此时有其他线程正在被put阻塞(即其他线程提交元素正等待被接收)或者"碰巧"有offer操作,那么将返回true,否则返回false.
  •  E peek():总会返回null,硬编码.

这个队列中,对我们有意义的操作时put/take,以及put/offer + take或者put/take + poll,对于无法进入队列的元素,需要有额外的"拒绝"策略支持.

SynchronousQueue经常用来,一端或者双端严格遵守"单工"(单工作者)模式的场景,队列的两个操作端分别是productor和consumer.常用于一个productor多个consumer的场景。

在ThreadPoolExecutor中,通过Executors创建的cachedThreadPool就是使用此类型队列.已确保,如果现有线程无法接收任务(offer失败),将会创建新的线程来执行.

下面为简单的例子:

public class SynchronousQueueTest {
	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception{
		SynchronousQueue<Object> queue = new SynchronousQueue<Object>();
		for(int i=0;i<5;i++){
			Thread t = new SQThread(queue, 1);
		t.start();
		}
		//Thread.sleep(1000);
		for(int i=0;i<10;i++){
			if(!queue.offer(new Object())){
				System.out.println("Failure");
			}
		}
	}
	public static class SQThread extends Thread{
		private SynchronousQueue<Object> queue;
		int mode;
		SQThread(SynchronousQueue<Object> queue,int mode){
			this.queue = queue;
			this.mode = mode;
		}
		@Override
		public void run(){
			Object item = null;
			try{
				System.out.println(Thread.currentThread().getId());
				if(mode == 1){
					while((item = queue.take()) != null){
					System.out.println(item.toString());
				}
				}else{
				//
				}
			}catch(Exception e){
			//
			}
			System.out.println("end");
		}
	}
}

 

事实上多次执行此程序,会发现不同的结果,同是注释部分Thread.sleep的时间不同,也会对结果不同.所以在线程池中,我们需要"拒绝策略"的支持.

 

SynchronousQueue支持2种策略:公平和非公平,默认为非公平.

1) 公平策略:内部实现为TransferQueue,即一个队列.(consumer和productor将会被队列化)

2) 非公平策略:内部实现为TransferStack,即一个stack(内部模拟了一个单向链表,允许闯入行为)

内部实现比较复杂,尽管支持线程安全,但是其内部并没有使用lock(事实上无法使用lock),使用了LockSupport来控制线程,使用CAS来控制栈的head游标(非公平模式下)。

 

如下为非公平策略下实现

对于consumer而言,其为data消费者,每次take将会检测是否已经有productor的线程在等待,如果有,此时head应该不为null并且head的数据类型为“DATA”(两种类型:DATA 和REQUEST)

此时consumer将消费数据。取消productor的阻塞效果(使用LockSupport.unpark).

如果此时没有productor提供数据,即head为null,此时consumer也将创建一个mode为REQUEST的元素,并阻塞线程。当有productor提供数据时,有productor来取消其阻塞效果。

 

对于productor而言,情况类似。。当put数据时,如果队列中无元素,和take一样,创建一个node为DATA的元素,并阻塞进程。。

如果队列中有元素,则CAS更新head,使head指向新元素,并不断去tryMatch是否有正在阻塞的尚未take到数据的consumer,如果有,就unpark它,并返回数据,并变更列表的next指针。(有点像俄罗斯方块)

如果队列中没有元素(即没有消费者),则自己阻塞。

每个NODE,都有一个waiter属性,标示此在此元素上操作时阻塞的线程(put操作产生的元素未被消费,那么waiter就是put线程;take产生的元素单无元素消费成功时,那么waiter就是take线程)。

 

 

分享到:
评论

相关推荐

    java 同步器SynchronousQueue详解及实例

    主要介绍了java 同步器SynchronousQueue详解及实例的相关资料,需要的朋友可以参考下

    SynchronousQueue实现原理.pdf

    转载的一篇博客资源

    一个小的java Demo , 非常适合Java初学者学习阅读.rar

    同步队列 SynchronousQueue,阻塞双端队列 BlockingDeque, 链阻塞双端队列 LinkedBlockingDeque,并发 Map(映射) ConcurrentMap, 并发导航映射 ConcurrentNavigableMap,交换机 Exchanger, 信号量 Semaphore,执行器...

    SynchronousQueue详解

    SynchronousQueue是BlockingQueue的一种,所以SynchronousQueue是线程安全的。SynchronousQueue和其他的BlockingQueue不同的是SynchronousQueue的capacity是0。即SynchronousQueue不存储任何元素。 也就是说...

    SynchronousQueue核心属性和方法源码的分析

    SynchronousQueue核心属性和方法源码的分析的代码

    java并发工具包 java.util.concurrent中文版用户指南pdf

    7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航 映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版

    7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    java并发包资源

    7. 同步队列 SynchronousQueue 8. 阻塞双端队列 BlockingDeque 9. 链阻塞双端队列 LinkedBlockingDeque 10. 并发 Map(映射) ConcurrentMap 11. 并发导航映射 ConcurrentNavigableMap 12. 闭锁 CountDownLatch 13. ...

    2004_DISC_dual_DS.pdf

    SynchronousQueue 底层算法相关实现论文

    常见的Java笔试题-JVM-JUC-Core:JUCJVM核心知识点

    SynchronousQueue Callable接口 阻塞队列的应用——生产者消费者 传统模式 阻塞队列模式 阻塞队列的应用——线程池 线程池基本概念 线程池三种常用创建方式 线程池创建的七个参数 线程池底层原理

    第7章-JUC多线程v1.1.pdf

    以SynchronousQueue作为等待队列, 从而每次往队列中插入一个元素, 必须等待另一个线程从这个队列删除一个元素. 定时调度 4.定时调度的线程池 ExecutorService newCachedThreadPool = Executors....

    JAVA高并发_学习笔记

    同步非阻塞式IO:Buffere Channel Selector Concurrent包:Blocking:Queue\Concurrent Hash Map \ExcutorService\lock\原子性 BlockingQueue:ArrayBlockingQueue , DelayQueue , LinkedBlockingDeque , ...

    java线程池概念.txt

     SynchronousQueue;  ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。 threadFactory:线程工厂,主要用来创建线程:...

    JAVA核心知识点整理(有效)

    1. 目录 1. 2. 目录 .........................................................................................................................................................1 JVM ........................

    java核心知识点整理.pdf

    1. 目录 1. 2. 目录 .........................................................................................................................................................1 JVM .........................

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第一阶段20讲、同步代码块以及同步方法之间的区别和关系.mp4 │ 高并发编程第一阶段21讲、通过实验分析This锁的存在.mp4 │ 高并发编程第一阶段22讲、通过实验分析Class锁的存在.mp4 │ 高并发编程...

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第一阶段20讲、同步代码块以及同步方法之间的区别和关系.mp4 │ 高并发编程第一阶段21讲、通过实验分析This锁的存在.mp4 │ 高并发编程第一阶段22讲、通过实验分析Class锁的存在.mp4 │ 高并发编程...

Global site tag (gtag.js) - Google Analytics