接触skynet 大半年了 ,最近在研究 skynet 的源码 而且在仿写一个相似的服务程序。发现本身是在开着多个线程学习 。so 开始作一下简单的现场保护。。web
先大体记录一下socket模块源码文件的做用api
thread_socket(void *p) {
struct monitor * m = p; //这是框架的一个管理者
skynet_initthread(THREAD_SOCKET);
//这个线程其实就是在不停的 skynet_socket_poll
for (;;) {
int r = skynet_socket_poll();
if (r==0)
break;
if (r<0) {
CHECK_ABORT
continue;
}
wakeup(m,0);
}
return NULL;
}
//这个函数的主要工做是将 socket_server_poll 的数据 转换成 skynet 通讯机制中使用的格式 以便分发数据
// socket_sever_poll 返回的数据是 socket_message
// 在forward_message 中将数据变为 skynet_message 而且将消息压入 二级队列 (每一个服务模块的私有队列)
skynet_socket_poll() {
struct socket_server *ss = SOCKET_SERVER;
assert(ss);
struct socket_message result;
int more = 1;
int type = socket_server_poll(ss, &result, &more);
switch (type) {
case SOCKET_EXIT:
return 0;
case SOCKET_DATA:
forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
break;
/* 省略 */
default:
skynet_error(NULL, "Unknown socket message type %d.",type);
return -1;
}
if (more) {
return -1;
}
return 1;
}
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
for (;;) {
//先判断有木有 cmd 须要处理 (这里懵逼的话 看完文章就明白了 )
if (ss->checkctrl) {
if (has_cmd(ss)) {
int type = ctrl_cmd(ss, result);
if (type != -1) {
clear_closed_event(ss, result, type);
return type;
} else
continue;
} else {
ss->checkctrl = 0;
}
}
}
/*
在这里看一下 socket_server 结构中部分数据的说明吧 下面的代码用得着
struct socket_server {
int recvctrl_fd; //接受管道
int sendctrl_fd; //发送管道
int checkctrl; //释放检测命令
poll_fd event_fd; //epoll 的fd
int alloc_id; //应用层分配id用的
int event_n; //epoll_wait 返回的事件数
int event_index; //当前处理的事件序号
struct socket_object_interface soi;
struct event ev[MAX_EVENT]; //epoll_wait 返回的事件集合
struct socket slot[MAX_SOCKET]; //每一个Socket server能够包含多个Socket,这是存储这些Socket的数组(应用层预先分配的)
char buffer[MAX_INFO]; //临时数据的保存,好比保存对方的地址信息等
uint8_t udpbuffer[MAX_UDP_PACKAGE];
fd_set rfds; //用于select的fd集
};
//这里开始处理socket 的读写
//event_index当前处理的事件序号==事件总数 等价于 以前epoll到的事件都处理完了
if (ss->event_index == ss->event_n)
{
//因此这里继续去 查询有没有要处理的事件
ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
ss->checkctrl = 1;
if (more) {
*more = 0;
}
ss->event_index = 0;
//发生错误
if (ss->event_n <= 0) {
ss->event_n = 0;
//忽略EINTR错误
if (errno == EINTR) {
continue;
}
return -1;
}
}
//取出一个事件来处理
struct event *e = &ss->ev[ss->event_index++];
struct socket *s = e->s;
if (s == NULL) {
// dispatch pipe message at beginning
continue;
}
struct socket_lock l;
socket_lock_init(s, &l);
//根据socket 的类型 处理
switch (s->type) { case SOCKET_TYPE_CONNECTING: return report_connect(ss, s, &l, result); case SOCKET_TYPE_LISTEN: { int ok = report_accept(ss, s, result); if (ok > 0) { return SOCKET_ACCEPT; } if (ok < 0 ) { return SOCKET_ERR; }
// when ok == 0, retry
break;
}
case SOCKET_TYPE_INVALID:
fprintf(stderr, "socket-server: invalid socket\n");
break;
//默认的是处理 socket 的读写操做
default:
//读
if (e->read) {
int type;
if (s->protocol == PROTOCOL_TCP) {
type = forward_message_tcp(ss, s, &l, result);
} else {
type = forward_message_udp(ss, s, &l, result);
if (type == SOCKET_UDP) { // try read again --ss->event_index; return SOCKET_UDP; }
}
if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) { // Try to dispatch write message next step if write flag set. e->read = false; --ss->event_index; }
if (type == -1)
break;
return type;
}
//写
if (e->write) {
int type = send_buffer(ss, s, &l, result);
if (type == -1)
break;
return type;
}
//错误处理
if (e->error) {
// close when error
int error;
socklen_t len = sizeof(error);
int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len);
const char * err = NULL;
if (code < 0) {
err = strerror(errno);
} else if (error != 0) {
err = strerror(error);
} else {
err = "Unknown error";
}
force_close(ss, s, &l, result);
result->data = (char *)err;
return SOCKET_ERR;
}
break;
}
}
每一个socket 服务都有 写缓存队列,因此 框架会异步的实现读写。数组
socket 的open close listen apect 等操做是经过给 socket_server 的管道写入请求信息,在server_poll循环中再去处理他。缓存
struct socket {
uintptr_t opaque; //这个其实就是每一个服务对应的handle
struct wb_list high; //写缓存
struct wb_list low;
int64_t wb_size;
int fd; //socket fd
int id; //为该socket 分配的id 供应用层座位标识使用
uint8_t protocol; //udp or tcp
uint8_t type; //该socket 目前的状态类型
uint16_t udpconnecting;
int64_t warn_size;
union {
int size;
uint8_t udp_address[UDP_ADDRESS_SIZE];
} p;
//直接发送数据的缓存
struct spinlock dw_lock; //direct_write_XXX
int dw_offset;
const void * dw_buffer;
size_t dw_size;
};
//这个结构体看起来很奇怪 。 其实这是利用 union 的特性。
//多个成员共用一块内存 u 中这些 结构体 在使用时 只是用其中的一种 这能够让这些类型 公用这块内存
struct request_package {
uint8_t header[8]; // 6 bytes dummy
union {
char buffer[256];
struct request_open open;
struct request_send send;
struct request_send_udp send_udp;
struct request_close close;
struct request_listen listen;
struct request_bind bind;
struct request_start start;
struct request_setopt setopt;
struct request_udp udp;
struct request_setudp set_udp;
} u;
uint8_t dummy[256];
};