當前位置:首頁 > IT技術 > 其他 > 正文

時間輪定時器分析
2022-04-25 23:09:20


時間輪算法(Timing-Wheel)很早出現(xiàn)在linux kernel 2.6中。因效率非常高,很多應用框架都實現(xiàn)了這個算法。還有些定時器使用最小堆實現(xiàn),但總體來說,時間輪算法在插入性能上更高。

時間輪分成多個層級,每一層是一個圈,和時鐘類似

當個位的指針轉(zhuǎn)完一圈到達0這個刻度之后,十位的指針轉(zhuǎn)1格;當十位的轉(zhuǎn)完一圈,百位的轉(zhuǎn)1格,以此類推。這樣就能表示很長的度數(shù)。

時間輪能表達的時間長度,和圈的數(shù)量以及每一圈的長度成正比。假設有5圈,每個圈60個刻度,每個刻度表示1毫秒,那么這個時間輪可以表示這么長:

60 x 60 x 60 x 60 x 60 = 777,600,000?(ms) ~ 216小時

現(xiàn)在用程序表示這個指針可能這樣的:

int ptr[5];

假如有一個update函數(shù)驅(qū)動時間輪運轉(zhuǎn)起來,每調(diào)一次跳一格,那這個算法可能是這樣的:

  • ptr[0]加1,如果ptr[0]等于60,則使ptr[0]重置為0,可以寫為:ptr[0] = (ptr[0] + 1) % 60
  • 如果ptr[0]等于0的時候,說明轉(zhuǎn)了一圈,此時ptr[1]要加1:ptr[1] = (ptr[1] + 1) % 60
  • 就這樣一直轉(zhuǎn)到第5圈,然后又重新開始。

這樣處理有點麻煩,而且還需要一個數(shù)組來表示。其實可以用一個uint32變量來劃分,比如:

| 6bit | 6bit | 6bit | 6bit |  8bit  |
111111 111111 111111 111111 11111111

也分5個圈,第1個占8位即256個槽位,后面4個分別占6位即64個槽位。這個變量只需不斷自增就行,比如前面8位滿了會變成0,后面自動進1。這樣就可以定義一些宏:

// 第1個輪占的位數(shù)
#define TVR_BITS 8
// 第1個輪的長度
#define TVR_SIZE (1 << TVR_BITS)
// 第n個輪占的位數(shù)
#define TVN_BITS 6
// 第n個輪的長度
#define TVN_SIZE (1 << TVN_BITS)
// 掩碼:取?;蛘?br/>#define TVR_MASK (TVR_SIZE - 1)
#define TVN_MASK (TVN_SIZE - 1)

想取某一個圈的當前指針位置是:

// 第1個圈的當前指針位置
#define FIRST_INDEX(v) ((v) & TVR_MASK)
// 后面第n個圈的當前指針位置
#define NTH_INDEX(v, n) (((v) >> (TVR_BITS + (n) * TVN_BITS)) & TVN_MASK)

核心的數(shù)組結(jié)構(gòu)是這樣的:

// 第1個輪
typedef struct tvroot {
clinknode_t vec[TVR_SIZE];
} tvroot_t;
// 后面幾個輪
typedef struct tvnum {
clinknode_t vec[TVN_SIZE];
} tvnum_t;

// 時間輪定時器
typedef struct timerwheel {
tvroot_t tvroot; // 第1個輪
tvnum_t tv[4]; // 后面4個輪
uint64_t lasttime; // 上一次的時間毫秒
uint32_t currtick; // 當前的tick
uint16_t interval; // 每個時間點的毫秒間隔
uint16_t remainder; // 剩余的毫秒
} timerwheel_t;

這里要重點講一下clinknode_t,這是每一個圈的槽位的數(shù)據(jù),它其實是一個雙向循環(huán)鏈表:

時間輪定時器分析_結(jié)點

新結(jié)點可以往head的前面加,也可以往head的后面加,相當于加到鏈表頭和鏈表尾。初始情況下head的前后指針指向自己。鏈表中的結(jié)點就是定時器結(jié)點。

雙向鏈表的代碼如下:

#pragma once
/**
* 循環(huán)雙向鏈表
*/

