OkHttp3 源碼淺析

DiaCeh 8年前發布 | 18K 次閱讀 OkHttp Android開發 移動開發

之前的底層網絡庫基本就是Apache HttpClient和HttpURLConnection。由于HttClient比較難用,官方在Android2.3以后就不建議用了,并且在Android5.0以后廢棄了HttpClient,在Android6.0更是刪除了HttpClient。

HttpURLConnection是一種多用途、輕量極的HTTP客戶端,使用它來進行HTTP操作可以適用于大多數的應用程序,但是在Android 2.2版本之前存在一些bug,所以官方建議在Android2.3以后替代HttpClient,Volley就是按版本分區使用這兩個網絡庫。

然而隨著開源屆扛把子Square的崛起,OkHttp的開源,這兩個網絡庫只能被淹沒在歷史洪流中。Android4.4以后HttpURLConnection的底層已經替換成OkHttp實現。OkHttp配合同樣是Square開源的Retrofit,網絡請求變得更簡便,功能更強大。

OkHttp

OkHttp是一個現代,快速,高效的網絡庫,OkHttp 庫的設計和實現的首要目標是高效。

  • 支持 HTTP/2和SPDY,這使得對同一個主機發出的所有請求都可以共享相同的套接字連接;
  • 如果 HTTP/2和SPDY不可用,OkHttp會使用連接池來復用連接以提高效率。
  • 支持Gzip降低傳輸內容的大小
  • 支持Http緩存
  • 會從很多常用的連接問題中自動恢復。如果服務器配置了多個IP地址,OkHttp 會自動重試一個主機的多個 IP 地址。
  • 使用Okio來大大簡化數據的訪問與存儲,提高性能

簡單使用

簡單的異步請求

    OkHttpClient client = new OkHttpClient();
    Request request = new Request.Builder()
            .url(url)
            .build();

    client.newCall(request).enqueue(new Callback() {
        public void onFailure(Request request, IOException e)  {

    }

        public void onResponse(Response response) throws IOException {
            System.out.println(response.body().string());
        }
});

使用非常的簡答,發送請求,拿到異步結果。

OkHttpClient

跟下源碼,OkHttpClient.newCall實現

public class OkHttpClient implements Cloneable, Call.Factory{
  public static final class Builder {
    Dispatcher dispatcher;
    Proxy proxy;
    List<Protocol> protocols;
    List<ConnectionSpec> connectionSpecs;
    final List<Interceptor> interceptors = new ArrayList<>();
    final List<Interceptor> networkInterceptors = new ArrayList<>();
    ProxySelector proxySelector;
    CookieJar cookieJar;
    Cache cache;
    InternalCache internalCache;
    SocketFactory socketFactory;
    SSLSocketFactory sslSocketFactory;
    CertificateChainCleaner certificateChainCleaner;
    HostnameVerifier hostnameVerifier;
    CertificatePinner certificatePinner;
    Authenticator proxyAuthenticator;
    Authenticator authenticator;
    ConnectionPool connectionPool;
    Dns dns;
    boolean followSslRedirects;
    boolean followRedirects;
    boolean retryOnConnectionFailure;
    int connectTimeout;
    int readTimeout;
    int writeTimeout;
    }
    ...
    ...
    @Override public Call newCall(Request request) {
    return new RealCall(this, request);
    ...
    ...
  }
}

OkHttpClient通過Builder實例化,實現了Call.Factory接口創建了一個RealCall的實例,而RealCall是Call接口的實現。

public interface Call {
  Request request();
  Response execute() throws IOException;
  void enqueue(Callback responseCallback);
  void cancel();
  boolean isExecuted();
  boolean isCanceled();
  interface Factory {
    Call newCall(Request request);
  }
}

RealCall

