KCP 源码剖析

概述

项目路径:https://github.com/skywind3000/kcp
代码大概不到两千行,主要文件是ikcp.h 和 ikcp.cpp。
test.h和test.cpp为测试代码。按照我自己的理解顺序做了排列

随机数生成器

// 均匀分布的随机数
class Random
{
public:
    Random(int size) {
        this->size = 0;
        seeds.resize(size);
    }

    int random() {
        int x, i;
        if (seeds.size() == 0) return 0;
        if (size == 0) { 
            for (i = 0; i < (int)seeds.size(); i++) {
                seeds[i] = i;
            }
            size = (int)seeds.size();
        }
        i = rand() % size;
        x = seeds[i];
        seeds[i] = seeds[--size];
        return x;
    }

protected:
    int size;
    std::vector<int> seeds;
};

用法就是

Random R(100);
R.random();

不断地生成100内的随机数。至于均匀分布看代码也好理解。
比如丢包率10%,那门random() < 10 这个包就假装没收到

网络延迟模拟器

class LatencySimulator
{
public:

    virtual ~LatencySimulator() {
        clear();
    }

    // lostrate: 往返一周丢包率的百分比,默认 10%
    // rttmin:rtt最小值,默认 60
    // rttmax:rtt最大值,默认 125
    LatencySimulator(int lostrate = 10, int rttmin = 60, int rttmax = 125, int nmax = 1000);

    // 清除数据
    void clear();

    // 发送数据
    // peer - 端点0/1,从0发送,从1接收;从1发送从0接收
    void send(int peer, const void *data, int size);

    // 接收数据
    int recv(int peer, void *data, int maxsize);

public:
    int tx1;
    int tx2;

protected:
    IUINT32 current;
    int lostrate;
    int rttmin;
    int rttmax;
    int nmax;
    typedef std::list<DelayPacket*> DelayTunnel;
    DelayTunnel p12;
    DelayTunnel p21;
    Random r12;
    Random r21;
};

这个项目没有创建socket。而用了一个类来模拟网络。
typedef std::list<DelayPacket*> DelayTunnel; 这是一个list的定义
DelayTunnel p12, p21; 是两块内存
我们两端有两个连接对象互相发包。1发包到网络,往p12里面塞数据。2发包往p21塞数据。收报的时候2从p12那,1从p21拿。
如果要模拟丢包,那random一下,流程断掉即可。
其他成员:
current 是发包的时间戳
lostrate 丢包率,和random一起用。random() < lostrate 就丢包
rttmin,rttmax。rtt区间。random一个时间位于这之间,然后模拟收到包的时间戳。
nmax 表示list大小,超了也算丢包。用来测试拥塞控制的应该。
构造,析构,clear 不解释。核心主要是send和recv

发送数据

    // 发送数据
    // peer - 端点0/1,从0发送,从1接收;从1发送从0接收
    void send(int peer, const void *data, int size) {
        if (peer == 0) {
            tx1++;
            if (r12.random() < lostrate) return;
            if ((int)p12.size() >= nmax) return;
        }   else {
            tx2++;
            if (r21.random() < lostrate) return;
            if ((int)p21.size() >= nmax) return;
        }
        DelayPacket *pkt = new DelayPacket(size, data);
        current = iclock();
        IUINT32 delay = rttmin;
        if (rttmax > rttmin) delay += rand() % (rttmax - rttmin);
        pkt->setts(current + delay);
        if (peer == 0) {
            p12.push_back(pkt);
        }   else {
            p21.push_back(pkt);
        }
    }

data里面的数据发向了网络,相当于socket->send。由于是模拟的,直接放到p12, p21 中。在这过程中要通过 random() < lostrate 和 size() >= nmax 模拟丢包。
另一端之间从内存里面拿包就假装收到了
并模拟计算这个包的发送时间,接收时间。

接收数据

    // 接收数据
    int recv(int peer, void *data, int maxsize) {
        DelayTunnel::iterator it;
        if (peer == 0) {
            it = p21.begin();
            if (p21.size() == 0) return -1;
        }   else {
            it = p12.begin();
            if (p12.size() == 0) return -1;
        }
        DelayPacket *pkt = *it;
        current = iclock();
        if (current < pkt->ts()) return -2;
        if (maxsize < pkt->size()) return -3;
        if (peer == 0) {
            p21.erase(it);
        }   else {
            p12.erase(it);
        }
        maxsize = pkt->size();
        memcpy(data, pkt->ptr(), maxsize);
        delete pkt;
        return maxsize;
    }

从网络中取数据,放到data中。都是一些合法性判断。

KCP主要流程和收发包

入口main,然后到了位于test.cpp 的 test(0)


