坏蛋Dan
知乎@坏蛋Dan
发布时间:2024.1.4

前言

昨天我们看完了附录

坏蛋Dan:rust基础学习--day38

今天我们来实战了!


我把代码放github上了,和文档给出的稍微有些不同,如果不想跟着敲,可以直接拉下来

多线程web服务器

下面是实现这个web服务器的计划

  1. 了解TCPHTTP(八股文受害者:?)
  2. 基于socket监听TCP链接
  3. 解析一部分HTTP请求
  4. 创建适当的响应内容
  5. 使用线程池来改善我们服务器的“吞吐量”

社区里已经有很多可直接用于生产环境的包,它们有的可以直接创建一个比我们的demo更完善的项目框架。

这个项目可能也不是很符合真实开发场景,因为它会涉及到我们学到的大部分知识。

毕竟只是个demo用来验收学习知识的。

创建一个单线程web server

在开始实现一个单线程web服务器之前,我们来快速的了解下里面涉及到的协议。

对这一块早已熟悉到爆的大佬可以绕过了,以下内容仅提供给我这样的小白。

HTTP*Hypertext Transfer Protocol*的缩写也就是超文本传输协议,而TCP则是*Transmission Control Protocol*即传输控制协议。

它俩都是*request-response protocol*即请求-响应协议,这也就意味着客户端(client)发起请求需要服务端(server)监听这个请求并返回响应信息给客户端。请求的内容和响应的内容都由这些协议约定。

TCP是低级协议(low-level protocol),它描述信息如何从一端到另一端的细节,但是没有指定信息的内容。

HTTP基于TCP,定义请求和响应的内容。技术上来说,将HTTP和其它协议搭配是有可能的,不过大多数场景中,HTTP都是通过TCP来传递数据。

我们将处理TCPHTTP请求和响应的原始字节(raw bytes)。


监听TCP连接

我们的web服务器需要监听TCP的链接,所以我们第一步就是这个。

标准库中提供了std::net这个模块,它能帮助我们创建连接。

不过在这之前,我们需要创建一个新的项目。

然后进入到main.rs

我们调用std::net::TcpListener监听了127.0.0.1这个本机地址的7878端口,现在可以接收TCP流了。

当有TCP流进入的时候我们的服务器会打印出Connection established

bind这个函数在这个场景中类似new,它会返回一个TcpListener实例。不过在web中还是叫bind更贴切一些。它返回的实例是包裹在Result中的,因为有可能因为端口已经被占用而创建监听失败。

我们调用unwrap把里面的值拿出来,失败的话直接panic在这里也是合理的。

incoming这个方法返回一个迭代器,它包含一些列字节流,类型是TcpStream。它代表着客户端和服务器建立了连接。

但这不是连接的全部,一个完整的连接还包含着响应以及连接的关闭。

同样,迭代器里的也都是用Result包裹的,因为可能接收到的数据是错的,这个时候断开连接也是没问题的。

其中一种incoimg会接收到错误的场景是我们建立的连接并不是真的建立成功了,而是connection attempts即连接尝试。

Ok,现在我们来cargo run运行下。

然后浏览器访问locahost:7878

此时浏览器表示没法访问,那是因为我们并没有返回东西。

另外你应该注意到了这里会输出多个Connection established,因为浏览器发送了多次,因为你的页面请求资源不可能只有一种。还有可能是浏览器retry多次请求导致的。当stream超出作用域之后,这次连接也就close了。


读取request信息

接下来我们来读取来自浏览器请求的信息。这么做也体现了关注点分离的理念,连接是一回事,连接的内容又是另一回事。

我们声明了一个handle_connection的函数,用来解析incoming stream获取request的信息。

我们引入了std::io::{ prelude::*, BufReader }这俩module,这样我们就能用iotraitstype来读/写字节流。

然后我们创建了streamBufReader实例。BufReader通过为我们对std::io::Read这个trait的方法的调用管理来添加buffer。(。。。我把原话发出来吧:BufReader adds buffering by managing calls to the std::io::Read trait methods for us.)

