okhttp(二) 拦截器

在前文中讲到了okhttp中的分发器

接下来讲okhttp中的拦截器

在上文中讲到,同步执行excuet时,会通过 getResponseWithInterceptorChain 获取response,
在异步enqueue后,后面会调用AsyncCall中execute,而里面也是通过getResponseWithInterceptorChain 来获取response。

调用的都是RealCall中的getResponseWithInterceptorChain函数。

getResponseWithInterceptorChain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Response getResponseWithInterceptorChain() throws IOException {

// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors()); //自定义拦截器加入到集合
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));

Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());

return chain.proceed(originalRequest);
}

一个数组,添加各个拦截器,
然后初始化一个 Interceptor.Chain 对象,注意第2、3、4个参数都是null,而第5个参数是index:0
然后调用proceed函数

RealInterceptorChain.java

RealInterceptorChain 实现 Interceptor接口中的Chain接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
public final class RealInterceptorChain implements Interceptor.Chain {
private final List<Interceptor> interceptors;
private final StreamAllocation streamAllocation;
private final HttpCodec httpCodec;
private final RealConnection connection;
private final int index;
private final Request request;
private final Call call;
private final EventListener eventListener;
private final int connectTimeout;
private final int readTimeout;
private final int writeTimeout;
private int calls;

public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
this.interceptors = interceptors;
this.connection = connection;
this.streamAllocation = streamAllocation;
this.httpCodec = httpCodec;
this.index = index;
this.request = request;
this.call = call;
this.eventListener = eventListener;
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.writeTimeout = writeTimeout;
}

@Override public Connection connection() {
return connection;
}

@Override public int connectTimeoutMillis() {
return connectTimeout;
}

@Override public Interceptor.Chain withConnectTimeout(int timeout, TimeUnit unit) {
int millis = checkDuration("timeout", timeout, unit);
return new RealInterceptorChain(interceptors, streamAllocation, httpCodec, connection, index,
request, call, eventListener, millis, readTimeout, writeTimeout);
}

@Override public int readTimeoutMillis() {
return readTimeout;
}

@Override public Interceptor.Chain withReadTimeout(int timeout, TimeUnit unit) {
int millis = checkDuration("timeout", timeout, unit);
return new RealInterceptorChain(interceptors, streamAllocation, httpCodec, connection, index,
request, call, eventListener, connectTimeout, millis, writeTimeout);
}

@Override public int writeTimeoutMillis() {
return writeTimeout;
}

@Override public Interceptor.Chain withWriteTimeout(int timeout, TimeUnit unit) {
int millis = checkDuration("timeout", timeout, unit);
return new RealInterceptorChain(interceptors, streamAllocation, httpCodec, connection, index,
request, call, eventListener, connectTimeout, readTimeout, millis);
}

public StreamAllocation streamAllocation() {
return streamAllocation;
}

public HttpCodec httpStream() {
return httpCodec;
}

@Override public Call call() {
return call;
}

public EventListener eventListener() {
return eventListener;
}

@Override public Request request() {
return request;
}

@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;

// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}

// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}

// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);

// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}

// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}

if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}

return response;
}
}

上面是RealInterceptorChain的全部代码,注意看一段代码

1
2
3
4
5
6
7
8
// 最开始index = 0,注意这里构造RealInterceptorChain时,传入的为index+1
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
// 获取到传入的拦截器数组中的最前面的,如果我们没有在OkHttpClient传入自定义拦截器,则会取出 RetryAndFollowUpInterceptor 拦截器
Interceptor interceptor = interceptors.get(index);
// 然后调用拦截器的 intercept函数
Response response = interceptor.intercept(next);

看注释现在可能又点绕,接着往下看,Interceptor是一个接口,没必要看,可以直接先看RetryAndFollowUpInterceptor

RetryAndFollowUpInterceptor 重试以及重定向拦截器

RetryAndFollowUpInterceptor 实现 Interceptor 接口

查看其中的intercept函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
@Override
public Response intercept(Chain chain) throws IOException {

Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();

/**
* todo 管理类,维护了 与服务器的连接、数据流与请求三者的关系。真正使用的拦截器为 Connect
*/
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;

int followUpCount = 0;
Response priorResponse = null;

// 在这里无限循环,只有抛出异常或者return response才会中断
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}

Response response;
boolean releaseConnection = true;
try {
//todo 请求出现了异常,那么releaseConnection依旧为true。
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
//todo 路由异常,连接未成功,请求还没发出去
//The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
//todo 请求发出去了,但是和服务器通信失败了。(socket流正在读写数据的时候断开连接)
// HTTP2才会抛出ConnectionShutdownException。所以对于HTTP1 requestSendStarted一定是true
//An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
//todo 不是前两种的失败,那直接关闭清理所有资源
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
//todo 如果进过重试/重定向才成功的,则在本次响应中记录上次响应的情况
//Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(
priorResponse.newBuilder()
.body(null)
.build()
)
.build();
}
//todo 处理3和4xx的一些状态码,如301 302重定向
Request followUp = followUpRequest(response, streamAllocation.route());
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}

closeQuietly(response.body());

//todo 限制最大 followup 次数为20次
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}

if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
//todo 判断是不是可以复用同一份连接
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}

request = followUp;
priorResponse = response;
}
}

