`
QING____
  • 浏览: 2228416 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

JAVA并发数据结构详解

 
阅读更多

前言:这是一场艰苦的旅行....

请首先参考:http://shift-alt-ctrl.iteye.com/blog/1839142

一.BlockingDeque阻塞双端队列(线程安全):

注意ArrayDeque和LinkedList仅仅扩展了Deque,是非阻塞类型的双端队列。

BlockingQueue单向队列,其内部基于ReentrantLock + Condition来控制同步和"阻塞"/"唤醒"的时机;有如下几个实现类:

  1. ArrayBlockingQueue: “浮动相对游标”的数组,来实现有界的阻塞队列。
  2. DelayQueue:支持“可延迟”的队列,DelayQueue还只接受Delayed类型的元素,Delayed接口继承自Compare接口并提供了一个long getDelay(TimeUnit),来获取指定时间到now的时间剩余量。DelayQueue底层就是使用PriorityQueue作为支撑的。
  3. PriorityBlockingQueue:有权重的队列,此队列时可以根据指定的comparator进行排序的。
  4. SynchronousQueue://
  5. LinkedBlockingDeque:有界或者无界阻塞队列

PriorityQueue为非线程安全非阻塞,有权重的队列,其权重需要根据特定的compartor来产生。

 

二.ConcurrentMap(接口):支持并发的map,支持多线程环境中安全的访问。

其提供了几个独特的方法:

  • V putIfAbsent(K,V):如果map中不存在此key,则put,否则返回现有的此key关联的值。此过程有Lock同步:

 

//等价于:
if (!map.containsKey(key)) 
      return map.put(key, value);
  else
return map.get(key);
 
Map<String,Object> map = new ConcurrentHashMap<String, Object>();
if(map.containsKey("key")){
    map.put("key", new Object());
}
//注意,concurrentHashMap并不保证contains方法和put方法直接保持"原子性",即有可能contains方法返回false之后,在put之前,可能其他线程已经put成功,即在当前线程put时,此时数据已经不一致了.建议采用putIfAbsent
 

 

  • boolean remove(Object key,Object value):比较并删除指定的key和value。
  • boolean replace(K,V oldValue,V newValue):比较并替换。

目前实现ConcurrentMap的类有ConcurrentHashMap,一种基于锁分段技术(Segement)实现的并发hashMap,锁采取了ReentrantLock(Segement扩展了ReentrantLock)。

备注:

1)HashMap非线程安全,即在并发环境中进行put、remove时,map容器中最终的数据可能并非当前线程预期的结果,比如Thread-1、Thread-2同时对map中相同的key进行put操作,因为并发的问题,可能Thread-1操作之后(尚未返回)值已经被Thread-2给覆盖,即put方法返回的V与map中实际存储的V并不一致;还有另外一个问题,HashMap内部基于数组存储,所以在并发操作时,数据的扩容并不是“同步的”,这仍然会导致一些问题。

2)为了解决HashMap线程安全问题,我们可以手动对hashMap实例进行同步操作,或者使用JAVA中自带的SynchronizedMap,可以通过Collections.synchronizedMap()创建,此map是线程安全的,其内部put/remove方法均是同步的。但是这种同步,尽管保证了数据安全,但是却牺牲了并发性。

3)所以,我们引入了ConcurrentHashMap,既是线程安全的,有可以支持并发操作。

在JDK 1.6中,ConcurrentHashMap基于锁分段手段来提高并发能力,相对于synchronizedMap任何时候只能有一个线程操作(put、remove),ConcurrentHashMap允许多个线程并发操作且可以保证数据操作是安全的;ConcurrentHashMap中有一个“CONCURRENT_LEVEL”(并发级别)的属性,默认为16,即最多允许16个线程并行操作;此外它还有一个Segements[],数组大小跟并发级别一样,对于任何put/remove操作,首先根据key的hashcode对并发级别取模,并将模数作为Segements[]数组的索引,获取segement对象,然后执行lock,并在数据操作完毕后unlock。由此可见,hashcode与并发level模数相同的key,将会被“同步执行”,模数不同的key使用不同的lock,可以并行执行。这个思想非常简单易于理解!

在JDK 1.8中,ConcurrentHashMap的实现原理进行了修正,抛弃了Segements锁的手段和并发级别,转而使用CAS + 同步操作。(具体原理比较复杂,暂不介绍)

 

ConcurrentHashMap支持并发环境下的遍历 + 删除,无论是基于entrySet()、keySet()、values(),都是安全可靠的,因为这三个方法返回的均为View,其iterator内部持有实际map数据。

 

三.ConcurrentLinkedQueue:

基于单向链表实现的,线程安全的并发队列,无界非阻塞队列,当队列需要在多线程环境中被使用,可以考虑使用它。记住,这是个非阻塞队列不过支持阻塞的队列,貌似都是线程安全的

此队列的size不是时间固定的,它的iterator也会被不断调整。ConcurrentLinkedQueue并没有使用Lock,而是采用了CAS的方式,对tail.next进行赋值操作。因为tail.next永远是null,且队列不接受null的元素。

 

注意,非并发集合(list,queue,set)的iterator以及forEach循环在并发环境中是不能正常工作的,如果原始集合被外部修改(其他线程的add,remove),将会导致异常。对于并发集合的iterator,没有做相关的size校验。

 

Lock(锁)是控制操作(action)的,可以让一个操作或者一个子操作被串行的处理。。。CAS其实只是对内存数据的变更时使用,如果想知道数据变更在并发环境中是否符合预期,才会使用到CAS。

 

四.ConcurrentSkipListMap/ConcurrentSkipListSet

两个基于SkipList(跳跃表)方式实现的、支持并发访问的数据结构。依据跳跃表的思想,可以提高数据访问的效率。其中ConcurrentSkipListSet底层使用ConcurrentSkipListMap支撑。

ConcurrentSkipListMap也是ConcurrentNavigableMap的实现类,对于SkipList,其内部元素,必须是可排序的。

 

跳跃表是一个很简单的表,(参见跳跃表概念),对底层的线性存储结构,加入类似“多级索引”的概念,“索引”的添加时基于随即化。一个跳跃表,整体设计上(设计思路很多)分为表左端head索引,右端tail索引(边界),底端存储层(排序的线性链表),和一个随机化、散列化的不同高度的多级索引“指针”。head索引是高度最高的索引,它是整个链表中值最小的元素锁产生的索引;右端为边界索引,索引指向null或者任意设计的边界值(bound).

 

跳跃表的底端是一个和普通的链表没啥区别,单向或者双向的均可,前提是必须是排序的。索引节点,就是一个有向路径的标,每个索引节点,都分别有right、down指向,对于双向跳跃表,就具有left、right、up、down四个方向指针;指针就是为了方便寻路。每个新增元素时,首先会导致底层链表的改动;根据自定义的随即算法,来决定此元素的索引高度,如果高度不为0,则依次建立相应层高的索引,并调整各个层高的所以指向。

 

跳跃表之所以这么设计,实事上就是在做一件事情:基于简单的设计思路和算法,来实现较为高效的查询策略。相对于二分查找有一定的优势.

 

五.CopyOnWriteArrayList/CopyOnWriteArraySet:

均是CopyOnWrite思想,在数据修改时(happen-before),对数据进行Copy(),read操作可以在原数据结构上继续进行,待write操作结束后,调整数据结构指针。基于这种设计思路的数据结构,通常是read操作频率远大于write操作,可以在并发环境中,支撑较高的吞吐量;避免了因为同步而带来的瓶颈,同时也能确保数据安全操作。同时需要注意,copy操作将会带来多余的空间消耗。注意,此API时支持并发的,多个线程add操作(即CopyOnWrite)将会被队列化,内部采取了ReentrantLock机制来控制.

  • CopyOnWriteArrayList底层基于数组实现,在进行write操作时(add,remove),将会导致Arrays.copy操作,创建一个新的数组;待write操作成功后,将原数组的指针替换成新数组指针.
  • CopyOnWriteArraySet底层直接使用CopyOnWriteArrayList作为支撑,只不过在add操作时会遍历整个数组结构并进行equals比较(确保具有Set的特性),只有发现此新元素不存在时才会"替换指针".

    java中这两个API,支持并发操作时,仍然可以进行遍历而无需额外的同步;即不会抛出ConcurrentModificationException。事实上,迭代器所持有的数组只是一个"创建iterator时底层数组的引用",所以在遍历期间,即使CopyOnWriteArrayList已经新增或者删除了某些元素,仍不会发生冲突,因为iterator持有的是旧数组的引用,而CopyOnWriteArrayList持有的是Copy操作时创建的新数组引用..由此可见,iterator也无法反应实时的数组变化(遍历期间,实际数组的添加、删除),但是原始数组中对象内容发生改变还是可以在迭代器中反应出来。CopyOnWrite的遍历器的remove/add/set操作不被支持,这区别于ArrayList.

    CopyOnWriteArrayList、CopyOnWriteArraySet,底层基于数组实现,采取ReentrantLock来同步add/remove/clear等操作。并采取了snapshot的简单手段:


 

//例如add:

public boolean add(E e) {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
	   Object[] elements = getArray();
	   int len = elements.length;
		//数组copy
	   Object[] newElements = Arrays.copyOf(elements, len + 1);
	   newElements[len] = e;
		//修改结束后,指针转换
	   setArray(newElements);
	   return true;
	} finally {
	   lock.unlock();
	}
}
 

 

 

六.CountDownLatch:

同步类,用于多个线程协调工作。共享锁,当锁计数器较少到0时,将释放等待的线程。使用场景是,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。当CountDownLatch的锁计数器为1时,可以作为一种“开关”来使用。计数器无法被重置,如果需要重复计数,可以使用CyclicBarrier。

CountDownLatch内部基于AQS来控制线程访问。这个API很简单,只有2个核心方法:

  • void await():如果计数器不为0,则获取锁失败,加入同步队列;即线程阻塞。
  • void countDown():释放锁,导致计数器递减,如果此时计数器为0,则表示锁释放成功,AQS会帮助“发射”因为await阻塞的线程(组)。
public class CountDownLatchTestMain {

	/**

	* @param args

	*/

	public static void main(String[] args) throws Exception{
		System.out.println("Begin");
		CountDownLatch latch = new CountDownLatch(2);
		for(int i=0;i<4;i++){
			CThread c = new CThread(i,latch);
			c.start();
			//Thread.sleep(500);
		}
		latch.await();
		System.out.println("End");
	}

	static class CThread extends Thread{
		CountDownLatch latch;
		int count;
		CThread(int count,CountDownLatch latch){
			this.count = count;
			this.latch = latch;
		}
		@Override
		public void run(){
			try{
				System.out.println("---"+count);
			}catch(Exception e){
				e.printStackTrace();
			} finally { latch.countDown();}

		}

	}

}

 
 

七.CyclicBarrier:

循环屏障,它允许一组线程互相等待,直到到达某个公共屏障点;线程(组)数量固定,线程之间需要不时的互相等待,CyclicBarrier和CountDownLatch相比,它可以在释放等待线程后被再次“重用”,所以称为循环屏障。它提供了类似“关卡”的功能。对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。

 

  • CyclicBarrier(int parties):指定参与者个数
  • CyclicBarrier(int parties,Runnable barrierAction):指定一个屏障操作,此操作将会有最后一个进入barrier的线程执行。
  • int await():在所有的线程达到barrier之前,一直等待。此方法可以抛出InterrutedExeception(此线程被中断),可以抛出BrokenBarrierExeception(当其他参与者在wait期间中断,导致屏障完整性被破坏),在方法被await时,如果抛出上述异常,需要做补救的相应操作。此方法返回当前线程到达屏障时的索引。(第一个到达的,为0,最后一个为getParties() - 1);根据返回值的不同可以做一些操作,比如最先/最后达到的做一些前置、后置操作等。await方法其实就是获取锁的过程---信号量减1,初始信号量为parties,如果信号量减少到0时,将会唤醒之前所有await()的线程,并开始新的一轮(next),不需要执行reset,即此时信号量再次恢复到parties,接下来线程可以继续await。
  • boolean isBroken():屏障是否处于损坏状态。
  • void reset():重置屏障为其初始状态;如果此时有线程在await,其线程将会抛出BrokenBarrierExeception。对于reset操作,需要线程的执行方法有相应的配合(比如支持操作轮训等),否则重置会带来一些不必要的麻烦。。。如果你需要重置,尚不如重新建一个CyclicBarrier。

底层基于ReentrantLock实现。线程阻塞基于Condition方式(注意Condition底层也是通过AQS框架实现);大概伪代码:

ReentrantLock lock = new ReentrantLock();
Condition trip = lock.newCondition();
////await方法:
if(count!=0){
	trip.await();//AQS:当前线程队列化,LockSupport.park
	count--;
}else{
	trip.signalAll();
}
 

 

//////////////////代码实例

public class CyclicBarrierTestMain {

 

	/**

	* @param args

	*/

	public static void main(String[] args) throws Exception{
		CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
			@Override
			public void run() {
				System.out.println("Barrier action!!");
			}
			});
		for(int i=0;i<5;i++){
			CThread c = new CThread(barrier);
			c.start();
		}
		Thread.sleep(1000);
	}

	static class CThread extends Thread{
		CyclicBarrier barrier;
		CThread(CyclicBarrier barrier){
			this.barrier = barrier;
		}
		@Override
		public void run(){
			int count = 0;
			while(true){
				try{
					System.out.print("---" + count);
					int index = barrier.await();
					System.out.println("+++" + count);
					count++;
					if(index == barrier.getParties() - 1){
						//barrier.reset();
					}
				}catch(Exception e){
					e.printStackTrace();
					break;
				}
			}
		}
	}
}
 

 八.Exchanger

Exchanger:同步交换器,2个互相匹配(协调的对象),互相交换数据。2个线程需要把相同类型的数据,以互相等待的方式交换。比如线程1将数据A交换给B,此时线程1等待直到线程B将数据交换出去。Exchanger有一个方法,就是exchange(V x):其作用就是等待另一个线程到达交换点,然后将数据传递给线程。

如果没有其他线程到达交换点,处于调度的目的,禁用当前线程,直到某个线程到达或者某个线程中断。

伪代码:

void exchange(V item){
	//如果有线程已经到达
	for(;;){
		Node e = get();
		if(e != null){
			V i = e.getItem();
			CAS(e,i,null);//将等待匹配者移除
			Thread t = e.waiter;
			LockSupport.unpark(t);
			//
			Node ne = new Node(currentThread,ne);
			set();//将当前需要交换的数据加入,当其他线程unpart之后,可以get,并获取数据
			return i;//返回需要交换的数据
		}else{
			Node e = new Now(currentThread,item);
			set(node);
			LockSupport.park(currentThread);
		//重新回到顶层for循环,并获取交换数据
		}
	}
}
 

如下的例子是基于一个简单的Productor和Consumer模式,一个线程负责生产数据,当数据满时,交换给consumer消费;当consumer消费完时,也申请交换。

 

 

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Exchanger;

public class ExchangerTestMain {

	/**

	* @param args

	*/

	public static void main(String[] args) throws Exception{
		Exchanger<Queue<Integer>> exchanger = new Exchanger<Queue<Integer>>();
		CThread c = new CThread(exchanger);
		PThread p = new PThread(exchanger);
		c.start();
		p.start();
		Thread.sleep(2000);
	}

	static class CThread extends Thread{
		Exchanger<Queue<Integer>> exchanger ;
		Queue<Integer> current;
		CThread(Exchanger<Queue<Integer>> exchanger){
			this.exchanger = exchanger;
		}

		@Override
		public void run(){
			if(current == null){
				current = new ArrayDeque<Integer>(10);
			}
			try{
				while(true){
				//productor
				if(current.size() == 0){
					current = exchanger.exchange(current);//交换出去fullList,希望获得EmptyList
				}
					System.out.println("C:" + current.poll());
				}
			}catch(Exception e){
				e.printStackTrace();
				return;
			}
		}
	}

	static class PThread extends Thread{
		Exchanger<Queue<Integer>> exchanger ;
		Queue<Integer> current;
		PThread(Exchanger<Queue<Integer>> exchanger){
			this.exchanger = exchanger;
		}

		@Override
		public void run(){
			Random r = new Random();
			if(current == null){
				current = new ArrayDeque<Integer>(10);
			}
			try{
				while(true){
					//productor
					if(current.size() >= 10){
						current = exchanger.exchange(current);//交换出去fullList,希望获得EmptyList
					}
					Integer i = r.nextInt(20);
					System.out.println("P:" + i);
					current.add(i);
				}
			}catch(Exception e){
				e.printStackTrace();
				return;
			}
		}
	}
}
 

 

九.Semaphore:信号量

我们需要把semaphore真的看成“信号量”,它是可以被“增减”的锁引用,“0”是判断信号“过剩”的界限。

我们通常使用semaphore来控制资源访问并发量。它底层使用“共享”模式锁实现,提供了“公平”“非公平”2中策略。当“信号量”大于0时,允许获取锁;否则将阻塞直到信号量恢复。

将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。这通常也称为二进制信号量,因为它只能有两种状态:一个可用的许可,或零个可用的许可。按此方式使用时,二进制信号量具有某种属性(与很多 Lock 实现不同),即可以由线程释放“锁”,而不是由所有者(因为信号量没有所有权的概念)。在某些专门的上下文(如死锁恢复)中这会很有用。

  • Semaphore(int permits, boolean fair):指定信号量,指定公平策略。
  • void acquire():获取一个信号,如果信号量<=0,则阻塞;在非公平模式下,允许闯入。
  • void acquire(int permits).

上面2个方法都会抛出InterruptException,即在等待线程被“中断时”,将会抛出异常而返回。底层基于AQS.acquireSharedInterruptibly()

  • void acquireUninterruptibly():获取一个信号,不支持中断,当线程被中断时,此线程将继续等待,当线程确实从此方法返回后,将设置其中断状态。底层基于AQS.acquireShared();
  • void release():释放一个信号,直接导致信号量++。
  • boolean tryAcquire():获取一个信号,如果获取成功,则返回true。
分享到:
评论

相关推荐

    Java高并发编程详解:多线程与架构设计 (Java核心技术系列)

    我们可以通过synchronized关键字,或者Lock等显式锁的方式在代码的编写阶段对共享资源进行数据一致性保护,那么一个Class在完成初始化的整个过程到后在方法区(JDK8 以后在元数据空间)其数据结构是怎样确保数据一致...

    「Java学习+面试指南」一份涵盖大部分 Java 程序员所需要掌握的核心知识 准备 Java 面试,首选.zip

    ConcurrentHashMap 源码+底层数据结构分析 IO IO 基础知识总结 IO 设计模式总结 IO 模型详解 并发 知识点/面试题总结 : (必看 ) Java 并发常见知识点&面试题总结(上) Java 并发常见知识点&面试题总结(中) Jav

    「Java学习+面试指南」一份涵盖大部分 Java 程序员所需要掌握的核心知识

    ConcurrentHashMap 源码+底层数据结构分析 IO IO 基础知识总结 IO 设计模式总结 IO 模型详解 并发 知识点/面试题总结 : (必看 ) Java 并发常见知识点&面试题总结(上) Java 并发常见知识点&面试题总结(中) Java ...

    JAVA高并发高性能高可用高扩展架构视频教程

    打造高效代码结构(java性能优化) 新版本通俗易懂_观察者模式递进时讲解 ibatis连接数据库 高并发之单(多)生产者消费者线程 高并发复用数据库链接技术详解之数据库连接池 类加载器的高级特性(自定义类加器实现加密...

    汪文君高并发编程实战视频资源下载.txt

    │ Java并发编程.png │ ppt+源码.rar │ 高并发编程第二阶段01讲、课程大纲及主要内容介绍.wmv │ 高并发编程第二阶段02讲、介绍四种Singleton方式的优缺点在多线程情况下.wmv │ 高并发编程第二阶段03讲、...

    JAVA学习资源1.0

    17.Java数据结构和算法 18.java代码重构 19.一线架构师实践指南 20.springCloud微服务实战 21.重构改善 如此多的书籍是作者辛苦整理,多年沉淀学习的成果,今日拿出分享给新秀程序员,和想进入开发编程的小白。

    互联网公司Java面试题及核心知识点

    Dubbo,Spring Boot,Spring Cloud,数据结构与算法,设计模式,网络与操作系统等 相关技术领域的大厂面试题及详解,由于纸质书籍篇幅有限,部分面试题的解答会以电子版 的形式呈现。 适用人群:本资源适合具备一定...

    精通 Hibernate:Java 对象持久化技术详解(第2版).part2

    第1章 Java应用分层架构及软件模型  1.1 应用程序的分层体系结构  1.1.1 区分物理层和逻辑层  1.1.2 软件层的特征  1.1.3 软件分层的优点  1.1.4 软件分层的缺点  1.1.5 Java应用的持久化层  1.2 软件的模型 ...

    Java线程详解.ppt

    根据定义,进程为一个数据结构及能在其上进行的一次操作, 它有两个基本特征, 1:进程是可用于资源的独立单位, 2:进程同时又是一个可独立调度和分派的基本单位, 这两个基本属性使之能够独立运行,也能够并发...

    java基础案例与开发详解案例源码全

    1.3 Java平台的体系结构7 1.3.1 JavaSE标准版8 1.3.2 JavaEE企业版10 1.3.3 JavaME微型版11 1.4 JavaSE环境安装和配置12 1.4.1 什么是JDK12 1.4.2 JDK安装目录和实用命令工具介绍12 1.4.3 设置环境变量13 1.4.4 验证...

    免费超全面的Java基础类型,容器,并发,IO流,面向对象,Web编程等代码总结

    Java基础类型,容器,并发,IO流,面向对象,Web编程等代码总结。 2、分类文档 JVM虚拟机 JVM特点,结构与执行周期 JVM类加载机制 JVM运行时区数据 JVM执行引擎和垃圾回收 基础语法 理解Java中对象基础Object类 ...

    汪文君高并发编程实战视频资源全集

    │ Java并发编程.png │ ppt+源码.rar │ 高并发编程第二阶段01讲、课程大纲及主要内容介绍.wmv │ 高并发编程第二阶段02讲、介绍四种Singleton方式的优缺点在多线程情况下.wmv │ 高并发编程第二阶段03讲、...

    精通并发与 netty 视频教程(2018)视频教程

    76_Netty项目开发过程中常见且重要事项分析 77_Java NIO Buffer总结回顾与难点拓展 78_Netty数据容器ByteBuf底层数据结构深度剖析 79_Netty的ByteBuf底层实现大揭秘 80_Netty复合缓冲区详解与3种缓冲区适用场景分析 ...

    精通并发与netty视频教程(2018)视频教程

    78_Netty数据容器ByteBuf底层数据结构深度剖析 79_Netty的ByteBuf底层实现大揭秘 80_Netty复合缓冲区详解与3种缓冲区适用场景分析 81_Netty引用计数的实现机制与自旋锁的使用技巧 82_Netty引用计数原子更新揭秘与...

    精通并发与netty 无加密视频

    第78讲:Netty数据容器ByteBuf底层数据结构深度剖析 第79讲:Netty的ByteBuf底层实现大揭秘 第80讲:Netty复合缓冲区详解与3种缓冲区适用场景分析 第81讲:Netty引用计数的实现机制与自旋锁的使用技巧 第82讲:...

    Java思维导图xmind文件+导出图片

    redis数据结构分析 Redis主从复制原理及无磁盘复制分析 Redis管道模式详解 Redis缓存与数据库一致性问题解决方案 基于redis实现分布式实战 图解Redis中的AOF和RDB持久化策略的原理 redis读写分离架构实践 ...

    精通 Hibernate:Java 对象持久化技术详解(第2版).part4

    第1章 Java应用分层架构及软件模型  1.1 应用程序的分层体系结构  1.1.1 区分物理层和逻辑层  1.1.2 软件层的特征  1.1.3 软件分层的优点  1.1.4 软件分层的缺点  1.1.5 Java应用的持久化层  1.2 软件的模型 ...

Global site tag (gtag.js) - Google Analytics