java并發編程實戰一之基礎篇

mitunovnume187 8年前發布 | 11K 次閱讀 并發 Java Java開發

緩存一致性問題

計算機在執行程序時,每條指令都是在CPU中執行的,而執行指令過程中,勢必涉及到數據的讀取和寫入。由于程序運行過程中的臨時數據是存放在主存(物理內存)當中的,這時就存在一個問題,由于CPU執行速度很快,而從內存讀取數據和向內存寫入數據的過程跟CPU執行指令的速度比起來要慢的多,因此如果任何時候對數據的操作都要通過和內存的交互來進行,會大大降低指令執行的速度。因此在CPU里面就有了高速緩存。

也就是,當程序在運行過程中,會將運算需要的數據從主存復制一份到CPU的高速緩存當中,那么CPU進行計算時就可以直接從它的高速緩存讀取數據和向其中寫入數據,當運算結束之后,再將高速緩存中的數據刷新到主存當中。

在多核CPU中,每條線程可能運行于不同的CPU中,因此每個線程運行時有自己的高速緩存。被多個線程訪問的共享變量在多個CPU中都存在緩存,這里那么就可能存在緩存不一致的問題

所以就出現了緩存一致性協議。最出名的就是Intel 的MESI協議,MESI協議保證了每個緩存中使用的共享變量的副本是一致的。它核心的思想是:當CPU寫數據時,如果發現操作的變量是共享變量,即在其他CPU中也存在該變量的副本,會發出信號通知其他CPU將該變量的緩存行置為無效狀態,因此當其他CPU需要讀取這個變量時,發現自己緩存中緩存該變量的緩存行是無效的,那么它就會從內存重新讀取。