可以看到此处的主要逻辑,
将传入的chain对象转为RealInterceptorChain,
无限循环,然后在循环中调用chain.proceed函数,获取response,
循环的主要作用为catch异常,并针对RouteException 或者 IOException 做重试操作,其他异常则终止,

而chain.proceed又会回到上面RealInterceptorChain中

1
2
3
4
5
6
7
8
9

// 当从RetryAndFollowUpInterceptor再过来时,index = 1; index + 1 = 2;
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
// 获取到传入的拦截器数组中的 index = 1,如果我们没有在OkHttpClient传入自定义拦截器,则会取出 BridgeInterceptor 拦截器
Interceptor interceptor = interceptors.get(index);
// 然后调用拦截器的 intercept函数
Response response = interceptor.intercept(next);

BridgeInterceptor 负责把用户构造的请求转换为发送给服务器的请求,把服务器返回的响应转换为对用户友好的响应

BridgeInterceptor 也是实现 Interceptor 接口

同样直接查看intercept函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Override
public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();

RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}

long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}

if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}

if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}

// If we add an "Accept-Encoding: gzip" header field we're responsible for also
// decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}

List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}

if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}

Response networkResponse = chain.proceed(requestBuilder.build());

HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);

if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}

return responseBuilder.build();
}

可以看到,这个函数的逻辑,
先对请求头信息进行再处理,然后执行chain.proceed获取networkResponse,
然后再对networkResponse进行再包装处理

同RetryAndFollowUpInterceptor一样,又会调用到chain.proceed函数,
又会回到上面RealInterceptorChain中

1
2
3
4
5
6
7
8
// 当从RetryAndFollowUpInterceptor再过来时,index = 2; index + 1 = 3;
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
// 获取到传入的拦截器数组中的 index = 2,如果我们没有在OkHttpClient传入自定义拦截器,则会取出 CacheInterceptor 拦截器
Interceptor interceptor = interceptors.get(index);
// 然后调用拦截器的 intercept函数
Response response = interceptor.intercept(next);

再看CacheInterceptor代码

CacheInterceptor 负责读取缓存以及更新缓存

CacheInterceptor 也是实现 Interceptor 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
@Override
public Response intercept(Chain chain) throws IOException {
//todo 通过url的md5数据 从文件缓存查找 (GET请求才有缓存)
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;

long now = System.currentTimeMillis();

//todo 缓存策略:根据各种条件(请求头)组成 请求与缓存
CacheStrategy strategy =
new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
//
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;

if (cache != null) {
cache.trackResponse(strategy);
}

if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}

//todo 没有网络请求也没有缓存
//If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}

//todo 没有请求,肯定就要使用缓存
//If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}

//todo 去发起请求
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}

// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
//todo 服务器返回304无修改,那就使用缓存的响应修改了时间等数据后作为本次请求的响应
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();

// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}

//todo 走到这里说明缓存不可用 那就使用网络的响应
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
//todo 进行缓存
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response,
networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}

if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}

过程就是获取一个networlRequets,以及一个缓存的cacheResponse,
如果networlRequets与cacheResponse,则直接报504的网络异常
如果networlRequets为空,则只能返回cacheResponse,
如果networlRequets不为空,则执行网络请求,获取networkResponse,
如果cacheResponse 不为空 且 networkResponse响应码304,代表无更改,则可以返回,

再往下就是对networlRequets进行处理,并加入缓存。

注意:此处获取networkResponse也是调用chain.proceed,于是与上面一直,也会走到 RealInterceptorChain中。

1
2
3
4
5
6
7
8
// 当从RetryAndFollowUpInterceptor再过来时,index = 3; index + 1 = 4;
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
// 获取到传入的拦截器数组中的 index = 3,如果我们没有在OkHttpClient传入自定义拦截器,则会取出 ConnectInterceptor 拦截器
Interceptor interceptor = interceptors.get(index);
// 然后调用拦截器的 intercept函数
Response response = interceptor.intercept(next);

此时会调用到ConnectInterceptor中的intercept函数

ConnectInterceptor

ConnectInterceptor 也是实现 Interceptor 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;

public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}

@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();

return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}

如上代码就是ConnectInterceptor类中所有源码,
intercept函数中,重新获取了几个参数,和上面的几个Interceptor实现类不太一样,
最后调用的proceed函数是四个参数的,最后还是会走到 RealInterceptorChain中。

1
2
3
4
5
6
7
8
// 当从RetryAndFollowUpInterceptor再过来时,index = 4; index + 1 = 5;
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
// 获取到传入的拦截器数组中的 index = 4,如果我们没有在OkHttpClient传入自定义拦截器,则会取出 CallServerInterceptor 拦截器
Interceptor interceptor = interceptors.get(index);
// 然后调用拦截器的 intercept函数
Response response = interceptor.intercept(next);

CallServerInterceptor

CallServerInterceptor 也是实现 Interceptor 接口,
CallServerInterceptor 中 intercept 函数就是直接与服务端通信的,
而在这里会获取到真正的response,并返回到上一层,也就是ConnectInterceptor的intercept,同理会依次向上返回。

责任链模式

在上面看到的,一层一层拦截器向下调用,Interceptor类实现 调用 Interceptor类实现,
每一层的拦截器实现自己的功能,然后交付给下一层拦截器,
这种设计模式就是责任链模式。