- 浏览: 28294 次
- 性别:
- 来自: 北京
最新评论
-
cin_ie:
通俗易懂,此文甚好。
JAVA设计模式--建造者模式 -
asleep886:
...
JAVA设计模式--建造者模式 -
asleep886:
JAVA设计模式--建造者模式
一:java并发的开篇
1、在实际应用中,经常用到线程的并发,那为什么需要用到并发呢,不能独自单独的程序处理吗,那很明确的说,多条线程做完成一件事情和一条线程去完成一件事情,那是无法同言而语的。比如在实际的网站访问过程中,世界各地的人都去同事访问一个网站,在服务器端,如果收到一个请求就对它先处理,而其他用户的请求到达只是对 他们先存储,然后在对他们进行处理,可想而知这样的速度会有多慢,如果一天之类用千万级的用户访问,那样的速度无法想象。在现实生活中这样的例子随处可见,做 一件事情,一个人去做和多个人同事做可想这样的速度是多么快,只是在做的过程当中对与大家共用的东西应当控制并发访问问题。
2、同步:当线程并发的时候,随之而来的也带来了一些问题,如果多条线程去同时操作共享而用的数据那将是会使共享的数据很容易就出现错误。在JAVA中提供同防止多条线程同时共享数据的方式是:synchronized,volatile很好的利用这两个关键字就能防止并发而带来的问题,同时在JAVA1.5之后也提供了更多好用的类来解决这个问题
3、线程池:当有许多请求需要去处理的时候,如果只是单独的一个人去处理,可想而知那会让后面在排队的人等多久,这样就需要线程池,有请求过来了就到线程池里 面取出一条线程去处理它,处理完成就把它收回到线程池里面,然而自己实现 一个功能强大的线程池也并非易事,在java1.5之后专门提供了线程池的类库
二、java.util.concurrent包中
1、java.util.concurrent.Executors类
通过这个类可以创建不同的线程池
1.1、创建固定大小的线程池
ExecutorService service = Executors.newFixedThreadPool(3);//创建一个大小为3的线程池
//通过线程池service,可以给线程池进行提交任务等操作
ExecutorService的方法
*提交任务
四个方法:
Submit(Callable<T> task);
submit(Runnable task)
submit(Runnable task, T result)
execute()
说明:前三个方法都是带有返回值的执行任务,最后一个方法只是单纯的执行任务,方法的返回值是Future类型的,它是支持泛型的,返回值要和泛型的类型一致
Future<String> future = service.submit(new Callable<String>() {
Public String call() throws Exception() {
Return “test”;
}
});
//得到返回值
String str = future.get();
当使用submit(Runnable task)的时候,里面执行的是run方法是没有返回值的,
得到的future.get()得到的是Null值,表示线程正常执行完成
线程执行完毕关闭线程池里拥有的线程:
两个方法:
Shutdown():调用的时候会先完成当前正在执行和已经提交的任务
shutdownNow():结束当前正在执行的任务和等待的任务也不会执行,返回待执行
任务列表
1.2、创建可变大小线程池
ExecutorService service = Executors. newCachedThreadPool();
这种方式返回的是一个可变的线程池,只要有新的任务来到,如果没有可用的线程存在就新建一个线程
1.3、创建一个定时器样式的线程池:在单位时间内执行任务
1.3.1、定时器:
public static void main5() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
System.out.println("5秒钟后将发生爆炸");
service.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("蹦............");
return "success";
}
},
5,
TimeUnit.SECONDS);
}
1.3.2:每五秒钟发生一次爆炸
public static void main7() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
System.out.println("5秒钟后将发生连环爆炸");
service.scheduleWithFixedDelay((new Runnable() {
@Override
public void run() {
System.out.println("蹦...........");
}
}), 5, 1, TimeUnit.SECONDS);
}
scheduleWithFixedDelay或者scheduleAtFixedRate方法都是能够产生循环定时器,只是实现方式不一样
说明:ScheduledExecutorService接口实现了ExecuorService所以它一样拥有ExecutoService的功能
1.4:创建单线程的线程池
ExecutorService service = Executors.newSingleThreadExecutor();
2、部分ExecuorService接口方法说明:
invokeAll(Collection<? extends Callable<T>> tasks):事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们。返回所有任务执行完成后的future集合
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,当任务都完成或者是超时时,返回所有任务的状态future集合
invokeAny(Collection<? extends Callable<T>> tasks): 事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,每执行完成一个线程就返回一个future值
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): 事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,在指定时间内完成了一个任务就返回Future结果
3、对线程任务完成队列的合理处理类ExecutorCompletionService
利用线程池能很好的执行任务,并且得到任务的执行返回结果,当然可以自己对所有处理的结果进行处理,但是java.util.concurrent提供了一个很简单的类给我们用就是
ExecutorCompletionService 用它来执行任务,所有任务的返回结果都将放置在一个队列上,之后可以通过队列取得任务的执行结果,根据任务执行结果做相应的操作,同时也很方便的对所有的执行结果统一的处理
public static void main8() {
ExecutorService service = Executors.newCachedThreadPool();
CompletionService<Integer> cservice = new ExecutorCompletionService<Integer>(service);
Collection<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
for(int i=0; i<10; i++) {
final int squence = i;
tasks.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
for(int j=1; j<5; j++) { System.out.println(Thread.currentThread().getName() + " task " + squence + " time " + j);
}
return squence;
}
});
}
for(Callable<Integer> task:tasks) {
cservice.submit(task);
}
//统一处理任务结果
int result = 0;
for(int i=0; i<tasks.size(); i++) {
result += cservice.take().get();
}
System.out.println(result);
可知任务完成的时候,处理结果会存放在调用take()方法所获取的队列上面,方便统一处理结果
三、java.util.concurrent.locks
3.1、锁是用来防止线程并发而带来的线程安全问题,在访问线程共享数据的时候,操作它或者读取它的时候,如果不希望别的线程也同时对它就行读取,那应该给它上锁,这样只要一个线程进去了对他上锁了,别的线程就无法在进入了。有时候线程并发不是就是单一并发访问问题,并发访问同时可能也会导致数据不一致
public class LockTest {
public static void main(String[] args) {
final Service s = new Service ();
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<10; i++) {
service.execute(new Runnable() {
public void run() {
s.service();
}
});
}
}
static class Service {
private int count = 0;
Lock lock = new ReentrantLock();
public void service() {
++ count;
try {
Thread.sleep(200l);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("service :" + count + " time");
}
}
}
对与以上程序,我们希望打印的是
service : 1 time
service : 2 time
……..
Service: 10 time
实际情况确实在多线程的环境里打印的结果确实
Service:10 time
Service:10 tiem
…..
Service :10 time
所以在调用service方法操作count的时候,应该上锁,service上锁后方法如下:
public void service() {
try {
lock.lock();//加锁
++count;
Thread.sleep(200l);
System.out.println("service :" + count + " time");
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();//释放锁
}
}
当然在JAVA里面同时还可以使用synchronized关键字来防止同步问题
3.2、读写锁,在JDK5.0以前是没用线程并发库的,如果用synchronized确实能实现同步问题,但是比如对与一个类的属性,希望的效果是在读取它的时候能多线程同时读,在写它的时候不能读,在读它的时候不能写。这样如果只是单单在方法上面加synchronized关键字,它会使所有的线程互斥,不能说多个线程能同时读取对象的值,虽然自己编写代码实现,但是JAVA5之后线程并发库已经给了解决方案,那就是读写锁,如下例子:
public class LockTest {
public static void main(String[] args) {
final ReadWriteLockT s = new ReadWriteLockT();
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 4; i++) {
service.execute(new Runnable() {
public void run() {
while(true) {
s.getX();
}
}
});
service.execute(new Runnable() {
public void run() {
while(true) {
s.setX(new Random().nextInt());
}
}
});
}
}
static class ReadWriteLockT{
private int x = 0;
private ReadWriteLock rock = new ReentrantReadWriteLock();
public void setX(int x) {
rock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + ".........进入写读锁");
this.x = x;
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("写完毕");
rock.writeLock().unlock();
}
public int getX() {
rock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "----------进入读读锁:");
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(x);
System.out.println("读完毕");
rock.readLock().unlock();
return x;
}
}
}
说明:从打印的结果中可以看出,在多条线程操作的过程中,在读的过程中别的线程是无法调用写的方法的,在写的过程中也是无法调用读的方法的,只有在读的时候才能同时调用读的方法,当然在写的时候不能同时调用写的方法
3.3、condition的使用
Lock锁得机制是在synchronized关键字上面进行了一些功能的提升,那么condition是在wait(),notify(),notifyall()上面做了对应的提升。对与唤醒线程,是线程等待它能做到更好的实现,即时大部分其实它是更加以一种优雅的形式去表现wait(),notify(),notifyall()的一种实现。其实在某种意义上说,之前的wait这样的方法只是针对一个对象的监视器,然而现在对功能的扩展,就好比能分解成多个监视器在监视一个对象一般,同时每个监视器和对应的锁形成 的组合,能更加有序的对线程进行唤醒和等待。下面结合JDK API帮助文档里的例子谈谈它们之间的区别
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
说明:如上面的例子,线程不断的调用put 和 take方法向队列items里面放入数据,放蛮的情况下就无法在放,唤醒拿的线程。 在拿的时候如果为空唤醒对应的放的线程,协调工作。
在这里如果是之前的wait,notify这样的方式的话,因为程序是有多条put和多条take的线程同时并发访问,wati,notify这样的方式只能唤醒一天线程,比如take发现是空的,就得put了,但同时也有好多其它的take线程在等待锁,所以这时候也有可能锁再一次给了take线程,这样的话显然不是自己的意愿,要的效果是让put线程拿到锁,而condition就做到了这样,上面例子可看出,只要take是空的,就唤醒一条put线程,相反只要put时是满的,就唤醒一条take线程
四、java.util.concurrent.atomic类讲解
类摘要
AtomicBoolean
可以用原子方式更新的 boolean 值。
AtomicInteger
可以用原子方式更新的 int 值。
AtomicIntegerArray
可以用原子方式更新其元素的 int 数组。
AtomicIntegerFieldUpdater<T>
基于反射的实用工具,可以对指定类的指定 volatile int 字段进行原子更新。
AtomicLong
可以用原子方式更新的 long 值。
AtomicLongArray
可以用原子方式更新其元素的 long 数组。
AtomicLongFieldUpdater<T>
基于反射的实用工具,可以对指定类的指定 volatile long 字段进行原子更新。
AtomicMarkableReference<V>
AtomicMarkableReference 维护带有标记位的对象引用,可以原子方式对其进行更新。
AtomicReference<V>
可以用原子方式更新的对象引用。
AtomicReferenceArray<E>
可以用原子方式更新其元素的对象引用数组。
AtomicReferenceFieldUpdater<T,V>
基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。
AtomicStampedReference<V>
AtomicStampedReference 维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。
讲解:从上面的类中可以看出,它对基本的一些数值的操作做了线程同步的安全,有时候可能对一个数的加,多线线程同时访问加就会出现问题,甚至平时一般的i++,多个线程同事访问都会造成数据混乱。所以对多线程共享时候加一般用它来实现更加好,不过在java1.5之前可以使用volatile关键字来解决问题,然而现在有了这套类库就业可以避免每次为变量申明volatile关键字了。其实这些内部源码都也是基于volatitle的实现。也可以使用synchorzied关键字同步方法,但是用基于volatile的方式有更好的效率
但是volatile关键字其实还有线程安全问题,就是当多个线程对变量进行操作的时候还是可能会造成变量的值不准确性。 然而用这些类却解决了这个问题
用一个例子说明:
Class MyDate {
Private int volatile x;
Public void opera() {
X ++;
}
}
上面的类,当有多个线程同时执行opera的时候,虽然x已经用了volatile关键字同步,但是未能防止x的值出错,虽然出错的概率小
五java.util.concurrent.同步集合类的讲解
ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentSkipListSet
CopyOnWriteArrayList
CopyOnWriteArraySet
集合类说明:传统的集合类中,在多线程共享的环境中会出现错误,所以使用这里的安全集合类就不会出现错误了。
六java.util.concurrent.队列的讲解
所有已知子接口:
BlockingDeque<E>
所有已知实现类:
ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue
这些队列在线程并发中是安全的,也有阻塞队列,阻塞队列实际应用比较多,所以在这也给一个实现。
阻塞队列:在放的时候要有空为才能放,要不然执行等待,在拿的时候要有数据才能执行拿,要不然执行等待。
在前面例子中,用condition方式实现了类似的功能,用这里的阻塞队列将会发现更加方便。
例子实现:
package com.moom;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingDequeTest {
public static void main(String[] args) {
final BlockingQueue<String> queue = new LinkedBlockingQueue<String>(6);//创建一个只能容纳6个容量的阻塞队列
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<3; i++) {
//生产三条拿的线程
service.execute(new Runnable() {
public void run() {
try {
while(true) {//不停的拿
Thread.sleep(200l);
System.out.println(Thread.currentThread().getName() + "即将拿");
String s = queue.take();
System.out.println(Thread.currentThread().getName() + "拿到" + s);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
while(true) { //不停的放
try {
System.out.println(Thread.currentThread().getName() + "即将放");
String s = String.valueOf(new Random().nextInt(10000));
queue.put(s);
System.out.println(Thread.currentThread().getName() + "放入了" + s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
七java.util.concurrent.剩余部分类的讲解
5.1、CyclicBarrier,有些多线程需要在多条线程完成了相应的事情之后然后就继续去完成后续的事情,每次都是这样,就是说当只有一条线程完成了对应的事情时,他是无法继续向前运行的,只有等待。
例子:
package com.moom;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(5);
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
service.execute(new Runnable() {
@Override
public void run() {
// 每条线程要执行的任务
System.out.println("线程" + Thread.currentThread().getName()
+ "将执行完任务");
try {
barrier.await();// 线程执行完就等待,要五条线程都执行完才能做后续操作
// 执行后续任务
System.out.println("线程" + Thread.currentThread().getName() + "后续任务执行完毕");
barrier.await();//等待所有线程执行完后续任务
//执行最后任务,随意执行不等待,结束
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
service.shutdown();
}
}
注释:每次开启的线程数量应当和CyclicBarrier对象的线程数一样,比如上面例子是开启5,然而如果开六条线程的话会发觉程序先前5条线程正常执行下去,第六条执行完先前任务,一直阻塞等待着其它线程的到来。等于就是五条五条线程为一组。
5.2、Semaphore,对与有些多线程共享的数据,并不是说一条线程访问的时候其它线程就不能访问了,是规定只有多少条线程可以访问,好比一个房间里面只能容纳多少个人,只要没满的时候其它的线程就可以进去
例子:
package com.moom;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) {
final Semaphore se = new Semaphore(5);//创建同时可以五个线程访问控制器
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<20; i++) {
service.execute(new Runnable() {
public void run() {
try {
se.acquire();
System.out.println("线程" + Thread.currentThread().getName() + "进来了开始执行任务");
Thread.sleep(100l);
System.out.println("线程" + Thread.currentThread().getName() + "将要执行完毕");
se.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
service.shutdown();
}
}
每次最多只会有5条线程在执行,没条线程执行完毕就释放资源
5.3、CountDownLatch有的多线程应用场景,一个线程唤醒多条线程,多条线程执行完毕有去唤醒这一条线程,就这个循环的唤醒或者单独的唤醒运用用CountDownLatch就比较方便的实现
例子场景:在各类运动比赛当中,裁判线程监视这所有的参赛人线程,参赛人也会监视裁判线程,当裁判说开始参赛人线程开始,参赛人比赛完成,裁判在做回应
实现:
package com.moom;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchTest {
public static void main(String[] args) {
final CountDownLatch judje = new CountDownLatch(1);//产生一个裁判线程用
final CountDownLatch members = new CountDownLatch(5);//5个队员
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<5; i++) {
service.execute(new Runnable() {
public void run() {
try {
Thread.sleep(10l);
System.out.println(Thread.currentThread().getName() + "队员等待下命令");
judje.await();//等待裁判说开始
System.out.println(Thread.currentThread().getName() + "队员开始了");
//执行逻辑
members.countDown();//执行完成
System.out.println(Thread.currentThread().getName() + "结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
System.out.println(Thread.currentThread().getName() + "裁判即将通知开始");
try {
Thread.sleep(2000l);
System.out.println("开始");
judje.countDown();
members.await();//等待所有队员执行完
Thread.sleep(1000l);
System.out.println("裁判统计结果");
service.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5.4、Exchanger有些线程同步需要互相交换数据,比如两天线程执行完毕之后需要交换完毕的数据用exchanger的可以方便的实现
例子实现:
package com.moom;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerTest {
public static void main(String[] args) {
final Exchanger<String> ex = new Exchanger<String>();
final Exchanger<String> ex2 = new Exchanger<String>();
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(new Runnable() {
public void run() {
try {
Thread.sleep(100l);
String da1 = "aaaa";
System.out.println(Thread.currentThread().getName() + "要交换的数据是:" + da1);
String end = ex.exchange(da1);
System.out.println("正在执行交换");
System.out.println(Thread.currentThread().getName() + "得到的数据是:" + end);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
Thread.sleep(100l);
String da1 = "bbbbb";
System.out.println(Thread.currentThread().getName() + "要交换的数据是:" + da1);
String end = ex.exchange(da1);
System.out.println("正在执行交换");
System.out.println(Thread.currentThread().getName() + "得到的数据是:" + end);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
八、FAQ
6.1、为什么使用volatile更有效率
同步的代价, 主要由其覆盖范围决定, 如果可以降低同步的覆盖范围, 则可以大幅提升程序性能.而volatile的覆盖范围仅仅变量级别的. 因此它的同步代价很低.
1、在实际应用中,经常用到线程的并发,那为什么需要用到并发呢,不能独自单独的程序处理吗,那很明确的说,多条线程做完成一件事情和一条线程去完成一件事情,那是无法同言而语的。比如在实际的网站访问过程中,世界各地的人都去同事访问一个网站,在服务器端,如果收到一个请求就对它先处理,而其他用户的请求到达只是对 他们先存储,然后在对他们进行处理,可想而知这样的速度会有多慢,如果一天之类用千万级的用户访问,那样的速度无法想象。在现实生活中这样的例子随处可见,做 一件事情,一个人去做和多个人同事做可想这样的速度是多么快,只是在做的过程当中对与大家共用的东西应当控制并发访问问题。
2、同步:当线程并发的时候,随之而来的也带来了一些问题,如果多条线程去同时操作共享而用的数据那将是会使共享的数据很容易就出现错误。在JAVA中提供同防止多条线程同时共享数据的方式是:synchronized,volatile很好的利用这两个关键字就能防止并发而带来的问题,同时在JAVA1.5之后也提供了更多好用的类来解决这个问题
3、线程池:当有许多请求需要去处理的时候,如果只是单独的一个人去处理,可想而知那会让后面在排队的人等多久,这样就需要线程池,有请求过来了就到线程池里 面取出一条线程去处理它,处理完成就把它收回到线程池里面,然而自己实现 一个功能强大的线程池也并非易事,在java1.5之后专门提供了线程池的类库
二、java.util.concurrent包中
1、java.util.concurrent.Executors类
通过这个类可以创建不同的线程池
1.1、创建固定大小的线程池
ExecutorService service = Executors.newFixedThreadPool(3);//创建一个大小为3的线程池
//通过线程池service,可以给线程池进行提交任务等操作
ExecutorService的方法
*提交任务
四个方法:
Submit(Callable<T> task);
submit(Runnable task)
submit(Runnable task, T result)
execute()
说明:前三个方法都是带有返回值的执行任务,最后一个方法只是单纯的执行任务,方法的返回值是Future类型的,它是支持泛型的,返回值要和泛型的类型一致
Future<String> future = service.submit(new Callable<String>() {
Public String call() throws Exception() {
Return “test”;
}
});
//得到返回值
String str = future.get();
当使用submit(Runnable task)的时候,里面执行的是run方法是没有返回值的,
得到的future.get()得到的是Null值,表示线程正常执行完成
线程执行完毕关闭线程池里拥有的线程:
两个方法:
Shutdown():调用的时候会先完成当前正在执行和已经提交的任务
shutdownNow():结束当前正在执行的任务和等待的任务也不会执行,返回待执行
任务列表
1.2、创建可变大小线程池
ExecutorService service = Executors. newCachedThreadPool();
这种方式返回的是一个可变的线程池,只要有新的任务来到,如果没有可用的线程存在就新建一个线程
1.3、创建一个定时器样式的线程池:在单位时间内执行任务
1.3.1、定时器:
public static void main5() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
System.out.println("5秒钟后将发生爆炸");
service.schedule(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("蹦............");
return "success";
}
},
5,
TimeUnit.SECONDS);
}
1.3.2:每五秒钟发生一次爆炸
public static void main7() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
System.out.println("5秒钟后将发生连环爆炸");
service.scheduleWithFixedDelay((new Runnable() {
@Override
public void run() {
System.out.println("蹦...........");
}
}), 5, 1, TimeUnit.SECONDS);
}
scheduleWithFixedDelay或者scheduleAtFixedRate方法都是能够产生循环定时器,只是实现方式不一样
说明:ScheduledExecutorService接口实现了ExecuorService所以它一样拥有ExecutoService的功能
1.4:创建单线程的线程池
ExecutorService service = Executors.newSingleThreadExecutor();
2、部分ExecuorService接口方法说明:
invokeAll(Collection<? extends Callable<T>> tasks):事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们。返回所有任务执行完成后的future集合
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,当任务都完成或者是超时时,返回所有任务的状态future集合
invokeAny(Collection<? extends Callable<T>> tasks): 事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,每执行完成一个线程就返回一个future值
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): 事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,在指定时间内完成了一个任务就返回Future结果
3、对线程任务完成队列的合理处理类ExecutorCompletionService
利用线程池能很好的执行任务,并且得到任务的执行返回结果,当然可以自己对所有处理的结果进行处理,但是java.util.concurrent提供了一个很简单的类给我们用就是
ExecutorCompletionService 用它来执行任务,所有任务的返回结果都将放置在一个队列上,之后可以通过队列取得任务的执行结果,根据任务执行结果做相应的操作,同时也很方便的对所有的执行结果统一的处理
public static void main8() {
ExecutorService service = Executors.newCachedThreadPool();
CompletionService<Integer> cservice = new ExecutorCompletionService<Integer>(service);
Collection<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
for(int i=0; i<10; i++) {
final int squence = i;
tasks.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
for(int j=1; j<5; j++) { System.out.println(Thread.currentThread().getName() + " task " + squence + " time " + j);
}
return squence;
}
});
}
for(Callable<Integer> task:tasks) {
cservice.submit(task);
}
//统一处理任务结果
int result = 0;
for(int i=0; i<tasks.size(); i++) {
result += cservice.take().get();
}
System.out.println(result);
可知任务完成的时候,处理结果会存放在调用take()方法所获取的队列上面,方便统一处理结果
三、java.util.concurrent.locks
3.1、锁是用来防止线程并发而带来的线程安全问题,在访问线程共享数据的时候,操作它或者读取它的时候,如果不希望别的线程也同时对它就行读取,那应该给它上锁,这样只要一个线程进去了对他上锁了,别的线程就无法在进入了。有时候线程并发不是就是单一并发访问问题,并发访问同时可能也会导致数据不一致
public class LockTest {
public static void main(String[] args) {
final Service s = new Service ();
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<10; i++) {
service.execute(new Runnable() {
public void run() {
s.service();
}
});
}
}
static class Service {
private int count = 0;
Lock lock = new ReentrantLock();
public void service() {
++ count;
try {
Thread.sleep(200l);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("service :" + count + " time");
}
}
}
对与以上程序,我们希望打印的是
service : 1 time
service : 2 time
……..
Service: 10 time
实际情况确实在多线程的环境里打印的结果确实
Service:10 time
Service:10 tiem
…..
Service :10 time
所以在调用service方法操作count的时候,应该上锁,service上锁后方法如下:
public void service() {
try {
lock.lock();//加锁
++count;
Thread.sleep(200l);
System.out.println("service :" + count + " time");
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();//释放锁
}
}
当然在JAVA里面同时还可以使用synchronized关键字来防止同步问题
3.2、读写锁,在JDK5.0以前是没用线程并发库的,如果用synchronized确实能实现同步问题,但是比如对与一个类的属性,希望的效果是在读取它的时候能多线程同时读,在写它的时候不能读,在读它的时候不能写。这样如果只是单单在方法上面加synchronized关键字,它会使所有的线程互斥,不能说多个线程能同时读取对象的值,虽然自己编写代码实现,但是JAVA5之后线程并发库已经给了解决方案,那就是读写锁,如下例子:
public class LockTest {
public static void main(String[] args) {
final ReadWriteLockT s = new ReadWriteLockT();
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 4; i++) {
service.execute(new Runnable() {
public void run() {
while(true) {
s.getX();
}
}
});
service.execute(new Runnable() {
public void run() {
while(true) {
s.setX(new Random().nextInt());
}
}
});
}
}
static class ReadWriteLockT{
private int x = 0;
private ReadWriteLock rock = new ReentrantReadWriteLock();
public void setX(int x) {
rock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + ".........进入写读锁");
this.x = x;
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("写完毕");
rock.writeLock().unlock();
}
public int getX() {
rock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "----------进入读读锁:");
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(x);
System.out.println("读完毕");
rock.readLock().unlock();
return x;
}
}
}
说明:从打印的结果中可以看出,在多条线程操作的过程中,在读的过程中别的线程是无法调用写的方法的,在写的过程中也是无法调用读的方法的,只有在读的时候才能同时调用读的方法,当然在写的时候不能同时调用写的方法
3.3、condition的使用
Lock锁得机制是在synchronized关键字上面进行了一些功能的提升,那么condition是在wait(),notify(),notifyall()上面做了对应的提升。对与唤醒线程,是线程等待它能做到更好的实现,即时大部分其实它是更加以一种优雅的形式去表现wait(),notify(),notifyall()的一种实现。其实在某种意义上说,之前的wait这样的方法只是针对一个对象的监视器,然而现在对功能的扩展,就好比能分解成多个监视器在监视一个对象一般,同时每个监视器和对应的锁形成 的组合,能更加有序的对线程进行唤醒和等待。下面结合JDK API帮助文档里的例子谈谈它们之间的区别
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
说明:如上面的例子,线程不断的调用put 和 take方法向队列items里面放入数据,放蛮的情况下就无法在放,唤醒拿的线程。 在拿的时候如果为空唤醒对应的放的线程,协调工作。
在这里如果是之前的wait,notify这样的方式的话,因为程序是有多条put和多条take的线程同时并发访问,wati,notify这样的方式只能唤醒一天线程,比如take发现是空的,就得put了,但同时也有好多其它的take线程在等待锁,所以这时候也有可能锁再一次给了take线程,这样的话显然不是自己的意愿,要的效果是让put线程拿到锁,而condition就做到了这样,上面例子可看出,只要take是空的,就唤醒一条put线程,相反只要put时是满的,就唤醒一条take线程
四、java.util.concurrent.atomic类讲解
类摘要
AtomicBoolean
可以用原子方式更新的 boolean 值。
AtomicInteger
可以用原子方式更新的 int 值。
AtomicIntegerArray
可以用原子方式更新其元素的 int 数组。
AtomicIntegerFieldUpdater<T>
基于反射的实用工具,可以对指定类的指定 volatile int 字段进行原子更新。
AtomicLong
可以用原子方式更新的 long 值。
AtomicLongArray
可以用原子方式更新其元素的 long 数组。
AtomicLongFieldUpdater<T>
基于反射的实用工具,可以对指定类的指定 volatile long 字段进行原子更新。
AtomicMarkableReference<V>
AtomicMarkableReference 维护带有标记位的对象引用,可以原子方式对其进行更新。
AtomicReference<V>
可以用原子方式更新的对象引用。
AtomicReferenceArray<E>
可以用原子方式更新其元素的对象引用数组。
AtomicReferenceFieldUpdater<T,V>
基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。
AtomicStampedReference<V>
AtomicStampedReference 维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。
讲解:从上面的类中可以看出,它对基本的一些数值的操作做了线程同步的安全,有时候可能对一个数的加,多线线程同时访问加就会出现问题,甚至平时一般的i++,多个线程同事访问都会造成数据混乱。所以对多线程共享时候加一般用它来实现更加好,不过在java1.5之前可以使用volatile关键字来解决问题,然而现在有了这套类库就业可以避免每次为变量申明volatile关键字了。其实这些内部源码都也是基于volatitle的实现。也可以使用synchorzied关键字同步方法,但是用基于volatile的方式有更好的效率
但是volatile关键字其实还有线程安全问题,就是当多个线程对变量进行操作的时候还是可能会造成变量的值不准确性。 然而用这些类却解决了这个问题
用一个例子说明:
Class MyDate {
Private int volatile x;
Public void opera() {
X ++;
}
}
上面的类,当有多个线程同时执行opera的时候,虽然x已经用了volatile关键字同步,但是未能防止x的值出错,虽然出错的概率小
五java.util.concurrent.同步集合类的讲解
ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentSkipListSet
CopyOnWriteArrayList
CopyOnWriteArraySet
集合类说明:传统的集合类中,在多线程共享的环境中会出现错误,所以使用这里的安全集合类就不会出现错误了。
六java.util.concurrent.队列的讲解
所有已知子接口:
BlockingDeque<E>
所有已知实现类:
ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue
这些队列在线程并发中是安全的,也有阻塞队列,阻塞队列实际应用比较多,所以在这也给一个实现。
阻塞队列:在放的时候要有空为才能放,要不然执行等待,在拿的时候要有数据才能执行拿,要不然执行等待。
在前面例子中,用condition方式实现了类似的功能,用这里的阻塞队列将会发现更加方便。
例子实现:
package com.moom;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingDequeTest {
public static void main(String[] args) {
final BlockingQueue<String> queue = new LinkedBlockingQueue<String>(6);//创建一个只能容纳6个容量的阻塞队列
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<3; i++) {
//生产三条拿的线程
service.execute(new Runnable() {
public void run() {
try {
while(true) {//不停的拿
Thread.sleep(200l);
System.out.println(Thread.currentThread().getName() + "即将拿");
String s = queue.take();
System.out.println(Thread.currentThread().getName() + "拿到" + s);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
while(true) { //不停的放
try {
System.out.println(Thread.currentThread().getName() + "即将放");
String s = String.valueOf(new Random().nextInt(10000));
queue.put(s);
System.out.println(Thread.currentThread().getName() + "放入了" + s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
七java.util.concurrent.剩余部分类的讲解
5.1、CyclicBarrier,有些多线程需要在多条线程完成了相应的事情之后然后就继续去完成后续的事情,每次都是这样,就是说当只有一条线程完成了对应的事情时,他是无法继续向前运行的,只有等待。
例子:
package com.moom;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(5);
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
service.execute(new Runnable() {
@Override
public void run() {
// 每条线程要执行的任务
System.out.println("线程" + Thread.currentThread().getName()
+ "将执行完任务");
try {
barrier.await();// 线程执行完就等待,要五条线程都执行完才能做后续操作
// 执行后续任务
System.out.println("线程" + Thread.currentThread().getName() + "后续任务执行完毕");
barrier.await();//等待所有线程执行完后续任务
//执行最后任务,随意执行不等待,结束
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
service.shutdown();
}
}
注释:每次开启的线程数量应当和CyclicBarrier对象的线程数一样,比如上面例子是开启5,然而如果开六条线程的话会发觉程序先前5条线程正常执行下去,第六条执行完先前任务,一直阻塞等待着其它线程的到来。等于就是五条五条线程为一组。
5.2、Semaphore,对与有些多线程共享的数据,并不是说一条线程访问的时候其它线程就不能访问了,是规定只有多少条线程可以访问,好比一个房间里面只能容纳多少个人,只要没满的时候其它的线程就可以进去
例子:
package com.moom;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) {
final Semaphore se = new Semaphore(5);//创建同时可以五个线程访问控制器
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<20; i++) {
service.execute(new Runnable() {
public void run() {
try {
se.acquire();
System.out.println("线程" + Thread.currentThread().getName() + "进来了开始执行任务");
Thread.sleep(100l);
System.out.println("线程" + Thread.currentThread().getName() + "将要执行完毕");
se.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
service.shutdown();
}
}
每次最多只会有5条线程在执行,没条线程执行完毕就释放资源
5.3、CountDownLatch有的多线程应用场景,一个线程唤醒多条线程,多条线程执行完毕有去唤醒这一条线程,就这个循环的唤醒或者单独的唤醒运用用CountDownLatch就比较方便的实现
例子场景:在各类运动比赛当中,裁判线程监视这所有的参赛人线程,参赛人也会监视裁判线程,当裁判说开始参赛人线程开始,参赛人比赛完成,裁判在做回应
实现:
package com.moom;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchTest {
public static void main(String[] args) {
final CountDownLatch judje = new CountDownLatch(1);//产生一个裁判线程用
final CountDownLatch members = new CountDownLatch(5);//5个队员
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<5; i++) {
service.execute(new Runnable() {
public void run() {
try {
Thread.sleep(10l);
System.out.println(Thread.currentThread().getName() + "队员等待下命令");
judje.await();//等待裁判说开始
System.out.println(Thread.currentThread().getName() + "队员开始了");
//执行逻辑
members.countDown();//执行完成
System.out.println(Thread.currentThread().getName() + "结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
System.out.println(Thread.currentThread().getName() + "裁判即将通知开始");
try {
Thread.sleep(2000l);
System.out.println("开始");
judje.countDown();
members.await();//等待所有队员执行完
Thread.sleep(1000l);
System.out.println("裁判统计结果");
service.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5.4、Exchanger有些线程同步需要互相交换数据,比如两天线程执行完毕之后需要交换完毕的数据用exchanger的可以方便的实现
例子实现:
package com.moom;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerTest {
public static void main(String[] args) {
final Exchanger<String> ex = new Exchanger<String>();
final Exchanger<String> ex2 = new Exchanger<String>();
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(new Runnable() {
public void run() {
try {
Thread.sleep(100l);
String da1 = "aaaa";
System.out.println(Thread.currentThread().getName() + "要交换的数据是:" + da1);
String end = ex.exchange(da1);
System.out.println("正在执行交换");
System.out.println(Thread.currentThread().getName() + "得到的数据是:" + end);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
Thread.sleep(100l);
String da1 = "bbbbb";
System.out.println(Thread.currentThread().getName() + "要交换的数据是:" + da1);
String end = ex.exchange(da1);
System.out.println("正在执行交换");
System.out.println(Thread.currentThread().getName() + "得到的数据是:" + end);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
八、FAQ
6.1、为什么使用volatile更有效率
同步的代价, 主要由其覆盖范围决定, 如果可以降低同步的覆盖范围, 则可以大幅提升程序性能.而volatile的覆盖范围仅仅变量级别的. 因此它的同步代价很低.
相关推荐
多线程支持: Java内置了对多线程的支持,允许程序同时执行多个任务。这对于开发需要高并发性能的应用程序(如服务器端应用、网络应用等)非常重要。 自动内存管理(垃圾回收): Java具有自动内存管理机制,通过...
我认为要认识java线程安全,必须了解两个主要的点:java的内存模型,java的线程同步...后面我还会写java并发包的文章,详细总结如何利用java并发包编写高效安全的多线程并发程序。暂时写得比较仓促,后面会慢慢补充完善
2023年Java最新全套面试题资料,包含:Dubbo、JVM、Kafka、Linux、MongoDB、MyBatis、MySQL、Netty、Nginx、RabbitMQ消息中间件、Redis、Spring、SpringBoot、SpringCloud、SpringMVC、Tomcat、Zookeeper、并发编程...
【多线程】简述synchronized 和java.util.concurrent.locks.Lock的异同? 90 【线程】ThreadLocal的作用 90 【Spring】什么是IOC和DI?DI是如何实现的 91 【Spring】spring中的IOC(控制反转)的原理 92 【Spring】...
次年,发布 36 万字的配套详解图书《码出高效》,本书 秉持“图胜于表,表胜于言”的理念,深入浅出地将计算机基础、面向对象思想、JVM 探源、数据 结构与集合、并发与多线程、单元测试等知识客观、立体地呈现出来。...
《Linux多线程服务端编程:使用muduo C++网络库》主要讲述采用现代C++在x86-64 Linux上编写多线程TCP网络服务程序的主流常规技术,重点讲解一种适应性较强的多线程服务器的编程模型,即one loop per thread。...
我会从下图中的知识点去写这个系列,很多细节的点,可能想得不是很...并发与多线程 多线程基础知识 常见关键字 多线程锁机制 线程池知识点 常见的JUC工具类 多线程经典面试题 常用工具集 JVM问题排查工具-JMC IDEA开发
《深入理解Java虚拟机》、《并发编程的艺术》、《Java多线程核心编程艺术》、《Java8函数式编程》、《Redis设计与实现》、《RocketMQ技术内幕》、《Spring技术内幕》、《Spring源码深度解析》、《剑指Offer》、...
并发多线程部署,不管多少台服务器,多少个服务,同时发起线程进行更新、部署、启动。 提高list规则文件DIY程度,减少新增服务带来的修改代码,实现扫描list自动化安装部署。(配置、与监控属于业务范围,还需手动...
"在线签到"子系统选用目前比较流行和热门的"微服务"( Microservice)作为系统 的技术实现方案,作者在本文中将重点介绍子系统所应用的核心技术——微服务、反射、 对象序列化、多线程以及基于 TIP/IP 的 Socket ...
并发多线程部署,不管多少台服务器,多少个服务,同时发起线程进行更新、部署、启动。 提高list规则文件DIY程度,减少新增服务带来的修改代码,实现扫描list自动化安装部署。(配置、与监控属于业务范围,还需手动...
容易编码的例子 本代码为阿里的神书《代码出高效Java开发手册》的书籍原始码和相关代码示例,VSCode或Eclipse直接...目前,覆盖了最重要的第六章(数据结构与集合),第七章(并发与多线程),其他细节继承逐步完善。
2.实现了多线程编程,解决了单线程消息阻塞和不能并发的问题。 3.实现了动态的好友列表,交互感更强,有比较不错的用户体验。 4.实现了群聊与私聊并存,使功能更加的完善。 5.有比较简洁美观的用户界面,使用更加...
com.google.common.util.concurrent:多线程工具包。 类库使用手册: 一. 基本工具类:让使用Java语言更令人愉悦。 1. 使用和避免 null:null 有语言歧义, 会产生令人费解的错误, 反正他总是让人不爽。...
Java语言之所以受人推崇,是因为它确实称得上是一种新一代编程语言,具有面向对象、可移植性好、与硬件无关、系统强健安全、提供了并发机制、性能高的众多优点,并提供了分布性、多线程、动态性的支持。 Java作为一...
我的博客主要关注于java,并发,多线程,大数据,区块链,架构,人工智能等方面,一有时间就会更新。 下面分享几个我总结的规划脑图,博客的写作会参照这些脑图慢慢的完善,欢迎大家关注和star,研究的朋友也可以一...
针对系统的具体特点和系统要求,我们在进行数据库方案设计时对数据库平台提出下列 性能方面的要求: 标准化程度高,符合标准ANSI SQL 数据库语言的规范 支持Brower/SERVER 模式应用,支持对称处理和多线程技术 所...
性能测试是将系统处理能力容量测出来,而不是测试并发用户数,除了服务器长连接可能影响并发用户数外,系统处理能力不完全受并发用户数影响,可以用最小的用户数将系统处理能力容量测试出来,也可以用更多的用户将...
Java的产生与流行是当今Internet发展的客观要求,Java是一门各方面性能都很好的编程语言,它的基本特点是简单、面向对象、分布式、解释的、健壮的、安全的、结构中立的、可移植的、性能很优异的、多线程的、动态的,...