Linux C实现纯用户态抢占式多线程!

温州皮鞋厂老板,只有皮鞋!web


2010年的陈年旧事:
纯用户空间抢占式多线程的设计: https://blog.csdn.net/dog250/article/details/5735512api

纯用户空间的抢占式多线程库实际上是很麻烦的一件事!bash

嗯,九年前的事情了。当时一直想作一个纯用户态的多线程,然而最终没有找到优雅的方法。数据结构

五一放假前的周六单休,在家带娃,想到一个办法。今夜做本文记下。多线程


若是要找 纯用户态多线程 的实现,那太多了,可是几乎都是 非抢占式 的!都是某种协程的形式,须要指令流本身放弃CPU,将执行权主动交给其它的执行流。为何不多有人研究 纯用户态抢占式多线程 呢?缘由我想大概以下吧:tcp

  • 这种多线程严重依赖操做系统内核自己提供的机制,好比Solaris的call up机制这种,每个系统不必定提供相同的相似机制,甚至不必定提供该机制。
  • 当前线程库很是多,且Linux也早就实现了纯内核多线程,再去实现这个用户态的多线程没有意义。
  • 即使实现了纯用户态线程,每个线程也会受制于操做系统,好比一个线程在操做系统内核阻塞了,其它的线程也要跟着等…
  • 没有固定的规范化的性能评估体系,很难度量性能开销。这个和基于OS内核的线程不一样,由于后者是有严格的软硬件接口profile的。
  • 作这个事情于工做无益,只适合本身玩玩,可是谁会没事了玩这个呢?

我也知道,作这个事情没有意义,可是什么是意义?svg

直接说吧,作完这个事情,至少对Linux内核对信号处理的理解更深了一步吧,或者退一万步,至少,经过这事的练习,你对sigcontext和sigframe的结构体了解了是吧。函数

还以为没有意义?那就尽管喷吧。性能


所谓 抢占式多线程调度 ,就是不依靠线程本身来放弃CPU从而将执行权交给别的线程,而是靠一种外部主动干扰模式的调度机制,在须要调度的时刻,强行剥夺当前线程的执行权,依靠策略选择另外一个线程来运行。测试

当时之因此没有找到优雅的方案,是由于我没有找到什么地方能够同时作到两件事:

  1. 中断当前的线程执行,进入一个handler来根据调度策略实施调度和切换。
  2. 在这个handler中修改该进程的寄存器上下文,剥夺当前线程的执行权交给另外一个线程。

首先,上述第1点是能够用信号完成的,好比用alarm函数,能够实现分时中断。然而在中断处理函数中,我没有找到修改寄存器的方法,曾经想过用setjmp/longjmp,然而失败,最终使用PTRACE机制实现了一个无比丑陋和粗糙的。

在九年前的文章的中,开篇我就说 纯用户空间的抢占式多线程库实际上是很麻烦的一件事! 确实麻烦,之因此这么认为就是由于上面的难题没有解决。

当时确实是术业不精啊。后面的几年,本身也没怎么看过Linux内核信号处理相关的东西。

周六恰逢正则喝完奶睡着了以后,我一我的又不能出去浪,忽然就又想到了这个问题。我发誓要找一个优雅的方案出来,毕竟九年过去了,我想本身的内功应该比那时强太多了。

确实,这个方案也真的是信手拈来。


我知道,Linux进程在执行流返回用户态前处理信号的时候,要调用信号处理函数,而这个信号处理函数是定义在用户态的,因此Linux进程为了能够执行这个handler函数,便须要本身setup一下用户态堆栈。

而这个机制,偏偏给了咱们修改寄存器的机会。

九年前,我一直觉得用户态的寄存器上下文在彻底返回用户态以前,始终是保存在内核栈上,没法修改。但事实上,当执行信号处理函数的时候,内核会把该进程内核栈上的寄存器上下文sigcontext拷贝到用户态的堆栈中,再压入一个sigreturn系统调用做为返回地址,而后等信号处理函数完成后,sigreturn将会自动陷入内核,再将用户态的sigcontext拷贝回内核栈,以完全完成信号处理,恢复进程的寄存器上下文。

也就是说,当信号处理函数被执行时,是能够在当前堆栈上找到寄存器上下文的,咱们只须要在堆栈上找sigcontext结构体便可。这时,咱们对其进行修改,而后这些被修改过的寄存器上下文将会在信号处理完成返回内核时,更新内核栈上的寄存器上下文,从而达到咱们的目的。

