mun/core/reactor.lua

199 lines
5.6 KiB
Lua

local socket = require('socket')
local coroutine = require('coroutine')
reactor = {}
function reactor:Initialize(quantum)
self._read_sockets = {}
self._write_sockets = {}
self._quantum = quantum or 0.1
self._quit = false
self._co_sleep = {}
self._co_event = {}
self._coroutines = {}
self._ev_timeout = {}
self._timers = {}
self._events = {}
end
function reactor:Quit()
self._quit = true
end
function reactor:Sleep(time)
local co = coroutine.running()
if co == nil then
socket.sleep(time)
return
end
self._co_sleep[co] = os.time() + time
coroutine.yield()
end
reactor.Event = {}
function reactor.Event:Wait()
if coroutine.running() == nil then
error("Main thread waiting for event... wtf?")
end
self.Waiting[#self.Waiting + 1] = coroutine.running()
local Status, Data = coroutine.yield()
if Status == false then
-- exception!
error(Data)
end
return unpack(Data)
end
function reactor.Event:WaitTimeout(Timeout)
if coroutine.running() == nil then
error("Main thread waiting for event... wtf?")
end
self.Waiting[#self.Waiting + 1] = coroutine.running()
reactor._ev_timeout[self.Identifier] = {self, os.time() + Timeout, coroutine.running()}
local Status, Data = coroutine.yield()
print("Event result")
print(Status)
print(Data)
return Status, unpack(Data)
end
function reactor.Event:Fire(...)
local Data = {...}
-- remove any timeout events
reactor._ev_timeout[self.Identifier] = nil
-- make a local copy of the old waiters, and clear the new ones
local Waiters = {}
for i, Coroutine in pairs(self.Waiting) do
Waiters[#Waiters + 1] = Coroutine
end
self.Waiting = {}
for i, Coroutine in pairs(Waiters) do
coroutine.resume(Coroutine, true, Data)
end
end
function reactor.Event:Destroy()
-- destroy all waiters
for i, Coroutine in pairs(self.Waiting) do
coroutine.resume(Coroutine, false, "Coroutine destroyed.")
end
self.Reactor._events[self.Identifier] = nil
end
function reactor:NewEvent()
local Table = {}
Table.Identifier = #self._events + 1
Table.Reactor = self
Table.Waiting = {}
setmetatable(Table, reactor.Event)
reactor.Event.__index = reactor.Event
self._events[Table.Identifier] = Identifier
print("New event " .. tostring(Table.Identifier))
return Table
end
function reactor:Spawn(f, ...)
local Args = {...}
local co = coroutine.create(function() f(unpack(Args)) end)
self._coroutines[#self._coroutines + 1] = co
coroutine.resume(co)
end
function reactor:Run()
local read = {}
for Socket, V in pairs(self._read_sockets) do
read[#read+1] = Socket
end
local write = {}
for Socket, V in pairs(self._write_sockets) do
write[#write+1] = Socket
end
while true do
if self._quit then
hook.Call('ReactorQuit')
break
end
local r, w, e = socket.select(read, write, self._quantum)
if e == nil then
-- we actually got something on our sockets
for Socket, Data in pairs(self._read_sockets) do
if r[Socket] ~= nil then
--Socket:settimeout(3)
local Line, Error = Socket:receive('*l')
if Error then
error('Could not receive line: ' .. Error)
end
local Callback = Data[1]
local Args = Data[2]
self:Spawn(Callback, Line, unpack(Args))
end
end
for Socket, Data in pairs(self._write_sockets) do
if w[Socket] ~= nil then
local Callback = Data[1]
local Args = Data[2]
self:Spawn(Callback, Socket, unpack(Args))
end
end
end
-- See if we should wake up any sleepers
for Coroutine, Timeout in pairs(self._co_sleep) do
if os.time() > Timeout then
self._co_sleep[Coroutine] = nil
coroutine.resume(Coroutine)
end
end
-- See if any event timeouts expired
for k, v in pairs(self._ev_timeout) do
local Event = v[1]
local Timeout = v[2]
local Coroutine = v[3]
if Timeout < os.time() then
print("Timeout firing for event " .. tostring(Event.Identifier))
self._ev_timeout[k] = nil
self._events[Event.Identifier] = nil
coroutine.resume(Coroutine, false, "Timeout")
end
end
hook.Call('ReactorTick')
local Time = os.time()
for TimerName, Data in pairs(self._timers) do
if Time >= Data.NextTick then
local Result = Data.Callback()
if Data.Period ~= nil and Result ~= false then
Data.NextTick = Data.NextTick + Data.Period
else
self._timers[TimerName] = nil
end
end
end
end
end
function reactor:SetTimer(name, tick_at, callback, periodic)
local Data = {}
Data.Callback = callback
if periodic then
Data.Period = tick_at
end
Data.NextTick = os.time() + tick_at
self._timers[name] = Data
end
function reactor:RemoveTimer(name)
self._timers[name] = nil
end
function reactor:TCPConnect(host, port, receive_callback, ...)
local Socket = socket.connect(host, port)
local Args = {...}
local SocketStructure = { receive_callback, Args }
self._read_sockets[Socket] = SocketStructure
return Socket
end