// 测试用例
void test(int mode)
{
    // 创建模拟网络:丢包率10%,Rtt 60ms~125ms
    vnet = new LatencySimulator(10, 60, 125);

    // 创建两个端点的 kcp对象,第一个参数 conv是会话编号,同一个会话需要相同
    // 最后一个是 user参数,用来传递标识
    ikcpcb *kcp1 = ikcp_create(0x11223344, (void*)0);
    ikcpcb *kcp2 = ikcp_create(0x11223344, (void*)1);

    kcp1->output = udp_output;
    kcp2->output = udp_output;

    // 。。。
}

LatencySimulator 是模拟网络
ikcpcb 是kcp对象的上下文,可以理解为客户端,服务器的两个连接。
ikcp_create 用于分配内存,两个参数是 IUINT32 conv, void *user。
conv是连接ID,相同代表两个客户端在互相发包。user在例子用的01是用于逻辑区分是kcp1还是kcp2的。
output作用是,KCP处理好包了,准备向网络发送数据了

// 模拟网络:模拟发送一个 udp包
int udp_output(const char *buf, int len, ikcpcb *kcp, void *user)
{
    union { int id; void *ptr; } parameter;
    parameter.ptr = user;
    vnet->send(parameter.id, buf, len);
    return 0;
}

这里的回调可以接到你的socket->send里面。这里是模拟网络,就接到了vnet里面。这里的user最后会变成上面模拟网络的peer。
决定数据塞到p12还是p21
中间一段设置初始值直接跳过,到while(1) 的地方。直接看收发包逻辑

首先我们要大概了解一下API

发包

  • 发包

我们要发一个"Hello"的包,调用ikcp_send,把数据传给协议。会存到 ikcpcb 上下文对象里面的的 send_queue 里面。
再由 ikcp_update -> ikcp_flush 把send_queue的数据挪到send_buf 里面,然后发向网络
对用户来说就想这样

    // 每隔 20ms,kcp1发送数据
    for (; current >= slap; slap += 20) {
        ((IUINT32*)buffer)[0] = index++;
        ((IUINT32*)buffer)[1] = current;

        // 发送上层协议包
        ikcp_send(kcp1, buffer, 8);
    }

具体一点 ikcp_send

//---------------------------------------------------------------------
// user/upper level send, returns below zero for error
//---------------------------------------------------------------------
int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
    IKCPSEG *seg;
    int count, i;
    int sent = 0;

    // 合法性判断
    assert(kcp->mss > 0);
    if (len < 0) return -1;

    // 由于没搜到 stream 的赋值,所以这一块之间跳过
    if (kcp->stream != 0) {
    }

    // 发了 buffer 数据 len 这么长,要按照mss的长度进行分包。else 里面是除法向上取整的写法
    // udp的mtu一般是1500,超过就会分包。所以这里冗余一点mtu=1400,在减去kcp包头24就是mss
    if (len <= (int)kcp->mss) count = 1;
    else count = (len + kcp->mss - 1) / kcp->mss;

    // 合法性验证,一次性发这么多包???
    if (count >= (int)IKCP_WND_RCV) {
        // 先跳过stream逻辑
        return -2;
    }

    // 合法性验证
    if (count == 0) count = 1;

    // 数据包分片
    for (i = 0; i < count; i++) {
        int size = len > (int)kcp->mss ? (int)kcp->mss : len;
        seg = ikcp_segment_new(kcp, size); // 创建一个数据包
        assert(seg);
        if (seg == NULL) {
            return -2;
        }
        if (buffer && len > 0) {
            memcpy(seg->data, buffer, size); // 拷贝数据到数据包里面
        }
        seg->len = size;

        // 分包的时候,加上5个包,frg会是例如43210,最后收到0的时候,就相当于一整个包收完了
        seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;

        // 这个是C语言的队列写法,把数据包seg塞到snd_queue的末尾,然后数量++
        iqueue_init(&seg->node);
        iqueue_add_tail(&seg->node, &kcp->snd_queue);
        kcp->nsnd_que++;

        // 指针偏移,继续分下一个包
        if (buffer) {
            buffer += size;
        }
        len -= size;
        sent += size;
    }

    return sent;
}

KCP要求我们一定要在tick里面调用update。iclock是毫秒时间戳

ikcp_update(kcp1, iclock());
ikcp_update(kcp2, iclock());

这里面核心逻辑是为了调用 ikcp_flush。其他的逻辑都是为了计算 ikcp_flush 的调用时机

