Advertisement

OkHttp 的 IO 操作和进度监听

阅读量:
  • Utilize this whenever you write near a sink for ergonomic and efficient data access.
    */
    fun Sink.buffer(): BufferedSink = RealBufferedSinkthis

1.3、请求体的写入

至此为止, 我们现在已经掌握了 请求行 请求头的写入 以及 socket 的来源. 接下来就是 请求体 的编写, 请让我们再次回到 CallServerInterceptor 继续观看.

类CallServerInterceptor实现了一种拦截器逻辑。
重写了函数intercept(),用于处理链式拦截器链中的响应。
如果(HTTP方法允许接受请求体)并且(requestBody不为空)则:
创建一个缓冲后的响应体容器并将其赋值给requestBody。
将请求体流到缓冲后的响应体容器中。
缓冲完成后关闭该容器。
否则:
调用exchange:noRequestBody()

class Exchange {
fun createRequestBody(request: Request, duplex: Boolean): Sink {
this.isDuplex = duplex
val contentLength = request.body!!.contentLength()
eventListener.requestBodyStart(call)
// 拿到 codec 的用于写入请求体的 sink
val rawRequestBody = codec.createRequestBody(request, contentLength)
// 包装一下,增加了长度相关的检查和出错的事件处理,不深入理解
return RequestBodySink(rawRequestBody, contentLength)
}
}

class Http1ExchangeCodec {
override fun createRequestBody(request: Request, contentLength: Long): Sink {
return when {
// HTTP/1 不支持
request.body != null && request.body.isDuplex() -> throw ProtocolException(
“Duplex connections are not supported for HTTP/1”)
// chunked 类型的请求体,长度未知
request.isChunked -> newChunkedSink() // Stream a request body of unknown length.
// 长度固定的请求体
contentLength != -1L -> newKnownLengthSink() // Stream a request body of a known length.
// 非法情况
else -> // Stream a request body of a known length.
throw IllegalStateException(
“Cannot stream a request body without chunked encoding or a known content length!”)
}
}
}

可以理解为 Http1ExchangeCodec 支持了一个专门用于写入 RequestBody 的 Sink。经过封装处理后,该接口可以通过 RequestBody.writeTo 方法将数据注入到 RequestBody 中。在实际应用中发现的一类常见场景是上传文件或处理大量数据时的性能瓶颈主要集中在 RequestBody 大小的问题上。这就要求我们在实现时特别注意优化这部分代码逻辑以提升整体性能。

RequestBody.writeTo 到底干了啥呢,来看下一个 File 的实现

abstract class RequestBody {
/** Creates a new request body that transfers the content of this instance. */
@JvmStatic
@JvmName("createRequestBody")
fun File.asRequestBody(contentType: MediaType? = null): RequestBody {
return object : RequestBody() {
override fun contentType() = contentType
}
}

override fun contentLength() = length()

override fun writeTo(sink: BufferedSink) {
source().use {
source -> sink.writeAll(source)
}
}

此处将 \texttt{File} 转换为 \texttt{Source} ,随后使用 \texttt{writeAll} 进行全量输出?特别是当文件体积较大时 ,这可能会导致内存占用过高 。说实话 ,初次接触这类操作时 ,我确实感到有些困惑 !

让我们关注一下用于接收数据的 Sink 实例。具体来说,在文档中提到了这是一个实现过的类型:RealBufferedSink。其中最后一步调用了该实例的 buffer() 方法来处理请求数据。接下来我们将查看该方法的具体实现细节以及它如何影响整个流程。

internal actual class RealBufferedSink actual constructor(
@JvmField实际接收一个Sink类型的对象sink
) : BufferedSink {
override覆盖方法writeAll接收Source源并调用公共方法commonWriteAll
override覆盖方法emitCompleteSegments执行公共方法commonEmitCompleteSegments
}

internal inline fun RealBufferedSink.commonWriteAll(source: Source): Long {
var totalBytesRead = 0L
while (true) {
// 每次从 source 中读取 Segment.SIZE(8192)字节到缓存 buffer 中
val readCount: Long = source.read(buffer, Segment.SIZE.toLong())
// 读完了就返回
if (readCount == -1L) break
// 更新读到的字节数
totalBytesRead += readCount
// 这里才写入 sink 中
emitCompleteSegments()
}
return totalBytesRead
}

private internal function commonEmitCompleteSegments(): BufferedSink {
check(!closed) {
if (false) {
// 是否关闭
}
}
// 计算已发送的数据字节数
val byteCount = buffer.completeSegmentByteCount()
// 将数据发送给 target sink
if (byteCount > 0L) {
sink.write(buffer, byteCount)
}
return this
}

其中核心原理体现在方法 RealBufferedSink.commonWriteAll 中。可以观察到每次操作均向 buffer 写入固定大小的块数据;这些数据块依次被发送至 sink 直至 source 中的内容全部读取完毕。整个流程与 Java IO 在文件/网络数据读写的常见模式相似;而 okio 已经为我们实现了高效可靠的数据传输机制。

回到FileRequestBody的方式是这样的:这种方式每次只从文件中读取8KB的数据块,并将其发送给socket进行处理;这种方式并非是一次性将整个文件内容加载到内存中再通过socket发送;这样一来,在整个请求处理过程中只需要完成一次数据传输的IO操作即可完成任务。

1.4、请求的响应处理

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
// 读取响应行和响应头
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
// 构建 Response
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
// 有响应体时提供 ResponseBody
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
// …
return response
}
}

