skynet消息调度机制

上一节讨论了c服务的建立,如今来讨论消息的派发和消费,本节会讨论skynet的消息派发和消费,以及它如何实现线程安全,要完全弄清楚这些内容,须要先理解如下四种锁。安全

  • 互斥锁(mutex lock : mutual exclusion lock)
  1. 概念:互斥锁,一条线程加锁锁住临界区,另外一条线程尝试访问改临界区的时候,会发生阻塞,并进入休眠状态。临界区是锁lock和unlock之间的代码片断,通常是多条线程可以共同访问的部分。
  2. 具体说明:假设一台机器上的cpu有两个核心core0和core1,如今有线程A、B、C,此时core0运行线程A,core1运行线程B,此时线程B使用Mutex锁,锁住一个临界区,当线程A试图访问该临界区时,由于线程B已经将其锁住,所以线程A被挂起,进入休眠状态,此时core0进行上下文切换,将线程A放入休眠队列中,而后core0运行线程C,当线程B完成临界区的流程并执行解锁以后,线程A又会被唤醒,core0从新运行线程A

  • 自旋锁(spinlock)
  1. 概念:自旋锁,一条线程加锁锁住临界区,另外一条线程尝试访问该临界区的时候,会发生阻塞,可是不会进入休眠状态,而且不断轮询该锁,直至原来锁住临界区的线程解锁。
  2. 具体说明:假设一台机器上有两个核心core0和core1,如今有线程A、B、C,此时core0运行线程A,core1运行线程B,此时线程B调用spin lock锁住临界区,当线程A尝试访问该临界区时,由于B已经加锁,此时线程A会阻塞,而且不断轮询该锁,不会交出core0的使用权,当线程B释放锁时,A开始执行临界区逻辑


  • 读写锁(readers–writer lock)
  1. 概述:读写锁,一共三种状态
    • 读状态时加锁,此时为共享锁,当一个线程加了读锁时,其余线程若是也尝试以读模式进入临界区,那么不会发生阻塞,直接访问临界区
    • 写状态时加锁,此时为独占锁,当某个线程加了写锁,那么其余线程尝试访问该临界区(不管是读仍是写),都会阻塞等待
    • 不加锁
  2. 注意:
    • 某线程加读取锁时,容许其余线程以读模式进入,此时若是有一个线程尝试以写模式访问临界区时,该线程会被阻塞,而其后尝试以读方式访问该临界区的线程也会被阻塞
    • 读写锁适合在读远大于写的情形中使用

  • 条件变量(condition variables)
