什么是多路复用?
把一条 TCP 连接看作单车道公路,多路复用就是在这条路上规划出虚拟车道,让多股数据 “车” 并行传输。
npc mux 解决的问题
nps 是一个自带管理后台的内网穿透项目,nps mux 是其中的多路复用实现,该项目实现了:
- 连接复用
- 滑动窗口流控
- 心跳保活
- 优先级队列
优先级队列
在高速公路上,右侧有一条应急车道,可以让特殊车辆快速行进。优先级队列也是类似解决这个问题的,为避免控制类指令堵塞。
priorityQueue 有高中低 3 个队列,结合防饥饿机制确保低优先级的数据流也能得到处理,底层采用无锁环形缓冲区链表。
- 高优先级,初始容量 4,处理消息类型有
muxPingFlag,muxPingReturn,设计意图确保心跳包能最快被处理,防止因网络拥堵导致的意外断连。 - 中优先级,初始容量 32,处理消息类型有
muxNewConn,muxNewConnOk,muxNewConnFail,设计意图优先处理连接建立的握手过程,提高新连接的响应速度。 - 低优先级,初始容量 256,通常是数据传输包,用于承载大流量的数据传输。
|
|
Push() 函数中负责根据消息类型进行优先级分配,来了消息通过 cond.Broadcast() 通知。
Pop() 函数逻辑稍微复杂一些:
-
最高优先级优先
-
中优先级检测是否满足条件
starving < 8,满足尝试消费,存在数据则递增starving。 -
如果上述都未命中,尝试消费低优先级队列,存在数据则
starving/2,处理低优先级数据会降低饥饿值,从而允许后续再次处理中优先级数据。 -
最后的中优先级尝试,当满足
starving>0时(说明之前因为额度限制跳过了中优先级队列) 且低级队列也没数据,则再次尝试消费中优先级。 -
以上的流程尝试两次都没有数据的情况下,则使用
cond.L.Lock()加锁和cond.Wait()等待,直到获取数据为止,这里用了一个巧妙的实现。1 2 3 4 5 6 7 8 9 10 11 12// 加锁 Self.cond.L.Lock() defer Self.cond.L.Unlock() // 再次尝试 for packager = Self.TryPop(); packager == nil; { if Self.stop { return } // 确实没有数据,等待 `Broadcast` Self.cond.Wait() packager = Self.TryPop() }
还有一个实现很有意思,执行两次。for range 2 和 以下方式都是表达执行两次。
|
|
滑动窗口
TCP 协议中的滑动窗口,由发送方维护一个"发送窗口",表示可以连续发送但尚未被确认的数据范围,接收方通过 TCP 报文头中的 16 位"窗口字段" 告诉发送方自己还能接收多少字节。每收到一个 ACK 窗口就向右滑动,允许发送新数据,若接收方缓冲区满(窗口=0),发送发暂停发送,但会定期发送 “窗口探测包” 询问状态。

TCP 的滑动窗口是全局的,只看整条路现在的拥堵情况。如果 A 货车在终点卸货太慢,堆积在出口,TCP 就会告诉收费站临时禁止通行,这时候 B,C 明明因为卸货很快,完全畅通,但是也必须被迫停下来。通过在多路复用上实现滑动窗口,可以精细化管理。
具体的交互流程如下:
- 建立连接时,会给对方一个初始的窗口大小,例如
4096*30 - 发送方每次发送数据前,先检测是否有配额,有则扣除配额,没有则调用
waitReceiveWindow()阻塞直到收到对方的"新配额通知" - 接收方
Read()将缓冲区里的数据独走后,接收窗口的缓冲区变空了,这时候重新计算新的剩余空间,并通知发送方。
心跳
从代码实现看,心跳函数中启动了两个协程,一个用于每 5 秒发送一次心跳,一个用于收到心跳则记录延迟,心跳包里面是有时间的,拿当前时间和心跳包里的发送时间做减法,就能知道延迟情况。
数据包协议
| flag(1B) | id(4B) | length(2B) | content(可变长度) |
|---|---|---|---|
| 包类型 | 连接 ID | 内容长度 | 实际数据 |
如果是窗口大小
| flag(1B) | id(4B) | windowSize(8B) | |
|---|---|---|---|
| 包类型 | 连接 ID | 窗口大小 |
包类型有
|
|
主要流程
写数据,从 writeQueue 优先级队列获取数据,对每个数据打包封装好 header,发送到对方,每个包切割成最大不超过 4096 个字节。
前面高速公路的举例其实不够恰当,这里再用一个快递包裹传送带的概念,在多路复用中,读写决不允许并发,否则包裹黏在一起(数据错乱),对每个数据打包,接收方根据 id 识别不同的包裹,根据长度识别包裹的大小(还要接收多少数据)。
读数据,启动两个协程,一个负责处理新连接,一个流程就复杂了。
- 首先尝试从连接中解析出完整的数据包,根据包的类型处理,新连接就丢到队列里,由前一个协程处理。心跳探测则立即响应。
- 如果包的 id 找不到,释放资源
- 业务数据写入接收者,ACK 则更新窗口大小,连接建立的确认与失败,连接关闭等,继续往细节看,业务消息分两种类型
muxNewMsgPart表示消息的部分内容muxNewMsg表示消息的完整内容,可作为消息的结束标志
一方使用 NewConn() 将会在连接上创建一个虚拟连接,将 ID : conn 键值对存储起来,当读取到该 ID 的消息,则写入 conn 的读缓冲区,外部发来的消息则写入 conn 的写缓冲区,conn 是实现了 net.Conn 接口的实例,对于调用者来说,几乎与控制单连接一样简单。