并发容器
这些容器的关键方法大部分都实现了线程安全的功能,却不使用同步关键字(synchronized)。值得注意的是Queue接口本身定义的几个常用方法的区别,
1.add方法和offer方法的区别在于超出容量限制时前者抛出异常,后者返回false;
2.remove方法和poll方法都从队列中拿掉元素并返回,但是他们的区别在于空队列下操作前者抛出异常,而后者返回null;
3.element方法和peek方法都返回队列顶端的元素,但是不把元素从队列中删掉,区别在于前者在空队列的时候抛出异常,后者返回null。
阻塞队列:
非阻塞队列:
转移队列:
其它容器:
同步设备
这些类大部分都是帮助做线程之间同步的,简单描述,就像是提供了一个篱笆,线程执行到这个篱笆的时候都得等一等,等到条件满足以后再往后走。
任何时候都有一个party的总数,即注册(registered)的party数,它可以在Phaser构造器里指定,也可以任意时刻调用方法动态增减;
每一个party都有unarrived和arrived两种状态,可以通过调用arriveXXX方法使得它从unarrived变成arrived;
每一个线程到达barrier后会等待(调用arriveAndAwaitAdvance方法),一旦所有party都到达(即arrived的party数量等于registered的数量),就会触发advance操作,同时barrier被打破,线程继续向下执行,party重新变为unarrived状态,重新等待所有party的到达;
在绝大多数情况下一个线程就只负责操控一个party的到达,因此很多文章说party指的就是线程,但是这是不准确的,因为一个线程完全可以操控多个party,只要它执行多次的arrive方法。
一个是phase,表示当前在哪一个阶段,每碰到一次barrier就会触发advance操作(触发前调用onAdvance方法),一旦越过这道barrier就会触发phase+1,这很容易理解;
另一个是party,很多文章说它就是线程数,但是其实这并不准确,它更像一个用于判断advance是否被允许发生的计数器:
给出一个Phaser使用的最简单的例子:
public
class
T {
public
static
void
main(String args[]) {
final
int
count = 3;
final
Phaser phaser = new
Phaser(count);
// 总共有3个registered parties
for(int
i = 0; i < count; i++) {
final
Thread thread = new
Thread(new
Task(phaser));
thread.start();
}
}
public
static
class
Task implements
Runnable {
private
final
Phaser phaser;
public
Task(Phaser phaser) {
this.phaser = phaser;
}
@Override
public
void
run() {
phaser.arriveAndAwaitAdvance();
// 每执行到这里,都会有一个party arrive,
//如果arrived parties等于registered parties,
//就往下继续执行,否则等待
}
}
}
原子对象
这些对象都的行为在不使用同步的情况下保证了原子性。值得一提的有两点:
1.weakCompareAndSet方法:compareAndSet方法很明确,但是这个是啥?根据JSR规范,调用weakCompareAndSet时并不能保证happen-before的一致性,因此允许存在重排序指令等等虚拟机优化导致这个操作失败(较弱的原子更新操作),但是从Java源代码看,它的实现其实和compareAndSet是一模一样的;
2.lazySet方法:延时设置变量值,这个等价于set方法,但是由于字段是volatile类型的,因此次字段的修改会比普通字段(非volatile字段)有稍微的性能损耗,所以如果不需要立即读取设置的新值,那么此方法就很有用。
锁
class
BoundedBuffer {
final
Lock lock = new
ReentrantLock();
final
Condition notFull = lock.newCondition();
final
Condition notEmpty = lock.newCondition();
final
Object[] items = new
Object[100];
int
putptr, takeptr, count;
public
void
put(Object x) throws
InterruptedException {
lock.lock();
try
{
while
(count == items.length)
notFull.await();
items[putptr] = x;
if
(++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
// 既然已经放进了元素,肯定不空了,唤醒“notEmpty”
} finally
{
lock.unlock();
}
}
public
Object take() throws
InterruptedException {
lock.lock();
try
{
while
(count == 0)
notEmpty.await();
Object x = items[takeptr];
if
(++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
// 既然已经拿走了元素,肯定不满了,唤醒“notFull”
return
x;
} finally
{
lock.unlock();
}
}
}
Fork-join框架
这是一个JDK7引入的并行框架,它把流程划分成fork(分解)+join(合并)两个步骤(怎么那么像MapReduce?),传统线程池来实现一个并行任务的时候,经常需要花费大量的时间去等待其他线程执行任务的完成,但是fork-join框架使用work stealing技术缓解了这个问题:
1.每个工作线程都有一个双端队列,当分给每个任务一个线程去执行的时候,这个任务会放到这个队列的头部;
2.当这个任务执行完毕,需要和另外一个任务的结果执行合并操作,可是那个任务却没有执行的时候,不会干等,而是把另一个任务放到队列的头部去,让它尽快执行;
3.当工作线程的队列为空,它会尝试从其他线程的队列尾部偷一个任务过来;
4.取得的任务可以被进一步分解。
class SortTask extends RecursiveAction { final long[] array; final int lo; final int hi; private int THRESHOLD = 30; public SortTask(long[] array) { this.array = array; this.lo = 0; this.hi = array.length - 1; } public SortTask(long[] array, int lo, int hi) { this.array = array; this.lo = lo; this.hi = hi; } @Override protected void compute() { if (hi - lo < THRESHOLD) sequentiallySort(array, lo, hi); else { int pivot = partition(array, lo, hi); coInvoke(new SortTask(array, lo, pivot - 1), new SortTask(array, pivot + 1, hi)); } } private int partition(long[] array, int lo, int hi) { long x = array[hi]; int i = lo - 1; for (int j = lo; j < hi; j++) { if (array[j] <= x) { i++; swap(array, i, j); } } swap(array, i + 1, hi); return i + 1; } private void swap(long[] array, int i, int j) { if (i != j) { long temp = array[i]; array[i] = array[j]; array[j] = temp; } } private void sequentiallySort(long[] array, int lo, int hi) { Arrays.sort(array, lo, hi + 1); }} |
---|
测试的调用代码:
@Test
public
void
testSort() throws
Exception {
ForkJoinTask sort = new
SortTask(array);
ForkJoinPool fjpool = new
ForkJoinPool();
fjpool.submit(sort);
fjpool.shutdown();
fjpool.awaitTermination(30, TimeUnit.SECONDS);
assertTrue(checkSorted(array));
}
class
Fibonacci extends
RecursiveTask {
final
int
n;
Fibonacci(int
n) {
this.n = n;
}
private
int
compute(int
small) {
final
int[] results = { 1, 1, 2, 3,
5, 8, 13, 21, 34, 55, 89
};
return
results[small];
}
public
Integer compute() {
if
(n <= 10) {
return
compute(n);
}
Fibonacci f1 = new
Fibonacci(n - 1);
Fibonacci f2 = new
Fibonacci(n - 2);
f1.fork();
f2.fork();
return
f1.join() + f2.join();
}
}
RecursiveTask和RecursiveAction的区别在于它的compute是可以有返回值的,子任务的计算使用fork()方法,结果的获取使用join()方法:
执行器和线程池
这个是我曾经举过的例子:
public class FutureUsage { public static void main(String[] args) { ExecutorService executor = Executors. newSingleThreadExecutor(); Callable<Object> task = new Callable<Object>() { public Object call() throws Exception { Thread.sleep(4000); Object result = "finished"; return result; } }; Future<Object> future = executor.submit(task); System.out.println("task submitted"); try { System.out.println(future.get()); } catch (InterruptedException e) { } catch (ExecutionException e) { } // Thread won't be destroyed. }} |
---|
线程池具备这样的优先级处理策略:
1.请求到来首先交给coreSize内的常驻线程执行
2.如果coreSize的线程全忙,任务被放到队列里面
3.如果队列放满了,会新增线程,直到达到maxSize
4.如果还是处理不过来,会把一个异常扔到RejectedExecutionHandler中去,用户可以自己设定这种情况下的最终处理策略
对于大于coreSize而小于maxSize的那些线程,空闲了keepAliveTime后,会被销毁。观察上面说的优先级顺序可以看到,假如说给ExecutorService一个无限长的队列,比如LinkedBlockingQueue,那么maxSize>coreSize就是没有意义的。
ExecutorService:
ScheduledExecutor:
CompletionService:
其它:
ThreadLocalRandom.class,随机数生成器,它和Random类差不多,但是它的性能要高得多,因为它的种子内部生成后,就不再修改,而且随机对象不共享,就会减少很多消耗和争用,由于种子内部生成,因此生成随机数的方法略有不同:
ThreadLocalRandom.current().nextX(…)
(完)
出处:http://www.raychase.net/1912
版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。