然后我们创建一个名为http_request的变量,类型是Vec,我们用它来存储request的信息。_占位符表示我们并不关心vector里面元素的类型是什么。

BufReader实现了std::io::BufRead这个trait,它提供了lines这个方法。

这个lines返回一个迭代器,迭代器的元素是Result类型的,它们来自requeststream,然后被切割成一项一项的。切割的过程自然是有可能出错的,比如需要转换成字符串,如果这个数据不符合utf-8的编码,那就会出问题了。

然后我们用map迭代这个迭代器,用unwrapResult里的string拿出来。

最后转换成一个集合,元素类型是String

浏览器表示HTTP请求结束是通过在行的末尾加上两个换行符。

现在我们打印下请求的信息

(其实直接用dbg!()stream打印出来也是这个效果,但是对于数据来说类型完全不同,会影响到后面使用)。

各浏览器的请求信息会有些许不同,但是大致还是一样的。


HTTP请求进一步分析

HTTP是基于文本的协议,所以一个请求有以下的格式

第一行是request line,它带有客户端想请求的东西的信息。

然后这个request line还可以分割成几部分:

  1. method,请求的方式,比如GET、POST等,我们用的GET,一般意味着向服务器请求数据信息。
  2. 请求的URI,也就是*Uniform Resource Identifier*, 翻译过来即统一资源标识符。也就是什么资源客户端想要的。这里插一嘴和它挺像的兄弟*Uniform Resource Locator(URL)*也就是统一资源路径。他俩很像,但不完全一样。在HTTP中的术语就叫URI,所以我们在这里可以直接把URI当作URL
  3. HTTP协议的版本,比如HTTP1.1或者HTTP2.0等。
  4. *(carriage return and line feed)CRLF sequence*也就是回车和换行符序列。CRLF是来自打字机时代的术语。它也可以直接写成\r\n。它是用来分割请求行用的。

然后回看下我们request第一行的数据,methodGET, URI/versionHTTP/1.1

request line下面从Host:开始的是headers也就是俗话中的请求头。

然后就没了,我们是GET请求,没有请求的body也就是俗话中的请求体。


实现响应

现在我们已经可以知道浏览器这次请求想要什么东西了,然后我们该返回对应的信息了。

来而不往非礼也~

不过在这之前,我们得知道response的格式

第一行是status line,它包含下面几部分

  1. HTTP的版本
  2. 状态码,比如200/404/502等,表示请求的结果
  3. reason phrase即原因短语,一小段文本描述这个状态码。
  4. CRLF

接着就是headers即响应头,它也用CRLF作为结束的标识符。

最后就是message-body即响应体。

来看一个非常小的response

没了,只有响应的状态行,没有响应头和响应体。

我们来改下我们的handle_connection方法,让它发送响应信息

我们调用as_bytes把响应信息从字符串转换成字节流。然后调用stream自身的write_all方法,它接收u8类型的切片引用,然后将字节流发送出去。由于发送的过程可能会出错,所以我们用unwrap兜底,而实际开发我们应该对错误信息进行分析而不是直接panic

然后我们来重新运行下代码

现在不再是无法访问了。

可以看到我们的response确实获取到了。


返回html

现在我们的页面还是空空的,没有内容的,我们应该返回html。让我们在根目录创建一个index.html文件

注意这里不要放在src下。

然后我们来修改代码,让我们的html可以作为响应内容返回。

突击检查:我们之前是怎么获取文件的内容的?

答:用fs::read_to_string这个方法,它返回一个Result的类型。

读取完文件内容之后,我们还需要拼接到响应体里。

我们用format!这宏直接拼接到一起,这样方便点(突然好想念前端的${}写法)。

另外响应头需要指明响应体也就是内容的长度Content-Length,确保我们的响应是有效的。

现在让我们重新运行下

现在返回的内容我们就拿到了。

不过目前我们并没有对请求的信息进行分析,而是直接返回html

也就是说当你输入localhost:7878/xxx又或者localhost:7878/aaa时,返回的内容都是一样的。

这显然是不符合我们的预期的,我们希望能在/时返回html,但是在/xxx或者/aaa的时候返回其他内容。

我们接下来就来实现这一块功能。


校验request并选择性响应

我们之前已经知道请求想要什么东西是放在URI里的,所以我们可以根据它来实现选择性返回。

这里我直接用split_ascii_whitespace来分割request_line,然后获取第二个元素也就是URI的所有权,然后用来match对应的路径。

然后我们来重新运行下

然后我们再来拓展下404场景,一般404我们会返回一个404页面。

根目录创建404.html

然后我们再来修改下代码,支持404页面。

现在我们来重新运行下代码

这样就ok


稍微重构下代码

可以看到上面我们的status_lineres_body实际上是有重复部分的(这里指的是官网给的例子,我自己写的其实也没差多少)。

我们来稍微重构下代码减少这块的重复量。

我们去掉了match,因为我们只有两个场景,/404,然后我们把status_line和文件名都用元组返回,这样减少了fs::read_to_string的重复编写以及status_line的覆盖。

不过这么看确实优雅了一些。

现在我们的项目就差不多完成了,是时候拓展成多线程的了。


将单线程服务器转换成多线程服务器

我们的项目目前还是单线程的,所以如果一次有多个请求存在,那么它只能按顺序一个一个响应。

量少还好说,如果量大了就难受了。(nodejs好一些,但是量大了也顶不住)

所以为了提升用户体验感,我们来把项目拓展成多线程的。

不过在这之前,我们得复现一下场景


模拟一个慢请求

那要怎么做呢?

很简单,多一个匹配路径sleep,匹配到就暂停5秒。

这不还是改回match了么,害~

现在访问localhost:7878/sleep会慢五秒再展示html内容。

当访问/sleep的之后立即访问/,也是还会延迟5秒的,因为目前还是单线程,上一次请求的处理会阻塞后面请求的处理。


使用线程池改善吞吐量的问题

线程池指的是一堆空闲的子线程,它们随时都可以被用来执行任务。当任务来了,程序会随便挑一个他们之中的一个来执行,当执行完了之后那个线程就会回归线程池。

当然,线程不是越多越好的,我们的项目只搞几个就好。真实项目中如果线程太多,容易被DoS攻击。

另外我们还需要搞个队列(queue),用来存放需要响应的请求,为什么要这么做呢?

因为我们的线程池有可能被占用完了而请求还有一堆,这个时候就得有个东西存储它们了,并且得是按顺序的,不然客户端可能会出错,等到有线程空闲之后再按顺序分配请求,先进先出可不就是队列吗?

线程池只是一种改善吞吐量的方案,还有其它方案比如:fork/join modelsingle-threaded async I/O modelmulti-threaded async I/O model 等。

扯远了,回到我们的项目中。

不过在这之前,我们得设计下多线程的代码。

我们使用类似测试驱动开发的编译器驱动开发模式,先写结果,然后由编译器报错一步步引导实现过程。

不过在这之前的之前,我们还得来探索一个我们这项目用不到的技术,它是后面一切的起点。


给请求创建线程

现在我们先来创建线程,不考虑线程个数等,实现了即可。

目前是有几个请求就有几个线程。

现在访问/sleep的之后立即访问/或者404是不会有5秒的请求的。

另外请不要测试当前代码太多次,不然操作系统容易崩溃。


限制线程个数

rust标准库里没有提供ThreadPool相关的, 我们得自己写或者引用大佬们的依赖。

不过目前我们是编译器驱动开发,所以我们先来实现结果。

ThreadPool创建一个线程池,线程个数限制为4个。

然后线程池实例调用execute把请求存放到队列中,然后队列再分配给闲置的线程。


实现ThreadPool

编译器目前提示我们没有找到ThreadPool这东西,所以我们下一步就是实现它。

我们先创建lib.rs文件,我们的ThreadPool准备放到lib.rs中,这样到时候可以打包成library crate给其他项目直接使用。

创建完后我们来分析下,这个ThreadPool目前有一个new的关联函数,它接收一个整数,返回一个线程池实例。

然后它还有一个executemethod,它接受一个闭包并且会存储到线程池的队列中。有空闲的线程时才会分配。不过队列这一步我们先不实现,先写一个简单的即可。

先来写个框架

new就不说了,execute为什么会是一个FnOnce() + Send + 'static呢?

因为它长得像thread::spawn,都是接收闭包并且多线程。

来看下thread::spawn的源码类型

看不懂正常,我也看不懂~ 不过类型我们认识。

FnOnce是闭包实现的trait的最基本要求,它并不需要改变数据或者拥有数据的所有权,我们只是想执行这个闭包并且只会执行一次。

另外我们这里是FnOnce()而不是FnOnce,表示这个闭包没有东西返回,是个空元组。如果一个函数指针是有参数限制的,它的类型限制大概是这样的fn(param: type) -> return_type

Send就更不用说了,从一个线程传递数据到另一个线程就一定要先实现Send这个trait才行,这样才安全。

至于'static。。。spawn使用'static是因为它返回的内容和外部是没有关联的,但是他又要返回出去(为了让主线程等待子线程执行完成),这样编译器就会报错:悬浮指针,所以得用'static来标记它。

既然spawn这么做,那我们也顺便保留了,因为我们可能也得用JoinHandle包裹内容来保证主线程等待子线程执行完成之后再结束线程。

现在我们的代码已经不会报错了。

可以运行cago check来看下编译器还有没有报错。


校验new的参数

我们前面说线程的个数要限制,但是实际开发也得限制范围,负数那当然不可以。

直接用assert!给它断言。

顺便写下文档注释

另外我们也可以把错误传递出去,不过我们这里没必要,因为如果线程创建个数有问题,那就是开发者自己参数传递过来有问题,是开发者的问题。


存储线程的空间

既然是线程池有多个线程,那就得有个空间来存储线程,另外这些线程自然得是内部私有的。

还记得spawnJoinHandle么?

我们是通过thread::spawn来创建线程的,所以自然拿到的线程就是JoinHandle类型的。

JoinHandle的泛型类型是()也就是空元组,因为我们的闭包并没有返回值。

with_capacityVec::new的功能差不多,但是它比new多了一点就是预分配内存空间,因为我们的size是已知的,所以直接预分配空间能减少runtime的损耗。


一个worker struct负责将代码从线程池发送给线程

我们上面确定了threads的类型,但是我们并没有实现线程的创建,为什么呢?因为thread::spawn需要我们传入闭包在它初始化的时候,而new创建实例的过程我们是没有任何的任务的,而任务来自execute这个方法,那这个时候就没法创建线程实例了。

这个时候就轮到Worker这一种数据结构登场了。

Worker会收集task,然后在Worker's的线程中执行这个task

不过这也就意味着我们的threads类型需要改变了。

直接来看下改动之后的代码

我们创建了一个Workerstruct,它有俩字段,一个是id,一个是线程,它有一个关联函数new,会创建一个worker实例。

我们在ThreadPoolnew关联函数中创建worker实例。

线程池的threads改成workers,每个worker存储一个单独的thread

实际上我们还是没解决spawn会立即执行的问题,这个点不着急,接下来就来实现这个。


通过channels将请求传递给线程

现在我们来实现将闭包传递给worker的线程这一过程。

还记得之前学的多线程里的channel吗?通过这种方法,我们可以安全的在多个线程之间传递数据。

我们把channel当做队列来使用,通过它我们可以把闭包传递给worker里的线程。

我们准备将channel的发射器放在ThreadPool里面,当execute被调用的时候就可以触发这个发射器。

而接收器自然放在worker里,然后我们遍历workers,如果存在worker是空闲的,我们就可以将闭包发送给这个线程。

老规矩先上代码

我们先是拓展了ThreadPool的字段,新增一个sender

这个sender自然是channel的发射器,它传递的数据类型是Job,而这个Job是准备用来存储我们的闭包的。

接下来我们来实现接收器相关的

但是在这之前,我们有一个问题需要解决:只能有一个receiver

如果按照上面的想法直接把接收器放到worker里则会引起报错

上面的代码我们把接收器包裹到线程的闭包里,这样到时接收器就能直接调用闭包。

但是这样是会引起报错的

它只有一个,也没有实现Copy这个trait,那么它就只能有一个,而线程闭包里引用数据又必须是拥有所有权的,这就冲突了。