void ikcp_update(ikcpcb *kcp, IUINT32 current)
{
    IINT32 slap;

    kcp->current = current;

    // 是否第一次调用 ikcp_update
    if (kcp->updated == 0) {
        kcp->updated = 1;
        kcp->ts_flush = kcp->current;
    }

    // 计算距离上一次调用 ikcp_flush的时间差
    slap = _itimediff(kcp->current, kcp->ts_flush);

    if (slap >= 10000 || slap < -10000) {
        kcp->ts_flush = kcp->current;
        slap = 0;
    }

    // 每间隔 interval 这么长时间调用一次 ikcp_flush。默认100ms
    if (slap >= 0) {
        kcp->ts_flush += kcp->interval;
        if (_itimediff(kcp->current, kcp->ts_flush) >= 0) {
            kcp->ts_flush = kcp->current + kcp->interval;
        }
        ikcp_flush(kcp);
    }
}

至于 ikcp_flush有点复杂。对于发送来说,处理了数据从 send_queue 拷贝到send_buf。又发向网络。
后面再看说吧

收包

ikcp_input

从网络中取了 buffer 的数据,hr 大于0为收到的字节数

    // 处理虚拟网络:检测是否有udp包从p1->p2
    while (1) {
        hr = vnet->recv(1, buffer, 2000);
        if (hr < 0) break;
        // 如果 p2收到udp,则作为下层协议输入到kcp2
        ikcp_input(kcp2, buffer, hr);
    }

在看ikcp_input之前,先过一下包体结构

//=====================================================================
// SEGMENT
//=====================================================================
struct IKCPSEG
{
    struct IQUEUEHEAD node;
    IUINT32 conv; // 连接ID
    IUINT32 cmd; // 包的类型标志位 IKCP_CMD_PUSH 【普通数据包】 IKCP_CMD_ACK【ACK包】 IKCP_CMD_WASK【问对方接收窗口多大】 IKCP_CMD_WINS 【回应接收窗口多大】
    IUINT32 frg; // 数据包分片序号 10000字节,每次发1000字节,分10片. 那这个就是 9, 8, 7, 6, 5, 4, 3, 2, 1, 0。等于0的时候表示包都到了
    IUINT32 wnd; // 接收窗口大小,用于流量控制
    IUINT32 ts; // 时间戳
    IUINT32 sn; // 包序号
    IUINT32 una; // 如果A->B发了个包,B->A发ACK,这个ACK也可能会丢,那么A->B还会重发,就造成了重复包。所以发送的是una, 比如123456,我收到了1234,那么我就发ACK 4,表示1234之前的都收到了。具体解析看下面代码
    IUINT32 len; // 下面data的长度
    IUINT32 resendts; // 重发时间戳
    IUINT32 rto; // 超时重传时间
    IUINT32 fastack; // 快速重传。比如发包123456,我收到了23包,1还没到,那1就被过了两次,累计到阈值就会触发快速重传
    IUINT32 xmit; // 发送次数
    char data[1];
};
//---------------------------------------------------------------------
// input data
//---------------------------------------------------------------------
int ikcp_input(ikcpcb *kcp, const char *data, long size)
{
    IUINT32 prev_una = kcp->snd_una;
    IUINT32 maxack = 0, latest_ts = 0;
    int flag = 0;

    // 合法性判断
    if (ikcp_canlog(kcp, IKCP_LOG_INPUT)) {
        ikcp_log(kcp, IKCP_LOG_INPUT, "[RI] %d bytes", (int)size);
    }

    // 合法性判断
    if (data == NULL || (int)size < (int)IKCP_OVERHEAD) return -1;

    while (1) {
        IUINT32 ts, sn, len, una, conv;
        IUINT16 wnd;
        IUINT8 cmd, frg;
        IKCPSEG *seg;

        // 合法性判断
        if (size < (int)IKCP_OVERHEAD) break;

        // 开始解析KCP包头
        data = ikcp_decode32u(data, &conv); // 连接ID
        if (conv != kcp->conv) return -1;
        data = ikcp_decode8u(data, &cmd);
        data = ikcp_decode8u(data, &frg);
        data = ikcp_decode16u(data, &wnd);
        data = ikcp_decode32u(data, &ts);
        data = ikcp_decode32u(data, &sn);
        data = ikcp_decode32u(data, &una);
        data = ikcp_decode32u(data, &len);

        // 去掉包头后属下包的长度
        size -= IKCP_OVERHEAD;

        // 合法性判断
        if ((long)size < (long)len || (int)len < 0) return -2;

        // 合法性判断
        if (cmd != IKCP_CMD_PUSH && cmd != IKCP_CMD_ACK &&
            cmd != IKCP_CMD_WASK && cmd != IKCP_CMD_WINS) 
            return -3;

        kcp->rmt_wnd = wnd; // 每一个KCP包都会带上发送窗口的大小,设置当前上下文

        // 如果A->B发了个包,B->A发单个ACK,这个ACK也可能会丢,那么A->B还会重发,就造成了重复包。
        // 所以发送的是una, 比如发包123456,我收到了4的ACK,就认为1234之前的都收到了 ikcp_parse_una 做的就是把1234从send_buf里面去掉
        // IKCPCB上下文维护了,send_una, snd_nxt, rcv_nxt 几个变量。snd_nxt每发一个包都会++,写在ikcp_flush里面
        // 对于send_buf来说  【send_una】__【A】______【send_nxt】这是一个滑动窗口。如果收到一个A位置的ACK,那门send_una~A都收到了,send_una挪到a的位置,send_nxt是最新要发的包ID
        // 对于rcv_queue .......【rcv_nxt】表示包已经收到了rcv_nxt这里的,那么发包的时候una=rcv_nxt。假设就是上一行的A位置。表示这段包收到了
        ikcp_parse_una(kcp, una);
        ikcp_shrink_buf(kcp); // 更新一下新的snd_una。由于上面send_una~A已经从队列删掉了,如果队列非空,snd_una就是队列头的ID,否则就是send_nxt

        // 然后开始根据不同的包做不同的事情了
        if (cmd == IKCP_CMD_ACK) {
            // 。。。
        }
        else if (cmd == IKCP_CMD_PUSH) {
            // 。。。
        }
        else if (cmd == IKCP_CMD_WASK) {
            // 。。。
        }
        else if (cmd == IKCP_CMD_WINS) {
            // 。。。
        }
        else {
            return -3;
        }

        data += len;
        size -= len;
    }

    // 快速重传和拥塞控制。。。

    return 0;
}