概述: 假设A,B,C三条线程,其中B,C线程加了cond_wait锁并投入睡眠,而A线程则在某个条件触发时,会经过signal通知B,C线程,从 而唤醒B和C线程。


  • 消费消息流程服务器

  1. 概述:
    skynet在启动时,会建立若干条worker线程(由配置指定),这些worker线程被建立之后,会不断得从global_mq里pop出一个次级消息队列来,每一个worker线程,每次只pop一个次级消息队列,而后再从次级消息队列中,pop一到若干条消息出来(授权重值影响),最后消息将做为参数传给对应服务的callback函数(每一个服务只有一个专属的次级消息队列),当callback执行完时,worker线程会将次级消息队列push回global_mq里,这样就完成了消息的消费。
    在这个过程当中,由于每一个worker线程会从global_mq里pop一个次级消息队列出来,此时其余worker线程就不能从global_mq里pop出同一个次级消息队列,也就是说,一个服务不能同时在多个worker线程内调用callback函数,从而保证了线程安全。
  2. worker线程的建立与运做
    要理解skynet的消息调度,首先要理解worker线程的建立流程,基本运做以及线程安全。worker线程的数量由配置的“thread”字段指定,skynet节点启动时,会建立配置指定数量的worker线程,咱们能够再skynet_start.c的start函数中找到这个建立流程:
    // skynet_start.c
    static void
    start(int thread) {
    	pthread_t pid[thread+3];
    
    	struct monitor *m = skynet_malloc(sizeof(*m));
    	memset(m, 0, sizeof(*m));
    	m->count = thread;
    	m->sleep = 0;
    
    	m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *));
    	int i;
    	for (i=0;i<thread;i++) {
    		m->m[i] = skynet_monitor_new();
    	}
    	if (pthread_mutex_init(&m->mutex, NULL)) {
    		fprintf(stderr, "Init mutex error");
    		exit(1);
    	}
    	if (pthread_cond_init(&m->cond, NULL)) {
    		fprintf(stderr, "Init cond error");
    		exit(1);
    	}
    
    	create_thread(&pid[0], thread_monitor, m);
    	create_thread(&pid[1], thread_timer, m);
    	create_thread(&pid[2], thread_socket, m);
    
    	static int weight[] = { 
    		-1, -1, -1, -1, 0, 0, 0, 0,
    		1, 1, 1, 1, 1, 1, 1, 1, 
    		2, 2, 2, 2, 2, 2, 2, 2, 
    		3, 3, 3, 3, 3, 3, 3, 3, };
    	struct worker_parm wp[thread];
    	for (i=0;i<thread;i++) {
    		wp[i].m = m;
    		wp[i].id = i;
    		if (i < sizeof(weight)/sizeof(weight[0])) {
    			wp[i].weight= weight[i];
    		} else {
    			wp[i].weight = 0;
    		}
    		create_thread(&pid[i+3], thread_worker, &wp[i]);
    	}
    
    	for (i=0;i<thread+3;i++) {
    		pthread_join(pid[i], NULL); 
    	}
    
    	free_monitor(m);
    }
    skynet全部的线程都在这里被建立,在建立完monitor线程,timer线程和socket线程之后,就开始建立worker线程。每条worker线程会被指定一个权重值,这个权重值决定一条线程一次消费多少条次级消息队列里的消息,当权重值< 0,worker线程一次消费一条消息(从次级消息队列中pop一个消息);当权重==0的时候,worker线程一次消费完次级消息队列里全部的消息;当权重>0时,假设次级消息队列的长度为mq_length,将mq_length转成二进制数值之后,向右移动weight(权重值)位,结果N则是,该线程一次消费次级消息队列的消息数。在多条线程,同时运做时,每条worker线程都要从global_mq中pop一条次级消息队列出来,对global_mq进行pop和push操做的时候,会用自旋锁锁住临界区,
    // skynet_mq.c
    void 
    skynet_globalmq_push(struct message_queue * queue) {
    	struct global_queue *q= Q;
    
    	SPIN_LOCK(q)
    	assert(queue->next == NULL);
    	if(q->tail) {
    		q->tail->next = queue;
    		q->tail = queue;
    	} else {
    		q->head = q->tail = queue;
    	}
    	SPIN_UNLOCK(q)
    }
    
    struct message_queue * 
    skynet_globalmq_pop() {
    	struct global_queue *q = Q;
    
    	SPIN_LOCK(q)
    	struct message_queue *mq = q->head;
    	if(mq) {
    		q->head = mq->next;
    		if(q->head == NULL) {
    			assert(mq == q->tail);
    			q->tail = NULL;
    		}
    		mq->next = NULL;
    	}
    	SPIN_UNLOCK(q)
    
    	return mq;
    }
    这样出队操做,只能同时在一条worker线程里进行,而其余worker线程只可以进入阻塞状态,在开的worker线程不少的状况下,始终有必定数量线程处于阻塞状态,下降服务器的并发处理效率,这里这么作第1-4条worker线程,每次只消费一个次级消息队列的消息,第5-8条线程一次消费整个次级消息队列的消息,第9-16条worker线程一次消费的消息数目大约是整个次级消息队列长度的一半,第17-24条线程一次消费的消息数大约是整个次级消息队列长度的四分之一,而第25-32条worker线程,则大约是次级消息总长度的八分之一。这样作的目的,大概是但愿避免过多的worker线程为了等待spinlock解锁,而陷入阻塞状态(由于一些线程,一次消费多条甚至所有次级消息队列的消息,所以在消费期间,不会对global_mq进行入队和出队操做,入队和出队操做时加自旋锁的,所以就不会尝试去访问spinlock锁住的临界区,该线程就在至关一段时间内不会陷入阻塞),进而提高服务器的并发处理能力。这里还有一个细节值得注意,就是前四条线程,每次只是pop一个次级消息队列的消息出来,这样作也在必定程度上保证了没有服务会被饿死。
    正如本节概述所说,一个worker线程被建立出来之后,则是不断尝试从global_mq中pop一个次级消息队列,并从次级消息队列中pop消息,进而经过服务的callback函数来消费该消息:
    // skynet_start.c
    static void
    wakeup(struct monitor *m, int busy) {
    	if (m->sleep >= m->count - busy) {
    		// signal sleep worker, "spurious wakeup" is harmless
    		pthread_cond_signal(&m->cond);
    	}
    }
    
    static void *
    thread_timer(void *p) {
    	struct monitor * m = p;
    	skynet_initthread(THREAD_TIMER);
    	for (;;) {
    		skynet_updatetime();
    		CHECK_ABORT
    		wakeup(m,m->count-1);
    		usleep(2500);
    	}
    	// wakeup socket thread
    	skynet_socket_exit();
    	// wakeup all worker thread
    	pthread_mutex_lock(&m->mutex);
    	m->quit = 1;
    	pthread_cond_broadcast(&m->cond);
    	pthread_mutex_unlock(&m->mutex);
    	return NULL;
    }
    
    static void *
    thread_worker(void *p) {
    	struct worker_parm *wp = p;
    	int id = wp->id;
    	int weight = wp->weight;
    	struct monitor *m = wp->m;
    	struct skynet_monitor *sm = m->m[id];
    	skynet_initthread(THREAD_WORKER);
    	struct message_queue * q = NULL;
    	while (!m->quit) {
    		q = skynet_context_message_dispatch(sm, q, weight);
    		if (q == NULL) {
    			if (pthread_mutex_lock(&m->mutex) == 0) {
    				++ m->sleep;
    				// "spurious wakeup" is harmless,
    				// because skynet_context_message_dispatch() can be call at any time.
    				if (!m->quit)
    					pthread_cond_wait(&m->cond, &m->mutex);
    				-- m->sleep;
    				if (pthread_mutex_unlock(&m->mutex)) {
    					fprintf(stderr, "unlock mutex error");
    					exit(1);
    				}
    			}
    		}
    	}
    	return NULL;
    }
    
    // skynet_server.c
    struct message_queue * 
    skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
    	if (q == NULL) {
    		q = skynet_globalmq_pop();
    		if (q==NULL)
    			return NULL;
    	}
    
    	uint32_t handle = skynet_mq_handle(q);
    
    	struct skynet_context * ctx = skynet_handle_grab(handle);
    	if (ctx == NULL) {
    		struct drop_t d = { handle };
    		skynet_mq_release(q, drop_message, &d);
    		return skynet_globalmq_pop();
    	}
    
    	int i,n=1;
    	struct skynet_message msg;
    
    	for (i=0;i<n;i++) {
    		if (skynet_mq_pop(q,&msg)) {
    			skynet_context_release(ctx);
    			return skynet_globalmq_pop();
    		} else if (i==0 && weight >= 0) {
    			n = skynet_mq_length(q);
    			n >>= weight;
    		}
    		int overload = skynet_mq_overload(q);
    		if (overload) {
    			skynet_error(ctx, "May overload, message queue length = %d", overload);
    		}
    
    		skynet_monitor_trigger(sm, msg.source , handle);
    
    		if (ctx->cb == NULL) {
    			skynet_free(msg.data);
    		} else {
    			dispatch_message(ctx, &msg);
    		}
    
    		skynet_monitor_trigger(sm, 0,0);
    	}
    
    	assert(q == ctx->queue);
    	struct message_queue *nq = skynet_globalmq_pop();
    	if (nq) {
    		// If global mq is not empty , push q back, and return next queue (nq)
    		// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
    		skynet_globalmq_push(q);
    		q = nq;
    	} 
    	skynet_context_release(ctx);
    
    	return q;
    }
    
    static void
    dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
    	assert(ctx->init);
    	CHECKCALLING_BEGIN(ctx)
    	pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
    	int type = msg->sz >> MESSAGE_TYPE_SHIFT;
    	size_t sz = msg->sz & MESSAGE_TYPE_MASK;
    	if (ctx->logfile) {
    		skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
    	}
    	if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)) {
    		skynet_free(msg->data);
    	} 
    	CHECKCALLING_END(ctx)
    }
    整个worker线程的消费流程是:
    a) worker线程每次,从global_mq中弹出一个次级消息队列,若是次级消息队列为空,则该worker线程投入睡眠,timer线程每隔2.5毫秒会唤醒一条睡眠中的worker线程,并从新尝试从全局消息队列中pop一个次级消息队列出来,当次级消息队列不为空时,进入下一步
    b) 根据次级消息的handle,找出其所属的服务(一个skynet_context实例)指针,从次级消息队列中,pop出n条消息(受weight值影响),而且将其做为参数,传给skynet_context的cb函数,并调用它
    c) 当完成callback函数调用时,就从global_mq中再pop一个次级消息队列中,供下一次使用,并将本次使用的次级消息队列push回global_mq的尾部
    d) 返回第a步
  3. 线程安全
    • 整个消费流程,每条worker线程,从global_mq取出的次级消息队列都是惟一的,而且有且只有一个服务与之对应,取出以后,在该worker线程完成全部callback调用以前,不会push回global_mq中,也就是说,在这段时间内,其余worker线程不能拿到这个次级消息队列所对应的服务,并调用callback函数,也就是说一个服务不可能同时在多条worker线程内执行callback函数,从而保证了线程安全
      image
    • 不管是global_mq也好,次级消息队列也好,他们在入队和出队操做时,都有加上spinlock,这样多个线程同时访问mq的时候,第一个访问者会进入临界区并锁住,其余线程会阻塞等待,直至该锁解除,这样也保证了线程安全。global_mq会同时被多个worker线程访问,这个很好理解,由于worker线程老是在不断尝试驱动不一样的服务,要驱动服务,首先要取出至少一个消息,要得到消息,就要取出一个次级消息队列,而这个次级消息队列要从全局消息队列里取。虽然一个服务的callback函数,只能在一个worker线程内被调用,可是在多个worker线程中,能够向同一个次级消息队列push消息,即使是该次级消息队列所对应的服务正在执行callback函数,因为次级消息队列不是skynet_context的成员(skynet_context只是包含了次级消息队列的指针),所以改变次级消息队列不等于改变skynet_context上的数据,不会影响到该服务自身内存的数据,次级消息队列在进行push和pop操做的时候,会加上一个spinlock,当多个worker线程同时向同一个次级消息队列push消息时,第一个访问的worker线程,可以进入临界区,其余worker线程就阻塞等待,直至该临界区解锁,这样保证了线程安全。
    • 咱们在经过handle从skynet_context list里获取skynet_context的过程当中(好比派发消息时,要要先获取skynet_context指针,再调用该服务的callback函数),须要加上一个读写锁,由于在skynet运做的过程当中,获取skynet_context,比建立skynet_context的状况要多得多,所以这里用了读写锁:
      struct skynet_context * 
      skynet_handle_grab(uint32_t handle) {
      	struct handle_storage *s = H;
      	struct skynet_context * result = NULL;
      
      	rwlock_rlock(&s->lock);
      
      	uint32_t hash = handle & (s->slot_size-1);
      	struct skynet_context * ctx = s->slot[hash];
      	if (ctx && skynet_context_handle(ctx) == handle) {
      		result = ctx;
      		skynet_context_grab(result);
      	}
      
      	rwlock_runlock(&s->lock);
      
      	return result;
      }
      这里加上读写锁的意义在于,多个worker线程,同时从skynet_context列表中获取context指针时,没有一条线程是会被阻塞的,这样提升了并发的效率,而此时,尝试往skyent_context里表中,添加新的服务的线程将会被阻塞住,由于添加新的服务可能会致使skynet_context列表(也就是代码里的slot列表)可能会被resize,所以读的时候不容许写入,写的时候不容许读取,保证了线程安全。
  • 发送消息流程 向一个服务发送消息的本质,就是向该服务的次级消息队列里push消息,多个worker线程可能会同时向同一个服务的次级消息队列push一个消息,正如上节所说的那样,次级消息队列push和pop操做,都有加一个spinlock,从而保证了线程安全,上节已经说明了,这里再也不赘述。