Lua层消息处理机制在lualib/skynet.lua,提供大部分Lua层的api(最终会调用到c层的api),包括启动一个snlua服务时Lua层的处理,建立新服务,注册服务协议,如何发送消息,如何处理对方发过来的消息等。本篇主要介绍消息处理机制,从而理解skynet如何实现高并发。node
为了简化,代码里用到的coroutine_resume,coroutine_yield当作coroutine.resume,coroutine.yield便可。api
local coroutine_resume = profile.resume local coroutine_yield = profile.yield
coroutine.create,建立一个co,惟一的参数是co要执行的闭包f,此时是不会执行闭包f的缓存
coroutine.resume,执行一个co,第一个参数是co的句柄,若是是第一次执行,其余参数是传递给闭包f的。co启动后,一直执行直到它终止或让出。正常终止,返回true和闭包f的返回值;发生错误异常终止,则返回false和错误信息session
coroutine.yield,使co暂停,让出执行权。对应最近的resume会马上返回,返回true和yield的参数。下一次resume同一个co时,会从让出点继续执行,此时,yield的调用会马上返回,返回值为resume除第一个参数以外的其余参数闭包
引用Lua文档介绍协程coroutine(简称co)的经典例子,能够看出,co能够被不断的暂停和重启。skynet普遍使用co,当发送一个rpc请求时会暂停当前co,等对方返回时又重启co。并发
先阐述下skynet建立协程(co)的方式,经过co_create(f)这个api建立一个协程,这段代码很是有意思。为了性能,skynet会把建立的co放到缓存里(第9行),当协程执行完流程(闭包f)后不会终止,而是暂停(第10行)。当调用者调用co_create这个api时,若是缓存里没有,经过coroutine.create建立一个co,此时是不会执行闭包f,而后在某个时刻(一般是收到消息调用消息分发skynet.dispatch_message)会重启(附带须要的参数)这个co,co接着执行闭包f(第6行),最后暂停以等待下一次使用,对应最近的resume返回true和“EXIT”(第10行);若是是一个复用的co,重启co(第15行,参数是将要执行的闭包f),yield会马上返回把闭包赋值给f(第10行),在11行又暂停,一样在某个时刻会重启(附带须要的参数)这个co,co接着执行闭包f(第11行),最后又在第10行暂停等待下一次使用。函数
1 -- lualib/skynet.lua 2 local function co_create(f) 3 local co = table.remove(coroutine_pool) 4 if co == nil then 5 co = coroutine.create(function(...) 6 f(...) 7 while true do 8 f = nil 9 coroutine_pool[#coroutine_pool+1] = co 10 f = coroutine_yield "EXIT" 11 f(coroutine_yield()) 12 end 13 end) 14 else 15 coroutine_resume(co, f) 16 end 17 return co 18 end
了解了co_create的原理后,接下来以服务A向服务B发一条消息为例说明skynet是如何处理Lua层消息:高并发
-- A.lua local skynet = require "skynet" skynet.start(function() print(skynet.call("B", "lua", "aaa")) end)
-- B.lua local skynet = require "skynet" require "skynet.manager" skynet.start(function() skynet.dispatch("lua", function(session, source, ...) skynet.ret(skynet.pack("OK")) end) skynet.register "B" end)
在服务启动最后会调用skynet.start,skynet.start调用skynet.timeout,在timeout里会建立一个co(12行),称之为服务的主协程co1,此时co1不会执行性能
1 -- lualib/skynet.lua 2 function skynet.start(start_func) 3 c.callback(skynet.dispatch_message) 4 skynet.timeout(0, function() 5 skynet.init_service(start_func) 6 end) 7 end 8 9 function skynet.timeout(ti, func) 10 local session = c.intcommand("TIMEOUT",ti) 11 assert(session) 12 local co = co_create(func) 13 assert(session_id_coroutine[session] == nil) 14 session_id_coroutine[session] = co 15 end
定时器被触发(由于定时器设置是0,因此下一帧就触发)会向服务发送一条“RESPONSE”类型(PTYPE_RESPONSE=1)的消息ui
// skynet-src/skynet_timer.c
static inline void dispatch_list(struct timer_node *current) { ... message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT; ... }
服务收到消息后,调用消息分发api,因为消息类型是RESPONSE,最终会执行到第7行。重启主协程co1,执行co1的闭包f(这里是skynet.init_service(start_func)),若是闭包f里没有暂停的操做,待闭包f成功运行完,co1暂停,resume会返回true和"EXIT",接下来,第7行就变成,suspend(co, true, "EXIT")
1 -- luablib/skynet.lua 2 local function raw_dispatch_message(prototype, msg, sz, session, source) 3 -- skynet.PTYPE_RESPONSE = 1, read skynet.h 4 if prototype == 1 then 5 local co = session_id_coroutine[session] 6 ... 7 suspend(co, coroutine_resume(co, true, msg, sz)) 8 ... 9 end
而后,调用suspend,因为类型是"EXIT",作一些清理工做便可。
-- lualib/skynet.lua function suspend(co, result, command, param, size) ... elseif command == "EXIT" then -- coroutine exit local address = session_coroutine_address[co] if address then release_watching(address) session_coroutine_id[co] = nil session_coroutine_address[co] = nil session_response[co] = nil end ... end
当闭包f里有暂停操做,好比A服务向B服务发送消息skynet.call("B", "lua", "aaa"),这里分别讲解A服务和B服务是如何处理的:
对于A服务:
首先在c层把消息发送出去(第14行,把消息push到目的服务的次级消息队列),而后暂停co1,resume返回true,"CALL"和session值
1 -- lualib/skynet.lua 2 local function yield_call(service, session) 3 watching_session[session] = service 4 local succ, msg, sz = coroutine_yield("CALL", session) 5 watching_session[session] = nil 6 if not succ then 7 error "call failed" 8 end 9 return msg,sz 10 end 11 12 function skynet.call(addr, typename, ...) 13 local p = proto[typename] 14 local session = c.send(addr, p.id , nil , p.pack(...)) 15 if session == nil then 16 error("call to invalid address " .. skynet.address(addr)) 17 end 18 return p.unpack(yield_call(addr, session)) 19 end
而后调用suspend(co, true, "CALL", session),类型是"CALL",以session为key,co为value保存在session_id_coroutine里,以便当B服务对A的请求返回后,根据session找到对应的co,从而能够重启co
1 -- lualib/skynet.lua 2 function suspend(co, result, command, param, size) 3 ... 4 if command == "CALL" then 5 session_id_coroutine[param] = co 6 ... 7 end
当A收到B的返回消息时,调用消息分发api,根据session找到对应的co(即主协程co1),从上一次暂停点重启它,下面这一行代码yield会马上返回,打印出B返回的结果print(...)(A.lua),此时执行完co1整个流程,返回true和“EXIT”给suspend,对co1作一些清理工做。
local succ, msg, sz = coroutine_yield("CALL", session)
稍微改一下A.lua,co1执行闭包f流程中经过fork建立一个协程(称为co2),因为co1没有暂停,会一直执行完整个流程。此时co2并无执行。
1 -- A.lua 2 local skynet = require "skynet" 3 4 skynet.start(function() 5 skynet.fork(function() 6 print(skynet.call("B", "lua", "aaa")) 7 end) 8 end)
1 -- lualib/skynet.lua 2 function skynet.fork(func,...) 3 local args = table.pack(...) 4 local co = co_create(function() 5 func(table.unpack(args,1,args.n)) 6 end) 7 table.insert(fork_queue, co) 8 return co 9 end
消息分发api作的第二件事是处理fork_queue里的co。因此收到定时器发送回来的消息后作的第二件事是重启co2,向B服务发送消息后暂停co2,直到B返回时再重启co2。
1 -- lualib/skynet.lua 2 function skynet.dispatch_message(...) 3 ... 4 local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co)) 5 ... 6 end
对于B服务:
收到A服务的消息后调用消息分发api,建立一个co(第12行),co要执行的闭包f是已注册的消息回调函数p.dispatch(第4行),而后经过resume重启它(第15行)
1 -- lualib/skynet.lua 2 local function raw_dispatch_message(prototype, msg, sz, session, source) 3 ... 4 local f = p.dispatch 5 if f then 6 local ref = watching_service[source] 7 if ref then 8 watching_service[source] = ref + 1 9 else 10 watching_service[source] = 1 11 end 12 local co = co_create(f) 13 session_coroutine_id[co] = session 14 session_coroutine_address[co] = source 15 suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz))) 16 ... 17 end
执行skynet.ret(skynet.pack("OK")),调用yield暂停它(第4行),最近的resume返回,上面第15行变成suspend(co, true, "RETURN", msg, sz)
1 -- lualib/skynet.lua 2 function skynet.ret(msg, sz) 3 msg = msg or "" 4 return coroutine_yield("RETURN", msg, sz) 5 end
当command=="RETURN"时,作两件事:1. 向源地址(即A服务)发送返回消息(第5行);2. 重启co(第7行),co从skynet.ret返回,而后B服务的消息回调函数(p.dispatch)执行完,co的闭包f所有执行完放入缓存中,返回true和“EXIT“给suspend
1 -- lualib/skynet.lua 2 function suspend(co, result, command, param, size) 3 ... 4 elseif command == "RETURN" then 5 ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) ~= nil 6 ... 7 return suspend(co, coroutine_resume(co, ret)) 8 ... 9 end
至此,就是Lua层消息处理的整个流程。
在一些状况下须要作异常处理,好比没有注册对应消息类型的协议,没有提供消息回调函数,执行co过程当中发生错误等。当一个服务处理一条消息的过程发生异常,必需要作两件事:1. 异常终止当前co;2. 通知消息发送方,而不是让对方一直忙等待。
当执行co过程当中发生错误时,resume第一个返回值是false,调用suspend,向对方发送一条PTYPE_ERROR类型消息(第9行),而后抛出异常,终止当前co(第14行)。
1 -- lualib/skynet.lua 2 function suspend(co, result, command, param, size) 3 if not result then 4 local session = session_coroutine_id[co] 5 if session then -- coroutine may fork by others (session is nil) 6 local addr = session_coroutine_address[co] 7 if session ~= 0 then 8 -- only call response error 9 c.send(addr, skynet.PTYPE_ERROR, session, "") 10 end 11 session_coroutine_id[co] = nil 12 session_coroutine_address[co] = nil 13 end 14 error(debug.traceback(co,tostring(command))) 15 end 16 ... 17 end
大部分异常状况下,都会向对方发送一条PTYPE_ERROR类型消息通知对方,当收到PYTPE_ERROR类型消息,会调用_error_dispatch,把error_source记录在dead_service里,把error_session记录在error_queue里
1 -- lualib/skynet.lua 2 local function _error_dispatch(error_session, error_source) 3 if error_session == 0 then 4 -- service is down 5 -- Don't remove from watching_service , because user may call dead service 6 if watching_service[error_source] then 7 dead_service[error_source] = true 8 end 9 for session, srv in pairs(watching_session) do 10 if srv == error_source then 11 table.insert(error_queue, session) 12 end 13 end 14 else 15 -- capture an error for error_session 16 if watching_session[error_session] then 17 table.insert(error_queue, error_session) 18 end 19 end 20 end
在suspend最后会调用dispatch_error_queue处理error_queue,经过session查找到正在等待的co,而后强制终止它,保证co不会一直忙等待。
1 -- lualib/skynet.lua 2 local function dispatch_error_queue() 3 local session = table.remove(error_queue,1) 4 if session then 5 local co = session_id_coroutine[session] 6 session_id_coroutine[session] = nil 7 return suspend(co, coroutine_resume(co, false)) 8 end 9 end
一次同步的rpc请求的流程以下图。当一个服务当前co暂停时,能够去执行服务里其余co的流程,N个co之间能够交叉执行,一个co暂停并不会影响其余co的执行,最大化提供计算能力,实现高并发。