ikcp_recv

用户从kcp协议里面取一个包出来

//---------------------------------------------------------------------
// user/upper level recv: returns size, returns below zero for EAGAIN
//---------------------------------------------------------------------
int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
{
    struct IQUEUEHEAD *p;
    int ispeek = (len < 0)? 1 : 0;
    int peeksize;
    int recover = 0;
    IKCPSEG *seg;
    assert(kcp);

    if (iqueue_is_empty(&kcp->rcv_queue))
        return -1;

    if (len < 0) len = -len;

    // 上面发包的时候,我们提到了分片,frg从543210,0代表整个包,这里根据这个规则,算出整个包的peeksize
    peeksize = ikcp_peeksize(kcp);

    if (peeksize < 0) 
        return -2;

    if (peeksize > len) 
        return -3;

    // 接收队列 >= 接收窗口。允许接收。拥塞控制相关
    if (kcp->nrcv_que >= kcp->rcv_wnd)
        recover = 1;

    // 合并数据包
    for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
        int fragment;
        seg = iqueue_entry(p, IKCPSEG, node);
        p = p->next;

        if (buffer) {
            memcpy(buffer, seg->data, seg->len);
            buffer += seg->len;
        }

        len += seg->len;
        fragment = seg->frg;

        if (ikcp_canlog(kcp, IKCP_LOG_RECV)) {
            ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn);
        }

        if (ispeek == 0) {
            iqueue_del(&seg->node);
            ikcp_segment_delete(kcp, seg);
            kcp->nrcv_que--;
        }

        if (fragment == 0) 
            break;
    }

    assert(len == peeksize);

    // 移动数据 rcv_buf -> rcv_queue
    // 
    while (! iqueue_is_empty(&kcp->rcv_buf)) {
        seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
        if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
            iqueue_del(&seg->node);
            kcp->nrcv_buf--;
            iqueue_add_tail(&seg->node, &kcp->rcv_queue);
            kcp->nrcv_que++;
            kcp->rcv_nxt++;
        }   else {
            break;
        }
    }

    // fast recover 拥塞控制
    if (kcp->nrcv_que < kcp->rcv_wnd && recover) {
        // ready to send back IKCP_CMD_WINS in ikcp_flush
        // tell remote my window size
        kcp->probe |= IKCP_ASK_TELL;
    }

    return len;
}

ikcp_flush

void ikcp_flush(ikcpcb *kcp)
{
    IUINT32 current = kcp->current; // 时间戳,上层update的时候会更新一次
    char *buffer = kcp->buffer; // 只是一块临时内存
    char *ptr = buffer;
    int count, size, i;
    IUINT32 resent, cwnd;
    IUINT32 rtomin;
    struct IQUEUEHEAD *p;
    int change = 0;
    int lost = 0;
    IKCPSEG seg;

    // ikcp_flush 前必须调用一次 update
    if (kcp->updated == 0) return;

    // 初始一个包
    seg.conv = kcp->conv;
    seg.cmd = IKCP_CMD_ACK;
    seg.frg = 0;
    seg.wnd = ikcp_wnd_unused(kcp); // 接收窗口的最大大小 - 当前接收窗口的大小
    seg.una = kcp->rcv_nxt; // 上面4.2有提到una干吗用
    seg.len = 0;
    seg.sn = 0;
    seg.ts = 0;

    // ...

}