RealCall中封裝了OKHttpClient和Request

  protected RealCall(OkHttpClient client, Request originalRequest) {
    this.client = client;
    this.originalRequest = originalRequest;
  }

  @Override public void enqueue(Callback responseCallback) {
    enqueue(responseCallback, false);
  }

  void enqueue(Callback responseCallback, boolean forWebSocket) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    client.dispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket));
  }

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;
    private final boolean forWebSocket;

    private AsyncCall(Callback responseCallback, boolean forWebSocket) {
      super("OkHttp %s", redactedUrl().toString());
      this.responseCallback = responseCallback;
      this.forWebSocket = forWebSocket;
    }
    ...

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain(forWebSocket);
        if (canceled) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        //注意這一句代碼
        client.dispatcher().finished(this);
      }
    }
  }

調用enqueue封裝成AsyncCall交給OKHttpClient的dispatcher線程池執行。

Dispatcher線程池

OkHttp的dispatcher參數是直接new出來的。先看下enqueue方法,將AsyncCall當做參數傳遞進來

public final class Dispatcher {
  /** 最大并發請求數為64 */
  private int maxRequests = 64;
  /** 每個主機最大請求數為5 */
  private int maxRequestsPerHost = 5;

  /** 線程池 */
  private ExecutorService executorService;

  /** 準備執行的請求 */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** 正在執行的異步請求,包含已經取消但未執行完的請求 */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** 正在執行的同步請求,包含已經取消單未執行完的請求 */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

    public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
}

構造一個線程池ExecutorService:

executorService = new ThreadPoolExecutor(
    0, //corePoolSize 最小并發線程數,如果是0的話,空閑一段時間后所有線程將全部被銷毀。
    Integer.MAX_VALUE, //maximumPoolSize: 最大線程數,當任務進來時可以擴充的線程最大值,當大于了這個值就會根據丟棄處理機制來處理
    60, //keepAliveTime: 當線程數大于corePoolSize時,多余的空閑線程的最大存活時間
    TimeUnit.SECONDS,//單位秒
    new SynchronousQueue<Runnable>(),//工作隊列,先進先出        Util.threadFactory("OkHttp Dispatcher", false));//單個線程的工廠

構建了一個最大線程數為Integer.MAX_VALUE的線程池,也就是說,是個不設最大上限的線程池(其實有限制64個),有多少任務添加進來就新建多少線程,以保證I/O任務中高阻塞低占用的過程中,不會長時間卡在阻塞上。當工作完成后,線程池會在60s內相繼關閉所有線程。

還記得剛才在AsyncCall.execute() finally中的內容嗎

finally {
    client.dispatcher().finished(this);
  }
  ...

  /** Used by {@code AsyncCall#run} to signal completion. */
  synchronized void finished(AsyncCall call) {
    if (!runningAsyncCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!");
    promoteCalls();
  }


  //Dispatcher.java
  private void promoteCalls() {
  //超過閾值 返回
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

當AsyncCall執行完成后,調用Disptcher的finish()方法,調用promoteCalls()方法,如果超過閾值,繼續等待,否則取出緩存區的任務執行,順序是先進先出。

Dispatcher線程池總結

  • 調度線程池Disptcher實現了高并發,低阻塞的實現
  • 采用Deque作為緩存,先進先出的順序執行
  • 任務在try/finally中調用了finished函數,控制任務隊列的執行順序,而不是采用鎖,減少了編碼復雜性提高性能

Interceptor

調度基本整明白了,AsyncCall 中的execute具體內容還沒有分析,主要就一行代碼。

@Override protected void execute() {
boolean signalledCallback = false;
  try {
    ...
    Response response = getResponseWithInterceptorChain(forWebSocket);
    ...
  } finally {
    client.dispatcher().finished(this);
  }
}

private Response getResponseWithInterceptorChain(boolean     forWebSocket) throws IOException {
Interceptor.Chain chain = new         ApplicationInterceptorChain(0, originalRequest, forWebSocket);
return chain.proceed(originalRequest);
}

從方法名字基本可以猜到是干嘛的,調用 chain.proceed(originalRequest); 將request傳遞進來,從攔截器鏈里拿到返回結果。那么攔截器Interceptor是干嘛的,Chain是干嘛的呢?繼續往下看ApplicationInterceptorChain

class ApplicationInterceptorChain implements Interceptor.Chain {
    private final int index;
    private final Request request;

    ApplicationInterceptorChain(int index, Request request, boolean forWebSocket) {
      this.index = index;
      this.request = request;
      this.forWebSocket = forWebSocket;
    }

    @Override public Connection connection() {
      return null;
    }

    @Override public Request request() {
      return request;
    }

    @Override public Response proceed(Request request) throws IOException {
      // If there's another interceptor in the chain, call that.
      if (index < client.interceptors().size()) {
        Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket);
        Interceptor interceptor = client.interceptors().get(index);
        Response interceptedResponse = interceptor.intercept(chain);

        if (interceptedResponse == null) {
          throw new NullPointerException("application interceptor " + interceptor
              + " returned null");
        }

        return interceptedResponse;
      }

      // No more interceptors. Do HTTP.
      return getResponse(request, forWebSocket);
    }
  }

ApplicationInterceptorChain實現了Interceptor.Chain接口,持有Request的引用。

public interface Interceptor {
  Response intercept(Chain chain) throws IOException;

  interface Chain {
    Request request();

    Response proceed(Request request) throws IOException;

    Connection connection();
  }
}

proceed方法中判斷index(此時為0)是否小于client.interceptors(List )的大小,如果小于也就是說client.interceptors還有Interceptor,那么就再封裝一個ApplicationInterceptorChain,只不過index + 1,然后取出第index個Interceptor將chain傳遞進去。傳遞進去干嘛呢?我們看一個用法,以實際項目為例

HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor(new RetrofitLogger());
        interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
OkHttpClient client = new OkHttpClient.Builder()
        .addInterceptor(interceptor)
        .retryOnConnectionFailure(true)
        .connectTimeout(15, TimeUnit.SECONDS)
        .addInterceptor(getCommonParameterInterceptor())
        .addNetworkInterceptor(getTokenInterceptor())
        .build();

@Override
protected Interceptor getCommonParameterInterceptor() {
    return new Interceptor() {
        @Override
        public Response intercept(Chain chain) throws IOException {
            Request originalRequest = chain.request();
            Request request = originalRequest;
            if (!originalRequest.method().equalsIgnoreCase("POST")) {
                HttpUrl modifiedUrl = originalRequest.url().newBuilder()
                        .addQueryParameter("version_code", String.valueOf(AppUtils.getVersionCode()))
                        .addQueryParameter("app_key", "nicepro")
                        .addQueryParameter("app_device", "Android")
                        .addQueryParameter("app_version", AppUtils.getVersionName())
                        .addQueryParameter("token", AccountUtils.getToken())
                        .build();
                request = originalRequest.newBuilder().url(modifiedUrl).build();
            }
            return chain.proceed(request);
        }
    };
}

@Override
protected Interceptor getTokenInterceptor() {
    return new Interceptor() {
        @Override
        public Response intercept(Chain chain) throws IOException {
            Request originalRequest = chain.request();
            Request authorised = originalRequest.newBuilder()
                    .header("app-key", "nicepro")
                    .header("app-device", "Android")
                    .header("app-version", AppUtils.getVersionName())
                    .header("os", AppUtils.getOs())
                    .header("os-version", AppUtils.getAndroidVersion() + "")
                    .header("Accept", "application/json")
                    .header("User-Agent", "Android/retrofit")
                    .header("token", AccountUtils.getToken())
                    .build();
            return chain.proceed(authorised);
        }
    };
}

可以看到每個Interceptor的intercept方法中做了一些操作后,最后都會調用 chain.proceed(request) 方法,而這個chain就是每次prceed方法中生成的ApplicationInterceptorChain,用index+1的方式遞歸調用OkHttClient中的Interceptors,進行攔截操作,比如可以用來監控log,修改請求,修改結果,供開發者自定義參數添加等等,然后最終調用的還是最初的index=0的那個chain的proceed方法中的 getResponse(request, forWebSocket); 。

可以說OkHttp是用chain串聯起攔截器,而每個攔截器都有能力返回Response,返回Response即終止整個調用鏈,這種設計模式稱為 責任鏈模式 。這種模式為OkHttp提供了強大的裝配能力,極大的提高了OkHttp的擴展性和可維護性。

在Android系統中最典型的責任鏈模式就是View的Touch傳遞機制,一層一層傳遞直到被消費。

官方的一張圖就能很好的解釋Interceptor

整個流程很清晰。這種設計真是太棒了,值得學習!

連接池復用

我們知道進行一次tcp網絡請求,一般要三次握手連接,四次握手斷開連接。一次完整的http請求過程見下圖。

如果請求重復的地址,那么重復的連接和斷開連接就成了延長整個時間的的重要因素,特別是在復雜的網絡環境下,每次請求傳輸數據的大小將不再是請求速度的決定性因素。

http有一種 keepalive connections 的機制,可以在傳輸后仍然保持連接,當客戶端需要再次獲取數據時,直接使用剛剛空閑下來的連接而不需要再次握手。

Okhttp支持5個并發KeepAlive,默認鏈路生命為5分鐘(鏈路空閑后,保持存活的時間) 。

DNS解析

對比上一張圖的一次完整的Http請求,在復雜的天朝網絡環境下,相信大多數開發者都碰到過很奇怪的網絡問題,比如運營商動態插入辣雞html代碼嵌入廣告,比如運營商緩存請求數據導致用戶請求到的數據不是最新的問題,比如某些運營商只支持 put\post 請求,而不支持 delete 請求,比如運營商。。。這些問題大部分都跟DNS相關。

為了解決DNS劫持的問題,我們在薄荷app上做了很多優化工作,比如使用HTTP DNS(我們使用的DNSPod)代替系統自帶的libc庫去查詢運營商的DNS服務器,直接拿到IP地址進行IP直連,其中又做了一些緩存和選擇最優IP的一些操作。解決掉了很大一部分用戶反饋的網絡問題。

而在OkHttp中,可以直接配置DNS,默認是系統自帶的 Dns.SYSTEM 。

// Try each address for best behavior in mixed IPv4/IPv6 environments.
List<InetAddress> addresses = address.dns().lookup(socketHost);
for (int i = 0, size = addresses.size(); i < size; i++) {
  InetAddress inetAddress = addresses.get(i);
  inetSocketAddresses.add(new InetSocketAddress(inetAddress, socketPort));
}

注意結果是數組,即一個域名可能會有多個IP,如果一個IP不通,會自動重連下一個IP。

開發者就可以新建一個Dns類復寫 lookup 方法通過HTTP DNS請求IP地址,其中新建一個 HttpDNSClient 來請求DNS,插入攔截器來配置緩存時間,容錯處理等等,然后在構建OkHttpClient時加入 dns 方法即可。

client = new OkHttpClient.Builder().addNetworkInterceptor(getLogger())
        .dispatcher(getDispatcher())
        //配置DNS查詢實現
        .dns(HTTP_DNS)
        .build();

這樣的全局HTTP DNS解析真是足夠簡單高效,并且完全是無侵入性的,絲毫不影響正常的網絡請求。

總結

本文基本講了下OkHttp3的大概流程,Interceptor的基本原理,DNS的可選配置等。涉及到socket和Okio流相關的都沒有講到,有興趣的讀者可以在參考文章自行搜索。總結來說,OkHttp基本可以滿足日常開發的需求,并且性能足夠強大,配合Retrofit + Rxjava更是效率翻倍。如果你在開發新的項目,強烈建議你扔掉Volley,擁抱Retrofit。

 

來自:http://w4lle.github.io/2016/12/06/OkHttp/

 

 本文由用戶 DiaCeh 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!