另外这里还有一个问题,那就是接收器重复触发的问题。

那么该怎么处理只能有一个接收器的问题呢?

聪明的你应该已经想起来我们有可以创建多个拥有者的类型和创建不可变数据的可变引用的类型。

也就是ArcMutex搭配的Arc>,即多线程中允许存在多个可变引用。

我们来改下代码

现在就不会报错了


实现execute

话不多说,直接上代码

没什么好说的,调用sender发送闭包给receiver

注意我们这里把Job的类型改成了Box

然后我们再来实现receiver那边的代码

我们的lock解决了另一个问题,send广播重复接收问题,这个我们没法避免,但是我们可以限制执行,通过lockunlock获取锁的权限。

然后注意这里的loop,我们没有办法阻止spawn里的闭包的立即执行,但是我们可以通过loop让它停不下来~。

这里就解决了我们之前遗漏下来的问题:spawn接收的闭包会立即执行,并不能等到闭包的出现再执行。

接着我们调用receiver.lock先尝试获取锁访问权限,然后unwrap没问题再接收recv

当然,也不是每次都能正常接收到数据的,所以我们还需要unwrap兜底(实际上也不应该是unwrap,需要特具体分析)。

注意这里不能用while let来循环,因为这里的job不会自己Drop,只有while let结束之后才会drop

所以这里锁一直没有解开,那其他线程也就没办法处理请求了。

现在这一块代码完成了,整合下

我们来运行下

测试正常。


优雅的关闭(shutdown)和清理(cleanup)

目前我们的代码已经完成了,接下来我们来优化下一些东西。

比如我们线程结束使用的是ctrl + c大法,这样虽然能立即结束所有线程,但是并不能保证线程里的任务已经执行完毕了,这很不优雅并且这种情况在实际开发中已经算是bug了。

我们来优化下

ThreadPool实现Drop

我们给ThreadPool实现Drop这个trait,当它被drop的时候得先保证它的worker们的线程都执行完才行。

还记得我们的JoinHandle类型么,我们可以调用它的join方法来保证主线程等待子线程执行完再结束。

那么按照我们的想法,我们来实现它

我们遍历了它,等到pooldrop的时候,我们遍历workers,给它们的thread调用join

看着好像没问题,但是这样是错的。

join方法需要线程的所有权才行,但是我们这里只是引用。

那么要怎么拿到线程的所有权呢?

还记得之前我们用过Optiontake方法吗?

我们通过take方法把Some(T)的数据拿了出来并让它变成了None

我们来改下

我们用Option包裹worker.thread,然后在drop方法中解构,这样就拿到线程的所有权了。

当线程被take出来后,原来worker里的thread就变成None了,然后在当前的for作用域结束之后这个thread变量也会被drop掉,剩下的就是等待线程都执行完被drop掉了。


向线程发送停止监听的job的信号

上面的代码实际上还有个大问题,就是线程们实际上不会被结束,为什么呢?

因为我们用的是循环并且没有结束条件,所以它会一直循环持续下去。

这就是一个严重的bug了。

我们来解决这个问题。

还记得我们给receiver用的recv方法后面加了一个unwrap吗?这个时候就派上用场了。

我们可以借用这个错误处理来处理掉线程,然后退出循环。

那么要怎么触发这个错误呢?那最简单的自然就是channelclose

还记得之前学的么?当channel两端任意一方被drop了之后,这个channelclose了。

我们用Option包裹sender,这样就能拿到所有权,然后在drop的时候把发射器drop掉,这样receiver也就被drop了。

这个drop方法是标准库里提供的手动清除方法。

as_ref是用来转换&Option变成Option<&T>的。

改完之后就能退出循环了。但是还有个问题,现在我们的代码会直接panic

所以我们得把recvunwrap方法替换成match,当match到非Ok的情况直接break即可。

现在就可以啦,我们来模拟下。

我们给incoming后面加上了take(2)表示迭代器只接收前两个就退出作用域了。

然后我们来运行下

可以看到执行了两次之后线程们都被杀死了。


总结