CMD_PUSH & ACK

跳到 ikcp_input的 IKCP_CMD_PUSH 表示一个普通的数据包

    else if (cmd == IKCP_CMD_PUSH) {

        // LOG 不管
        if (ikcp_canlog(kcp, IKCP_LOG_IN_DATA)) {
            ikcp_log(kcp, IKCP_LOG_IN_DATA, 
                "input psh: sn=%lu ts=%lu", (unsigned long)sn, (unsigned long)ts);
        }

        // 包在滑动窗口范围内
        if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {

            // 把这个包sn, ts这个时间放到acklist里面。等下一次update的时候,即ikcp_flush。会把这些包向对面发送ACK
            ikcp_ack_push(kcp, sn, ts);

            // 合法性判断
            if (_itimediff(sn, kcp->rcv_nxt) >= 0) {

                // 创建一个新的包
                seg = ikcp_segment_new(kcp, len);

                // 包头
                seg->conv = conv;
                seg->cmd = cmd;
                seg->frg = frg;
                seg->wnd = wnd;
                seg->ts = ts;
                seg->sn = sn;
                seg->una = una;
                seg->len = len;

                // 包体数据拷贝过来
                if (len > 0) {
                    memcpy(seg->data, data, len);
                }

                // 把包拷贝到recv_buf,可能会添加到recv_queue中
                ikcp_parse_data(kcp, seg);
            }
        }
    }
//---------------------------------------------------------------------
// parse data
//---------------------------------------------------------------------
void ikcp_parse_data(ikcpcb *kcp, IKCPSEG *newseg)
{
    struct IQUEUEHEAD *p, *prev;
    IUINT32 sn = newseg->sn;
    int repeat = 0;

    // 合法性判断
    if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) >= 0 ||
        _itimediff(sn, kcp->rcv_nxt) < 0) {
        ikcp_segment_delete(kcp, newseg);
        return;
    }

    // 判断包是不是重复收到了
    for (p = kcp->rcv_buf.prev; p != &kcp->rcv_buf; p = prev) {
        IKCPSEG *seg = iqueue_entry(p, IKCPSEG, node);
        prev = p->prev;
        if (seg->sn == sn) {
            repeat = 1;
            break;
        }
        if (_itimediff(sn, seg->sn) > 0) {
            break;
        }
    }

    // 包没有重复,加到rcv_buf
    if (repeat == 0) {
        iqueue_init(&newseg->node);
        iqueue_add(&newseg->node, p);
        kcp->nrcv_buf++;
    }   else {
        ikcp_segment_delete(kcp, newseg);
    }

    // 移动数据 rcv_buf -> rcv_queue
    while (! iqueue_is_empty(&kcp->rcv_buf)) {
        IKCPSEG *seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);

        // 
        if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
            iqueue_del(&seg->node);
            kcp->nrcv_buf--;
            iqueue_add_tail(&seg->node, &kcp->rcv_queue);
            kcp->nrcv_que++;
            kcp->rcv_nxt++;
        }   else {
            break;
        }
    }
}

回顾一下 收到 IKCP_CMD_PUSH 包的时候。通过 ikcp_ack_push 把包放到 acklist里面。
下一次 ikcp_flush 的时候会发送这些包的ack到对面。
所以我们回到ikcp_flush 的下一段。

    // flush acknowledges
    count = kcp->ackcount;
    for (i = 0; i < count; i++) {
        size = (int)(ptr - buffer);
        if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
            ikcp_output(kcp, buffer, size);
            ptr = buffer;
        }
        ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
        ptr = ikcp_encode_seg(ptr, &seg);
    }

    kcp->ackcount = 0;

这里在发送这些包的ACK

探测发送窗口