// 鏈表結(jié)點
typedef struct clinknode {
struct clinknode *next;
struct clinknode *prev;
} clinknode_t;

// 初始化鏈表頭:前后都指向自己
static inline void clinklist_init(clinknode_t *head) {
head->prev = head;
head->next = head;
}

// 插入結(jié)點到鏈表的前面,因為是循環(huán)鏈表,其實是在head的后面
static inline void clinklist_add_front(clinknode_t *head, clinknode_t *node) {
node->prev = head;
node->next = head->next;
head->next->prev = node;
head->next = node;
}

// 插入結(jié)點到鏈表的后面,因為是循環(huán)鏈表,所以其實是在head的前面
static inline void clinklist_add_back(clinknode_t *head, clinknode_t *node) {
node->prev = head->prev;
node->next = head;
node->prev->next = node;
head->prev = node;
}

// 判斷鏈表是否為空:循環(huán)鏈表為空是頭的下一個和上一個都指向自己
static inline int clinklist_is_empty(clinknode_t *head) {
return head == head->next;
}

// 從鏈表中移除自己,同時會重設結(jié)點
static inline void clinklist_remote(clinknode_t *node) {
node->next->prev = node->prev;
node->prev->next = node->next;
clinklist_init(node);
}

// 將鏈表1的結(jié)點取出來,放到鏈表2
static inline void clinklist_splice(clinknode_t *head1, clinknode_t *head2) {
if (!clinklist_is_empty(head1)) {
clinknode_t *first = head1->next; // 第1個結(jié)點
clinknode_t *last = head1->prev; // 最后1個結(jié)點
clinknode_t *at = head2->next; // 插在第2個鏈表的這個結(jié)點前面
first->prev = head2;
head2->next = first;
last->next = at;
at->prev = last;
clinklist_init(head1);
}
}

整個定時器的代碼如下:

timerwheel.h

#pragma once
/**
* 定時器模塊
*/
#include <stdio.h>
#include <stddef.h>
#include <stdint.h>
#include <string.h>
#include "clinklist.h"


// 時間輪定時器

// 第1個輪占的位數(shù)
#define TVR_BITS 8
// 第1個輪的長度
#define TVR_SIZE (1 << TVR_BITS)
// 第n個輪占的位數(shù)
#define TVN_BITS 6
// 第n個輪的長度
#define TVN_SIZE (1 << TVN_BITS)
// 掩碼:取模或整除用
#define TVR_MASK (TVR_SIZE - 1)
#define TVN_MASK (TVN_SIZE - 1)

// 定時器回調(diào)函數(shù)
typedef void (*timer_cb_t)(void*);

// 定時器結(jié)點
typedef struct timernode {
struct linknode *next; // 下一個結(jié)點
struct linknode *prev; // 上一個結(jié)點
void *userdata; // 用戶數(shù)據(jù)
timer_cb_t callback; // 回調(diào)函數(shù)
uint32_t expire; // 到期時間
} timernode_t;

// 第1個輪
typedef struct tvroot {
clinknode_t vec[TVR_SIZE];
} tvroot_t;

// 后面幾個輪
typedef struct tvnum {
clinknode_t vec[TVN_SIZE];
} tvnum_t;



// 時間輪定時器
typedef struct timerwheel {
tvroot_t tvroot; // 第1個輪
tvnum_t tv[4]; // 后面4個輪
uint64_t lasttime; // 上一次的時間毫秒
uint32_t currtick; // 當前的tick
uint16_t interval; // 每個時間點的毫秒間隔
uint16_t remainder; // 剩余的毫秒
} timerwheel_t;


// 初始化時間輪,interval為每幀的間隔,currtime為當前時間
void timerwheel_init(timerwheel_t *tw, uint16_t interval, uint64_t currtime);
// 初始化時間結(jié)點:cb為回調(diào),ud為用戶數(shù)據(jù)
void timerwheel_node_init(timernode_t *node, timer_cb_t cb, void *ud);
// 增加時間結(jié)點,ticks為觸發(fā)間隔(注意是以interval為單位)
void timerwheel_add(timerwheel_t *tw, timernode_t *node, uint32_t ticks);
// 刪除結(jié)點
int timerwheel_del(timerwheel_t *tw, timernode_t *node);
// 更新時間輪
void timerwheel_update(timerwheel_t *tw, uint64_t currtime);

timerwheel.c

#include "timerwheel.h"

#define FIRST_INDEX(v) ((v) & TVR_MASK)
#define NTH_INDEX(v, n) (((v) >> (TVR_BITS + (n) * TVN_BITS)) & TVN_MASK)

void timerwheel_init(timerwheel_t *tw, uint16_t interval, uint64_t currtime) {
memset(tw, 0, sizeof(*tw));
tw->interval = interval;
tw->lasttime = currtime;
int i, j;
for (i = 0; i < TVR_SIZE; ++i) {
clinklist_init(tw->tvroot.vec + i);
}
for (i = 0; i < 4; ++i) {
for (j = 0; j < TVN_SIZE; ++j)
clinklist_init(tw->tv[i].vec + j);
}
}

void timerwheel_node_init(timernode_t *node, timer_cb_t cb, void *ud) {
node->next = 0;
node->prev = 0;
node->userdata = ud;
node->callback = cb;
node->expire = 0;
}

static void _timerwheel_add(timerwheel_t *tw, timernode_t *node) {
uint32_t expire = node->expire;
uint32_t idx = expire - tw->currtick;
clinknode_t *head;
if (idx < TVR_SIZE) {
head = tw->tvroot.vec + FIRST_INDEX(expire);
} else {
int i;
uint64_t sz;
for (i = 0; i < 4; ++i) {
sz = (1ULL << (TVR_BITS + (i+1) * TVN_BITS));
if (idx < sz) {
idx = NTH_INDEX(expire, i);
head = tw->tv[i].vec + idx;
break;
}
}
}
clinklist_add_back(head, (clinknode_t*)node);
}

void timerwheel_add(timerwheel_t *tw, timernode_t *node, uint32_t ticks) {
node->expire = tw->currtick + ((ticks > 0) ? ticks : 1);
_timerwheel_add(tw, node);
}

int timerwheel_del(timerwheel_t *tw, timernode_t *node) {
if (!clinklist_is_empty((clinknode_t*)node)) {
clinklist_remote((clinknode_t*)node);
return 1;
}
return 0;
}

void _timerwheel_cascade(timerwheel_t *tw, tvnum_t *tv, int idx) {
clinknode_t head;
clinklist_init(&head);
clinklist_splice(tv->vec + idx, &head);
while (!clinklist_is_empty(&head)) {
timernode_t *node = (timernode_t*)head.next;
clinklist_remote(head.next);
_timerwheel_add(tw, node);
}
}

void _timerwheel_tick(timerwheel_t *tw) {
++tw->currtick;

uint32_t currtick = tw->currtick;
int index = (currtick & TVR_MASK);
if (index == 0) {
int i = 0;
int idx;
do {
idx = NTH_INDEX(tw->currtick, i);
_timerwheel_cascade(tw, &(tw->tv[i]), idx);
} while (idx == 0 && ++i < 4);
}

clinknode_t head;
clinklist_init(&head);
clinklist_splice(tw->tvroot.vec + index, &head);
while (!clinklist_is_empty(&head)) {
timernode_t *node = (timernode_t*)head.next;
clinklist_remote(head.next);
if (node->callback) {
node->callback(node->userdata);
}
}
}

void timerwheel_update(timerwheel_t *tw, uint64_t currtime) {
if (currtime > tw->lasttime) {
int diff = currtime - tw->lasttime + tw->remainder;
int intv = tw->interval;
tw->lasttime = currtime;

while (diff >= intv) {
diff -= intv;
_timerwheel_tick(tw);
}
tw->remainder = diff;
}
}

核心函數(shù)就兩個:

  • _timerwheel_add 把定時器結(jié)點加到時間輪里。
  • _timerwheel_tick 指針往前走一步,如果指針到達0,則上一個圈的指針也會跳一步,此時要把上一個圈的鏈表取出來,重新加到當前圈里(_timerwheel_cascade)

這份代碼參考了2.6內(nèi)核的timer,其中涉及到一些位運算,不過通過上面的講解,應該不難理解。有興趣了解的還是看代碼吧,代碼才是最好的文檔:)




本文摘自 :https://blog.51cto.com/u

開通會員,享受整站包年服務立即開通 >