// 这里不在贴 Exchange 里的代码了

类 Http1ExchangeCodec

override函数 parseResponseHeaders(参数 expectContinue: Boolean): 返回 Response.Builder?

通过 headersReader 获取响应行

调用 StatusLine对象的parse方法来解析读取到的各行数据

创建一个responseBuilder变量并初始化为一个新的Response.Builder对象

调用protocol方法设置协议字段

调用code方法设置状态码字段

调用message方法设置消息字段

接下来是处理响应头部分的内容

重写函数opencasedBody源码获取响应源码的方法接受Response类型的响应参数并返回Source类型{
return when{
// 无body
!response.promisesBody() -> newFixedLengthSource(0)
// 响应为chunked类型且内容长度未知时使用newChunkedSource(response.request.url)
response.isChunked -> newChunkedSource(response.request.url)
else -> {
val contentLength = response.headersContentLength()
if (contentLength != -1L) {
// 响应体长度固定时获取对应固定长度源码
newFixedLengthSource(contentLength)
} else {
// 响应体长度未知时获取对应未知长度源码
newUnknownLengthSource()
}
}
}
}

类似地,在处理请求时

至此为止,在本章中我们已经完成了OkHttp请求与响应之间的IO操作。接下来就是搭建一个统一的上传与下载进度监控框架。

2、OkHttp 上传下载进度监听

2.1、如何实现

通过前面介绍的 IO 过程,我们已经了解了上传请求体最终会到达 RequestBody.writeTo。因此,在上传数据时我们可以在此处设置监听机制,在每次传输一定数量(如8K字节)时进行统计,并对传输进度进行跟踪;类似地,在下载过程中我们需要在 Callback.onResponse 里监听 Responsebody.source 的读取进度以确保数据完整性。

此方案虽然可行但存在多个问题:一是必须确保在所有涉及的地方都独立实现进度上传与下载的功能;二是此外不同类型的请求体和响应体之间的处理方式也存在差异;三是这会导致较高的业务侵入性。然而我们希望尽可能简化并统一对所有进程进行监控以便达到对整个系统进程的有效监控:为此就需要设计并实施一个专门的拦截器 来监控所有 incoming 的 request 和 response 对象 并且替换掉原有的数据操作流程 从而达到对整个系统进程的有效监控

2.2、想怎么用

为了深入解析该功能对外提供的接口及其使用方法,请说明您希望了解的具体内容

// 初始化客户端时增加下方拦截器
OkHttpClient.Builder()
.addInterceptor(ProgressIntercept)
.build()
// 实现上传与下载的进度监控
Request.Builder()
.uploadingProgress(object: OkUploadListener {
override fun upload(curr: Long, contentLength: Long) {
// 目前已上传的数据量及总数据量
Log.d(tag, "Uploading Progress - Current Status: curr/contentLength")
}
})
.downloadingProgress(object: OkDownloadListener {
override fun download(curr: Long, contentLength: Long) {
// 目前已下载的数据量及总数据量
Log.d(tag, "Downloading Progress - Current Status: curr/contentLength")
}
})

我们正在配置 OkHttpClient 时需要注册一个 ProgressIntercept 拦截器来处理上传相关的各种逻辑。在构建请求时可以使用 uploadProgress 方法来监听上传进度的变化情况,并同样提供 downloadProgress 方法用于监听下载进度的变化情况。通过 Request 对象发起请求即可完成整个流程。