ikcp_flush的下一段

    // 对端发送窗口是0的时候,需要让对方告诉我,你的发送窗口是多大
    if (kcp->rmt_wnd == 0) {
        if (kcp->probe_wait == 0) {
            // 没有设置过探测时间,那就设置一个 probe_wait 时间后探测一次
            kcp->probe_wait = IKCP_PROBE_INIT;
            kcp->ts_probe = kcp->current + kcp->probe_wait;
        }   
        else {
            // 如果当前时间大于探测时间,那就探测一次
            // 做的事情包括,设置下一次探测时间,并且设置一个探测标志意为,当前需要探测
            if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {
                if (kcp->probe_wait < IKCP_PROBE_INIT) 
                    kcp->probe_wait = IKCP_PROBE_INIT;
                kcp->probe_wait += kcp->probe_wait / 2;
                if (kcp->probe_wait > IKCP_PROBE_LIMIT)
                    kcp->probe_wait = IKCP_PROBE_LIMIT;
                kcp->ts_probe = kcp->current + kcp->probe_wait;
                kcp->probe |= IKCP_ASK_SEND;
            }
        }
    }   else {
        kcp->ts_probe = 0;
        kcp->probe_wait = 0;
    }

上面如果标记了 kcp->probe |= IKCP_ASK_SEND; 那这里就会发一个 IKCP_CMD_WASK 包。这个包没有data。只有cmd

    // flush window probing commands
    // 如果上面的探测标志被设置了,那么这里就会发送一个探测包
    if (kcp->probe & IKCP_ASK_SEND) {
        seg.cmd = IKCP_CMD_WASK; // 发一个这个命令,表示我想知道你的窗口大小
        size = (int)(ptr - buffer);
        if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
            ikcp_output(kcp, buffer, size);
            ptr = buffer;
        }
        ptr = ikcp_encode_seg(ptr, &seg);
    }

对面的 ikcp_input 收到 IKCP_CMD_WASK 包的时候,就会加一个 IKCP_ASK_TELL 标记。

    else if (cmd == IKCP_CMD_WASK) {
        // ready to send back IKCP_CMD_WINS in ikcp_flush
        // tell remote my window size
        kcp->probe |= IKCP_ASK_TELL;
        if (ikcp_canlog(kcp, IKCP_LOG_IN_PROBE)) {
            ikcp_log(kcp, IKCP_LOG_IN_PROBE, "input probe");
        }
    }

对面的ikcp_flush的时候,就会发一个 IKCP_CMD_WINS包

    // flush window probing commands
    // 接收的时候会标记这个,回一个这个包
    if (kcp->probe & IKCP_ASK_TELL) {
        seg.cmd = IKCP_CMD_WINS;
        size = (int)(ptr - buffer);
        if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
            ikcp_output(kcp, buffer, size);
            ptr = buffer;
        }
        ptr = ikcp_encode_seg(ptr, &seg);
    }

而收到IKCP_CMD_WINS不需要做任何事情。

    else if (cmd == IKCP_CMD_WINS) {
        // do nothing
        if (ikcp_canlog(kcp, IKCP_LOG_IN_WINS)) {
            ikcp_log(kcp, IKCP_LOG_IN_WINS,
                "input wins: %lu", (unsigned long)(wnd));
        }
    }

因为解包的时候,每个包头都带了。对端接收窗口大小。在上面
data = ikcp_decode16u(data, &wnd);

重传机制

数据流:
用户发送->send_queue->send_buf -> 网络 -> recv_queue -> recv_buf -> 用户接收
对于send_buf
【snd_una】【】【】【】【a】【】【snd_nxt】
我每发送一个包,就在最后添加一个,并且snd_nxt++,即挪到最后。那么这个就是我的发送滑动窗口。

对方的recv_buf
【】【】【】【】【】【rcv_nxt】
最后一个接收到的包是rcv_nxt。每次发包都会 una = rcv_nxt。塞在包头里面发给对面

回到ikcp_flush
这个判断和拥塞控制有关,后面讲。就是把合法包从send_queue挪到send_buf里面


    // move data from snd_queue to snd_buf
    while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
        IKCPSEG *newseg;
        if (iqueue_is_empty(&kcp->snd_queue)) break;

        newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);

        iqueue_del(&newseg->node);
        iqueue_add_tail(&newseg->node, &kcp->snd_buf);
        kcp->nsnd_que--;
        kcp->nsnd_buf++;

        newseg->conv = kcp->conv;
        newseg->cmd = IKCP_CMD_PUSH;
        newseg->wnd = seg.wnd;
        newseg->ts = current;
        newseg->sn = kcp->snd_nxt++;
        newseg->una = kcp->rcv_nxt;
        newseg->resendts = current;
        newseg->rto = kcp->rx_rto;
        newseg->fastack = 0;
        newseg->xmit = 0;
    }