那么我们这个demo就完成了,不过还有些地方可以优化下的:

  • library增加tests
  • library增加文档注释
  • 优化错误场景,去掉unwrap
  • 使用第三方依赖替换我们的线程池

参考

  1. ^rust-multithreaded-web-server https://doc.rust-lang.org/book/ch20-00-final-project-a-web-server.html#final-project-building-a-multithreaded-web-server
  2. ^rust-creating-a-single-thread-web-server https://doc.rust-lang.org/book/ch20-01-single-threaded.html#building-a-single-threaded-web-server
  3. ^rust-listen-to-TCP-connections https://doc.rust-lang.org/book/ch20-01-single-threaded.html#listening-to-the-tcp-connection
  4. ^rust-reading-Request https://doc.rust-lang.org/book/ch20-01-single-threaded.html#reading-the-request
  5. ^rust-a-closer-to-HTTP-request https://doc.rust-lang.org/book/ch20-01-single-threaded.html#a-closer-look-at-an-http-request
  6. ^rust-writing-response https://doc.rust-lang.org/book/ch20-01-single-threaded.html#writing-a-response
  7. ^rust-returning-real-html https://doc.rust-lang.org/book/ch20-01-single-threaded.html#returning-real-html
  8. ^rust-validating-request-and-selectivly-response https://doc.rust-lang.org/book/ch20-01-single-threaded.html#validating-the-request-and-selectively-responding
  9. ^rust-a-touch-of-refactors https://doc.rust-lang.org/book/ch20-01-single-threaded.html#a-touch-of-refactoring
  10. ^rust-turn-out-single-threaded-sever-into-multithreaded-server https://doc.rust-lang.org/book/ch20-02-multithreaded.html#turning-our-single-threaded-server-into-a-multithreaded-server
  11. ^rust-simulate-slow-request-in-current-server-implementation https://doc.rust-lang.org/book/ch20-02-multithreaded.html#simulating-a-slow-request-in-the-current-server-implementation
  12. ^rust-improve-throughput-with-thread-pool https://doc.rust-lang.org/book/ch20-02-multithreaded.html#improving-throughput-with-a-thread-pool
  13. ^rust-spawning-a-thread-to-request https://doc.rust-lang.org/book/ch20-02-multithreaded.html#spawning-a-thread-for-each-request
  14. ^rust-creating-finite-number-thread https://doc.rust-lang.org/book/ch20-02-multithreaded.html#creating-a-finite-number-of-threads
  15. ^rust-implement-ThreadPool https://doc.rust-lang.org/book/ch20-02-multithreaded.html#building-threadpool-using-compiler-driven-development
  16. ^rust-validating-associated-function-new-parameter https://doc.rust-lang.org/book/ch20-02-multithreaded.html#validating-the-number-of-threads-in-new
  17. ^rust-way-to-store-threads https://doc.rust-lang.org/book/ch20-02-multithreaded.html#creating-space-to-store-the-threads
  18. ^rust-a-worker-struct https://doc.rust-lang.org/book/ch20-02-multithreaded.html#a-worker-struct-responsible-for-sending-code-from-the-threadpool-to-a-thread
  19. ^rust-sending-requests-to-thread-via-channels https://doc.rust-lang.org/book/ch20-02-multithreaded.html#sending-requests-to-threads-via-channels
  20. ^rust-implement-execute-method https://doc.rust-lang.org/book/ch20-02-multithreaded.html#implementing-the-execute-method
  21. ^rust-graceful-shutdown-and-cleanup https://doc.rust-lang.org/book/ch20-03-graceful-shutdown-and-cleanup.html#graceful-shutdown-and-cleanup
  22. ^rust-implement-drop-trait-on-ThreadPool https://doc.rust-lang.org/book/ch20-03-graceful-shutdown-and-cleanup.html#implementing-the-drop-trait-on-threadpool
  23. ^rust-signal-to-threads-to-stop-listening-jobs https://doc.rust-lang.org/book/ch20-03-graceful-shutdown-and-cleanup.html#signaling-to-the-threads-to-stop-listening-for-jobs

编辑于 2023-01-15 16:31・IP 属地广东