概述
项目路径:https://github.com/skywind3000/kcp
代码大概不到两千行,主要文件是ikcp.h 和 ikcp.cpp。
test.h和test.cpp为测试代码。按照我自己的理解顺序做了排列
随机数生成器
// 均匀分布的随机数
class Random
{
public:
Random(int size) {
this->size = 0;
seeds.resize(size);
}
int random() {
int x, i;
if (seeds.size() == 0) return 0;
if (size == 0) {
for (i = 0; i < (int)seeds.size(); i++) {
seeds[i] = i;
}
size = (int)seeds.size();
}
i = rand() % size;
x = seeds[i];
seeds[i] = seeds[--size];
return x;
}
protected:
int size;
std::vector<int> seeds;
};
用法就是
Random R(100);
R.random();
不断地生成100内的随机数。至于均匀分布看代码也好理解。
比如丢包率10%,那门random() < 10 这个包就假装没收到
网络延迟模拟器
class LatencySimulator
{
public:
virtual ~LatencySimulator() {
clear();
}
// lostrate: 往返一周丢包率的百分比,默认 10%
// rttmin:rtt最小值,默认 60
// rttmax:rtt最大值,默认 125
LatencySimulator(int lostrate = 10, int rttmin = 60, int rttmax = 125, int nmax = 1000);
// 清除数据
void clear();
// 发送数据
// peer - 端点0/1,从0发送,从1接收;从1发送从0接收
void send(int peer, const void *data, int size);
// 接收数据
int recv(int peer, void *data, int maxsize);
public:
int tx1;
int tx2;
protected:
IUINT32 current;
int lostrate;
int rttmin;
int rttmax;
int nmax;
typedef std::list<DelayPacket*> DelayTunnel;
DelayTunnel p12;
DelayTunnel p21;
Random r12;
Random r21;
};
这个项目没有创建socket。而用了一个类来模拟网络。
typedef std::list<DelayPacket*> DelayTunnel; 这是一个list的定义
DelayTunnel p12, p21; 是两块内存
我们两端有两个连接对象互相发包。1发包到网络,往p12里面塞数据。2发包往p21塞数据。收报的时候2从p12那,1从p21拿。
如果要模拟丢包,那random一下,流程断掉即可。
其他成员:
current 是发包的时间戳
lostrate 丢包率,和random一起用。random() < lostrate 就丢包
rttmin,rttmax。rtt区间。random一个时间位于这之间,然后模拟收到包的时间戳。
nmax 表示list大小,超了也算丢包。用来测试拥塞控制的应该。
构造,析构,clear 不解释。核心主要是send和recv
发送数据
// 发送数据
// peer - 端点0/1,从0发送,从1接收;从1发送从0接收
void send(int peer, const void *data, int size) {
if (peer == 0) {
tx1++;
if (r12.random() < lostrate) return;
if ((int)p12.size() >= nmax) return;
} else {
tx2++;
if (r21.random() < lostrate) return;
if ((int)p21.size() >= nmax) return;
}
DelayPacket *pkt = new DelayPacket(size, data);
current = iclock();
IUINT32 delay = rttmin;
if (rttmax > rttmin) delay += rand() % (rttmax - rttmin);
pkt->setts(current + delay);
if (peer == 0) {
p12.push_back(pkt);
} else {
p21.push_back(pkt);
}
}
data里面的数据发向了网络,相当于socket->send。由于是模拟的,直接放到p12, p21 中。在这过程中要通过 random() < lostrate 和 size() >= nmax 模拟丢包。
另一端之间从内存里面拿包就假装收到了
并模拟计算这个包的发送时间,接收时间。
接收数据
// 接收数据
int recv(int peer, void *data, int maxsize) {
DelayTunnel::iterator it;
if (peer == 0) {
it = p21.begin();
if (p21.size() == 0) return -1;
} else {
it = p12.begin();
if (p12.size() == 0) return -1;
}
DelayPacket *pkt = *it;
current = iclock();
if (current < pkt->ts()) return -2;
if (maxsize < pkt->size()) return -3;
if (peer == 0) {
p21.erase(it);
} else {
p12.erase(it);
}
maxsize = pkt->size();
memcpy(data, pkt->ptr(), maxsize);
delete pkt;
return maxsize;
}
从网络中取数据,放到data中。都是一些合法性判断。
KCP主要流程和收发包
入口main,然后到了位于test.cpp 的 test(0)
// 测试用例
void test(int mode)
{
// 创建模拟网络:丢包率10%,Rtt 60ms~125ms
vnet = new LatencySimulator(10, 60, 125);
// 创建两个端点的 kcp对象,第一个参数 conv是会话编号,同一个会话需要相同
// 最后一个是 user参数,用来传递标识
ikcpcb *kcp1 = ikcp_create(0x11223344, (void*)0);
ikcpcb *kcp2 = ikcp_create(0x11223344, (void*)1);
kcp1->output = udp_output;
kcp2->output = udp_output;
// 。。。
}
LatencySimulator 是模拟网络
ikcpcb 是kcp对象的上下文,可以理解为客户端,服务器的两个连接。
ikcp_create 用于分配内存,两个参数是 IUINT32 conv, void *user。
conv是连接ID,相同代表两个客户端在互相发包。user在例子用的01是用于逻辑区分是kcp1还是kcp2的。
output作用是,KCP处理好包了,准备向网络发送数据了
// 模拟网络:模拟发送一个 udp包
int udp_output(const char *buf, int len, ikcpcb *kcp, void *user)
{
union { int id; void *ptr; } parameter;
parameter.ptr = user;
vnet->send(parameter.id, buf, len);
return 0;
}
这里的回调可以接到你的socket->send里面。这里是模拟网络,就接到了vnet里面。这里的user最后会变成上面模拟网络的peer。
决定数据塞到p12还是p21
中间一段设置初始值直接跳过,到while(1) 的地方。直接看收发包逻辑
首先我们要大概了解一下API
发包
- 发包
我们要发一个"Hello"的包,调用ikcp_send,把数据传给协议。会存到 ikcpcb 上下文对象里面的的 send_queue 里面。
再由 ikcp_update -> ikcp_flush 把send_queue的数据挪到send_buf 里面,然后发向网络
对用户来说就想这样
// 每隔 20ms,kcp1发送数据
for (; current >= slap; slap += 20) {
((IUINT32*)buffer)[0] = index++;
((IUINT32*)buffer)[1] = current;
// 发送上层协议包
ikcp_send(kcp1, buffer, 8);
}
具体一点 ikcp_send
//---------------------------------------------------------------------
// user/upper level send, returns below zero for error
//---------------------------------------------------------------------
int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
IKCPSEG *seg;
int count, i;
int sent = 0;
// 合法性判断
assert(kcp->mss > 0);
if (len < 0) return -1;
// 由于没搜到 stream 的赋值,所以这一块之间跳过
if (kcp->stream != 0) {
}
// 发了 buffer 数据 len 这么长,要按照mss的长度进行分包。else 里面是除法向上取整的写法
// udp的mtu一般是1500,超过就会分包。所以这里冗余一点mtu=1400,在减去kcp包头24就是mss
if (len <= (int)kcp->mss) count = 1;
else count = (len + kcp->mss - 1) / kcp->mss;
// 合法性验证,一次性发这么多包???
if (count >= (int)IKCP_WND_RCV) {
// 先跳过stream逻辑
return -2;
}
// 合法性验证
if (count == 0) count = 1;
// 数据包分片
for (i = 0; i < count; i++) {
int size = len > (int)kcp->mss ? (int)kcp->mss : len;
seg = ikcp_segment_new(kcp, size); // 创建一个数据包
assert(seg);
if (seg == NULL) {
return -2;
}
if (buffer && len > 0) {
memcpy(seg->data, buffer, size); // 拷贝数据到数据包里面
}
seg->len = size;
// 分包的时候,加上5个包,frg会是例如43210,最后收到0的时候,就相当于一整个包收完了
seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;
// 这个是C语言的队列写法,把数据包seg塞到snd_queue的末尾,然后数量++
iqueue_init(&seg->node);
iqueue_add_tail(&seg->node, &kcp->snd_queue);
kcp->nsnd_que++;
// 指针偏移,继续分下一个包
if (buffer) {
buffer += size;
}
len -= size;
sent += size;
}
return sent;
}
KCP要求我们一定要在tick里面调用update。iclock是毫秒时间戳
ikcp_update(kcp1, iclock());
ikcp_update(kcp2, iclock());
这里面核心逻辑是为了调用 ikcp_flush。其他的逻辑都是为了计算 ikcp_flush 的调用时机
void ikcp_update(ikcpcb *kcp, IUINT32 current)
{
IINT32 slap;
kcp->current = current;
// 是否第一次调用 ikcp_update
if (kcp->updated == 0) {
kcp->updated = 1;
kcp->ts_flush = kcp->current;
}
// 计算距离上一次调用 ikcp_flush的时间差
slap = _itimediff(kcp->current, kcp->ts_flush);
if (slap >= 10000 || slap < -10000) {
kcp->ts_flush = kcp->current;
slap = 0;
}
// 每间隔 interval 这么长时间调用一次 ikcp_flush。默认100ms
if (slap >= 0) {
kcp->ts_flush += kcp->interval;
if (_itimediff(kcp->current, kcp->ts_flush) >= 0) {
kcp->ts_flush = kcp->current + kcp->interval;
}
ikcp_flush(kcp);
}
}
至于 ikcp_flush有点复杂。对于发送来说,处理了数据从 send_queue 拷贝到send_buf。又发向网络。
后面再看说吧
收包
ikcp_input
从网络中取了 buffer 的数据,hr 大于0为收到的字节数
// 处理虚拟网络:检测是否有udp包从p1->p2
while (1) {
hr = vnet->recv(1, buffer, 2000);
if (hr < 0) break;
// 如果 p2收到udp,则作为下层协议输入到kcp2
ikcp_input(kcp2, buffer, hr);
}
在看ikcp_input之前,先过一下包体结构
//=====================================================================
// SEGMENT
//=====================================================================
struct IKCPSEG
{
struct IQUEUEHEAD node;
IUINT32 conv; // 连接ID
IUINT32 cmd; // 包的类型标志位 IKCP_CMD_PUSH 【普通数据包】 IKCP_CMD_ACK【ACK包】 IKCP_CMD_WASK【问对方接收窗口多大】 IKCP_CMD_WINS 【回应接收窗口多大】
IUINT32 frg; // 数据包分片序号 10000字节,每次发1000字节,分10片. 那这个就是 9, 8, 7, 6, 5, 4, 3, 2, 1, 0。等于0的时候表示包都到了
IUINT32 wnd; // 接收窗口大小,用于流量控制
IUINT32 ts; // 时间戳
IUINT32 sn; // 包序号
IUINT32 una; // 如果A->B发了个包,B->A发ACK,这个ACK也可能会丢,那么A->B还会重发,就造成了重复包。所以发送的是una, 比如123456,我收到了1234,那么我就发ACK 4,表示1234之前的都收到了。具体解析看下面代码
IUINT32 len; // 下面data的长度
IUINT32 resendts; // 重发时间戳
IUINT32 rto; // 超时重传时间
IUINT32 fastack; // 快速重传。比如发包123456,我收到了23包,1还没到,那1就被过了两次,累计到阈值就会触发快速重传
IUINT32 xmit; // 发送次数
char data[1];
};
//---------------------------------------------------------------------
// input data
//---------------------------------------------------------------------
int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
IUINT32 prev_una = kcp->snd_una;
IUINT32 maxack = 0, latest_ts = 0;
int flag = 0;
// 合法性判断
if (ikcp_canlog(kcp, IKCP_LOG_INPUT)) {
ikcp_log(kcp, IKCP_LOG_INPUT, "[RI] %d bytes", (int)size);
}
// 合法性判断
if (data == NULL || (int)size < (int)IKCP_OVERHEAD) return -1;
while (1) {
IUINT32 ts, sn, len, una, conv;
IUINT16 wnd;
IUINT8 cmd, frg;
IKCPSEG *seg;
// 合法性判断
if (size < (int)IKCP_OVERHEAD) break;
// 开始解析KCP包头
data = ikcp_decode32u(data, &conv); // 连接ID
if (conv != kcp->conv) return -1;
data = ikcp_decode8u(data, &cmd);
data = ikcp_decode8u(data, &frg);
data = ikcp_decode16u(data, &wnd);
data = ikcp_decode32u(data, &ts);
data = ikcp_decode32u(data, &sn);
data = ikcp_decode32u(data, &una);
data = ikcp_decode32u(data, &len);
// 去掉包头后属下包的长度
size -= IKCP_OVERHEAD;
// 合法性判断
if ((long)size < (long)len || (int)len < 0) return -2;
// 合法性判断
if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS)
return -3;
kcp->rmt_wnd = wnd; // 每一个KCP包都会带上发送窗口的大小,设置当前上下文
// 如果A->B发了个包,B->A发单个ACK,这个ACK也可能会丢,那么A->B还会重发,就造成了重复包。
// 所以发送的是una, 比如发包123456,我收到了4的ACK,就认为1234之前的都收到了 ikcp_parse_una 做的就是把1234从send_buf里面去掉
// IKCPCB上下文维护了,send_una, snd_nxt, rcv_nxt 几个变量。snd_nxt每发一个包都会++,写在ikcp_flush里面
// 对于send_buf来说 【send_una】__【A】______【send_nxt】这是一个滑动窗口。如果收到一个A位置的ACK,那门send_una~A都收到了,send_una挪到a的位置,send_nxt是最新要发的包ID
// 对于rcv_queue .......【rcv_nxt】表示包已经收到了rcv_nxt这里的,那么发包的时候una=rcv_nxt。假设就是上一行的A位置。表示这段包收到了
ikcp_parse_una(kcp, una);
ikcp_shrink_buf(kcp); // 更新一下新的snd_una。由于上面send_una~A已经从队列删掉了,如果队列非空,snd_una就是队列头的ID,否则就是send_nxt
// 然后开始根据不同的包做不同的事情了
if (cmd == IKCP_CMD_ACK) {
// 。。。
}
else if (cmd == IKCP_CMD_PUSH) {
// 。。。
}
else if (cmd == IKCP_CMD_WASK) {
// 。。。
}
else if (cmd == IKCP_CMD_WINS) {
// 。。。
}
else {
return -3;
}
data += len;
size -= len;
}
// 快速重传和拥塞控制。。。
return 0;
}
ikcp_recv
用户从kcp协议里面取一个包出来
//---------------------------------------------------------------------
// user/upper level recv: returns size, returns below zero for EAGAIN
//---------------------------------------------------------------------
int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
{
struct IQUEUEHEAD *p;
int ispeek = (len < 0)? 1 : 0;
int peeksize;
int recover = 0;
IKCPSEG *seg;
assert(kcp);
if (iqueue_is_empty(&kcp->rcv_queue))
return -1;
if (len < 0) len = -len;
// 上面发包的时候,我们提到了分片,frg从543210,0代表整个包,这里根据这个规则,算出整个包的peeksize
peeksize = ikcp_peeksize(kcp);
if (peeksize < 0)
return -2;
if (peeksize > len)
return -3;
// 接收队列 >= 接收窗口。允许接收。拥塞控制相关
if (kcp->nrcv_que >= kcp->rcv_wnd)
recover = 1;
// 合并数据包
for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
int fragment;
seg = iqueue_entry(p, IKCPSEG, node);
p = p->next;
if (buffer) {
memcpy(buffer, seg->data, seg->len);
buffer += seg->len;
}
len += seg->len;
fragment = seg->frg;
if (ikcp_canlog(kcp, IKCP_LOG_RECV)) {
ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn);
}
if (ispeek == 0) {
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
kcp->nrcv_que--;
}
if (fragment == 0)
break;
}
assert(len == peeksize);
// 移动数据 rcv_buf -> rcv_queue
//
while (! iqueue_is_empty(&kcp->rcv_buf)) {
seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
iqueue_del(&seg->node);
kcp->nrcv_buf--;
iqueue_add_tail(&seg->node, &kcp->rcv_queue);
kcp->nrcv_que++;
kcp->rcv_nxt++;
} else {
break;
}
}
// fast recover 拥塞控制
if (kcp->nrcv_que < kcp->rcv_wnd && recover) {
// ready to send back IKCP_CMD_WINS in ikcp_flush
// tell remote my window size
kcp->probe |= IKCP_ASK_TELL;
}
return len;
}
ikcp_flush
void ikcp_flush(ikcpcb *kcp)
{
IUINT32 current = kcp->current; // 时间戳,上层update的时候会更新一次
char *buffer = kcp->buffer; // 只是一块临时内存
char *ptr = buffer;
int count, size, i;
IUINT32 resent, cwnd;
IUINT32 rtomin;
struct IQUEUEHEAD *p;
int change = 0;
int lost = 0;
IKCPSEG seg;
// ikcp_flush 前必须调用一次 update
if (kcp->updated == 0) return;
// 初始一个包
seg.conv = kcp->conv;
seg.cmd = IKCP_CMD_ACK;
seg.frg = 0;
seg.wnd = ikcp_wnd_unused(kcp); // 接收窗口的最大大小 - 当前接收窗口的大小
seg.una = kcp->rcv_nxt; // 上面4.2有提到una干吗用
seg.len = 0;
seg.sn = 0;
seg.ts = 0;
// ...
}
CMD_PUSH & ACK
跳到 ikcp_input的 IKCP_CMD_PUSH 表示一个普通的数据包
else if (cmd == IKCP_CMD_PUSH) {
// LOG 不管
if (ikcp_canlog(kcp, IKCP_LOG_IN_DATA)) {
ikcp_log(kcp, IKCP_LOG_IN_DATA,
"input psh: sn=%lu ts=%lu", (unsigned long)sn, (unsigned long)ts);
}
// 包在滑动窗口范围内
if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
// 把这个包sn, ts这个时间放到acklist里面。等下一次update的时候,即ikcp_flush。会把这些包向对面发送ACK
ikcp_ack_push(kcp, sn, ts);
// 合法性判断
if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
// 创建一个新的包
seg = ikcp_segment_new(kcp, len);
// 包头
seg->conv = conv;
seg->cmd = cmd;
seg->frg = frg;
seg->wnd = wnd;
seg->ts = ts;
seg->sn = sn;
seg->una = una;
seg->len = len;
// 包体数据拷贝过来
if (len > 0) {
memcpy(seg->data, data, len);
}
// 把包拷贝到recv_buf,可能会添加到recv_queue中
ikcp_parse_data(kcp, seg);
}
}
}
//---------------------------------------------------------------------
// parse data
//---------------------------------------------------------------------
void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg)
{
struct IQUEUEHEAD *p, *prev;
IUINT32 sn = newseg->sn;
int repeat = 0;
// 合法性判断
if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) >= 0 ||
_itimediff(sn, kcp->rcv_nxt) < 0) {
ikcp_segment_delete(kcp, newseg);
return;
}
// 判断包是不是重复收到了
for (p = kcp->rcv_buf.prev; p != &kcp->rcv_buf; p = prev) {
IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
prev = p->prev;
if (seg->sn == sn) {
repeat = 1;
break;
}
if (_itimediff(sn, seg->sn) > 0) {
break;
}
}
// 包没有重复,加到rcv_buf
if (repeat == 0) {
iqueue_init(&newseg->node);
iqueue_add(&newseg->node, p);
kcp->nrcv_buf++;
} else {
ikcp_segment_delete(kcp, newseg);
}
// 移动数据 rcv_buf -> rcv_queue
while (! iqueue_is_empty(&kcp->rcv_buf)) {
IKCPSEG *seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
//
if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
iqueue_del(&seg->node);
kcp->nrcv_buf--;
iqueue_add_tail(&seg->node, &kcp->rcv_queue);
kcp->nrcv_que++;
kcp->rcv_nxt++;
} else {
break;
}
}
}
回顾一下 收到 IKCP_CMD_PUSH 包的时候。通过 ikcp_ack_push 把包放到 acklist里面。
下一次 ikcp_flush 的时候会发送这些包的ack到对面。
所以我们回到ikcp_flush 的下一段。
// flush acknowledges
count = kcp->ackcount;
for (i = 0; i < count; i++) {
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
ptr = ikcp_encode_seg(ptr, &seg);
}
kcp->ackcount = 0;
这里在发送这些包的ACK
探测发送窗口
ikcp_flush的下一段
// 对端发送窗口是0的时候,需要让对方告诉我,你的发送窗口是多大
if (kcp->rmt_wnd == 0) {
if (kcp->probe_wait == 0) {
// 没有设置过探测时间,那就设置一个 probe_wait 时间后探测一次
kcp->probe_wait = IKCP_PROBE_INIT;
kcp->ts_probe = kcp->current + kcp->probe_wait;
}
else {
// 如果当前时间大于探测时间,那就探测一次
// 做的事情包括,设置下一次探测时间,并且设置一个探测标志意为,当前需要探测
if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {
if (kcp->probe_wait < IKCP_PROBE_INIT)
kcp->probe_wait = IKCP_PROBE_INIT;
kcp->probe_wait += kcp->probe_wait / 2;
if (kcp->probe_wait > IKCP_PROBE_LIMIT)
kcp->probe_wait = IKCP_PROBE_LIMIT;
kcp->ts_probe = kcp->current + kcp->probe_wait;
kcp->probe |= IKCP_ASK_SEND;
}
}
} else {
kcp->ts_probe = 0;
kcp->probe_wait = 0;
}
上面如果标记了 kcp->probe |= IKCP_ASK_SEND; 那这里就会发一个 IKCP_CMD_WASK 包。这个包没有data。只有cmd
// flush window probing commands
// 如果上面的探测标志被设置了,那么这里就会发送一个探测包
if (kcp->probe & IKCP_ASK_SEND) {
seg.cmd = IKCP_CMD_WASK; // 发一个这个命令,表示我想知道你的窗口大小
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, &seg);
}
对面的 ikcp_input 收到 IKCP_CMD_WASK 包的时候,就会加一个 IKCP_ASK_TELL 标记。
else if (cmd == IKCP_CMD_WASK) {
// ready to send back IKCP_CMD_WINS in ikcp_flush
// tell remote my window size
kcp->probe |= IKCP_ASK_TELL;
if (ikcp_canlog(kcp, IKCP_LOG_IN_PROBE)) {
ikcp_log(kcp, IKCP_LOG_IN_PROBE, "input probe");
}
}
对面的ikcp_flush的时候,就会发一个 IKCP_CMD_WINS包
// flush window probing commands
// 接收的时候会标记这个,回一个这个包
if (kcp->probe & IKCP_ASK_TELL) {
seg.cmd = IKCP_CMD_WINS;
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, &seg);
}
而收到IKCP_CMD_WINS不需要做任何事情。
else if (cmd == IKCP_CMD_WINS) {
// do nothing
if (ikcp_canlog(kcp, IKCP_LOG_IN_WINS)) {
ikcp_log(kcp, IKCP_LOG_IN_WINS,
"input wins: %lu", (unsigned long)(wnd));
}
}
因为解包的时候,每个包头都带了。对端接收窗口大小。在上面
data = ikcp_decode16u(data, &wnd);
重传机制
数据流:
用户发送->send_queue->send_buf -> 网络 -> recv_queue -> recv_buf -> 用户接收
对于send_buf
【snd_una】【】【】【】【a】【】【snd_nxt】
我每发送一个包,就在最后添加一个,并且snd_nxt++,即挪到最后。那么这个就是我的发送滑动窗口。
对方的recv_buf
【】【】【】【】【】【rcv_nxt】
最后一个接收到的包是rcv_nxt。每次发包都会 una = rcv_nxt。塞在包头里面发给对面
回到ikcp_flush
这个判断和拥塞控制有关,后面讲。就是把合法包从send_queue挪到send_buf里面
// move data from snd_queue to snd_buf
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
IKCPSEG *newseg;
if (iqueue_is_empty(&kcp->snd_queue)) break;
newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
iqueue_del(&newseg->node);
iqueue_add_tail(&newseg->node, &kcp->snd_buf);
kcp->nsnd_que--;
kcp->nsnd_buf++;
newseg->conv = kcp->conv;
newseg->cmd = IKCP_CMD_PUSH;
newseg->wnd = seg.wnd;
newseg->ts = current;
newseg->sn = kcp->snd_nxt++;
newseg->una = kcp->rcv_nxt;
newseg->resendts = current;
newseg->rto = kcp->rx_rto;
newseg->fastack = 0;
newseg->xmit = 0;
}
对于 snd_buf。这部分包要发往网络了
// calculate resent
resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;
// flush data segments
// 遍历
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
int needsend = 0;
// xmit 是一个发送次数标记,这个包被发送了几次
// 首次发送
if (segment->xmit == 0) {
needsend = 1; // 标记为需要发送
segment->xmit++;
// 计算rto 和 resendts。resendts这么长时间后是下一次重发
segment->rto = kcp->rx_rto;
segment->resendts = current + segment->rto + rtomin;
}
else if (_itimediff(current, segment->resendts) >= 0) {
// 这个包发过了,但是没被确认掉,又要重发了
needsend = 1;
segment->xmit++;
kcp->xmit++;
// 重发的情况下计算rto
if (kcp->nodelay == 0) {
segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
} else {
IINT32 step = (kcp->nodelay < 2)?
((IINT32)(segment->rto)) : kcp->rx_rto;
segment->rto += step / 2;
}
segment->resendts = current + segment->rto;
lost = 1; // 标记丢包了
}
// 在ikcp_input的时候收到ACK会记录收到的最大ACK。然后调用 ikcp_parse_fastack(kcp, maxack, latest_ts);
// send_buf 1234, 我的maxack是3。那么1就被跳过两次,2被跳过一次。超过resent阈值,就触发快速重传
else if (segment->fastack >= resent) {
if ((int)segment->xmit <= kcp->fastlimit ||
kcp->fastlimit <= 0) {
needsend = 1;
segment->xmit++;
segment->fastack = 0;
segment->resendts = current + segment->rto;
change++;
}
}
// 需要发包
if (needsend) {
int need;
segment->ts = current;
segment->wnd = seg.wnd;
segment->una = kcp->rcv_nxt;
size = (int)(ptr - buffer);
need = IKCP_OVERHEAD + segment->len;
// 这里发包
if (size + need > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, segment);
if (segment->len > 0) {
memcpy(ptr, segment->data, segment->len);
ptr += segment->len;
}
// 超时断开
if (segment->xmit >= kcp->dead_link) {
kcp->state = (IUINT32)-1;
}
}
}
在收到ack包的时候会调用ikcp_update_ack。计算出一个rx_rto。上面的重传时间与这个有关系
//---------------------------------------------------------------------
// parse ack
//---------------------------------------------------------------------
static void ikcp_update_ack(ikcpcb *kcp, IINT32 rtt)
{
// rtt是当前时间减去发包时间,算出来的rtt输入参数
IINT32 rto = 0;
if (kcp->rx_srtt == 0) { // rx_srtt 平滑的rtt时间,==0表示之前没有设置过
kcp->rx_srtt = rtt;
kcp->rx_rttval = rtt / 2; // rx_rttval 是增量
} else {
// 平滑算法
long delta = rtt - kcp->rx_srtt;
if (delta < 0) delta = -delta;
kcp->rx_rttval = (3 * kcp->rx_rttval + delta) / 4; // (3/4)*rx_rttval + (1/4)*delta
kcp->rx_srtt = (7 * kcp->rx_srtt + rtt) / 8; // (7/8)*rx_srtt + (1/8)rtt 这一次的rtt对于累计rtt的影响比例
if (kcp->rx_srtt < 1) kcp->rx_srtt = 1;
}
// 避免rto算超了
rto = kcp->rx_srtt + _imax_(kcp->interval, 4 * kcp->rx_rttval);
kcp->rx_rto = _ibound_(kcp->rx_minrto, rto, IKCP_RTO_MAX);
}
ikcp_input 收到ack的处理
ikcp_update_ack 收到ack包,把这段包从发送缓冲删掉
maxack计算这一波最大的ack号,用途是累加 fastack
if (cmd == IKCP_CMD_ACK) {
if (_itimediff(kcp->current, ts) >= 0) {
ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
}
ikcp_parse_ack(kcp, sn);
ikcp_shrink_buf(kcp);
if (flag == 0) {
flag = 1;
maxack = sn;
latest_ts = ts;
} else {
if (_itimediff(sn, maxack) > 0) {
#ifndef IKCP_FASTACK_CONSERVE
maxack = sn;
latest_ts = ts;
#else
if (_itimediff(ts, latest_ts) > 0) {
maxack = sn;
latest_ts = ts;
}
#endif
}
}
if (ikcp_canlog(kcp, IKCP_LOG_IN_ACK)) {
ikcp_log(kcp, IKCP_LOG_IN_ACK,
"input ack: sn=%lu rtt=%ld rto=%ld", (unsigned long)sn,
(long)_itimediff(kcp->current, ts),
(long)kcp->rx_rto);
}
}
拥塞控制
incr 最大可发送量,控制拥塞窗口大小
ssthresh,slow start threshold 慢启动阈值
ikcp_flush
计算拥塞窗口大小
cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);
数据流:
用户发送->send_queue->send_buf -> 网络 -> recv_queue -> recv_buf -> 用户接收
对于send_buf
【snd_una】【】【】【】【a】【】【snd_nxt】
我每发送一个包,就在最后添加一个,并且snd_nxt++,即挪到最后。那么这个就是我的发送滑动窗口。
对方的recv_buf
【】【】【】【】【】【rcv_nxt】
最后一个接收到的包是rcv_nxt。每次发包都会 una = rcv_nxt。塞在包头里面发给对面
change 是这一次更新发送了快速包,lost 是这意思丢过包
如果发送了快速重传,可能丢包了,就需要change,修改kcp->cwnd
// update ssthresh
if (change) {
IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
// 计算慢启动阈值
kcp->ssthresh = inflight / 2;
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
// 重新计算拥塞窗口大小
kcp->cwnd = kcp->ssthresh + resent;
kcp->incr = kcp->cwnd * kcp->mss;
}
决定是否 snd_queue 移动数据到 snd_buf 的判断是
// move data from snd_queue to snd_buf
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
//
}
snd_nxt < snd_una + cwnd
snd_buf【snd_una】【】【】【】【】【】【snd_nxt】
那么change的改变kcp->cwnd就会影响这一行为。
如果真丢包了
if (lost) {
kcp->ssthresh = cwnd / 2;
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
kcp->cwnd = 1;
kcp->incr = kcp->mss;
}
if (kcp->cwnd < 1) {
kcp->cwnd = 1;
kcp->incr = kcp->mss;
}
那就直接是1,就发不出去了
慢启动
ikcp_input接收的时候
// 如果接收数据的时候 una 发生了改变,(因为收到了ack包)需要调整窗口的大小
// 需要增加拥塞窗口
if (_itimediff(kcp->snd_una, prev_una) > 0) {
// 可以增加拥塞窗口
if (kcp->cwnd < kcp->rmt_wnd) {
IUINT32 mss = kcp->mss;
if (kcp->cwnd < kcp->ssthresh) {
// 小于阈值就一直增加
kcp->cwnd++;
kcp->incr += mss;
} else {
// 否则做慢启动的逻辑,然后增速慢一点
if (kcp->incr < mss) kcp->incr = mss; // 最小边界
// 下面详解
kcp->incr += (mss * mss) / kcp->incr + (mss / 16);
if ((kcp->cwnd + 1) * mss <= kcp->incr) {
#if 1
kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0)? mss : 1);
#else
kcp->cwnd++;
#endif
}
}
// 拥塞窗口不能大于远端接收窗口
if (kcp->cwnd > kcp->rmt_wnd) {
kcp->cwnd = kcp->rmt_wnd;
kcp->incr = kcp->rmt_wnd * mss;
}
}
}
if (kcp->cwnd < kcp->ssthresh) {
// 小于阈值就一直增加
kcp->cwnd++;
kcp->incr += mss;
} else {
// 否则做慢启动的逻辑,然后增速慢一点
if (kcp->incr < mss) kcp->incr = mss; // 最小边界
kcp->incr += (mss * mss) / kcp->incr + (mss / 16);
if ((kcp->cwnd + 1) * mss <= kcp->incr) {
#if 1
kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0)? mss : 1);
#else
kcp->cwnd++;
#endif
}
}
发送了丢包
kcp->cwnd = 1
kcp->incr = mss;
所以首次的增量是
(mss mss) / (kcp->cwnd mss) + (mss / 16)
= mss / kcp->cwnd + mss / 16
= mss (1/kcp->cwnd + 1/16)
随着 kcp->cwnd 增加这个增量会越来越慢
其他
到这里代码已经基本看完了,过一遍还差一个
IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current)
看注释是为了减少不必要的update。
通过ikcp_check获得下次调用update的时间