`

JAVA多线程并发完善

阅读更多
一:java并发的开篇
1、在实际应用中,经常用到线程的并发,那为什么需要用到并发呢,不能独自单独的程序处理吗,那很明确的说,多条线程做完成一件事情和一条线程去完成一件事情,那是无法同言而语的。比如在实际的网站访问过程中,世界各地的人都去同事访问一个网站,在服务器端,如果收到一个请求就对它先处理,而其他用户的请求到达只是对 他们先存储,然后在对他们进行处理,可想而知这样的速度会有多慢,如果一天之类用千万级的用户访问,那样的速度无法想象。在现实生活中这样的例子随处可见,做 一件事情,一个人去做和多个人同事做可想这样的速度是多么快,只是在做的过程当中对与大家共用的东西应当控制并发访问问题。
2、同步:当线程并发的时候,随之而来的也带来了一些问题,如果多条线程去同时操作共享而用的数据那将是会使共享的数据很容易就出现错误。在JAVA中提供同防止多条线程同时共享数据的方式是:synchronized,volatile很好的利用这两个关键字就能防止并发而带来的问题,同时在JAVA1.5之后也提供了更多好用的类来解决这个问题
3、线程池:当有许多请求需要去处理的时候,如果只是单独的一个人去处理,可想而知那会让后面在排队的人等多久,这样就需要线程池,有请求过来了就到线程池里 面取出一条线程去处理它,处理完成就把它收回到线程池里面,然而自己实现 一个功能强大的线程池也并非易事,在java1.5之后专门提供了线程池的类库   
       
二、java.util.concurrent包中
1、java.util.concurrent.Executors类
    通过这个类可以创建不同的线程池
    1.1、创建固定大小的线程池
     ExecutorService service = Executors.newFixedThreadPool(3);//创建一个大小为3的线程池
     //通过线程池service,可以给线程池进行提交任务等操作
     ExecutorService的方法
       *提交任务
       四个方法:
       Submit(Callable<T> task);
       submit(Runnable task)
      submit(Runnable task, T result)
execute()
说明:前三个方法都是带有返回值的执行任务,最后一个方法只是单纯的执行任务,方法的返回值是Future类型的,它是支持泛型的,返回值要和泛型的类型一致
Future<String> future = service.submit(new Callable<String>() {
    Public String call() throws Exception() {
   Return “test”;
}
});
//得到返回值
String str = future.get();
当使用submit(Runnable task)的时候,里面执行的是run方法是没有返回值的,
得到的future.get()得到的是Null值,表示线程正常执行完成

线程执行完毕关闭线程池里拥有的线程:
两个方法:
Shutdown():调用的时候会先完成当前正在执行和已经提交的任务
shutdownNow():结束当前正在执行的任务和等待的任务也不会执行,返回待执行
任务列表
   1.2、创建可变大小线程池
      ExecutorService service = Executors. newCachedThreadPool();
      这种方式返回的是一个可变的线程池,只要有新的任务来到,如果没有可用的线程存在就新建一个线程
   1.3、创建一个定时器样式的线程池:在单位时间内执行任务
        1.3.1、定时器:
        public static void main5() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
System.out.println("5秒钟后将发生爆炸");
service.schedule(new Callable<String>() {

@Override
public String call() throws Exception {
System.out.println("蹦............");
return "success";
}
},
5,
TimeUnit.SECONDS);
}
      1.3.2:每五秒钟发生一次爆炸
public static void main7() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
System.out.println("5秒钟后将发生连环爆炸");
service.scheduleWithFixedDelay((new Runnable() {

@Override
public void run() {
System.out.println("蹦...........");
}
}), 5, 1,  TimeUnit.SECONDS);
}

scheduleWithFixedDelay或者scheduleAtFixedRate方法都是能够产生循环定时器,只是实现方式不一样
说明:ScheduledExecutorService接口实现了ExecuorService所以它一样拥有ExecutoService的功能
1.4:创建单线程的线程池
ExecutorService service = Executors.newSingleThreadExecutor();

2、部分ExecuorService接口方法说明:
   invokeAll(Collection<? extends Callable<T>> tasks):事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们。返回所有任务执行完成后的future集合
   invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,当任务都完成或者是超时时,返回所有任务的状态future集合
invokeAny(Collection<? extends Callable<T>> tasks): 事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,每执行完成一个线程就返回一个future值
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): 事先把所一个集合的任务放到一个集合类里面,然后在用线程池去执行它们,在指定时间内完成了一个任务就返回Future结果
3、对线程任务完成队列的合理处理类ExecutorCompletionService
利用线程池能很好的执行任务,并且得到任务的执行返回结果,当然可以自己对所有处理的结果进行处理,但是java.util.concurrent提供了一个很简单的类给我们用就是
ExecutorCompletionService 用它来执行任务,所有任务的返回结果都将放置在一个队列上,之后可以通过队列取得任务的执行结果,根据任务执行结果做相应的操作,同时也很方便的对所有的执行结果统一的处理
public static void main8() {
ExecutorService service = Executors.newCachedThreadPool();
CompletionService<Integer> cservice = new ExecutorCompletionService<Integer>(service);
Collection<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
for(int i=0; i<10; i++) {
final int squence = i;
tasks.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
for(int j=1; j<5; j++) { System.out.println(Thread.currentThread().getName() + " task " + squence + " time " + j);
}
return squence;
}
});
}
for(Callable<Integer> task:tasks) {
cservice.submit(task);
}
//统一处理任务结果
int result = 0;
for(int i=0; i<tasks.size(); i++) {
result += cservice.take().get();
}
System.out.println(result);
可知任务完成的时候,处理结果会存放在调用take()方法所获取的队列上面,方便统一处理结果
三、java.util.concurrent.locks
3.1、锁是用来防止线程并发而带来的线程安全问题,在访问线程共享数据的时候,操作它或者读取它的时候,如果不希望别的线程也同时对它就行读取,那应该给它上锁,这样只要一个线程进去了对他上锁了,别的线程就无法在进入了。有时候线程并发不是就是单一并发访问问题,并发访问同时可能也会导致数据不一致
public class LockTest {
public static void main(String[] args) {
final Service s = new Service ();
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<10; i++) {
service.execute(new Runnable() {
public void run() {
s.service();
}
});
}
}
static class Service {
private int count = 0;
Lock lock = new ReentrantLock();
public void service() {
++ count;
try {
Thread.sleep(200l);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("service :" + count + " time");
}
}
}
对与以上程序,我们希望打印的是
service : 1 time
service : 2 time
……..
Service: 10 time
实际情况确实在多线程的环境里打印的结果确实
Service:10 time
Service:10 tiem
…..
Service :10 time
所以在调用service方法操作count的时候,应该上锁,service上锁后方法如下:

public void service() {
try {
lock.lock();//加锁
++count;
Thread.sleep(200l);
System.out.println("service :" + count + " time");
} catch (Exception e) {
e.printStackTrace();
}finally {
lock.unlock();//释放锁
}
}

当然在JAVA里面同时还可以使用synchronized关键字来防止同步问题
3.2、读写锁,在JDK5.0以前是没用线程并发库的,如果用synchronized确实能实现同步问题,但是比如对与一个类的属性,希望的效果是在读取它的时候能多线程同时读,在写它的时候不能读,在读它的时候不能写。这样如果只是单单在方法上面加synchronized关键字,它会使所有的线程互斥,不能说多个线程能同时读取对象的值,虽然自己编写代码实现,但是JAVA5之后线程并发库已经给了解决方案,那就是读写锁,如下例子:
public class LockTest {
public static void main(String[] args) {
final ReadWriteLockT s = new ReadWriteLockT();
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 4; i++) {
service.execute(new Runnable() {
public void run() {
while(true) {
s.getX();
}
}
});
service.execute(new Runnable() {
public void run() {
while(true) {
s.setX(new Random().nextInt());
}
}
});
}
}

static class ReadWriteLockT{
private int x = 0;

private ReadWriteLock rock = new ReentrantReadWriteLock();

public void setX(int x) {
rock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + ".........进入写读锁");
this.x = x;
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("写完毕");
rock.writeLock().unlock();
}

public int getX() {
rock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "----------进入读读锁:");
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(x);
System.out.println("读完毕");
rock.readLock().unlock();
return x;
}
}
}
说明:从打印的结果中可以看出,在多条线程操作的过程中,在读的过程中别的线程是无法调用写的方法的,在写的过程中也是无法调用读的方法的,只有在读的时候才能同时调用读的方法,当然在写的时候不能同时调用写的方法
3.3、condition的使用
Lock锁得机制是在synchronized关键字上面进行了一些功能的提升,那么condition是在wait(),notify(),notifyall()上面做了对应的提升。对与唤醒线程,是线程等待它能做到更好的实现,即时大部分其实它是更加以一种优雅的形式去表现wait(),notify(),notifyall()的一种实现。其实在某种意义上说,之前的wait这样的方法只是针对一个对象的监视器,然而现在对功能的扩展,就好比能分解成多个监视器在监视一个对象一般,同时每个监视器和对应的锁形成 的组合,能更加有序的对线程进行唤醒和等待。下面结合JDK API帮助文档里的例子谈谈它们之间的区别
class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition();
   final Condition notEmpty = lock.newCondition();
   final Object[] items = new Object[100];
   int putptr, takeptr, count;
   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }
}

说明:如上面的例子,线程不断的调用put 和 take方法向队列items里面放入数据,放蛮的情况下就无法在放,唤醒拿的线程。 在拿的时候如果为空唤醒对应的放的线程,协调工作。

在这里如果是之前的wait,notify这样的方式的话,因为程序是有多条put和多条take的线程同时并发访问,wati,notify这样的方式只能唤醒一天线程,比如take发现是空的,就得put了,但同时也有好多其它的take线程在等待锁,所以这时候也有可能锁再一次给了take线程,这样的话显然不是自己的意愿,要的效果是让put线程拿到锁,而condition就做到了这样,上面例子可看出,只要take是空的,就唤醒一条put线程,相反只要put时是满的,就唤醒一条take线程
四、java.util.concurrent.atomic类讲解
类摘要
AtomicBoolean
可以用原子方式更新的 boolean 值。
AtomicInteger
可以用原子方式更新的 int 值。
AtomicIntegerArray
可以用原子方式更新其元素的 int 数组。
AtomicIntegerFieldUpdater<T>
基于反射的实用工具,可以对指定类的指定 volatile int 字段进行原子更新。
AtomicLong
可以用原子方式更新的 long 值。
AtomicLongArray
可以用原子方式更新其元素的 long 数组。
AtomicLongFieldUpdater<T>
基于反射的实用工具,可以对指定类的指定 volatile long 字段进行原子更新。
AtomicMarkableReference<V>
AtomicMarkableReference 维护带有标记位的对象引用,可以原子方式对其进行更新。
AtomicReference<V>
可以用原子方式更新的对象引用。
AtomicReferenceArray<E>
可以用原子方式更新其元素的对象引用数组。
AtomicReferenceFieldUpdater<T,V>
基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。
AtomicStampedReference<V>
AtomicStampedReference 维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。
讲解:从上面的类中可以看出,它对基本的一些数值的操作做了线程同步的安全,有时候可能对一个数的加,多线线程同时访问加就会出现问题,甚至平时一般的i++,多个线程同事访问都会造成数据混乱。所以对多线程共享时候加一般用它来实现更加好,不过在java1.5之前可以使用volatile关键字来解决问题,然而现在有了这套类库就业可以避免每次为变量申明volatile关键字了。其实这些内部源码都也是基于volatitle的实现。也可以使用synchorzied关键字同步方法,但是用基于volatile的方式有更好的效率

但是volatile关键字其实还有线程安全问题,就是当多个线程对变量进行操作的时候还是可能会造成变量的值不准确性。 然而用这些类却解决了这个问题

用一个例子说明:

Class  MyDate {
   Private int volatile x;
   Public void opera() {
      X ++;
}
}
上面的类,当有多个线程同时执行opera的时候,虽然x已经用了volatile关键字同步,但是未能防止x的值出错,虽然出错的概率小
五java.util.concurrent.同步集合类的讲解
ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentSkipListSet
CopyOnWriteArrayList
CopyOnWriteArraySet

集合类说明:传统的集合类中,在多线程共享的环境中会出现错误,所以使用这里的安全集合类就不会出现错误了。
六java.util.concurrent.队列的讲解
所有已知子接口:
BlockingDeque<E>
所有已知实现类:
ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue
这些队列在线程并发中是安全的,也有阻塞队列,阻塞队列实际应用比较多,所以在这也给一个实现。
阻塞队列:在放的时候要有空为才能放,要不然执行等待,在拿的时候要有数据才能执行拿,要不然执行等待。
在前面例子中,用condition方式实现了类似的功能,用这里的阻塞队列将会发现更加方便。
例子实现:
package com.moom;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingDequeTest {
public static void main(String[] args) {
final BlockingQueue<String> queue = new LinkedBlockingQueue<String>(6);//创建一个只能容纳6个容量的阻塞队列
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<3; i++) {
//生产三条拿的线程
service.execute(new Runnable() {
public void run() {
try {
while(true) {//不停的拿
Thread.sleep(200l);
System.out.println(Thread.currentThread().getName() + "即将拿");
String s = queue.take();
System.out.println(Thread.currentThread().getName() + "拿到" + s);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
while(true) { //不停的放
try {
System.out.println(Thread.currentThread().getName() + "即将放");
String s = String.valueOf(new Random().nextInt(10000));
queue.put(s);
System.out.println(Thread.currentThread().getName() + "放入了" + s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
七java.util.concurrent.剩余部分类的讲解
5.1、CyclicBarrier,有些多线程需要在多条线程完成了相应的事情之后然后就继续去完成后续的事情,每次都是这样,就是说当只有一条线程完成了对应的事情时,他是无法继续向前运行的,只有等待。
例子:
package com.moom;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(5);
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
service.execute(new Runnable() {
@Override
public void run() {
// 每条线程要执行的任务
System.out.println("线程" + Thread.currentThread().getName()
+ "将执行完任务");
try {
barrier.await();// 线程执行完就等待,要五条线程都执行完才能做后续操作
// 执行后续任务
System.out.println("线程" + Thread.currentThread().getName() + "后续任务执行完毕");
barrier.await();//等待所有线程执行完后续任务

//执行最后任务,随意执行不等待,结束
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
service.shutdown();
}
}
注释:每次开启的线程数量应当和CyclicBarrier对象的线程数一样,比如上面例子是开启5,然而如果开六条线程的话会发觉程序先前5条线程正常执行下去,第六条执行完先前任务,一直阻塞等待着其它线程的到来。等于就是五条五条线程为一组。

5.2、Semaphore,对与有些多线程共享的数据,并不是说一条线程访问的时候其它线程就不能访问了,是规定只有多少条线程可以访问,好比一个房间里面只能容纳多少个人,只要没满的时候其它的线程就可以进去
例子:
package com.moom;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreTest {
public static void main(String[] args) {
final Semaphore se = new Semaphore(5);//创建同时可以五个线程访问控制器
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<20; i++) {
service.execute(new Runnable() {
public void run() {
try {
se.acquire();
System.out.println("线程" + Thread.currentThread().getName() + "进来了开始执行任务");
Thread.sleep(100l);
System.out.println("线程" + Thread.currentThread().getName() + "将要执行完毕");
se.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}

service.shutdown();

}
}
每次最多只会有5条线程在执行,没条线程执行完毕就释放资源
5.3、CountDownLatch有的多线程应用场景,一个线程唤醒多条线程,多条线程执行完毕有去唤醒这一条线程,就这个循环的唤醒或者单独的唤醒运用用CountDownLatch就比较方便的实现
例子场景:在各类运动比赛当中,裁判线程监视这所有的参赛人线程,参赛人也会监视裁判线程,当裁判说开始参赛人线程开始,参赛人比赛完成,裁判在做回应
实现:
package com.moom;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchTest {
public static void main(String[] args) {
    final CountDownLatch judje = new CountDownLatch(1);//产生一个裁判线程用
final CountDownLatch members = new CountDownLatch(5);//5个队员
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<5; i++) {
service.execute(new Runnable() {
public void run() {
try {
Thread.sleep(10l);
System.out.println(Thread.currentThread().getName() + "队员等待下命令");
judje.await();//等待裁判说开始
System.out.println(Thread.currentThread().getName() + "队员开始了");
//执行逻辑
members.countDown();//执行完成
System.out.println(Thread.currentThread().getName() + "结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
System.out.println(Thread.currentThread().getName() + "裁判即将通知开始");
try {
Thread.sleep(2000l);
System.out.println("开始");
judje.countDown();
members.await();//等待所有队员执行完
Thread.sleep(1000l);
System.out.println("裁判统计结果");
service.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5.4、Exchanger有些线程同步需要互相交换数据,比如两天线程执行完毕之后需要交换完毕的数据用exchanger的可以方便的实现
例子实现:
package com.moom;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerTest {
public static void main(String[] args) {
final Exchanger<String> ex = new Exchanger<String>();
final Exchanger<String> ex2 = new Exchanger<String>();
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(new Runnable() {
public void run() {
try {
Thread.sleep(100l);
String da1 = "aaaa";
System.out.println(Thread.currentThread().getName() + "要交换的数据是:" + da1);
String end = ex.exchange(da1);
System.out.println("正在执行交换");
System.out.println(Thread.currentThread().getName() + "得到的数据是:" + end);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
Thread.sleep(100l);
String da1 = "bbbbb";
System.out.println(Thread.currentThread().getName() + "要交换的数据是:" + da1);
String end = ex.exchange(da1);
System.out.println("正在执行交换");
System.out.println(Thread.currentThread().getName() + "得到的数据是:" + end);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
八、FAQ
  6.1、为什么使用volatile更有效率
同步的代价, 主要由其覆盖范围决定, 如果可以降低同步的覆盖范围, 则可以大幅提升程序性能.而volatile的覆盖范围仅仅变量级别的. 因此它的同步代价很低.
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics