上一节讨论了c服务的建立,如今来讨论消息的派发和消费,本节会讨论skynet的消息派发和消费,以及它如何实现线程安全,要完全弄清楚这些内容,须要先理解如下四种锁。安全
消费消息流程服务器
// skynet_start.c
static void
start(int thread) {
pthread_t pid[thread+3];
struct monitor *m = skynet_malloc(sizeof(*m));
memset(m, 0, sizeof(*m));
m->count = thread;
m->sleep = 0;
m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *));
int i;
for (i=0;i<thread;i++) {
m->m[i] = skynet_monitor_new();
}
if (pthread_mutex_init(&m->mutex, NULL)) {
fprintf(stderr, "Init mutex error");
exit(1);
}
if (pthread_cond_init(&m->cond, NULL)) {
fprintf(stderr, "Init cond error");
exit(1);
}
create_thread(&pid[0], thread_monitor, m);
create_thread(&pid[1], thread_timer, m);
create_thread(&pid[2], thread_socket, m);
static int weight[] = {
-1, -1, -1, -1, 0, 0, 0, 0,
1, 1, 1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3, };
struct worker_parm wp[thread];
for (i=0;i<thread;i++) {
wp[i].m = m;
wp[i].id = i;
if (i < sizeof(weight)/sizeof(weight[0])) {
wp[i].weight= weight[i];
} else {
wp[i].weight = 0;
}
create_thread(&pid[i+3], thread_worker, &wp[i]);
}
for (i=0;i<thread+3;i++) {
pthread_join(pid[i], NULL);
}
free_monitor(m);
}
skynet全部的线程都在这里被建立,在建立完monitor线程,timer线程和socket线程之后,就开始建立worker线程。每条worker线程会被指定一个权重值,这个权重值决定一条线程一次消费多少条次级消息队列里的消息,当权重值< 0,worker线程一次消费一条消息(从次级消息队列中pop一个消息);当权重==0的时候,worker线程一次消费完次级消息队列里全部的消息;当权重>0时,假设次级消息队列的长度为mq_length,将mq_length转成二进制数值之后,向右移动weight(权重值)位,结果N则是,该线程一次消费次级消息队列的消息数。在多条线程,同时运做时,每条worker线程都要从global_mq中pop一条次级消息队列出来,对global_mq进行pop和push操做的时候,会用自旋锁锁住临界区, // skynet_mq.c
void
skynet_globalmq_push(struct message_queue * queue) {
struct global_queue *q= Q;
SPIN_LOCK(q)
assert(queue->next == NULL);
if(q->tail) {
q->tail->next = queue;
q->tail = queue;
} else {
q->head = q->tail = queue;
}
SPIN_UNLOCK(q)
}
struct message_queue *
skynet_globalmq_pop() {
struct global_queue *q = Q;
SPIN_LOCK(q)
struct message_queue *mq = q->head;
if(mq) {
q->head = mq->next;
if(q->head == NULL) {
assert(mq == q->tail);
q->tail = NULL;
}
mq->next = NULL;
}
SPIN_UNLOCK(q)
return mq;
}
这样出队操做,只能同时在一条worker线程里进行,而其余worker线程只可以进入阻塞状态,在开的worker线程不少的状况下,始终有必定数量线程处于阻塞状态,下降服务器的并发处理效率,这里这么作第1-4条worker线程,每次只消费一个次级消息队列的消息,第5-8条线程一次消费整个次级消息队列的消息,第9-16条worker线程一次消费的消息数目大约是整个次级消息队列长度的一半,第17-24条线程一次消费的消息数大约是整个次级消息队列长度的四分之一,而第25-32条worker线程,则大约是次级消息总长度的八分之一。这样作的目的,大概是但愿避免过多的worker线程为了等待spinlock解锁,而陷入阻塞状态(由于一些线程,一次消费多条甚至所有次级消息队列的消息,所以在消费期间,不会对global_mq进行入队和出队操做,入队和出队操做时加自旋锁的,所以就不会尝试去访问spinlock锁住的临界区,该线程就在至关一段时间内不会陷入阻塞),进而提高服务器的并发处理能力。这里还有一个细节值得注意,就是前四条线程,每次只是pop一个次级消息队列的消息出来,这样作也在必定程度上保证了没有服务会被饿死。// skynet_start.c
static void
wakeup(struct monitor *m, int busy) {
if (m->sleep >= m->count - busy) {
// signal sleep worker, "spurious wakeup" is harmless
pthread_cond_signal(&m->cond);
}
}
static void *
thread_timer(void *p) {
struct monitor * m = p;
skynet_initthread(THREAD_TIMER);
for (;;) {
skynet_updatetime();
CHECK_ABORT
wakeup(m,m->count-1);
usleep(2500);
}
// wakeup socket thread
skynet_socket_exit();
// wakeup all worker thread
pthread_mutex_lock(&m->mutex);
m->quit = 1;
pthread_cond_broadcast(&m->cond);
pthread_mutex_unlock(&m->mutex);
return NULL;
}
static void *
thread_worker(void *p) {
struct worker_parm *wp = p;
int id = wp->id;
int weight = wp->weight;
struct monitor *m = wp->m;
struct skynet_monitor *sm = m->m[id];
skynet_initthread(THREAD_WORKER);
struct message_queue * q = NULL;
while (!m->quit) {
q = skynet_context_message_dispatch(sm, q, weight);
if (q == NULL) {
if (pthread_mutex_lock(&m->mutex) == 0) {
++ m->sleep;
// "spurious wakeup" is harmless,
// because skynet_context_message_dispatch() can be call at any time.
if (!m->quit)
pthread_cond_wait(&m->cond, &m->mutex);
-- m->sleep;
if (pthread_mutex_unlock(&m->mutex)) {
fprintf(stderr, "unlock mutex error");
exit(1);
}
}
}
}
return NULL;
}
// skynet_server.c
struct message_queue *
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
if (q == NULL) {
q = skynet_globalmq_pop();
if (q==NULL)
return NULL;
}
uint32_t handle = skynet_mq_handle(q);
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
struct drop_t d = { handle };
skynet_mq_release(q, drop_message, &d);
return skynet_globalmq_pop();
}
int i,n=1;
struct skynet_message msg;
for (i=0;i<n;i++) {
if (skynet_mq_pop(q,&msg)) {
skynet_context_release(ctx);
return skynet_globalmq_pop();
} else if (i==0 && weight >= 0) {
n = skynet_mq_length(q);
n >>= weight;
}
int overload = skynet_mq_overload(q);
if (overload) {
skynet_error(ctx, "May overload, message queue length = %d", overload);
}
skynet_monitor_trigger(sm, msg.source , handle);
if (ctx->cb == NULL) {
skynet_free(msg.data);
} else {
dispatch_message(ctx, &msg);
}
skynet_monitor_trigger(sm, 0,0);
}
assert(q == ctx->queue);
struct message_queue *nq = skynet_globalmq_pop();
if (nq) {
// If global mq is not empty , push q back, and return next queue (nq)
// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
skynet_globalmq_push(q);
q = nq;
}
skynet_context_release(ctx);
return q;
}
static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
assert(ctx->init);
CHECKCALLING_BEGIN(ctx)
pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
int type = msg->sz >> MESSAGE_TYPE_SHIFT;
size_t sz = msg->sz & MESSAGE_TYPE_MASK;
if (ctx->logfile) {
skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
}
if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)) {
skynet_free(msg->data);
}
CHECKCALLING_END(ctx)
}
整个worker线程的消费流程是:struct skynet_context *
skynet_handle_grab(uint32_t handle) {
struct handle_storage *s = H;
struct skynet_context * result = NULL;
rwlock_rlock(&s->lock);
uint32_t hash = handle & (s->slot_size-1);
struct skynet_context * ctx = s->slot[hash];
if (ctx && skynet_context_handle(ctx) == handle) {
result = ctx;
skynet_context_grab(result);
}
rwlock_runlock(&s->lock);
return result;
}
这里加上读写锁的意义在于,多个worker线程,同时从skynet_context列表中获取context指针时,没有一条线程是会被阻塞的,这样提升了并发的效率,而此时,尝试往skyent_context里表中,添加新的服务的线程将会被阻塞住,由于添加新的服务可能会致使skynet_context列表(也就是代码里的slot列表)可能会被resize,所以读的时候不容许写入,写的时候不容许读取,保证了线程安全。