2.3、具体实现

首先分析 Request 的构建过程。这一过程相对简单,并非复杂的技术实现。值得注意的是,在 Request 对象中嵌入了自定义数据注入功能。具体而言,在 Request.Builder 中实现了两个进度监听的 listener 来处理回调事件

fun RequestBuilder.uploadingProcess(listener: OkUploadListener?) = be tagged with (OkUploadListener::class.java and the provided listener)
fun RequestFooter.downloadingProcess(listener: OkDownloadListener?) = be tagged with (OkDownloadListener::class.java and the provided listener)

拦截器的实现也不算太复杂,在请求处理开始时就需要将原有的 request body 参数替换为带有上传进度回传机制的新类型 request body(即 ProgressRequestBody)。当响应回来后也需要相应地将原有的 response body 替换为带有下载进度回传功能的新类型 response body(即 ProgressResponseBody)。至于具体如何通过这两个自定义类实现实际的进度反馈,则可一并完成。

object ProgressIntercept: Interceptor {

override fun intercept(chain: Interceptor_chain): Response { val raw_request = chain.request() val up_listener = raw_request.tag(OkUploadListener.class.java) val down_listener = raw_request.tag(OkDownloadListener.class.java) // 替换请求 body 进行上传进度监控 val request = progressUploadBody(raw_request, up_listener) val response = handleRequest(request) // 替换响应 body 进行下载进度监控 return progressDownloadBody(response, down_listener) }

private val replaceRequestBody = fun (request: Request, listener: OkUploadListener?)?: Request {
var body = request.body
when {
body.isNull() || listener.isNull()
} {
return null
}
return request builder()
.method(request.method, ProgressRequestBody(body, listener))
.build()
}

private val replaceResponseBody = fun(response: Response, listener?: OkDownloadListener) -> Response {
val body = response.body
if (body == null || listener == null) {
return response
}
return response.builder().
body(ProgressStreamReader(
body?.letOrNull { it },
listener))
.build()
}

}

在介绍 ProgressRequestBody 和 ProgressResponseBody 之前,请允许我们先探讨 Sink 和 Source 实现进度监听的具体方法。让我们看一下 Sink 接口的特点:它仅提供了一个 write 方法用于写入数据。如果我们对 Sink 进行适当的装饰,在每次 write 操作后统计 byteCount 以获得已写入数据的长度,则可轻松实现进度监听功能。

实际接口 Sink 定义为可关闭和可排空类型

这个想法虽然非常理想 但现实却是我们所获得的却是带有缓冲功能的 BufferedSink 而不是单纯的 Sink 实际上我们所获得的却是带有缓冲功能的 BufferedSink 而不是单纯的 Sink 这种设备为了能够高效地处理不同数据类型的输入数据 提供了大量实用的接口 按照上述想法执行时 就必须编写一系列新的接口来处理这些缓冲操作 并在每次调用时精确计算出可写的字节数量

「1.2」部分末尾的位置上之前已经提到过:当Sink.buffer()方法返回的是一个RealBufferedSink对象时,则它还有一个重要用途

internal actual class RealBufferedSink actual constructor( @JvmField actual val sink: Sink ) : BufferedSink

我们能够观察到,在本节中我们将重点讨论的是带有重叠事件处理机制的缓冲流媒体 sink 类型。这种设计思路源于我们希望在不引入额外资源的情况下实现对现有缓冲流媒体库的支持这一目标。具体而言,在这种设计模式下,默认情况下会将所有的事件处理逻辑集中于 sink 对象本身,并且通过这种方式实现了与现有缓冲流媒体库的有效集成。

/**

该Progressive Sink具备持续跟踪数据累积量的能力,在每一次数据读取完成后会调用指定的累计已写的长度(自定初值起)。该功能允许用户通过传递可选初始值及自定义的数据收集逻辑来实现个性化的数据处理流程。

init {
// 回调一次初始值
writeCallback(curr)
}

重载函数write接受一个Buffer和一个表示字符数量的Long参数,并执行以下操作:

  1. 调用父类的方法进行数据输出
  2. // 调整已读数据并触发回调
  3. 如果当前可读数据量大于零:
    a. 将当前可读数据量累加上新读取的数据量
    b. 将新的已读数据量传递给回调函数

}

首先设计一个扩展功能模块来实现带有进度回调功能的Sink类,并将其作为继承自ForwardingSink的对象进行开发。由于其逻辑极为简单主要承担着装饰功能的作用我们采用了重写的方式并实现了write函数的功能使其能够在每次数据写入时进行计数并通过回调机制同步进度信息以达到实时监控的目的。

class ProgressRequestBody(
private val body: RequestBody,
private val listener: OkUploadListener
): RequestBody() {
// …
override fun writeTo(sink: BufferedSink) {
val contentLength = contentLength()
// 带进度的 sink,套上 buffer 保证所有的写入都经过 Sink.write 方法而不是 BufferedSink 的其他方法
val progressSink = sink.progress {
listener.upload(it, contentLength)
}.buffer()
body.writeTo(progressSink)
// *** 注意 ***
// progressSink 是个 buffer,走到这里 body 写完了,但是 buffer 里的不一定完全写入 sink
// 所以要手动 flush 一下,等待数据写入完毕
progressSink.flush()
}
// …
}

ProgressRequestBody 实现了一个类似于 writeTo 方法的功能。通过使用 Sink.progress 来实现对 buffered sink 的进度监听,并在外部加上一个 buffer() 处理器来确保所有的外部 write 操作都会被路由到 Sink.progress 内部的 write 方法中。由于存在缓存机制,在提交数据前必须执行 flush 操作。

为此,在请求体上传的过程中实现了进度监听任务。相应地,在响应体下载过程中采用了类似的监控机制。我们需要在SourceRequestBody中分别实施进度封装功能。

class ProgressResponseBody(
private val body: ResponseBody,
private val listener: OkDownloadListener
): ResponseBody() {
// …

  • 带有进度控制的 source 被包裹一个 buffer 以确保所有读取操作都使用 [Source.read] 方法实现,并非通过 [BufferedSource] 的其他功能。
    */
    private val progress Source = body.source().progress { listener.download(it, contentLength) }.buffer() // 使用 .buffer() 封装该源码。
    // …
    override fun source(): Buffered Source = progress Source // 重写函数 source() 返回 Buffered Source 类型。
    // …

简单介绍一下自己。我于1999年获得上海交通大学的本科毕业文凭,并在此后的工作中积累了丰富的经验。曾经在中小型公司工作过一段时间,并曾在各大企业如华为、OPPO等有过短期接触。自2018年起,在阿里巴巴工作至今。

清楚地认识到这一现象:相当一部分初中级Android工程师在职业发展过程中面临双重选择——要么通过自主学习摸索式成长(即自行探索并逐步提升技能),要么选择参加培训课程进行系统化学习。尽管机构提供的高学费水平确实带来了较大的挑战性(如经济压力),许多从业者仍难以突破自身能力的限制(即遇到技术瓶颈导致无法进一步提升)。

为了收集整理一份《2024年Android移动开发全套学习资料》,其初衷也非常简单——那就是希望帮助那些想通过自学来提升自己却又不知道该如何开始的朋友,并能让大家的学习负担稍微减轻一些。

img
img
img
img

涵盖针对新手打造的基础学习材料和为资深开发者精心编排的系统性拓展课程。全面覆盖包含超过95%的专业 Android 开发知识点,并且内容安排非常有条理!

因为文件较大, 仅提供部分目录截取展示, 每个节点中均包含大厂面经. 学习笔记. 源码讲义. 实战项目. 讲解视频, 并会持续更新

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注:Android)

总结

本文阐述了我的对Android开发现状的看法几点见解或许有人会对我的观点提出质疑但我坚信没有绝对正确的或错误的观点让时间来最终检验吧!期待与各位志同道合的朋友携手共进!

《互联网巨头公司面试真题解析、深入开发技术核心知识点讲解课程、完整讲解视频课程、实际项目案例代码解析》点击传送门即可获取相关资源!**

文件较大,在此仅展示部分目录的截图。每个节点内部均包含大厂面经、学习笔记、源码讲义、实战项目以及讲解视频等资源,并将不断补充和完善!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注:Android)

总结

我对Android开发现状的看法进行了阐述。或许有人会对我的观点提出异议,但我并不认为存在绝对正确的判断标准。我坚信时间会给出最公正的答案吧!希望与每一位坚守者共同进步!

《互联网巨头公司面试真题解析、进阶开发技术核心课程学习笔记、完整讲解视频课程、实战项目源代码分析讲义》点击传送门即可获取!

全部评论 (0)

还没有任何评论哟~