昨天我们看完了附录
今天我们来实战了!
我把代码放github
上了,和文档给出的稍微有些不同,如果不想跟着敲,可以直接拉下来
web
服务器下面是实现这个web
服务器的计划
TCP
和HTTP
(八股文受害者:?)socket
监听TCP
链接HTTP
请求社区里已经有很多可直接用于生产环境的包,它们有的可以直接创建一个比我们的demo
更完善的项目框架。
这个项目可能也不是很符合真实开发场景,因为它会涉及到我们学到的大部分知识。
毕竟只是个demo用来验收学习知识的。
web server
在开始实现一个单线程web
服务器之前,我们来快速的了解下里面涉及到的协议。
对这一块早已熟悉到爆的大佬可以绕过了,以下内容仅提供给我这样的小白。
HTTP
是*Hypertext Transfer Protocol*
的缩写也就是超文本传输协议,而TCP
则是*Transmission Control Protocol*
即传输控制协议。
它俩都是*request-response protocol*
即请求-响应协议,这也就意味着客户端(client
)发起请求需要服务端(server
)监听这个请求并返回响应信息给客户端。请求的内容和响应的内容都由这些协议约定。
TCP
是低级协议(low-level protocol
),它描述信息如何从一端到另一端的细节,但是没有指定信息的内容。
HTTP
基于TCP
,定义请求和响应的内容。技术上来说,将HTTP
和其它协议搭配是有可能的,不过大多数场景中,HTTP
都是通过TCP
来传递数据。
我们将处理TCP
和HTTP
请求和响应的原始字节(raw bytes
)。
TCP
连接我们的web
服务器需要监听TCP
的链接,所以我们第一步就是这个。
标准库中提供了std::net
这个模块,它能帮助我们创建连接。
不过在这之前,我们需要创建一个新的项目。
然后进入到main.rs
中
我们调用std::net::TcpListener
监听了127.0.0.1
这个本机地址的7878
端口,现在可以接收TCP
流了。
当有TCP
流进入的时候我们的服务器会打印出Connection established
。
bind
这个函数在这个场景中类似new
,它会返回一个TcpListener
实例。不过在web
中还是叫bind
更贴切一些。它返回的实例是包裹在Result
中的,因为有可能因为端口已经被占用而创建监听失败。
我们调用unwrap
把里面的值拿出来,失败的话直接panic
在这里也是合理的。
incoming
这个方法返回一个迭代器,它包含一些列字节流,类型是TcpStream
。它代表着客户端和服务器建立了连接。
但这不是连接的全部,一个完整的连接还包含着响应以及连接的关闭。
同样,迭代器里的也都是用Result
包裹的,因为可能接收到的数据是错的,这个时候断开连接也是没问题的。
其中一种incoimg
会接收到错误的场景是我们建立的连接并不是真的建立成功了,而是connection attempts
即连接尝试。
Ok
,现在我们来cargo run
运行下。
然后浏览器访问locahost:7878
此时浏览器表示没法访问,那是因为我们并没有返回东西。
另外你应该注意到了这里会输出多个Connection established
,因为浏览器发送了多次,因为你的页面请求资源不可能只有一种。还有可能是浏览器retry
多次请求导致的。当stream
超出作用域之后,这次连接也就close
了。
request
信息接下来我们来读取来自浏览器请求的信息。这么做也体现了关注点分离的理念,连接是一回事,连接的内容又是另一回事。
我们声明了一个handle_connection
的函数,用来解析incoming stream
获取request
的信息。
我们引入了std::io::{ prelude::*, BufReader }
这俩module
,这样我们就能用io
的traits
和type
来读/写字节流。
然后我们创建了stream
的BufReader
实例。BufReader
通过为我们对std::io::Read
这个trait
的方法的调用管理来添加buffer
。(。。。我把原话发出来吧:BufReader
adds buffering by managing calls to the std::io::Read
trait methods for us.)
然后我们创建一个名为http_request
的变量,类型是Vec
,我们用它来存储request
的信息。_
占位符表示我们并不关心vector
里面元素的类型是什么。
BufReader
实现了std::io::BufRead
这个trait
,它提供了lines
这个方法。
这个lines
返回一个迭代器,迭代器的元素是Result
类型的,它们来自request
的stream
,然后被切割成一项一项的。切割的过程自然是有可能出错的,比如需要转换成字符串,如果这个数据不符合utf-8
的编码,那就会出问题了。
然后我们用map
迭代这个迭代器,用unwrap
将Result
里的string
拿出来。
最后转换成一个集合,元素类型是String
。
浏览器表示HTTP
请求结束是通过在行的末尾加上两个换行符。
现在我们打印下请求的信息
(其实直接用dbg!()
把stream
打印出来也是这个效果,但是对于数据来说类型完全不同,会影响到后面使用)。
各浏览器的请求信息会有些许不同,但是大致还是一样的。
HTTP
请求进一步分析HTTP
是基于文本的协议,所以一个请求有以下的格式
第一行是request line
,它带有客户端想请求的东西的信息。
然后这个request line
还可以分割成几部分:
method
,请求的方式,比如GET、POST
等,我们用的GET
,一般意味着向服务器请求数据信息。URI
,也就是*Uniform Resource Identifier*
, 翻译过来即统一资源标识符。也就是什么资源客户端想要的。这里插一嘴和它挺像的兄弟*Uniform Resource Locator(URL)*
也就是统一资源路径。他俩很像,但不完全一样。在HTTP
中的术语就叫URI
,所以我们在这里可以直接把URI
当作URL
。HTTP
协议的版本,比如HTTP1.1
或者HTTP2.0
等。(carriage return and line feed)CRLF sequence
*也就是回车和换行符序列。CRLF
是来自打字机时代的术语。它也可以直接写成\r\n
。它是用来分割请求行用的。然后回看下我们request
第一行的数据,method
是GET
, URI
是/
,version
是HTTP/1.1
。
在request line
下面从Host:
开始的是headers
也就是俗话中的请求头。
然后就没了,我们是GET
请求,没有请求的body
也就是俗话中的请求体。
现在我们已经可以知道浏览器这次请求想要什么东西了,然后我们该返回对应的信息了。
来而不往非礼也~
不过在这之前,我们得知道response
的格式
第一行是status line
,它包含下面几部分
HTTP
的版本200/404/502
等,表示请求的结果reason phrase
即原因短语,一小段文本描述这个状态码。CRLF
接着就是headers
即响应头,它也用CRLF
作为结束的标识符。
最后就是message-body
即响应体。
来看一个非常小的response
没了,只有响应的状态行,没有响应头和响应体。
我们来改下我们的handle_connection
方法,让它发送响应信息
我们调用as_bytes
把响应信息从字符串转换成字节流。然后调用stream
自身的write_all
方法,它接收u8
类型的切片引用,然后将字节流发送出去。由于发送的过程可能会出错,所以我们用unwrap
兜底,而实际开发我们应该对错误信息进行分析而不是直接panic
。
然后我们来重新运行下代码
现在不再是无法访问了。
可以看到我们的response
确实获取到了。
html
现在我们的页面还是空空的,没有内容的,我们应该返回html
。让我们在根目录创建一个index.html
文件
注意这里不要放在src
下。
然后我们来修改代码,让我们的html
可以作为响应内容返回。
突击检查:我们之前是怎么获取文件的内容的?
答:用fs::read_to_string
这个方法,它返回一个Result
的类型。
读取完文件内容之后,我们还需要拼接到响应体里。
我们用format!
这宏直接拼接到一起,这样方便点(突然好想念前端的${}
写法)。
另外响应头需要指明响应体也就是内容的长度Content-Length
,确保我们的响应是有效的。
现在让我们重新运行下
现在返回的内容我们就拿到了。
不过目前我们并没有对请求的信息进行分析,而是直接返回html
。
也就是说当你输入localhost:7878/xxx
又或者localhost:7878/aaa
时,返回的内容都是一样的。
这显然是不符合我们的预期的,我们希望能在/
时返回html
,但是在/xxx
或者/aaa
的时候返回其他内容。
我们接下来就来实现这一块功能。
request
并选择性响应我们之前已经知道请求想要什么东西是放在URI
里的,所以我们可以根据它来实现选择性返回。
这里我直接用split_ascii_whitespace
来分割request_line
,然后获取第二个元素也就是URI
的所有权,然后用来match
对应的路径。
然后我们来重新运行下
然后我们再来拓展下404
场景,一般404
我们会返回一个404
页面。
根目录创建404.html
然后我们再来修改下代码,支持404
页面。
现在我们来重新运行下代码
这样就ok
啦
可以看到上面我们的status_line
和res_body
实际上是有重复部分的(这里指的是官网给的例子,我自己写的其实也没差多少)。
我们来稍微重构下代码减少这块的重复量。
我们去掉了match
,因为我们只有两个场景,/
和404
,然后我们把status_line
和文件名都用元组返回,这样减少了fs::read_to_string
的重复编写以及status_line
的覆盖。
不过这么看确实优雅了一些。
现在我们的项目就差不多完成了,是时候拓展成多线程的了。
我们的项目目前还是单线程的,所以如果一次有多个请求存在,那么它只能按顺序一个一个响应。
量少还好说,如果量大了就难受了。(nodejs
好一些,但是量大了也顶不住)
所以为了提升用户体验感,我们来把项目拓展成多线程的。
不过在这之前,我们得复现一下场景
那要怎么做呢?
很简单,多一个匹配路径sleep
,匹配到就暂停5
秒。
这不还是改回match
了么,害~
现在访问localhost:7878/sleep
会慢五秒再展示html
内容。
当访问/sleep
的之后立即访问/
,也是还会延迟5
秒的,因为目前还是单线程,上一次请求的处理会阻塞后面请求的处理。
线程池指的是一堆空闲的子线程,它们随时都可以被用来执行任务。当任务来了,程序会随便挑一个他们之中的一个来执行,当执行完了之后那个线程就会回归线程池。
当然,线程不是越多越好的,我们的项目只搞几个就好。真实项目中如果线程太多,容易被DoS
攻击。
另外我们还需要搞个队列(queue
),用来存放需要响应的请求,为什么要这么做呢?
因为我们的线程池有可能被占用完了而请求还有一堆,这个时候就得有个东西存储它们了,并且得是按顺序的,不然客户端可能会出错,等到有线程空闲之后再按顺序分配请求,先进先出可不就是队列吗?
线程池只是一种改善吞吐量的方案,还有其它方案比如:fork/join model
、single-threaded async I/O model
、multi-threaded async I/O model
等。
扯远了,回到我们的项目中。
不过在这之前,我们得设计下多线程的代码。
我们使用类似测试驱动开发的编译器驱动开发模式,先写结果,然后由编译器报错一步步引导实现过程。
不过在这之前的之前,我们还得来探索一个我们这项目用不到的技术,它是后面一切的起点。
现在我们先来创建线程,不考虑线程个数等,实现了即可。
目前是有几个请求就有几个线程。
现在访问/sleep
的之后立即访问/
或者404
是不会有5
秒的请求的。
另外请不要测试当前代码太多次,不然操作系统容易崩溃。
rust
标准库里没有提供ThreadPool
相关的, 我们得自己写或者引用大佬们的依赖。
不过目前我们是编译器驱动开发,所以我们先来实现结果。
ThreadPool
创建一个线程池,线程个数限制为4
个。
然后线程池实例调用execute
把请求存放到队列中,然后队列再分配给闲置的线程。
ThreadPool
编译器目前提示我们没有找到ThreadPool
这东西,所以我们下一步就是实现它。
我们先创建lib.rs
文件,我们的ThreadPool
准备放到lib.rs
中,这样到时候可以打包成library crate
给其他项目直接使用。
创建完后我们来分析下,这个ThreadPool
目前有一个new
的关联函数,它接收一个整数,返回一个线程池实例。
然后它还有一个execute
的method
,它接受一个闭包并且会存储到线程池的队列中。有空闲的线程时才会分配。不过队列这一步我们先不实现,先写一个简单的即可。
先来写个框架
new
就不说了,execute
为什么会是一个FnOnce() + Send + 'static
呢?
因为它长得像thread::spawn
,都是接收闭包并且多线程。
来看下thread::spawn
的源码类型
看不懂正常,我也看不懂~ 不过类型我们认识。
FnOnce
是闭包实现的trait
的最基本要求,它并不需要改变数据或者拥有数据的所有权,我们只是想执行这个闭包并且只会执行一次。
另外我们这里是FnOnce()
而不是FnOnce
,表示这个闭包没有东西返回,是个空元组。如果一个函数指针是有参数限制的,它的类型限制大概是这样的fn(param: type) -> return_type
。
Send
就更不用说了,从一个线程传递数据到另一个线程就一定要先实现Send
这个trait
才行,这样才安全。
至于'static
。。。spawn
使用'static
是因为它返回的内容和外部是没有关联的,但是他又要返回出去(为了让主线程等待子线程执行完成),这样编译器就会报错:悬浮指针,所以得用'static
来标记它。
既然spawn
这么做,那我们也顺便保留了,因为我们可能也得用JoinHandle
包裹内容来保证主线程等待子线程执行完成之后再结束线程。
现在我们的代码已经不会报错了。
可以运行cago check
来看下编译器还有没有报错。
new
的参数我们前面说线程的个数要限制,但是实际开发也得限制范围,负数那当然不可以。
直接用assert!
给它断言。
顺便写下文档注释
另外我们也可以把错误传递出去,不过我们这里没必要,因为如果线程创建个数有问题,那就是开发者自己参数传递过来有问题,是开发者的问题。
既然是线程池有多个线程,那就得有个空间来存储线程,另外这些线程自然得是内部私有的。
还记得spawn
的JoinHandle
么?
我们是通过thread::spawn
来创建线程的,所以自然拿到的线程就是JoinHandle
类型的。
JoinHandle
的泛型类型是()
也就是空元组,因为我们的闭包并没有返回值。
with_capacity
和Vec::new
的功能差不多,但是它比new
多了一点就是预分配内存空间,因为我们的size
是已知的,所以直接预分配空间能减少runtime
的损耗。
worker struct
负责将代码从线程池发送给线程我们上面确定了threads
的类型,但是我们并没有实现线程的创建,为什么呢?因为thread::spawn
需要我们传入闭包在它初始化的时候,而new
创建实例的过程我们是没有任何的任务的,而任务来自execute
这个方法,那这个时候就没法创建线程实例了。
这个时候就轮到Worker
这一种数据结构登场了。
Worker
会收集task
,然后在Worker's
的线程中执行这个task
。
不过这也就意味着我们的threads
类型需要改变了。
直接来看下改动之后的代码
我们创建了一个Worker
的struct
,它有俩字段,一个是id
,一个是线程,它有一个关联函数new
,会创建一个worker
实例。
我们在ThreadPool
的new
关联函数中创建worker
实例。
线程池的threads
改成workers
,每个worker
存储一个单独的thread
。
实际上我们还是没解决spawn
会立即执行的问题,这个点不着急,接下来就来实现这个。
channels
将请求传递给线程现在我们来实现将闭包传递给worker
的线程这一过程。
还记得之前学的多线程里的channel
吗?通过这种方法,我们可以安全的在多个线程之间传递数据。
我们把channel
当做队列来使用,通过它我们可以把闭包传递给worker
里的线程。
我们准备将channel
的发射器放在ThreadPool
里面,当execute
被调用的时候就可以触发这个发射器。
而接收器自然放在worker
里,然后我们遍历workers
,如果存在worker
是空闲的,我们就可以将闭包发送给这个线程。
老规矩先上代码
我们先是拓展了ThreadPool
的字段,新增一个sender
。
这个sender
自然是channel
的发射器,它传递的数据类型是Job
,而这个Job
是准备用来存储我们的闭包的。
接下来我们来实现接收器相关的
但是在这之前,我们有一个问题需要解决:只能有一个receiver
。
如果按照上面的想法直接把接收器放到worker
里则会引起报错
上面的代码我们把接收器包裹到线程的闭包里,这样到时接收器就能直接调用闭包。
但是这样是会引起报错的
它只有一个,也没有实现Copy
这个trait
,那么它就只能有一个,而线程闭包里引用数据又必须是拥有所有权的,这就冲突了。
另外这里还有一个问题,那就是接收器重复触发的问题。
那么该怎么处理只能有一个接收器的问题呢?
聪明的你应该已经想起来我们有可以创建多个拥有者的类型和创建不可变数据的可变引用的类型。
也就是Arc
和Mutex
搭配的Arc>
,即多线程中允许存在多个可变引用。
我们来改下代码
现在就不会报错了
execute
话不多说,直接上代码
没什么好说的,调用sender
发送闭包给receiver
。
注意我们这里把Job
的类型改成了Box
。
然后我们再来实现receiver
那边的代码
我们的lock
解决了另一个问题,send
广播重复接收问题,这个我们没法避免,但是我们可以限制执行,通过lock
和unlock
获取锁的权限。
然后注意这里的loop
,我们没有办法阻止spawn
里的闭包的立即执行,但是我们可以通过loop
让它停不下来~。
这里就解决了我们之前遗漏下来的问题:spawn
接收的闭包会立即执行,并不能等到闭包的出现再执行。
接着我们调用receiver.lock
先尝试获取锁访问权限,然后unwrap
没问题再接收recv
。
当然,也不是每次都能正常接收到数据的,所以我们还需要unwrap
兜底(实际上也不应该是unwrap
,需要特具体分析)。
注意这里不能用while let
来循环,因为这里的job
不会自己Drop
,只有while let
结束之后才会drop
。
所以这里锁一直没有解开,那其他线程也就没办法处理请求了。
现在这一块代码完成了,整合下
我们来运行下
测试正常。
shutdown
)和清理(cleanup
)目前我们的代码已经完成了,接下来我们来优化下一些东西。
比如我们线程结束使用的是ctrl + c
大法,这样虽然能立即结束所有线程,但是并不能保证线程里的任务已经执行完毕了,这很不优雅并且这种情况在实际开发中已经算是bug
了。
我们来优化下
ThreadPool
实现Drop
我们给ThreadPool
实现Drop
这个trait
,当它被drop
的时候得先保证它的worker
们的线程都执行完才行。
还记得我们的JoinHandle
类型么,我们可以调用它的join
方法来保证主线程等待子线程执行完再结束。
那么按照我们的想法,我们来实现它
我们遍历了它,等到pool
被drop
的时候,我们遍历workers
,给它们的thread
调用join
。
看着好像没问题,但是这样是错的。
join
方法需要线程的所有权才行,但是我们这里只是引用。
那么要怎么拿到线程的所有权呢?
还记得之前我们用过Option
的take
方法吗?
我们通过take
方法把Some(T)
的数据拿了出来并让它变成了None
。
我们来改下
我们用Option
包裹worker.thread
,然后在drop
方法中解构,这样就拿到线程的所有权了。
当线程被take
出来后,原来worker
里的thread
就变成None
了,然后在当前的for
作用域结束之后这个thread
变量也会被drop
掉,剩下的就是等待线程都执行完被drop
掉了。
job
的信号上面的代码实际上还有个大问题,就是线程们实际上不会被结束,为什么呢?
因为我们用的是循环并且没有结束条件,所以它会一直循环持续下去。
这就是一个严重的bug
了。
我们来解决这个问题。
还记得我们给receiver
用的recv
方法后面加了一个unwrap
吗?这个时候就派上用场了。
我们可以借用这个错误处理来处理掉线程,然后退出循环。
那么要怎么触发这个错误呢?那最简单的自然就是channel
被close
。
还记得之前学的么?当channel
两端任意一方被drop
了之后,这个channel
就close
了。
我们用Option
包裹sender
,这样就能拿到所有权,然后在drop
的时候把发射器drop
掉,这样receiver
也就被drop
了。
这个drop
方法是标准库里提供的手动清除方法。
as_ref
是用来转换&Option
变成Option<&T>
的。
改完之后就能退出循环了。但是还有个问题,现在我们的代码会直接panic
。
所以我们得把recv
的unwrap
方法替换成match
,当match
到非Ok
的情况直接break
即可。
现在就可以啦,我们来模拟下。
我们给incoming
后面加上了take(2)
表示迭代器只接收前两个就退出作用域了。
然后我们来运行下
可以看到执行了两次之后线程们都被杀死了。
那么我们这个demo
就完成了,不过还有些地方可以优化下的:
library
增加tests
library
增加文档注释unwrap
编辑于 2023-01-15 16:31・IP 属地广东