【Skynet】Socket源码剖析二

socket_server 层使用简单封装后的 epoll ,向上提供一些列 socket_server_*** 的API。html


skynet_socket封装:node

为了进一步适用于Skynet框架,又进行一步对socket_server进行了封装,全部经常使用的接口都封装在skynet_socket.c 。相比 socket_server.c 的1600多行代码,skynet_socket.c 则只有200多行,其主要功能都由 socket_server 提供的API来完成。
git


先来看看 skynet_socket.hgithub

#ifndef skynet_socket_h
#define skynet_socket_h

struct skynet_context;

// 能够看作是对 socket_server 里返回值类型的封装
#define SKYNET_SOCKET_TYPE_DATA 1
#define SKYNET_SOCKET_TYPE_CONNECT 2
#define SKYNET_SOCKET_TYPE_CLOSE 3
#define SKYNET_SOCKET_TYPE_ACCEPT 4
#define SKYNET_SOCKET_TYPE_ERROR 5
#define SKYNET_SOCKET_TYPE_UDP 6
#define SKYNET_SOCKET_TYPE_WARNING 7

struct skynet_socket_message {
	int type;
	int id;
	int ud;
	char * buffer;
};

void skynet_socket_init();
void skynet_socket_exit();
void skynet_socket_free();
int skynet_socket_poll();

int skynet_socket_send(struct skynet_context *ctx, int id, void *buffer, int sz);
void skynet_socket_send_lowpriority(struct skynet_context *ctx, int id, void *buffer, int sz);
int skynet_socket_listen(struct skynet_context *ctx, const char *host, int port, int backlog);
int skynet_socket_connect(struct skynet_context *ctx, const char *host, int port);
int skynet_socket_bind(struct skynet_context *ctx, int fd);
void skynet_socket_close(struct skynet_context *ctx, int id);
void skynet_socket_shutdown(struct skynet_context *ctx, int id);
void skynet_socket_start(struct skynet_context *ctx, int id);
void skynet_socket_nodelay(struct skynet_context *ctx, int id);

int skynet_socket_udp(struct skynet_context *ctx, const char * addr, int port);
int skynet_socket_udp_connect(struct skynet_context *ctx, int id, const char * addr, int port);
int skynet_socket_udp_send(struct skynet_context *ctx, int id, const char * address, const void *buffer, int sz);
const char * skynet_socket_udp_address(struct skynet_socket_message *, int *addrsz);

#endif


对比一下以前 socket_server.hsession

#ifndef skynet_socket_server_h
#define skynet_socket_server_h

#include <stdint.h>

#define SOCKET_DATA 0
#define SOCKET_CLOSE 1
#define SOCKET_OPEN 2
#define SOCKET_ACCEPT 3
#define SOCKET_ERROR 4
#define SOCKET_EXIT 5
#define SOCKET_UDP 6

struct socket_server;

struct socket_message {
	int id;
	uintptr_t opaque;
	int ud;	// for accept, ud is new connection id ; for data, ud is size of data 
	char * data;
};

struct socket_server * socket_server_create();
void socket_server_release(struct socket_server *);
int socket_server_poll(struct socket_server *, struct socket_message *result, int *more);

void socket_server_exit(struct socket_server *);
void socket_server_close(struct socket_server *, uintptr_t opaque, int id);
void socket_server_shutdown(struct socket_server *, uintptr_t opaque, int id);
void socket_server_start(struct socket_server *, uintptr_t opaque, int id);

// return -1 when error
int64_t socket_server_send(struct socket_server *, int id, const void * buffer, int sz);
void socket_server_send_lowpriority(struct socket_server *, int id, const void * buffer, int sz);

// ctrl command below returns id
int socket_server_listen(struct socket_server *, uintptr_t opaque, const char * addr, int port, int backlog);
int socket_server_connect(struct socket_server *, uintptr_t opaque, const char * addr, int port);
int socket_server_bind(struct socket_server *, uintptr_t opaque, int fd);

// for tcp
void socket_server_nodelay(struct socket_server *, int id);

struct socket_udp_address;

int socket_server_udp(struct socket_server *, uintptr_t opaque, const char * addr, int port);
int socket_server_udp_connect(struct socket_server *, int id, const char * addr, int port);
int64_t socket_server_udp_send(struct socket_server *, int id, const struct socket_udp_address *, const void *buffer, int sz);
const struct socket_udp_address * socket_server_udp_address(struct socket_server *, struct socket_message *, int *addrsz);