線程安全性

  • 原子性

    即一個操作或者多個操作 要么全部執行并且執行的過程不會被任何因素打斷,要么就都不執行

    思考:?int long double讀寫操作的原子性

    思考:?int i++的原子性

  • 可見性

    可見性是指當多個線程訪問同一個變量時,一個線程修改了這個變量的值,其他線程能夠立即看得到修改的值

    與緩存相關,某線程改變了數據,其他線程沒有立即看到修改后的值

  • 有序性

    即程序執行的順序按照代碼的先后順序執行

    與指令重排序有關。一般來說,處理器為了提高程序運行效率,可能會對輸入代碼進行優化,它不保證程序中各個語句的執行先后順序同代碼中的順序一致,但是它會保證程序最終執行結果和代碼順序執行的結果是一致的。

    Java內存模型具備一些先天的“有序性”,即不需要通過任何手段就能夠得到保證的有序性,這個通常也稱為 happens-before 原則。如果兩個操作的執行次序無法從happens-before原則推導出來,那么它們就不能保證它們的有序性,虛擬機可以隨意地對它們進行重排序。

    • 程序次序規則:一個線程內,按照代碼順序,書寫在前面的操作先行發生于書寫在后面的操作
    • 鎖定規則:一個unLock操作先行發生于后面對同一個鎖額lock操作
    • volatile變量規則:對一個變量的寫操作先行發生于后面對這個變量的讀操作
    • 傳遞規則:如果操作A先行發生于操作B,而操作B又先行發生于操作C,則可以得出操作A先行發生于操作C
    • 線程啟動規則:Thread對象的start()方法先行發生于此線程的每個一個動作
    • 線程中斷規則:對線程interrupt()方法的調用先行發生于被中斷線程的代碼檢測到中斷事件的發生
    • 線程終結規則:線程中所有的操作都先行發生于線程的終止檢測,我們可以通過Thread.join()方法結束、Thread.isAlive()的返回值手段檢測到線程已經終止執行
    • 對象終結規則:一個對象的初始化完成先行發生于他的finalize()方法的開始
    </li> </ul>

    當多個線程訪問某個類時,不管運行時環境采用何種調度方式或者這些線程將如何交替執行,并且在主調代碼中不需要任何額外的同步或協同,這個類都能表現出正確的行為,那么就稱這個類是線程安全的。

    對象的共享

    加鎖與volatile

    加鎖機制既可以確保可見性,又可以確保原子性,而volatile變量只能確保可見性。

    發布與逸出

    發布一個對象的意思是指,使對象能夠在當前作用域之外的代碼中使用。

    發布內部狀態可能會破壞封裝性,并使得程序難以維持不變性條件。

    當某個不應該發布的對象被發布時,這種情況就被稱之為逸出。

    當一個對象發布時,在該對象的非私有域中引用的所有對象同樣會被發布。一般來說,如果一個已經發布的對象能夠通過非私有的變量引用和方法調用到達其他的對象,那么這些對象也都會被發布。

    不要在構造過程中使this引用逸出。

    線程封閉

    如果僅在單線程內訪問數據,就不需要同步。

    • Ad-hoc線程封閉(脆弱)

    • 棧封閉

    • ThreadLocal類

    不變性

    滿足同步需求的另一種方法是使用不可變對象(Immutable Object)

    不可變對象:

    • 對象創建以后其狀態就不能修改
    • 對象的所有域都是final類型
    • 對象是正確創建的

    安全發布

    • 在靜態初始化函數中初始化一個對象引用
    • 將對象的引用保存到volatile類型的域或者AtomicReferance對象中
    • 將對象的引用保存到某個正確構造對象的final類型域中
    • 將對象的引用保存到一個由鎖保護的域中

    線程安全庫的容器類:

    HashTable、synchronizedMap、ConcurrentMap

    Vector、CopyOnWriteArrayList、CopyOnWriteArraySet、synchronizedList、synchronizedSet

    BlockingQueue、ConcurrentLinkedQueue

    事實不可變對象(Effectively Immutable Object):如果對象從技術上看是可變的,但其狀態在發布后不會再改變,那么把這種對象稱為事實不可變對象。

    • 不可變對象可以通過任意機制發布
    • 事實不可變對象必須通過安全方式發布
    • 可變對象必須通過安全方式來發布,并且必須是線程安全的或者由某個鎖保護起來

    安全地共享對象

    • 線程封閉
    • 只讀共享
    • 線程安全共享
    • 保護共享

    對象的組合

    設計線程安全的類

    在設計線程安全類的過程中,需要包含以下三個基本要素:

    • 找出構成對象狀態的所有變量
    • 找出約束狀態變量的不變性條件
    • 建立對象狀態的并發訪問管理策略

    Java監視器模式:對于任何一種鎖對象,自始至終都使用該鎖對象,都可以用來保護對象的狀態

    @NotThreadSafe
    public class MutablePoint {
        public int x,y;

    public MutablePoint() {
        x = 0;
        y = 0;
    }
    
    public MutablePoint(MutablePoint p) {
        this.x = p.x;
        this.y = p.y;
    }
    

    }</code></pre>

    @ThreadSafe
    public class MonitorVehicleTracker {
        @GuardedBy("this")
        private final Map<String,MutablePoint> locations;

    public MonitorVehicleTracker(Map<String, MutablePoint> locations) {
        this.locations = deepCopy(locations);
    }
    
    public synchronized Map<String,MutablePoint> getLocations(){
        return deepCopy(locations);
    }
    
    public synchronized MutablePoint getLocation(String id){
        MutablePoint loc = locations.get(id);
        return loc == null ? null : new MutablePoint(loc);
    }
    
    public synchronized void setLocation(String id,int x,int y){
        MutablePoint loc = locations.get(id);
        if(loc == null){
            throw new IllegalArgumentException("No such ID:" + id);
        }
        loc.x = x;
        loc.y = y;
    }
    
    
    private static Map<String,MutablePoint> deepCopy(Map<String,MutablePoint> m){
        Map<String,MutablePoint> result = new HashMap<>();
        for(String id:m.keySet()){
            result.put(id,new MutablePoint(m.get(id)));
        }
        return Collections.unmodifiableMap(result);
    }
    

    }</code></pre>

    線程安全性的委托

    如果一個類是由多個獨立且線程安全的狀態變量組成,并且在所有的操作中都不包含無效狀態轉換,那么可以將線程安全性委托給底層的狀態變量。

    public class VisualComponent {
        private final List<KeyListener> keyListeners = new CopyOnWriteArrayList<>();
        private final List<MouseListener> mouseListeners = new CopyOnWriteArrayList<>();

    public void addKeyListener(KeyListener listener){
        keyListeners.add(listener);
    }
    
    public void addMouseListener(MouseListener listener){
        mouseListeners.add(listener);
    }
    
    public void removeKeyListener(KeyListener listener){
        keyListeners.remove(listener);
    }
    
    public void removeMouseListener(MouseListener listener){
        mouseListeners.remove(listener);
    }
    
    

    }</code></pre>

    Java里的基礎構建模塊

    同步容器類

    Vector HashTable Collections.synchronizedXxx工廠方法

    • 同步容器的線程安全問題
    public static <T> T getLast(Vector<T> vector){
            int lastIndex = vector.size() - 1;
            return vector.get(lastIndex);
        }

    public static <T> T deleteLast(Vector<T> vector){
        int lastIndex = vector.size() - 1;
        return vector.remove(lastIndex);
    }</code></pre> 
    

    在多線程中上述方法是不安全的,雖然Vector是安全的容器,但size()方法和get()或者remove()同時使用,存在“先檢查再運行”操作,就會拋出異常(ArrayIndexOutOfBoundsException),所以需要在客戶端加鎖

    public static <T> T getLast(Vector<T> vector) {
            synchronized (vector) {
                int lastIndex = vector.size() - 1;
                return vector.get(lastIndex);
            }
        }
    
        public static <T> T deleteLast(Vector<T> vector) {
            synchronized (vector) {
                int lastIndex = vector.size() - 1;
                return vector.remove(lastIndex);
            }
        }
    • 迭代器與ConcurrentModificationException

      在設計同步容器的迭代器時并沒有考慮并發修改的問題,它們表現出的行為是及時失敗(fail-fast)。

      List<Widget> widgeList = Collections.synchronizedList(new ArrayList<Widget>());
            //可能拋出ConcurrentModificationException
            for(Widget w:widgeList){
                doSomeThing(w);
            }

      解決方法有兩種:一是加鎖,但可能會產生死鎖;二是克隆,這里的性能開銷也很大。

    • 隱藏迭代器

      容器的toString()、hashCode()、equals()、containsAll()、removeAll()、retainAll()以及把容器作為參數的構造方法,都會對容器進行迭代。這些操作都有可能拋出ConcurrentModificationException

    并發容器

    • ConcurrentHashMap 替代同步Map
      使用分段鎖
      迭代器具有弱一致性(可以容忍并發修改,但并不能保證在迭代器被構造后將修改操作反映給容器,所以size()和isEmpty()的語義被略微減弱了。
    • CopyOnWriteArrayList/CopyOnWriteArraySet

      "寫入時復制"容器,每次修改時,都會創建并重新發布一個新的容器副本

      /**
         * Replaces the element at the specified position in this list with the
         * specified element.
         *
         * @throws IndexOutOfBoundsException {@inheritDoc}
         */
        public E set(int index, E element) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                Object[] elements = getArray();
                E oldValue = get(elements, index);
      
                if (oldValue != element) {
                    int len = elements.length;
                    Object[] newElements = Arrays.copyOf(elements, len);
                    newElements[index] = element;
                    setArray(newElements);
                } else {
                    // Not quite a no-op; ensures volatile write semantics
                    setArray(elements);
                }
                return oldValue;
            } finally {
                lock.unlock();
            }
        }
      
        /**
         * Appends the specified element to the end of this list.
         *
         * @param e element to be appended to this list
         * @return <tt>true</tt> (as specified by {@link Collection#add})
         */
        public boolean add(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                Object[] elements = getArray();
                int len = elements.length;
                Object[] newElements = Arrays.copyOf(elements, len + 1);
                newElements[len] = e;
                setArray(newElements);
                return true;
            } finally {
                lock.unlock();
            }
        }

      容器規模較大時,底層復制需要一定的開銷。僅當迭代操作遠遠多于修改操作時,才使用"寫入時復制"容器。

    • ConcurrentSkipListMap 替代同步的SortedMap
    • ConcurrentSkipListSet 替代同步的SortedSet
    • Queue ConcurrentLinkedQueue(先進先出) PriorityQueue(優先隊列)
    • BlockingQueue 阻塞隊列 LinkedBlockingQueue/ArrayBlockingQueue(FIFO) PriorityBlockingQueue(優先隊列) SynchronousQueue(維護一組線程,不維護存儲空間,直接交付)
      put()和take()是可阻塞的
      支持生產者消費者模式
      支持有界或者無界隊列
    • Deque BlockDeque 雙端隊列和工作密取
      每個消費者都有自己的雙端隊列,消費完自己的任務,就去其他隊列的末尾秘密的獲取工作

    阻塞方法中斷方法

    某方法拋出InterruptedException時,表示該方法是一個阻塞方法。

    捕獲異常,恢復中斷

    try {
                processTask(fileQueue.take())
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }

    同步工具類

    • CountDownLatch

      是一個或多個線程等待一組事件發生

    public class TestHarness {
    
        public static long timeTasks(int nThreads,final Runnable task) throws InterruptedException {
            final CountDownLatch startGate = new CountDownLatch(1);
            final CountDownLatch endGate = new CountDownLatch(nThreads);
    
            for(int i = 0;i < nThreads;i++){
    //            Runnable t = new Runnable() {
    //                @Override
    //                public void run() {
    //                    try {
    //                        startGate.await();
    //                        try {
    //                            task.run();
    //                        } finally {
    //                            endGate.countDown();
    //                        }
    //                    } catch (InterruptedException e) {
    //                        e.printStackTrace();
    //                    }
    //                }
    //            };
                Thread t = new Thread(){
                    @Override
                    public void run() {
                        try {
                            startGate.await();
                            try {
                                task.run();
                            } finally {
                                endGate.countDown();
                            }
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                };
                t.start();
            }
    
            long start = System.nanoTime();
    
            startGate.countDown();
    
            endGate.await();
    
            long end = System.nanoTime();
    
            return end - start;
    
        }
    
        public static void main(String[] args) throws InterruptedException {
            Runnable a = new Runnable() {
                @Override
                public void run() {
                    int sum = 0;
                    for(int i = 0; i < 1000000; i++){
                        sum += i;
                    }
                    System.out.println(sum);
                }
            };
    
           System.out.println( timeTasks(100,a) );
        }
    
    }
    • FutureTask
      異步獲取執行的結果
    • 信號量 Semaphore
      控制同時訪問某個特定資源的操作數量或者同時執行某個制定操作的數量
      實現資源池
      對容器施加邊界
    • 柵欄 CyclicBarrier Exchanger

    簡單的可伸縮性緩存

    public class Momoizerl<A,V> implements Computable<A,V> {
        private final ConcurrentMap<A,Future<V>> cache = new ConcurrentHashMap<>();
        private final Computable<A,V> c;
    
        public Momoizerl(Computable<A, V> c) {
            this.c = c;
        }
    
        @Override
        public V compute(final A arg) throws InterruptedException {
            while (true){
                Future<V> f = cache.get(arg);
                if(f == null){
                    Callable<V> eval = new Callable<V>() {
                        @Override
                        public V call() throws Exception {
                            return c.compute(arg);
                        }
                    };
                    FutureTask<V> ft = new FutureTask<V>(eval);
                    f = cache.putIfAbsent(arg,ft);
                    if(f == null){
                        f = ft;
                        ft.run();
                    }
                }
                try {
                    return f.get();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    基礎小結

    • 可變狀態越少,越容易確保線程安全性
    • 盡量將域聲明為final類型
    • 不可變對象一定是線程安全的
    • 使用所來保護可變變量
    • 當保護同一個不變性條件中的所有變量時,使用同一個鎖
    • 復合操作,使用鎖
    • 安全的適當的使用并發容器和同步工具

     

    來自:http://www.jianshu.com/p/f83222cfce11

     

 本文由用戶 mitunovnume187 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!