NPS 多路复用

什么是多路复用?

把一条 TCP 连接看作单车道公路,多路复用就是在这条路上规划出虚拟车道,让多股数据 “车” 并行传输。

npc mux 解决的问题

nps 是一个自带管理后台的内网穿透项目,nps mux 是其中的多路复用实现,该项目实现了:

  • 连接复用
  • 滑动窗口流控
  • 心跳保活
  • 优先级队列

优先级队列

在高速公路上,右侧有一条应急车道,可以让特殊车辆快速行进。优先级队列也是类似解决这个问题的,为避免控制类指令堵塞。

priorityQueue 有高中低 3 个队列,结合防饥饿机制确保低优先级的数据流也能得到处理,底层采用无锁环形缓冲区链表。

  • 高优先级,初始容量 4,处理消息类型有 muxPingFlag,muxPingReturn,设计意图确保心跳包能最快被处理,防止因网络拥堵导致的意外断连。
  • 中优先级,初始容量 32,处理消息类型有 muxNewConn, muxNewConnOk, muxNewConnFail,设计意图优先处理连接建立的握手过程,提高新连接的响应速度。
  • 低优先级,初始容量 256,通常是数据传输包,用于承载大流量的数据传输。
1
2
3
4
5
6
7
8
type priorityQueue struct {
    highestChain *bufChain // 高优先级链表
    middleChain  *bufChain // 中优先级链表
    lowestChain  *bufChain // 低优先级链表
    starving     uint8     // 饥饿计数器
    stop         bool      // 停止标志
    cond         *sync.Cond // 用于阻塞/唤醒消费者的条件变量
}

Push() 函数中负责根据消息类型进行优先级分配,来了消息通过 cond.Broadcast() 通知。

Pop() 函数逻辑稍微复杂一些:

  1. 最高优先级优先

  2. 中优先级检测是否满足条件 starving < 8,满足尝试消费,存在数据则递增 starving

  3. 如果上述都未命中,尝试消费低优先级队列,存在数据则 starving/2,处理低优先级数据会降低饥饿值,从而允许后续再次处理中优先级数据。

  4. 最后的中优先级尝试,当满足 starving>0 时(说明之前因为额度限制跳过了中优先级队列) 且低级队列也没数据,则再次尝试消费中优先级。

  5. 以上的流程尝试两次都没有数据的情况下,则使用 cond.L.Lock() 加锁和 cond.Wait() 等待,直到获取数据为止,这里用了一个巧妙的实现。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
    	// 加锁
      Self.cond.L.Lock()
    	defer Self.cond.L.Unlock()
      // 再次尝试
    	for packager = Self.TryPop(); packager == nil; {
    		if Self.stop {
    			return
    		}
        // 确实没有数据,等待 `Broadcast`
    		Self.cond.Wait()
    		packager = Self.TryPop()
    	}
    

还有一个实现很有意思,执行两次。for range 2 和 以下方式都是表达执行两次。

1
2
3
4
5
6
7
for {
		// 业务代码
		if iter {
			break
		}
		iter = true
	}

滑动窗口

TCP 协议中的滑动窗口,由发送方维护一个"发送窗口",表示可以连续发送但尚未被确认的数据范围,接收方通过 TCP 报文头中的 16 位"窗口字段" 告诉发送方自己还能接收多少字节。每收到一个 ACK 窗口就向右滑动,允许发送新数据,若接收方缓冲区满(窗口=0),发送发暂停发送,但会定期发送 “窗口探测包” 询问状态。

image-20251230165650971

TCP 的滑动窗口是全局的,只看整条路现在的拥堵情况。如果 A 货车在终点卸货太慢,堆积在出口,TCP 就会告诉收费站临时禁止通行,这时候 B,C 明明因为卸货很快,完全畅通,但是也必须被迫停下来。通过在多路复用上实现滑动窗口,可以精细化管理。

具体的交互流程如下:

  1. 建立连接时,会给对方一个初始的窗口大小,例如 4096*30
  2. 发送方每次发送数据前,先检测是否有配额,有则扣除配额,没有则调用 waitReceiveWindow() 阻塞直到收到对方的"新配额通知"
  3. 接收方 Read() 将缓冲区里的数据独走后,接收窗口的缓冲区变空了,这时候重新计算新的剩余空间,并通知发送方。

心跳

从代码实现看,心跳函数中启动了两个协程,一个用于每 5 秒发送一次心跳,一个用于收到心跳则记录延迟,心跳包里面是有时间的,拿当前时间和心跳包里的发送时间做减法,就能知道延迟情况。

数据包协议

flag(1B) id(4B) length(2B) content(可变长度)
包类型 连接 ID 内容长度 实际数据

如果是窗口大小

flag(1B) id(4B) windowSize(8B)
包类型 连接 ID 窗口大小

包类型有

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
const (
	muxPingFlag uint8 = iota   // 0: 心跳请求
	muxNewConnOk      // 1: 新连接确认
	muxNewConnFail    // 2: 新连接失败
	muxNewMsg         // 3: 完整消息
	muxNewMsgPart     // 4: 消息分片(大消息被分割)
	muxMsgSendOk      // 5: 窗口确认(告诉发送方可以继续发)
	muxNewConn        // 6: 创建新连接请求
	muxConnClose      // 7: 关闭连接
	muxPingReturn     // 8: 心跳响应
)

主要流程

写数据,从 writeQueue 优先级队列获取数据,对每个数据打包封装好 header,发送到对方,每个包切割成最大不超过 4096 个字节。

前面高速公路的举例其实不够恰当,这里再用一个快递包裹传送带的概念,在多路复用中,读写决不允许并发,否则包裹黏在一起(数据错乱),对每个数据打包,接收方根据 id 识别不同的包裹,根据长度识别包裹的大小(还要接收多少数据)。

读数据,启动两个协程,一个负责处理新连接,一个流程就复杂了。

  • 首先尝试从连接中解析出完整的数据包,根据包的类型处理,新连接就丢到队列里,由前一个协程处理。心跳探测则立即响应。
  • 如果包的 id 找不到,释放资源
  • 业务数据写入接收者,ACK 则更新窗口大小,连接建立的确认与失败,连接关闭等,继续往细节看,业务消息分两种类型
    • muxNewMsgPart 表示消息的部分内容
    • muxNewMsg 表示消息的完整内容,可作为消息的结束标志

一方使用 NewConn() 将会在连接上创建一个虚拟连接,将 ID : conn 键值对存储起来,当读取到该 ID 的消息,则写入 conn 的读缓冲区,外部发来的消息则写入 conn 的写缓冲区,conn 是实现了 net.Conn 接口的实例,对于调用者来说,几乎与控制单连接一样简单。

参考

nps mux github

Licensed under CC BY-NC-SA 4.0
本文阅读量 次, 总访问量 ,总访客数
Built with Hugo .   Theme Stack designed by Jimmy