那么,我先写一个信号处理函数,看看信号处理函数执行时,堆栈上都有什么:

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>

int i, j, k = 0;
unsigned char *stack_buffer;
unsigned long *p;

void sig_start(int signo)
{
	unsigned long a = 0x1234567811223344;

	p = (unsigned char *)&a;
	stack_buffer = (unsigned char *)&a;

	// 如下按照8字节为一组,打印堆栈的内容
	printf("----begin stack----\n");
	for (i = 0; i < 32; i++) {
		for (j = 0; j < 8; j++) {
			printf(" %.2x", stack_buffer[k]);
			k++;
		}
		printf("\n");
	}
	printf("----end stack----\n");

	if (signo = SIGINT)
		signal(SIGINT, NULL);
	if (signo = SIGHUP)
		signal(SIGHUP, NULL);
	return;
}

int main()
{
	printf("process id is %d %p %p\n",getpid(), main, wait_start);

	signal(SIGINT, sig_start);
	signal(SIGHUP, sig_start);

	for (;;);
}

让咱们执行之,按下Ctrl-C给它一个SIGINT信号,看看打印的堆栈的内容:

[root@localhost ~]# ./a.out
process id is 3036  0x4007a1 0x40068d
^C----begin stack----
 44 33 22 11 78 56 34 12 # 这即是咱们的局部变量 a
 98 7b 98 fa f8 7f 00 00
 10 11 92 b5 fc 7f 00 00
 80 02 3d fa f8 7f 00 00 # 这个即是信号处理函数的返回地址,调用sigreturn的
 01 00 00 00 00 00 00 00 # 如下的内容,须要参见内核 rt_sigframe 结构体
 00 00 00 00 00 00 00 00
 00 00 00 00 00 00 00 00
 02 00 00 00 01 00 00 00
 00 00 00 00 00 00 00 00
 60 10 92 b5 fc 7f 00 00
 10 0f 92 b5 fc 7f 00 00
 08 00 00 00 00 00 00 00
 06 02 00 00 00 00 00 00
 a0 05 40 00 00 00 00 00
 f0 11 92 b5 fc 7f 00 00
 00 00 00 00 00 00 00 00
 00 00 00 00 00 00 00 00
 01 00 00 00 00 00 00 00
 70 0e 92 b5 fc 7f 00 00 # 这玩意儿一看就是堆栈上的地址,0x7ffc...
 10 11 92 b5 fc 7f 00 00
 00 00 00 00 00 00 00 00
 00 00 00 00 00 00 00 00
 00 00 00 00 00 00 00 00
 8d 03 3d fa f8 7f 00 00
 10 11 92 b5 fc 7f 00 00
 e3 07 40 00 00 00 00 00
 02 02 00 00 00 00 00 00
 33 00 00 00 00 00 00 00
 00 00 00 00 00 00 00 00
 00 00 00 00 00 00 00 00
 00 00 00 00 00 00 00 00
 00 00 00 00 00 00 00 00
----end stack----
^C
[root@localhost ~]#

我是在x86_64平台上作的实验,因此咱们要看x86_64的rt_sigframe结构体,它位于:
arch/x86/include/asm/sigframe.h:

#ifdef CONFIG_X86_64

struct rt_sigframe {
    char __user *pretcode;
    struct ucontext uc;
    struct siginfo info;
    /* fp state follows here */
};
...
/* 一路追溯,看看rt_sigframe展开后的样子 */
// include/uapi/asm-generic/ucontext.h
struct ucontext {
    unsigned long     uc_flags;
    struct ucontext  *uc_link;
    stack_t       uc_stack;
    struct sigcontext uc_mcontext;  // 这个就是咱们要找的东西!
    sigset_t      uc_sigmask;   /* mask last for extensibility */
};

计算一下偏移位置,正好是处在 pretcode字段 的 58 字节处。也就是说,只要咱们找到信号处理函数的 pretcode 偏移,将其再加 58=40 字节就是sigcontext结构体了,这个结构体里所有都是寄存器:

struct sigcontext {
    __u64 r8;
    __u64 r9;
    __u64 r10;
    __u64 r11;
    __u64 r12;
    __u64 r13;
    __u64 r14;
    __u64 r15;
    __u64 rdi;
    __u64 rsi;
    __u64 rbp;
    __u64 rbx;
    __u64 rdx;
    __u64 rax;
    __u64 rcx;
    __u64 rsp;
    __u64 rip;
    __u64 eflags;       /* RFLAGS */
    __u16 cs;
    __u16 gs;
    __u16 fs;
    __u16 __pad0;
    __u64 err;
    __u64 trapno;
    __u64 oldmask;
    __u64 cr2;
    struct _fpstate *fpstate;   /* zero when no FPU context */
#ifdef __ILP32__
    __u32 __fpstate_pad;
#endif
    __u64 reserved1[8];
};

咱们所谓的纯用户态线程调度,就是在信号处理函数里save/restore上述的结构体就行了,而上述的结构体的位置,咱们已经知道它在哪里了。

#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>

// 仅仅是测试demo,分配4096字节的stack足够了。
#define STACK_SIZE 4096
/* * 为何是72? * 由于咱们在信号处理中增长了一个局部变量,这样pretcode的偏移就是32字节了。 * 因而32+40=72! */
#define CONTEXT_OFFSET 72
// rip寄存器相对于局部变量a的偏移。注意rip在sigcontext中的偏移是16
#define PC_OFFSET 200

int wait_start()
{
	for (;;) {
		sleep(1000);
	}
}

// 线程1的处理函数
void thread1()
{
	int a = 1, ret = 0;
	char buf[64];
	int fd = open("./file", O_RDWR);
	for (;;) {
		// 线程1持续往一个文件里写内容。
		snprintf(buf, 32, "user thread 1 stack: %p value:%d\n", &a, a++);
		ret = write(fd, buf, 32);
		printf("write buffer to file:%s size=%d\n", buf, ret);
		sleep(1);
	}
}

// 线程2的处理函数
void thread2()
{
	int a = 2;
	for (;;) {
		// 线程2随便打印些本身栈上的什么东西。
		printf("tcp user cong 2 stack: %p value:%d\n", &a, a++);
		sleep(1);
	}
}

unsigned char *buf;
int start = 0;
struct sigcontext context[2];
struct sigcontext *curr_con;
unsigned long pc[2];
int idx = 0;
unsigned char *stack1, *stack2;

// SIGINT用来启动全部线程,每次信号启动一个。
void sig_start(int dunno)
{
	unsigned long a = 0, *p;
	if (start == 0) {  // 启动第一个线程
		// 首先定位到sigcontext的rip,启动线程仅仅修改rip便可,目标是跳入到thread1线程处理函数
		p = (unsigned long*)((unsigned char *)&a + PC_OFFSET);
		*p = pc[0];
		// 定位到sigcontext
		p = (unsigned long *)((unsigned char *)&a + CONTEXT_OFFSET);
		curr_con = (struct sigcontext *)p;
		// 初始化其堆栈寄存器为为该线程分配的独立堆栈空间。
		curr_con->rsp = curr_con->rbp = (unsigned long)((unsigned char *)stack1 + STACK_SIZE);
		start++;
	} else if (start == 1) { // 启动第二个线程
		// 定位线程1的sigcontext,保存其上下文,由于立刻就要schedule线程2了。
		p = (unsigned long *)((unsigned char *)&a + CONTEXT_OFFSET);
		curr_con = (struct sigcontext *)p;
		memcpy((void *)&context[0], (const void *)curr_con, sizeof(struct sigcontext));

		// 保存第一个线程的上下文后再定位到sigcontext的rip并修改之,同线程1
		p = (unsigned long *)((char*)&a + PC_OFFSET);
		idx = 1;
		*p = pc[1];
		p = (unsigned long *)((unsigned char *)&a + CONTEXT_OFFSET);
		curr_con = (struct sigcontext *)p;
		// 初始化其堆栈寄存器为为该线程分配的独立堆栈空间。
		curr_con->rsp = curr_con->rbp = (unsigned long)((unsigned char *)stack2 + STACK_SIZE);
		start++;
		// 两个线程均启动完毕,开启时间片轮转调度吧。
		alarm(2);
		signal(SIGINT, NULL);
	}

	return;
}