对于 snd_buf。这部分包要发往网络了

    // calculate resent
    resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
    rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;

    // flush data segments
    // 遍历
    for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
        IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
        int needsend = 0;
        // xmit 是一个发送次数标记,这个包被发送了几次
        // 首次发送
        if (segment->xmit == 0) {
            needsend = 1; // 标记为需要发送
            segment->xmit++;

            // 计算rto 和 resendts。resendts这么长时间后是下一次重发
            segment->rto = kcp->rx_rto;
            segment->resendts = current + segment->rto + rtomin;
        }
        else if (_itimediff(current, segment->resendts) >= 0) {
            // 这个包发过了,但是没被确认掉,又要重发了
            needsend = 1;
            segment->xmit++;
            kcp->xmit++;

            // 重发的情况下计算rto
            if (kcp->nodelay == 0) {
                segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
            }   else {
                IINT32 step = (kcp->nodelay < 2)? 
                    ((IINT32)(segment->rto)) : kcp->rx_rto;
                segment->rto += step / 2;
            }
            segment->resendts = current + segment->rto;

            lost = 1; // 标记丢包了
        }
        // 在ikcp_input的时候收到ACK会记录收到的最大ACK。然后调用 ikcp_parse_fastack(kcp, maxack, latest_ts);
        // send_buf 1234, 我的maxack是3。那么1就被跳过两次,2被跳过一次。超过resent阈值,就触发快速重传
        else if (segment->fastack >= resent) {
            if ((int)segment->xmit <= kcp->fastlimit || 
                kcp->fastlimit <= 0) {
                needsend = 1;
                segment->xmit++;
                segment->fastack = 0;
                segment->resendts = current + segment->rto;
                change++;
            }
        }

        // 需要发包
        if (needsend) {
            int need;
            segment->ts = current;
            segment->wnd = seg.wnd;
            segment->una = kcp->rcv_nxt;

            size = (int)(ptr - buffer);
            need = IKCP_OVERHEAD + segment->len;

            // 这里发包
            if (size + need > (int)kcp->mtu) {
                ikcp_output(kcp, buffer, size);
                ptr = buffer;
            }

            ptr = ikcp_encode_seg(ptr, segment);

            if (segment->len > 0) {
                memcpy(ptr, segment->data, segment->len);
                ptr += segment->len;
            }

            // 超时断开
            if (segment->xmit >= kcp->dead_link) {
                kcp->state = (IUINT32)-1;
            }
        }
    }

在收到ack包的时候会调用ikcp_update_ack。计算出一个rx_rto。上面的重传时间与这个有关系

//---------------------------------------------------------------------
// parse ack
//---------------------------------------------------------------------
static void ikcp_update_ack(ikcpcb *kcp, IINT32 rtt)
{
    // rtt是当前时间减去发包时间,算出来的rtt输入参数

    IINT32 rto = 0;
    if (kcp->rx_srtt == 0) { // rx_srtt 平滑的rtt时间,==0表示之前没有设置过
        kcp->rx_srtt = rtt;
        kcp->rx_rttval = rtt / 2; // rx_rttval 是增量
    }   else {
        // 平滑算法
        long delta = rtt - kcp->rx_srtt;
        if (delta < 0) delta = -delta;
        kcp->rx_rttval = (3 * kcp->rx_rttval + delta) / 4; // (3/4)*rx_rttval + (1/4)*delta
        kcp->rx_srtt = (7 * kcp->rx_srtt + rtt) / 8; // (7/8)*rx_srtt + (1/8)rtt 这一次的rtt对于累计rtt的影响比例
        if (kcp->rx_srtt < 1) kcp->rx_srtt = 1;
    }

    // 避免rto算超了
    rto = kcp->rx_srtt + _imax_(kcp->interval, 4 * kcp->rx_rttval);
    kcp->rx_rto = _ibound_(kcp->rx_minrto, rto, IKCP_RTO_MAX);
}

ikcp_input 收到ack的处理
ikcp_update_ack 收到ack包,把这段包从发送缓冲删掉
maxack计算这一波最大的ack号,用途是累加 fastack

    if (cmd == IKCP_CMD_ACK) {
        if (_itimediff(kcp->current, ts) >= 0) {
            ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
        }
        ikcp_parse_ack(kcp, sn);
        ikcp_shrink_buf(kcp);
        if (flag == 0) {
            flag = 1;
            maxack = sn;
            latest_ts = ts;
        }   else {
            if (_itimediff(sn, maxack) > 0) {
            #ifndef IKCP_FASTACK_CONSERVE
                maxack = sn;
                latest_ts = ts;
            #else
                if (_itimediff(ts, latest_ts) > 0) {
                    maxack = sn;
                    latest_ts = ts;
                }
            #endif
            }
        }
        if (ikcp_canlog(kcp, IKCP_LOG_IN_ACK)) {
            ikcp_log(kcp, IKCP_LOG_IN_ACK, 
                "input ack: sn=%lu rtt=%ld rto=%ld", (unsigned long)sn, 
                (long)_itimediff(kcp->current, ts),
                (long)kcp->rx_rto);
        }
    }