struct socket_object_interface {
	void * (*buffer)(void *);
	int (*size)(void *);
	void (*free)(void *);
};

void socket_server_userobject(struct socket_server *, struct socket_object_interface *soi);

#endif

非常类似吧。


核心是 skynet_socket_poll(),该函数调用了socket_server 层的核心 socket_server_poll(),写法相似 socket-server 中 test.c 的 _poll() 函数,用于处理 socket_server_poll() 的返回值。框架

int 
skynet_socket_poll() {
	struct socket_server *ss = SOCKET_SERVER;
	assert(ss);
	struct socket_message result;
	int more = 1;
	int type = socket_server_poll(ss, &result, &more);
	switch (type) {
	case SOCKET_EXIT:
		return 0;
	case SOCKET_DATA:
		forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
		break;
	case SOCKET_CLOSE:
		forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
		break;
	case SOCKET_OPEN:
		forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
		break;
	case SOCKET_ERROR:
		forward_message(SKYNET_SOCKET_TYPE_ERROR, true, &result);
		break;
	case SOCKET_ACCEPT:
		forward_message(SKYNET_SOCKET_TYPE_ACCEPT, true, &result);
		break;
	case SOCKET_UDP:
		forward_message(SKYNET_SOCKET_TYPE_UDP, false, &result);
		break;
	default:
		skynet_error(NULL, "Unknown socket message type %d.",type);
		return -1;
	}
	if (more) {
		return -1;
	}
	return 1;
}

绝大部分的返回值都交给了 forward_message() 函数处理,并把以前获得的 result 传入。这里重点提出三种用于消息传输的结构体:

// 对应于 socket_server 服务中的消息传输
struct socket_message {  
    int id;  
    uintptr_t opaque;   // 在skynet中对应一个Actor实体的handle句柄  
    int ud;         	// 对于accept链接来讲, ud是新链接的id;对于数据(data)来讲, ud是数据的大小  
    char * data;  	// 对于数据收发事件,存放数据;对于socket链接事件,存放地址;其余状况下还能够存放打印日志。 
};  

// 对应 skynet_socket_server 服务中消息传输
struct skynet_socket_message {
	int type;
	int id;
	int ud;
	char * buffer;
};

// 在skynet不一样服务(Actor)间进行通讯
struct skynet_message {
	uint32_t source;
	int session;
	void * data;
	size_t sz;
};


再来看看 forward_message():
socket

// mainloop thread
static void
forward_message(int type, bool padding, struct socket_message * result) {
	struct skynet_socket_message *sm;
	size_t sz = sizeof(*sm);
	if (padding) {
		if (result->data) {
			size_t msg_sz = strlen(result->data);
			if (msg_sz > 128) {
				msg_sz = 128;
			}
			sz += msg_sz;
		} else {
			result->data = "";
		}
	}
	sm = (struct skynet_socket_message *)skynet_malloc(sz);				// skynet_malloc() 内部使用了 jemalloc 分配内存
	sm->type = type;
	sm->id = result->id;
	sm->ud = result->ud;
	if (padding) {
		sm->buffer = NULL;
		memcpy(sm+1, result->data, sz - sizeof(*sm));
	} else {
		sm->buffer = result->data;
	}

	struct skynet_message message;
	message.source = 0;
	message.session = 0;
	message.data = sm;
	message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);			// 颇有意思的写法,其实就是将类型 PTYPE_SOCKET 值置于 sz 的高8bit,再赋值给 message.sz
											// sz的值最大不超过sizeof(struct skynet_socket_message) + 128,该值并不大,高八位并无值,没有数据覆盖问题。
	if (skynet_context_push((uint32_t)result->opaque, &message)) {			// 看到 opaque 的做用了吧,其实就是上层handle的标记,按这个标记将信息向上层传递
		// don't call skynet_socket_close here (It will block mainloop)
		skynet_free(sm->buffer);
		skynet_free(sm);
	}
}


skynet_context_push() 已经涉及到 skynet 的消息调度,关于 skynet 消息调度机制,后面将有博文专门学习。这里咱们就不探寻 skynet_socket_push() 以后的步骤了。tcp




lua_socket与lua封装:函数


lua_socket 这一层将 C 代码封装成 lua 接口,最后在进行一次lua封装完成最终提供给用户的lua API。oop

