skynet 笔记 :socket 模块

接触skynet 大半年了 ,最近在研究 skynet 的源码 而且在仿写一个相似的服务程序。发现本身是在开着多个线程学习 。so 开始作一下简单的现场保护。。web


先大体记录一下socket模块源码文件的做用api

  • socket_epoll.c // epoll 机制 相关api 的简单封装
  • socket_server.c //socket 的主要实现文件
  • skynet_socket.c // 对socket_server 的进一步封装 使他更加容易使用
  • skynet_start.c //这个主要是框架的启动代码 so socket 也是着这里启动的

运行流程
  1. 在skynet_start() 中 调用 skynet_socket_init() 初始化socket服务
  2. 接着在start()中 建立一个socket 服务线程
  3. 线程执行函数分析:
thread_socket(void *p) {
    struct monitor * m = p; //这是框架的一个管理者
    skynet_initthread(THREAD_SOCKET);
    //这个线程其实就是在不停的 skynet_socket_poll
    for (;;) {
        int r = skynet_socket_poll();
        if (r==0)
            break;
        if (r<0) {
            CHECK_ABORT
            continue;
        }
        wakeup(m,0);
    }
    return NULL;
}
  1. 进入 skynet_socket_poll()
//这个函数的主要工做是将 socket_server_poll 的数据 转换成 skynet 通讯机制中使用的格式 以便分发数据
// socket_sever_poll 返回的数据是 socket_message 
// 在forward_message 中将数据变为 skynet_message 而且将消息压入 二级队列 (每一个服务模块的私有队列)
skynet_socket_poll() {
    struct socket_server *ss = SOCKET_SERVER;
    assert(ss);
    struct socket_message result;
    int more = 1;
    int type = socket_server_poll(ss, &result, &more);
    switch (type) {
    case SOCKET_EXIT:
        return 0;
    case SOCKET_DATA:
        forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
        break;
        /* 省略 */
    default:
        skynet_error(NULL, "Unknown socket message type %d.",type);
        return -1;
    }
    if (more) {
        return -1;
    }
    return 1;
}
  1. 继续挖掘 进入 socket_server_poll 一趟究竟 !注意 这才是socket 的核心实现 。若是您有幸看到这里 ,请先大体看看 ,看完整个文章 在回过头来琢磨。
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
    for (;;) {
        //先判断有木有 cmd 须要处理 (这里懵逼的话 看完文章就明白了 )
        if (ss->checkctrl) {
            if (has_cmd(ss)) {
                int type = ctrl_cmd(ss, result);
                if (type != -1) {
                    clear_closed_event(ss, result, type);
                    return type;
                } else
                    continue;
            } else {
                ss->checkctrl = 0;
            }
        }


}
/*
        在这里看一下 socket_server  结构中部分数据的说明吧  下面的代码用得着
        struct socket_server {
        int recvctrl_fd;    //接受管道
        int sendctrl_fd;    //发送管道
        int checkctrl;      //释放检测命令
        poll_fd event_fd;   //epoll 的fd 
        int alloc_id;       //应用层分配id用的
        int event_n;        //epoll_wait 返回的事件数
        int event_index;    //当前处理的事件序号
        struct socket_object_interface soi; 
        struct event ev[MAX_EVENT];     //epoll_wait 返回的事件集合
        struct socket slot[MAX_SOCKET]; //每一个Socket server能够包含多个Socket,这是存储这些Socket的数组(应用层预先分配的)
        char buffer[MAX_INFO];          //临时数据的保存,好比保存对方的地址信息等
        uint8_t udpbuffer[MAX_UDP_PACKAGE];
        fd_set rfds;                    //用于select的fd集
    };
//这里开始处理socket 的读写 
//event_index当前处理的事件序号==事件总数  等价于  以前epoll到的事件都处理完了
if (ss->event_index == ss->event_n)
     {
            //因此这里继续去 查询有没有要处理的事件
            ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
            ss->checkctrl = 1;
            if (more) {
                *more = 0;
            }
            ss->event_index = 0;
            //发生错误
            if (ss->event_n <= 0) {
                ss->event_n = 0;
                //忽略EINTR错误
                if (errno == EINTR) {
                    continue;
                }
                return -1;
            }
        }
        //取出一个事件来处理
        struct event *e = &ss->ev[ss->event_index++];
        struct socket *s = e->s;
        if (s == NULL) {
            // dispatch pipe message at beginning
            continue;
        }
        struct socket_lock l;
        socket_lock_init(s, &l);
        //根据socket 的类型 处理
        switch (s->type) { case SOCKET_TYPE_CONNECTING: return report_connect(ss, s, &l, result); case SOCKET_TYPE_LISTEN: { int ok = report_accept(ss, s, result); if (ok > 0) { return SOCKET_ACCEPT; } if (ok < 0 ) { return SOCKET_ERR; }
            // when ok == 0, retry
            break;
        }
        case SOCKET_TYPE_INVALID:
            fprintf(stderr, "socket-server: invalid socket\n");
            break;
        //默认的是处理 socket 的读写操做
        default:
        //读
            if (e->read) {
                int type;
                if (s->protocol == PROTOCOL_TCP) {
                    type = forward_message_tcp(ss, s, &l, result);
                } else {
                    type = forward_message_udp(ss, s, &l, result);
                    if (type == SOCKET_UDP) { // try read again --ss->event_index; return SOCKET_UDP; }
                }
                if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) { // Try to dispatch write message next step if write flag set. e->read = false; --ss->event_index; }
                if (type == -1)
                    break;              
                return type;
            }
            //写
            if (e->write) {
                int type = send_buffer(ss, s, &l, result);
                if (type == -1)
                    break;
                return type;
            }
            //错误处理
            if (e->error) {
                // close when error
                int error;
                socklen_t len = sizeof(error);  
                int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len);  
                const char * err = NULL;
                if (code < 0) {
                    err = strerror(errno);
                } else if (error != 0) {
                    err = strerror(error);
                } else {
                    err = "Unknown error";
                }
                force_close(ss, s, &l, result);
                result->data = (char *)err;
                return SOCKET_ERR;
            }
            break;
        }
    }

结构分析

这里写图片描述
每一个socket 服务都有 写缓存队列,因此 框架会异步的实现读写。数组

socket 的open close listen apect 等操做是经过给 socket_server 的管道写入请求信息,在server_poll循环中再去处理他。缓存


一些细节

  1. socket 在发送数据时 会尝试的直接发送数据!若是不能直接发送数据 才会把数据写入 socket 对应的写缓存 。
  2. 2.
struct socket {
    uintptr_t opaque;     //这个其实就是每一个服务对应的handle
    struct wb_list high;  //写缓存
    struct wb_list low;
    int64_t wb_size;
    int fd;              //socket fd
    int id;              //为该socket 分配的id 供应用层座位标识使用 
    uint8_t protocol;    //udp or tcp
    uint8_t type;        //该socket 目前的状态类型 
    uint16_t udpconnecting; 
    int64_t warn_size;    
    union {
        int size;
        uint8_t udp_address[UDP_ADDRESS_SIZE];
    } p;
    //直接发送数据的缓存 
    struct spinlock dw_lock;  //direct_write_XXX
    int dw_offset;
    const void * dw_buffer;
    size_t dw_size;
};
    3.
//这个结构体看起来很奇怪 。 其实这是利用 union 的特性。 
//多个成员共用一块内存 u 中这些 结构体 在使用时 只是用其中的一种 这能够让这些类型 公用这块内存 
struct request_package {
    uint8_t header[8];  // 6 bytes dummy
    union {
        char buffer[256];
        struct request_open open;
        struct request_send send;
        struct request_send_udp send_udp;
        struct request_close close;
        struct request_listen listen;
        struct request_bind bind;
        struct request_start start;
        struct request_setopt setopt;
        struct request_udp udp;
        struct request_setudp set_udp;
    } u;
    uint8_t dummy[256];
};