拥塞控制

incr 最大可发送量,控制拥塞窗口大小
ssthresh,slow start threshold 慢启动阈值

ikcp_flush
计算拥塞窗口大小

    cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
    if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);

数据流:
用户发送->send_queue->send_buf -> 网络 -> recv_queue -> recv_buf -> 用户接收
对于send_buf
【snd_una】【】【】【】【a】【】【snd_nxt】
我每发送一个包,就在最后添加一个,并且snd_nxt++,即挪到最后。那么这个就是我的发送滑动窗口。

对方的recv_buf
【】【】【】【】【】【rcv_nxt】
最后一个接收到的包是rcv_nxt。每次发包都会 una = rcv_nxt。塞在包头里面发给对面

change 是这一次更新发送了快速包,lost 是这意思丢过包
如果发送了快速重传,可能丢包了,就需要change,修改kcp->cwnd

    // update ssthresh
    if (change) {
        IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;

        // 计算慢启动阈值
        kcp->ssthresh = inflight / 2;
        if (kcp->ssthresh < IKCP_THRESH_MIN)
            kcp->ssthresh = IKCP_THRESH_MIN;

        // 重新计算拥塞窗口大小
        kcp->cwnd = kcp->ssthresh + resent;
        kcp->incr = kcp->cwnd * kcp->mss;
    }

决定是否 snd_queue 移动数据到 snd_buf 的判断是

    // move data from snd_queue to snd_buf
    while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
        // 
    }

snd_nxt < snd_una + cwnd
snd_buf【snd_una】【】【】【】【】【】【snd_nxt】
那么change的改变kcp->cwnd就会影响这一行为。

如果真丢包了

    if (lost) {
        kcp->ssthresh = cwnd / 2;
        if (kcp->ssthresh < IKCP_THRESH_MIN)
            kcp->ssthresh = IKCP_THRESH_MIN;
        kcp->cwnd = 1;
        kcp->incr = kcp->mss;
    }

    if (kcp->cwnd < 1) {
        kcp->cwnd = 1;
        kcp->incr = kcp->mss;
    }

那就直接是1,就发不出去了

慢启动

ikcp_input接收的时候

    // 如果接收数据的时候 una 发生了改变,(因为收到了ack包)需要调整窗口的大小
    // 需要增加拥塞窗口
    if (_itimediff(kcp->snd_una, prev_una) > 0) {

        // 可以增加拥塞窗口
        if (kcp->cwnd < kcp->rmt_wnd) {
            IUINT32 mss = kcp->mss;
            if (kcp->cwnd < kcp->ssthresh) {
                // 小于阈值就一直增加
                kcp->cwnd++;
                kcp->incr += mss;
            }   else {
                // 否则做慢启动的逻辑,然后增速慢一点

                if (kcp->incr < mss) kcp->incr = mss; // 最小边界

                // 下面详解
                kcp->incr += (mss * mss) / kcp->incr + (mss / 16); 
                if ((kcp->cwnd + 1) * mss <= kcp->incr) {
                #if 1
                    kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0)? mss : 1);
                #else
                    kcp->cwnd++;
                #endif
                }
            }

            // 拥塞窗口不能大于远端接收窗口
            if (kcp->cwnd > kcp->rmt_wnd) {
                kcp->cwnd = kcp->rmt_wnd;
                kcp->incr = kcp->rmt_wnd * mss;
            }
        }
    }
if (kcp->cwnd < kcp->ssthresh) {
    // 小于阈值就一直增加
    kcp->cwnd++;
    kcp->incr += mss;
} else {
    // 否则做慢启动的逻辑,然后增速慢一点

    if (kcp->incr < mss) kcp->incr = mss; // 最小边界

    kcp->incr += (mss * mss) / kcp->incr + (mss / 16); 
    if ((kcp->cwnd + 1) * mss <= kcp->incr) {
    #if 1
        kcp->cwnd = (kcp->incr + mss - 1) / ((mss > 0)? mss : 1);
    #else
        kcp->cwnd++;
    #endif
    }
}

发送了丢包
kcp->cwnd = 1
kcp->incr = mss;
所以首次的增量是
(mss mss) / (kcp->cwnd mss) + (mss / 16)
= mss / kcp->cwnd + mss / 16
= mss (1/kcp->cwnd + 1/16)
随着 kcp->cwnd 增加这个增量会越来越慢

其他

到这里代码已经基本看完了,过一遍还差一个
IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current)
看注释是为了减少不必要的update。
通过ikcp_check获得下次调用update的时间

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