void sig_schedule(int unused)
{
	unsigned long a = 0;
	unsigned char *p;
	
	// 保存当前线程的上下文
	p = (unsigned char *)((unsigned char *)&a + CONTEXT_OFFSET);
	curr_con = (struct sigcontext *)p;
	memcpy((void *)&context[idx%2], curr_con, sizeof(struct sigcontext));
	
	// 轮转调度下一个线程,恢复其上下文。
	idx++;
	memcpy(curr_con, (void *)&context[idx%2], sizeof(struct sigcontext));

	// 2秒后再调度
	alarm(2);

	return;
}

int main()
{
	printf("process id is %d %p %p\n",getpid(), thread1, thread2);
	
	// 为两个线程分配stack空间。
	// 注意,线程的stack空间必定要独立,否则函数调用会冲突的。
	stack1 = (unsigned char *)calloc(1, 4096);
	stack2 = (unsigned char *)calloc(1, 4096);
	signal(SIGINT, sig_start);
	signal(SIGALRM, sig_schedule);
	pc[0] = (unsigned long)thread1;
	pc[1] = (unsigned long)thread2;

	wait_start();
}

效果以下:

[root@localhost ~]# ./a.out
process id is 2994  0x4007cd 0x400869
0x1191010 0x1192020
^Cwrite buffer to file:user thread 1 stack: 0x1191ffc  value:1
  size=32
^Ctcp user cong 2 stack: 0x1193014  value:2
tcp user cong 2 stack: 0x1193014  value:3
write buffer to file:user thread 1 stack: 0x1191ffc  value:2
  size=32
write buffer to file:user thread 1 stack: 0x1191ffc  value:3
  size=32
tcp user cong 2 stack: 0x1193014  value:4
tcp user cong 2 stack: 0x1193014  value:5
write buffer to file:user thread 1 stack: 0x1191ffc  value:4
  size=32
write buffer to file:user thread 1 stack: 0x1191ffc  value:5
  size=32
tcp user cong 2 stack: 0x1193014  value:6
^C

能够看出,两个线程完美交替执行!

第一个例子有点复杂了,咱们换个简单的:

void thread1()
{
    int i = 1;
    while (1) {
        printf("I am thread:%d\n", i);
        sleep(1);
    }
}
void thread2()
{
    int i = 2;
    while (1) {
        printf("I am thread:%d\n", i);
        sleep(1);
    }
}

效果以下:

[root@localhost ~]# ./a.out
process id is 3085  0x4006fd 0x40072c
0x11f3010 0x11f4020
^CI am thread:1 # Ctrl-C 启动线程1
^CI am thread:2 # Ctrl-C 启动线程2
I am thread:2
I am thread:1
I am thread:1
I am thread:2
I am thread:2
I am thread:1
I am thread:1
I am thread:2
I am thread:2
I am thread:1
I am thread:1
I am thread:2
I am thread:2
I am thread:1
I am thread:1
^C

这个例子清晰多了。


以上的 纯用户态 多线程设计中,没有使用任何操做系统进程级别之外的数据结构存储线程上下文,这就是 纯用户态 的含义。咱们看到全部的线程上下文以及线程调度相关的数据结构都存储在单独的一个进程地址空间。

换句话说, 在单独的该进程以外,没人意识获得这个多线程的存在! 这个容纳用户态多线程的进程容器,就是一个 虚拟机 实例,它完成了线程硬件上下文的save/restore,调度,切换,就像Linux内核之于Linux进程所作的那般。

我说,个人这个纯用户态抢占式多线程,使用了信号处理机制。

有人会问,不是说 “纯” 用户态吗?干吗用信号?信号不是内核机制吗?

是的,信号是内核机制,但 这里的关注点不在信号是否是内核机制,而是“须要一个第三方来实施抢占式调度” 为何须要 “第三方”?

由于抢占式多线程不是协做式多线程,既然线程本身不参与调度决策,那就必然须要第三方来决策。使用信号只是一种方式,因为我在Linux系统作这个用户态多线程,信号偏偏是能够知足需求的。固然,也能够不用信号,若是你能找到等价的机制也是能够的。

注意⚠️: 采用信号机制来抢占的开销确实有点大,可是这只是一种可实现的方式,并非惟一方式,此外,这种抢占式调度彻底是能够和协做式调度好比协程协同工做的,只有在发生 某种不得不抢占 的事件后,才实施信号抢占。

操做系统内核调度不也依赖第三方的时钟中断吗?时钟晶振可不是内核的一部分,它是硬件。


此时4月29日凌晨2:13,合上电脑,或许还能再睡几个钟头。

浙江温州皮鞋?湿,下雨进水不会胖!