socket_server 层使用简单封装后的 epoll ,向上提供一些列 socket_server_*** 的API。html
skynet_socket封装:node
为了进一步适用于Skynet框架,又进行一步对socket_server进行了封装,全部经常使用的接口都封装在skynet_socket.c 。相比 socket_server.c 的1600多行代码,skynet_socket.c 则只有200多行,其主要功能都由 socket_server 提供的API来完成。
git
先来看看 skynet_socket.hgithub
#ifndef skynet_socket_h #define skynet_socket_h struct skynet_context; // 能够看作是对 socket_server 里返回值类型的封装 #define SKYNET_SOCKET_TYPE_DATA 1 #define SKYNET_SOCKET_TYPE_CONNECT 2 #define SKYNET_SOCKET_TYPE_CLOSE 3 #define SKYNET_SOCKET_TYPE_ACCEPT 4 #define SKYNET_SOCKET_TYPE_ERROR 5 #define SKYNET_SOCKET_TYPE_UDP 6 #define SKYNET_SOCKET_TYPE_WARNING 7 struct skynet_socket_message { int type; int id; int ud; char * buffer; }; void skynet_socket_init(); void skynet_socket_exit(); void skynet_socket_free(); int skynet_socket_poll(); int skynet_socket_send(struct skynet_context *ctx, int id, void *buffer, int sz); void skynet_socket_send_lowpriority(struct skynet_context *ctx, int id, void *buffer, int sz); int skynet_socket_listen(struct skynet_context *ctx, const char *host, int port, int backlog); int skynet_socket_connect(struct skynet_context *ctx, const char *host, int port); int skynet_socket_bind(struct skynet_context *ctx, int fd); void skynet_socket_close(struct skynet_context *ctx, int id); void skynet_socket_shutdown(struct skynet_context *ctx, int id); void skynet_socket_start(struct skynet_context *ctx, int id); void skynet_socket_nodelay(struct skynet_context *ctx, int id); int skynet_socket_udp(struct skynet_context *ctx, const char * addr, int port); int skynet_socket_udp_connect(struct skynet_context *ctx, int id, const char * addr, int port); int skynet_socket_udp_send(struct skynet_context *ctx, int id, const char * address, const void *buffer, int sz); const char * skynet_socket_udp_address(struct skynet_socket_message *, int *addrsz); #endif
对比一下以前 socket_server.hsession
#ifndef skynet_socket_server_h #define skynet_socket_server_h #include <stdint.h> #define SOCKET_DATA 0 #define SOCKET_CLOSE 1 #define SOCKET_OPEN 2 #define SOCKET_ACCEPT 3 #define SOCKET_ERROR 4 #define SOCKET_EXIT 5 #define SOCKET_UDP 6 struct socket_server; struct socket_message { int id; uintptr_t opaque; int ud; // for accept, ud is new connection id ; for data, ud is size of data char * data; }; struct socket_server * socket_server_create(); void socket_server_release(struct socket_server *); int socket_server_poll(struct socket_server *, struct socket_message *result, int *more); void socket_server_exit(struct socket_server *); void socket_server_close(struct socket_server *, uintptr_t opaque, int id); void socket_server_shutdown(struct socket_server *, uintptr_t opaque, int id); void socket_server_start(struct socket_server *, uintptr_t opaque, int id); // return -1 when error int64_t socket_server_send(struct socket_server *, int id, const void * buffer, int sz); void socket_server_send_lowpriority(struct socket_server *, int id, const void * buffer, int sz); // ctrl command below returns id int socket_server_listen(struct socket_server *, uintptr_t opaque, const char * addr, int port, int backlog); int socket_server_connect(struct socket_server *, uintptr_t opaque, const char * addr, int port); int socket_server_bind(struct socket_server *, uintptr_t opaque, int fd); // for tcp void socket_server_nodelay(struct socket_server *, int id); struct socket_udp_address; int socket_server_udp(struct socket_server *, uintptr_t opaque, const char * addr, int port); int socket_server_udp_connect(struct socket_server *, int id, const char * addr, int port); int64_t socket_server_udp_send(struct socket_server *, int id, const struct socket_udp_address *, const void *buffer, int sz); const struct socket_udp_address * socket_server_udp_address(struct socket_server *, struct socket_message *, int *addrsz); struct socket_object_interface { void * (*buffer)(void *); int (*size)(void *); void (*free)(void *); }; void socket_server_userobject(struct socket_server *, struct socket_object_interface *soi); #endif
核心是 skynet_socket_poll(),该函数调用了socket_server 层的核心 socket_server_poll(),写法相似 socket-server 中 test.c 的 _poll() 函数,用于处理 socket_server_poll() 的返回值。框架
int skynet_socket_poll() { struct socket_server *ss = SOCKET_SERVER; assert(ss); struct socket_message result; int more = 1; int type = socket_server_poll(ss, &result, &more); switch (type) { case SOCKET_EXIT: return 0; case SOCKET_DATA: forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result); break; case SOCKET_CLOSE: forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result); break; case SOCKET_OPEN: forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result); break; case SOCKET_ERROR: forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result); break; case SOCKET_ACCEPT: forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result); break; case SOCKET_UDP: forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result); break; default: skynet_error(NULL, "Unknown socket message type %d.",type); return -1; } if (more) { return -1; } return 1; }
// 对应于 socket_server 服务中的消息传输 struct socket_message { int id; uintptr_t opaque; // 在skynet中对应一个Actor实体的handle句柄 int ud; // 对于accept链接来讲, ud是新链接的id;对于数据(data)来讲, ud是数据的大小 char * data; // 对于数据收发事件,存放数据;对于socket链接事件,存放地址;其余状况下还能够存放打印日志。 }; // 对应 skynet_socket_server 服务中消息传输 struct skynet_socket_message { int type; int id; int ud; char * buffer; }; // 在skynet不一样服务(Actor)间进行通讯 struct skynet_message { uint32_t source; int session; void * data; size_t sz; };
再来看看 forward_message():
socket
// mainloop thread static void forward_message(int type, bool padding, struct socket_message * result) { struct skynet_socket_message *sm; size_t sz = sizeof(*sm); if (padding) { if (result->data) { size_t msg_sz = strlen(result->data); if (msg_sz > 128) { msg_sz = 128; } sz += msg_sz; } else { result->data = ""; } } sm = (struct skynet_socket_message *)skynet_malloc(sz); // skynet_malloc() 内部使用了 jemalloc 分配内存 sm->type = type; sm->id = result->id; sm->ud = result->ud; if (padding) { sm->buffer = NULL; memcpy(sm+1, result->data, sz - sizeof(*sm)); } else { sm->buffer = result->data; } struct skynet_message message; message.source = 0; message.session = 0; message.data = sm; message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT); // 颇有意思的写法,其实就是将类型 PTYPE_SOCKET 值置于 sz 的高8bit,再赋值给 message.sz // sz的值最大不超过sizeof(struct skynet_socket_message) + 128,该值并不大,高八位并无值,没有数据覆盖问题。 if (skynet_context_push((uint32_t)result->opaque, &message)) { // 看到 opaque 的做用了吧,其实就是上层handle的标记,按这个标记将信息向上层传递 // don't call skynet_socket_close here (It will block mainloop) skynet_free(sm->buffer); skynet_free(sm); } }
skynet_context_push() 已经涉及到 skynet 的消息调度,关于 skynet 消息调度机制,后面将有博文专门学习。这里咱们就不探寻 skynet_socket_push() 以后的步骤了。tcp
lua_socket与lua封装:函数
lua_socket 这一层将 C 代码封装成 lua 接口,最后在进行一次lua封装完成最终提供给用户的lua API。oop
int luaopen_socketdriver(lua_State *L) { // socketdriver,对应lua中 require “socketdriver” luaL_checkversion(L); // 参见云风:https://blog.codingnow.com/2012/01/lua_link_bug.html luaL_Reg l[] = { { "buffer", lnewbuffer }, // {"(lua调用时使用的函数名)",C定义函数名} { "push", lpushbuffer }, { "pop", lpopbuffer }, { "drop", ldrop }, { "readall", lreadall }, { "clear", lclearbuffer }, { "readline", lreadline }, { "str2p", lstr2p }, { "header", lheader }, { "unpack", lunpack }, { NULL, NULL }, }; luaL_newlib(L,l); luaL_Reg l2[] = { { "connect", lconnect }, { "close", lclose }, { "shutdown", lshutdown }, { "listen", llisten }, { "send", lsend }, { "lsend", lsendlow }, { "bind", lbind }, { "start", lstart }, { "nodelay", lnodelay }, { "udp", ludp }, { "udp_connect", ludp_connect }, { "udp_send", ludp_send }, { "udp_address", ludp_address }, { NULL, NULL }, }; lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context"); // 这两句话大体就是存在lua栈LUA_REGISTRYINDEX处的 skynet_context 变量压入栈顶,同时将其首地址交给 ctx struct skynet_context *ctx = lua_touserdata(L,-1); // 但是 LUA_REGISTRYINDEX 到底存放了什么,何时赋的值? if (ctx == NULL) { return luaL_error(L, "Init skynet context first"); } luaL_setfuncs(L,l2,1); return 1; }
向lua注册的函数能够参见表 l 和 l2。咱们以 llisten() 为例,它调用了 C 接口。
static int llisten(lua_State *L) { const char * host = luaL_checkstring(L,1); // 从栈获取 address 字符串 int port = luaL_checkinteger(L,2); // 从栈获取 port 整型 int backlog = luaL_optinteger(L,3,BACKLOG); // 从栈获取 backlog 整型,若没有该参数,使用默认值BACKLOG struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1)); int id = skynet_socket_listen(ctx, host,port,backlog); if (id < 0) { return luaL_error(L, "Listen error"); } lua_pushinteger(L,id); // 将返回值压栈 return 1; // 返回值个数 }
接着是最后一次封装:
local driver = require "socketdriver" function socket.listen(host, port, backlog) if port == nil then host, port = string.match(host, "([^:]+):(.+)$") port = tonumber(port) end return driver.listen(host, port, backlog) end
有的函数可能没作封装,直接赋值,好比 socket.write = assert(driver.send),socket.lwrite = assert(driver.lsend) 的写法。最后看看云风wiki上向外提供的对应 lua API:
socket.listen(address, port) 监听一个端口,返回一个 id ,供 start 使用。 // 官方文档就是这样写的。可能有笔误,没写 backlog 参数。
这里有几个相关的宏,目前还不是很清楚其用法:
// ./3rd/lua/luaconfig.h /* @@ LUAI_MAXSTACK limits the size of the Lua stack. ** CHANGE it if you need a different limit. This limit is arbitrary; ** its only purpose is to stop Lua from consuming unlimited stack ** space (and to reserve some numbers for pseudo-indices). */ #if LUAI_BITSINT >= 32 #define LUAI_MAXSTACK 1000000 #else #define LUAI_MAXSTACK 15000 #endif // ./3rd/lua/lua.h /* ** Pseudo-indices ** (-LUAI_MAXSTACK is the minimum valid index; we keep some free empty ** space after that to help overflow detection) */ #define LUA_REGISTRYINDEX (-LUAI_MAXSTACK - 1000) #define lua_upvalueindex(i) (LUA_REGISTRYINDEX - (i))
其余接口相似。
接着看看 skynet 向外提供的 socket lua 接口,常见的接口以下,更多详细内容见 ./lualib/socket.lua,以及 skynet Socket WiKi
* 发起一个TCP链接: socket.open(address, port) * 启动epoll监听: socket.start(id) * 从socket读数据: socket.read(id, sz) * 向socket写数据: socket.write(id, str) * 开启一个TCP监听: socket.listen(address, port) * 关闭socket: socket.close(id)