int
luaopen_socketdriver(lua_State *L) {		// socketdriver,对应lua中 require “socketdriver”
	luaL_checkversion(L);			// 参见云风:https://blog.codingnow.com/2012/01/lua_link_bug.html
	luaL_Reg l[] = {
		{ "buffer", lnewbuffer },					// {"(lua调用时使用的函数名)",C定义函数名}
		{ "push", lpushbuffer },
		{ "pop", lpopbuffer },
		{ "drop", ldrop },
		{ "readall", lreadall },
		{ "clear", lclearbuffer },
		{ "readline", lreadline },
		{ "str2p", lstr2p },
		{ "header", lheader },

		{ "unpack", lunpack },
		{ NULL, NULL },
	};
	luaL_newlib(L,l);
	luaL_Reg l2[] = {
		{ "connect", lconnect },
		{ "close", lclose },
		{ "shutdown", lshutdown },
		{ "listen", llisten },
		{ "send", lsend },
		{ "lsend", lsendlow },
		{ "bind", lbind },
		{ "start", lstart },
		{ "nodelay", lnodelay },
		{ "udp", ludp },
		{ "udp_connect", ludp_connect },
		{ "udp_send", ludp_send },
		{ "udp_address", ludp_address },
		{ NULL, NULL },
	};	
	lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");			// 这两句话大体就是存在lua栈LUA_REGISTRYINDEX处的 skynet_context 变量压入栈顶,同时将其首地址交给 ctx 
	struct skynet_context *ctx = lua_touserdata(L,-1);			// 但是 LUA_REGISTRYINDEX 到底存放了什么,何时赋的值?
	if (ctx == NULL) {
		return luaL_error(L, "Init skynet context first");
	}

	luaL_setfuncs(L,l2,1);

	return 1;
}

向lua注册的函数能够参见表 l 和 l2。咱们以 llisten() 为例,它调用了 C 接口。

static int
llisten(lua_State *L) {
	const char * host = luaL_checkstring(L,1);					// 从栈获取 address 字符串
	int port = luaL_checkinteger(L,2);						// 从栈获取 port 整型
	int backlog = luaL_optinteger(L,3,BACKLOG);					// 从栈获取 backlog 整型,若没有该参数,使用默认值BACKLOG
	struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
	int id = skynet_socket_listen(ctx, host,port,backlog);
	if (id < 0) {
		return luaL_error(L, "Listen error");
	}

	lua_pushinteger(L,id);								// 将返回值压栈
	return 1;									// 返回值个数
}

接着是最后一次封装:

local driver = require "socketdriver"

function socket.listen(host, port, backlog)
	if port == nil then
		host, port = string.match(host, "([^:]+):(.+)$")
		port = tonumber(port)
	end
	return driver.listen(host, port, backlog)
end

有的函数可能没作封装,直接赋值,好比 socket.write = assert(driver.send),socket.lwrite = assert(driver.lsend) 的写法。最后看看云风wiki上向外提供的对应 lua API:

socket.listen(address, port) 监听一个端口,返回一个 id ,供 start 使用。			// 官方文档就是这样写的。可能有笔误,没写 backlog 参数。

这里有几个相关的宏,目前还不是很清楚其用法:

// ./3rd/lua/luaconfig.h
/*
@@ LUAI_MAXSTACK limits the size of the Lua stack.
** CHANGE it if you need a different limit. This limit is arbitrary;
** its only purpose is to stop Lua from consuming unlimited stack
** space (and to reserve some numbers for pseudo-indices).
*/
#if LUAI_BITSINT >= 32
#define LUAI_MAXSTACK		1000000
#else
#define LUAI_MAXSTACK		15000
#endif

// ./3rd/lua/lua.h
/*
** Pseudo-indices
** (-LUAI_MAXSTACK is the minimum valid index; we keep some free empty
** space after that to help overflow detection)
*/
#define LUA_REGISTRYINDEX	(-LUAI_MAXSTACK - 1000)
#define lua_upvalueindex(i)	(LUA_REGISTRYINDEX - (i))

其余接口相似。


接着看看 skynet 向外提供的 socket lua 接口,常见的接口以下,更多详细内容见 ./lualib/socket.lua,以及 skynet Socket WiKi

*   发起一个TCP链接:       	socket.open(address, port)
*   启动epoll监听:       	socket.start(id)
*   从socket读数据:    		socket.read(id, sz)
*   向socket写数据:     	socket.write(id, str)
*   开启一个TCP监听:          	socket.listen(address, port)
*   关闭socket:           	socket.close(id)