Java 并發編程
Runnable與Thread的區別
使用Runnable的好處
1、可以避免由于Java的單繼承特性而帶來的局限;
2、增強程序的健壯性,代碼能夠被多個線程共享,代碼與數據是獨立的;
3、適合多個相同程序代碼的線程區處理同一資源的情況。
class MyThread implements Runnable {
private int ticket = 5;
public void run() {
for (int i = 0; i < 10; i++) {
if (ticket > 0) {
System.out.println("ticket = " + ticket--);
}
}
}
}
class RunnableDemo {
public static void main(String[] args) {
MyThread my = new MyThread();
new Thread(my).start();
new Thread(my).start();
new Thread(my).start();
}
}</code></pre>
Volatile
在多線程,同步變量。 線程為了提高效率,將某成員變量(如A)拷貝了一份(如B),線程中對A的訪問其實訪問的是B。只在某些動作時才進行A和B的同步。因此存在A和B不一致的情況。volatile就是用來避免這種情況的。volatile告訴jvm, 它所修飾的變量不保留拷貝,直接訪問主內存中的(也就是上面說的A) 假如pleaseStop沒有被聲明為volatile,線程執行run的時候檢查的是自己的副本, 就不能及時得知其他線程已經調用tellMeToStop()修改了pleaseStop的值。 簡單的說就是synchronized的代碼塊是確保可見性和原子性的, volatile只能確保可見性, 當且僅當下面條件全部滿足時, 才能使用volatile 對變量的寫入操作不依賴于變量的當前值, (++i/i++這種肯定不行), 或者能確保只有單個線程在更新該變量不會與其他狀態變量一起納入不變性條件中訪問變量時不需要加鎖volatile關鍵字不起作用,也就是說如下的表達式都不是原子操作n = n + 1 ;n ++ ; 所以在使用volatile關鍵時一定要謹慎,如果自己沒有把握,可以使用synchronized來代替volatile。
訪問變量時不需要加鎖volatile關鍵字不起作用,使用n = n + 1的情況,若線程安全結果是100,而現在結果是小于100
class JoinThread extends Thread
{
public static volatile int n = 0 ;
public void run()
{
for ( int i = 0 ; i < 10 ; i ++ )
try
{
n = n + 1 ;
sleep( 3 ); // 為了使運行結果更隨機,延遲3毫秒
}
catch (Exception e)
{
}
}
public static void main(String[] args) throws Exception
{
Thread threads[] = new Thread[ 100 ];
for ( int i = 0 ; i < threads.length; i ++ )
// 建立100個線程
threads[i] = new JoinThread();
for ( int i = 0 ; i < threads.length; i ++ )
// 運行剛才建立的100個線程
threads[i].start();
for ( int i = 0 ; i < threads.length; i ++ )
// 100個線程都執行完后繼續
threads[i].join();
System.out.println( " n= " + JoinThread.n);
}
}
加入synchronized的代碼塊是確保可見性和原子性的,現在結果是正確的
class JoinThreadText extends Thread
{
public static int n = 0 ;
public static synchronized void inc()
{
n ++ ;
}
public void run()
{
for ( int i = 0 ; i < 10 ; i ++ )
try
{
inc(); // n = n + 1 改成了 inc();
sleep( 3 ); // 為了使運行結果更隨機,延遲3毫秒
}
catch (Exception e)
{
}
}
public static void main(String[] args) throws Exception
{
Thread threads[] = new Thread[ 100 ];
for ( int i = 0 ; i < threads.length; i ++ )
// 建立100個線程
threads[i] = new JoinThread();
for ( int i = 0 ; i < threads.length; i ++ )
// 運行剛才建立的100個線程
threads[i].start();
for ( int i = 0 ; i < threads.length; i ++ )
// 100個線程都執行完后繼續
threads[i].join();
System.out.println( " n= " + JoinThread.n);
}
}</code></pre>
Wait、notify、notifyAll
在調用wait(), notify()或notifyAll()的時候,必須先獲得鎖,且狀態變量須由該鎖保護,而固有鎖對象與固有條件隊列對象又是同一個對象。也就是說,要在某個對象上執行wait,notify,先必須鎖定該對象,而對應的狀態變量也是由該對象鎖保護的。
AtomicInteger
AtomicInteger,一個提供原子操作的Integer的類。在Java語言中,++i和i++操作并不是線程安全的,在使用的時候,不可避免的會用到synchronized關鍵字。而AtomicInteger則通過一種線程安全的加減操作接口。來看AtomicInteger提供的接口。
//獲取當前的值
public final int get()
//取當前的值,并設置新的值
public final int getAndSet(int newValue)
//獲取當前的值,并自增
public final int getAndIncrement()
//獲取當前的值,并自減
public final int getAndDecrement()
//獲取當前的值,并加上預期的值
public final int getAndAdd(int delta)
時間比較
public class AtomicIntegerCompareTest {
private int value;
public AtomicIntegerCompareTest(int value) {
this.value = value;
}
public synchronized int increase() {
return value++;
}
public static void main(String args[]) {
long start = System.currentTimeMillis();
AtomicIntegerCompareTest test = new AtomicIntegerCompareTest(0);
for (int i = 0; i < 1000000; i++) {
test.increase();
}
long end = System.currentTimeMillis();
System.out.println("time elapse:" + (end - start));
long start1 = System.currentTimeMillis();
AtomicInteger atomic = new AtomicInteger(0);
for (int i = 0; i < 1000000; i++) {
atomic.incrementAndGet();
}
long end1 = System.currentTimeMillis();
System.out.println("time elapse:" + (end1 - start1));
}
}</code></pre>
BlockingQueue阻塞隊列
作為BlockingQueue的使用者,我們再也不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切BlockingQueue都給你一手包辦了。offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,
基于鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,并緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時 (LinkedBlockingQueue可以通過構造函數指定該值), 才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對于消費者這端的處理也基于同樣的原理。
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> bqueue = new ArrayBlockingQueue<String>(20);
for (int i = 0; i < 30; i++) {
//將指定元素添加到此隊列中
bqueue.put("加入元素" + i);
System.out.println("向阻塞隊列中添加了元素:" + i);
}
System.out.println("程序到此運行結束,即將退出----");
}
}
從執行結果中可以看出,由于隊列中元素的數量限制在了20個,因此添加20個元素后,其他元素便在隊列外阻塞等待,程序并沒有終止。
class BlockingQueueTest1 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> bqueue = new ArrayBlockingQueue<String>(20);
for (int i = 0; i < 30; i++) {
//將指定元素添加到此隊列中
bqueue.put("" + i);
System.out.println("向阻塞隊列中添加了元素:" + i);
if (i > 18) {
//從隊列中獲取隊頭元素,并將其移出隊列
System.out.println("從阻塞隊列中移除元素:" + bqueue.take());
}
}
System.out.println("程序到此運行結束,即將退出----");
}
}
從結果中可以看出,當添加了第20個元素后,我們從隊首移出一個元素,這樣便可以繼續向隊列中添加元素, 之后每添加一個元素,便從將隊首元素移除,這樣程序便可以執行結束。</code></pre>
FutureTask
FutureTask可以返回執行完畢的數據,并且FutureTask的get方法支持阻塞這兩個特性, 我們可以用來預先加載一些可能用到資源,然后要用的時候,調用get方法獲取(如果資源加載完,直接返回;否則繼續等待其加載完成)。
FutureTask則是一個RunnableFuture<V>,即實現了Runnbale又實現了Futrue<V>這兩個接口, 另外它還可以包裝Runnable(實際上會轉換為Callable)和Callable ,所以一般來講是一個符合體了,它可以通過Thread包裝來直接執行,也可以提交給ExecuteService來執行 ,并且還可以通過v get()返回執行結果,在線程體沒有執行完成的時候,主線程一直阻塞等待,執行完則直接返回結果。
其中Runnable實現的是void run()方法,無返回值;Callable實現的是 call()方法,并且可以返回執行結果。其中Runnable可以提交給Thread來包裝下 ,直接啟動一個線程來執行,而Callable則一般都是提交給ExecuteService來執行。
public class RunnableFutureTask {
/**
* ExecutorService
*/
static ExecutorService mExecutor = Executors.newSingleThreadExecutor();
/**
*
* @param args
*/
public static void main(String[] args) {
runnableDemo();
futureDemo();
}
/**
* runnable, 無返回值
*/
static void runnableDemo() {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("runnable demo : " + fibc(20));
}
}).start();
}
static void futureDemo() {
try {
/**
* 提交runnable則沒有返回值, future沒有數據
*/
Future<?> result = mExecutor.submit(new Runnable() {
@Override
public void run() {
fibc(20);
}
});
System.out.println("future result from runnable : " + result.get());
/**
* 提交Callable, 有返回值, future中能夠獲取返回值
*/
Future<Integer> result2 = mExecutor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return fibc(20);
}
});
System.out
.println("future result from callable : " + result2.get());
FutureTask<Integer> futureTask = new FutureTask<Integer>(
new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return fibc(20);
}
});
// 提交futureTask
mExecutor.submit(futureTask) ;
System.out.println("future result from futureTask : "
+ futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
/**
* 效率底下的斐波那契數列, 耗時的操作
*
* @param num
* @return
*/
static int fibc(int num) {
if (num == 0) {
return 0;
}
if (num == 1) {
return 1;
}
return fibc(num - 1) + fibc(num - 2);
}
}</code></pre>
Execute、ExecuteService
Executor接口中之定義了一個方法execute(Runnable command),該方法接收一個Runable實例,它用來執行一個任務,即一個實現了Runnable接口的類。ExecutorService接口繼承自Executor接口,它提供了更豐富的實現多線程的方法,比如,ExecutorService提供了關閉自己的方法,以及可為跟蹤一個或多個異步任務執行狀況而生成 Future 的方法。 可 以調用ExecutorService的shutdown()方法來平滑地關閉 ExecutorService,調用該方法后,將導致ExecutorService 停止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的), 當所有已經提交的任務執行完畢后將會關閉ExecutorService。因此我們一般用該接口來實現和管理多線程。
public class ExecutorDemo {
private static final int MAX = 10;
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
// fixedThredPool(3);
newCachedThreadPool();
}
private static void fixedThredPool(int size) {
ExecutorService executorService = Executors.newFixedThreadPool(size);
for (int i = 0; i < MAX; i++) {
Future<Integer> task = executorService.submit(new Callable<Integer>() {
public Integer call() throws Exception {
// TODO Auto-generated method stub
System.out.println("執行線程:" + Thread.currentThread().getName());
return fibc(20);
}
});
try {
System.out.println("第" + i + "次計算結果" + task.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private static void newCachedThreadPool() {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < MAX; i++) {
executorService.submit(new Runnable() {
public void run() {
// TODO Auto-generated method stub
System.out.println("執行線程:" + Thread.currentThread().getName() + ",結果" + fibc(20));
}
});
}
}
private static int fibc(int num) {
// TODO Auto-generated method stub
if (num == 0) {
return 0;
}
if (num == 1) {
return 1;
}
return fibc(num - 1) + fibc(num - 2);
}
}</code></pre>
Wait與Sleep的區別
sleep()睡眠時,保持對象鎖,仍然占有該鎖;而wait()睡眠時,釋放對象鎖。但是wait()和sleep()都可以通過interrupt()方法打斷線程的暫停狀態, 從而使線程立刻拋出InterruptedException(但不建議使用該方法)。
public class ThreadTest implements Runnable {
int number = 10;
public void firstMethod() throws Exception {
synchronized (this) {
number += 100;
System.out.println(number);
}
}
public void secondMethod() throws Exception {
synchronized (this) {
/**
* (休息2S,阻塞線程)
* 以驗證當前線程對象的機鎖被占用時,
* 是否被可以訪問其他同步代碼塊
*/
// Thread.sleep(2000);
this.wait(2000);
number *= 200;
}
}
@Override
public void run() {
try {
firstMethod();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
ThreadTest threadTest = new ThreadTest();
Thread thread = new Thread(threadTest);
thread.start();
threadTest.secondMethod();
}
}</code></pre>
ReadWiteLock
下面要介紹的是讀寫鎖(ReadWriteLock), 我們會有一種需求,在對數據進行讀寫的時候,為了保證數據的一致性和完整性, 需要讀和寫是互斥的,寫和寫是互斥的,但是讀和讀是不需要互斥的,這樣讀和讀不互斥性能更高些
public class ReadWriteLockTest {
public static void main(String[] args) {
final Data data = new Data();
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
public void run() {
for (int j = 0; j < 5; j++) {
data.set(new Random().nextInt(30));
}
}
}).start();
}
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
public void run() {
for (int j = 0; j < 5; j++) {
data.get();
}
}
}).start();
}
}
}
class Data {
private int data;// 共享數據
private ReadWriteLock rwl = new ReentrantReadWriteLock();
public void set(int data) {
rwl.writeLock().lock();// 取到寫鎖
try {
System.out.println(Thread.currentThread().getName() + "準備寫入數據");
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.data = data;
System.out.println(Thread.currentThread().getName() + "寫入" + this.data);
} finally {
rwl.writeLock().unlock();// 釋放寫鎖
}
}
public void get() {
rwl.readLock().lock();// 取到讀鎖
try {
System.out.println(Thread.currentThread().getName() + "準備讀取數據");
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "讀取" + this.data);
} finally {
rwl.readLock().unlock();// 釋放讀鎖
}
}
}</code></pre>
ReentrantLock
可重入鎖最大的作用是避免死鎖
ReentrantLock 類實現了 Lock ,它擁有與 synchronized 相同的并發性和內存語義,但是添加了類似鎖投票、定時鎖等候和可中斷鎖等候的一些特性
public class ReentrantloackTest implements Runnable {
ReentrantLock lock = new ReentrantLock();
public void get() {
lock.lock();
System.out.println(Thread.currentThread().getId());
set();
lock.unlock();
}
private void set() {
lock.lock();
System.out.println(Thread.currentThread().getId());
lock.unlock();
}
@Override
public void run() {
get();
}
public static void main(String... args) {
ReentrantloackTest reentrantlock = new ReentrantloackTest();
new Thread(reentrantlock).start();
new Thread(reentrantlock).start();
new Thread(reentrantlock).start();
new Thread(reentrantlock).start();
}
}</code></pre>
synchronized
synchronized(this)以及非static的synchronized方法(至于stati csynchronized方法請往下看), 只能防止多個線程同時執行同一個對象的同步代碼段。
用synchronized(Sync.class)實現了全局鎖的效果。synchronized鎖住的是代碼還是對象。答案是:synchronized鎖住的是括號里的對象,而不是代碼。對于非static的synchronized方法,鎖的就是對象本身也就是this。最后說說static synchronized方法,static方法可以直接類名加方法名調用,方法中無法使用this,所以它鎖的不是this,
而是類的Class對象,所以,static synchronized方法也相當于全局鎖,相當于鎖住了代碼段。
鎖作為并發共享數據,
保證一致性的工具,在JAVA平臺有多種實現(如 synchronized 和 ReentrantLock等等是廣義上的可重入鎖, 而不是單指JAVA下的ReentrantLock。 可重入鎖,也叫做遞歸鎖,指的是同一線程 外層函數獲得鎖之后 ,內層遞歸函數仍然有獲取該鎖的代碼,但不受影響。 在JAVA環境下 ReentrantLock 和synchronized 都是 可重入鎖
class Sync {
public void test() {
synchronized (Sync.class) {
System.out.println("test開始..");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("test結束..");
}
}
}
class MyThread extends Thread {
public void run() {
Sync sync = new Sync();
sync.test();
}
}
class Main {
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
Thread thread = new MyThread();
thread.start();
}
}
}</code></pre>
Callable
當將一個Callable的對象傳遞給ExecutorService的submit方法,則該call方法自動在一個線程上執行,并且會返回執行結果Future對象。同樣,將Runnable的對象傳遞給ExecutorService的submit方法, 則該run方法自動在一個線程上執行,并且會返回執行結果Future對象,但是在該Future對象上調用get方法,將返回null。
public class CallableDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<String>> resultList = new ArrayList<Future<String>>();
//創建10個任務并執行
for (int i = 0; i < 10; i++) {
//使用ExecutorService執行Callable類型的任務,并將結果保存在future變量中
Future<String> future = executorService.submit(new TaskWithResult(i));
//將任務執行結果存儲到List中
resultList.add(future);
}
//遍歷任務的結果
for (Future<String> fs : resultList) {
try {
// while (!fs.isDone) ;//Future返回如果沒有完成,則一直循環等待,直到Future返回完成
System.out.println(fs.get()); //打印各個線程(任務)執行的結果
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
//啟動一次順序關閉,執行以前提交的任務,但不接受新任務
executorService.shutdown();
}
}
}
}
class TaskWithResult implements Callable<String> {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
/**
* 任務的具體過程,一旦任務傳給ExecutorService的submit方法,
* 則該方法自動在一個線程上執行
*/
public String call() throws Exception {
System.out.println("call()方法被自動調用!!! " + Thread.currentThread().getName());
//該返回結果將被Future的get方法得到
return "call()方法被自動調用,任務返回的結果是:" + id + " " + Thread.currentThread().getName();
}
}</code></pre>
CyclicBarrier
CyclicBarrier(又叫障礙器)同樣是Java 5中加入的新特性,使用時需要導入java.util.concurrent.CylicBarrier。它適用于這樣一種情況:你希望創建一組任務,它們并發地執行工作,另外的一個任務在這一組任務并發執行結束前一直阻塞等待, 直到該組任務全部執行結束,這個任務才得以執行。這非常像CountDownLatch,只是CountDownLatch是只觸發一次的事件,而CyclicBarrier可以多次重用。
public class CyclicBarrierTest {
public static void main(String[] args) {
//創建CyclicBarrier對象,
//并設置執行完一組5個線程的并發任務后,再執行MainTask任務
CyclicBarrier cb = new CyclicBarrier(5, new MainTask());
new SubTask("A", cb).start();
new SubTask("B", cb).start();
new SubTask("C", cb).start();
new SubTask("D", cb).start();
new SubTask("E", cb).start();
}
}
/**
- 最后執行的任務
*/
class MainTask implements Runnable {
public void run() {
System.out.println("......終于要執行最后的任務了......");
}
}
/**
一組并發任務
*/
class SubTask extends Thread {
private String name;
private CyclicBarrier cb;
SubTask(String name, CyclicBarrier cb) {
this.name = name;
this.cb = cb;
}
public void run() {
System.out.println("[并發任務" + name + "] 開始執行");
for (int i = 0; i < 999999; i++) ; //模擬耗時的任務
System.out.println("[并發任務" + name + "] 開始執行完畢,通知障礙器");
try {
//每執行完一項任務就通知障礙器
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}</code></pre>
Smeaphore
Java并發包中的信號量Semaphore實際上是一個功能完畢的計數信號量,從概念上講,它維護了一個許可集合, 對控制一定資源的消費與回收有著很重要的意義。Semaphore可以控制某個資源被同時訪問的任務數, 它通過acquire()獲取一個許可,release()釋放一個許可。如果被同時訪問的任務數已滿,則其他acquire的任務進入等待狀態,直到有一個任務被release掉,它才能得到許可。 可以看出,Semaphore允許并發訪問的任務數一直為5,當然,這里還很容易看出一點, 就是Semaphore僅僅是對資源的并發訪問的任務數進行監控,而不會保證線程安全,因此,在訪問的時候,要自己控制線程的安全訪問。
public class SemaphoreTest {
public static void main(String[] args) {
//采用新特性來啟動和管理線程——內部使用線程池
ExecutorService exec = Executors.newCachedThreadPool();
//只允許5個線程同時訪問
final Semaphore semp = new Semaphore(5);
//模擬10個客戶端訪問
for (int index = 0; index < 10; index++) {
final int num = index;
Runnable run = new Runnable() {
public void run() {
try {
//獲取許可
semp.acquire();
System.out.println("線程" +
Thread.currentThread().getName() + "獲得許可:" + num);
//模擬耗時的任務
for (int i = 0; i < 999999; i++) ;
//釋放許可
semp.release();
System.out.println("線程" +
Thread.currentThread().getName() + "釋放許可:" + num);
System.out.println("當前允許進入的任務個數:" +
semp.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
exec.execute(run);
}
//關閉線程池
exec.shutdown();
}
}
來自:http://www.jianshu.com/p/c57a29e036ab