Improve Async Module

This commit is contained in:
Cooldude2606
2024-10-01 22:05:07 +01:00
parent f8c74c9dd3
commit 67a169ce0a
2 changed files with 202 additions and 136 deletions

View File

@@ -1,42 +1,55 @@
--[[-- Util Module - Async --[[-- ExpUtil - Async
- Provides a method of spreading work across multiple ticks and running functions at a later time Provides a method of spreading work across multiple ticks and running functions at a later time
@core Async
@alias Async
@usage-- Bypass permission groups --- Bypass permission groups
-- This is a simple example, you should have some kind of validation to prevent security flaws -- This is a simple example, you should have some kind of validation to prevent security flaws
local function setAdmin(player, state) local function setAdmin(player, state)
player.admin = state player.admin = state
end end
local setAdminAsync = Async.register(setAdmin) local set_admin_async = Async.register(setAdmin)
setAdminAsync(game.players[1], true) set_admin_async(game.players[1], true)
@usage-- Functions stored in storage table --- Functions stored in storage table
-- This can be used to create run time configurable callbacks, although this is not recommended -- Async functions and return values are safe to store in storage
storage.myCallback = Async.register(function() -- However they must be registered during the control stage
game.print("I got called!") local function say_hello(name)
end) game.print("Hello " .. name)
end
storage.say_hello_async = Async.register(say_hello)
-- The function can be called just like any other function -- The function can be called just like any other function
storage.myCallback() storage.say_hello_async("John")
@usage-- Creating singleton tasks (best used with storage data) -- Run the function this tick rather than the default of next tick
storage.say_hello_async:start_now("Dave")
-- Call the function after 60 ticks
storage.say_hello_async:start_after(60, "Steve")
-- You can cancel any task or function call that hasn't returned
-- You can store this task in storage to cancel at any time, or poll if it returned
local task = storage.say_hello_async:start_after(30, "Kevin")
task:cancel()
--- Creating multi tick tasks (best used with storage data)
-- This allows you to split large tasks across multiple ticks to prevent lag -- This allows you to split large tasks across multiple ticks to prevent lag
local myTask = Async.register(function(remainingWork) local my_task = Async.register(function(words)
game.print("Working... " .. remainingWork) game.print(table.remove(words))
if remainingWork > 0 then if #words > 0 then
return Async.status.continue(remainingWork - 1) return Async.status.continue(words)
end end
end) end)
myTask:start_task(10) -- Queues the task my_task:start_task{ "foo", "bar", "baz" } -- Queues the task
myTask:start_task(10) -- Does nothing, task is already running my_task:start_task{ "A", "B", "C" } -- Does nothing, task is already running
myTask:start_now(10) -- Ignores the already running instance and starts a second one my_task:start_soon{ "1", "2", "3" } -- Ignores the already running instance and starts a second one
my_task:start_now{ "X", "Y", "Z" } -- Same as start_soon but will run once this tick then queues the remainder
@usage-- Actions with variable delays --- Actions with variable delays
-- on_nth_tick is great for consistent delays, but tasks allow for variable delays -- on_nth_tick is great for consistent delays, but tasks allow for variable delays
local linearBackoff = Async.register(function(startingDelay, remainingWork) local linear_backoff = Async.register(function(startingDelay, remainingWork)
game.print("Working... " .. remainingWork) game.print("Working... " .. remainingWork)
if remainingWork > 0 then if remainingWork > 0 then
local newDelay = startingDelay + 1 local newDelay = startingDelay + 1
@@ -44,11 +57,12 @@ local linearBackoff = Async.register(function(startingDelay, remainingWork)
end end
end) end)
linearBackoff(1, 10) linear_backoff(1, 10)
@usage-- Getting return values --- Getting return values
-- you can capture the return values of an async function using the event -- You can capture the return values of an async function using another async function
local fillTableAsync = Async.register(function(tbl, val, remainingWork) -- Note that you can not chain calls to return_to only one return capture is allowed
local fill_table_async = Async.register(function(tbl, val, remainingWork)
table.insert(tbl, val) table.insert(tbl, val)
if remainingWork > 0 then if remainingWork > 0 then
return Async.status.continue(tbl, val, remainingWork - 1) return Async.status.continue(tbl, val, remainingWork - 1)
@@ -57,67 +71,108 @@ local fillTableAsync = Async.register(function(tbl, val, remainingWork)
end end
end) end)
local function on_function_complete(event) local function print_table_size(tbl)
if event.async_id ~= fillTableAsync.id then return end game.print("Table has length of " .. #tbl)
local filledTable = table.unpack(event.return_values)
game.print("Table has length of " .. #filledTable)
end end
fillTableAsync({}, "foo", 10) -- Puts 10 lots of foo into the table local print_table_size_async = Async.register(print_table_size)
fill_table_async({}, "foo", 10):return_to(print_table_size_async)
]] ]]
local ExpUtil = require("modules/exp_util")
local Clustorio = require("modules/clusterio/api") local Clustorio = require("modules/clusterio/api")
local ExpUtil = require("modules/exp_util/common")
local Async = { local Async = {
status = {}, -- Stores the allowed return types from a async function status = {}, -- Stores the allowed return types from a async function
events = {}, -- Stores all event handlers for this module events = {}, -- Stores all event handlers for this module
_prototype = {}, -- Prototype of the async function type
_queue_pressure = {}, -- Stores the count of each function in the queue to avoid queue iteration during start_task _queue_pressure = {}, -- Stores the count of each function in the queue to avoid queue iteration during start_task
_functions = {}, -- Stores a reference to all registered functions _functions = {}, -- Stores a reference to all registered functions
--- Raised when any async function has finished execution
-- @event on_function_complete
-- @tparam AsyncFunction async_id The function which finished execution, comparable to the return of register
-- @tparam table return_values An array representing the values returned by the completed function
on_function_complete = script.generate_event_name(),
} }
Async._metatable = { --- @class Async.AsyncFunction
__call = function(self, ...) Async._prototype.start_soon(self, ...) end, --- @field id number The id of this async function
__index = Async._prototype, --- @operator call: function
Async._function_prototype = {}
Async._function_metatable = {
__call = nil, -- Async._function_prototype.start_soon,
__index = Async._function_prototype,
__class = "AsyncFunction", __class = "AsyncFunction",
} }
script.register_metatable("AsyncFunction", Async._metatable) --- @class Async.AsyncReturn<F>
--- @field func_id number The id of the async function to be called
--- @field args any[] The arguments to call the function with
--- @field tick number? If present, the function will be called on this game tick
--- @field next_id number? The id of the async function to be called with the return value
--- @field canceled boolean? True if the call has been canceled
--- @field returned any[]? The return values of the function call
Async._return_prototype = {} -- Prototype of the async return type
--- Globals Async._return_metatable = {
local async_next -- Stores a queue of async functions to be executed on the next tick __index = Async._return_prototype,
local async_queue -- Stores a queue of async functions to be executed on a later tick __class = "AsyncReturn",
local on_tick_mutex = false -- It is not safe to modify the globals while this value is true }
script.register_metatable("AsyncFunction", Async._function_metatable)
script.register_metatable("AsyncReturn", Async._return_metatable)
--- Storage Variables
local resolve_next --- @type Async.AsyncReturn[] Stores a queue of async functions to be executed on the next tick
local resolve_queue --- @type Async.AsyncReturn[] Stores a queue of async functions to be executed on a later tick
--- Insert an item into the priority queue --- Insert an item into the priority queue
local function add_to_queue(pending) --- @param pending Async.AsyncReturn
--- @return Async.AsyncReturn
local function add_to_next_tick(pending)
resolve_next[#resolve_next + 1] = pending
return pending
end
--- Insert an item into the priority queue
--- @param pending Async.AsyncReturn
--- @return Async.AsyncReturn
local function add_to_resolve_queue(pending)
local tick = pending.tick local tick = pending.tick
for index = #async_queue, 1, -1 do for index = #resolve_queue, 1, -1 do
if async_queue[index].tick >= tick then if resolve_queue[index].tick >= tick then
async_queue[index + 1] = pending resolve_queue[index + 1] = pending
return return pending
else else
async_queue[index + 1] = async_queue[index] resolve_queue[index + 1] = resolve_queue[index]
end end
end end
async_queue[1] = pending resolve_queue[1] = pending
return pending
end end
--- Static Methods. --- Async Return.
-- Static methods of the class -- Similar to a JS promise, it is returned after starting a task and allows awaiting and cancellation
-- @section async-static -- Because it would result inefficient code, it is not possible to chain calls to after
--- Cancel an async function from being called
function Async._return_prototype:cancel()
self.canceled = true
end
--- Assign an async function to be called on completion of this function
--- @param async_func Async.AsyncFunction The function which will be called using start_soon
function Async._return_prototype:return_to(async_func)
self.next_id = async_func.id
if self.returned then
async_func(table.unpack(self.returned))
end
end
--- Async Function.
-- Functions which can be put in storage and used as tasks to be completed over multiple ticks
--- Register a new async function --- Register a new async function
-- @tparam function func The function which becomes the async function --- @param func function The function which becomes the async function
-- @treturn AsyncFunction The newly registered async function --- @return Async.AsyncFunction # The newly registered async function
function Async.register(func) function Async.register(func)
ExpUtil.assert_not_runtime() ExpUtil.assert_not_runtime()
ExpUtil.assert_argument_type(func, "function", 1, "func") ExpUtil.assert_argument_type(func, "function", 1, "func")
@@ -126,69 +181,62 @@ function Async.register(func)
Async._functions[id] = func Async._functions[id] = func
Async._queue_pressure[id] = 0 Async._queue_pressure[id] = 0
return setmetatable({ id = id }, Async._metatable) return setmetatable({ id = id }, Async._function_metatable)
end end
--- Prototype Methods.
-- Prototype methods of the class instances
-- @section async-prototype
--- Run an async function on the next tick, this is the default and can be used to bypass permission groups --- Run an async function on the next tick, this is the default and can be used to bypass permission groups
-- @param ... The arguments to call the function with --- @param ... any The arguments to call the function with
function Async._prototype:start_soon(...) --- @return Async.AsyncReturn
assert(not on_tick_mutex, "Cannot queue new async call during execution of another") function Async._function_prototype:start_soon(...)
assert(Async._functions[self.id], "Async function is not registered") assert(Async._functions[self.id], "Async function is not registered")
Async._queue_pressure[self.id] = Async._queue_pressure[self.id] + 1 Async._queue_pressure[self.id] = Async._queue_pressure[self.id] + 1
return add_to_next_tick(setmetatable({
async_next[#async_next + 1] = { func_id = self.id,
id = self.id,
args = { ... }, args = { ... },
} }, Async._return_metatable))
end end
--- Run an async function after the given number of ticks --- Run an async function after the given number of ticks
-- @tparam number ticks The number of ticks to call the function after --- @param ticks number The number of ticks to call the function after
-- @param ... The arguments to call the function with --- @param ... any The arguments to call the function with
function Async._prototype:start_after(ticks, ...) --- @return Async.AsyncReturn
function Async._function_prototype:start_after(ticks, ...)
ExpUtil.assert_argument_type(ticks, "number", 1, "ticks") ExpUtil.assert_argument_type(ticks, "number", 1, "ticks")
assert(not on_tick_mutex, "Cannot queue new async call during execution of another") assert(ticks > 0, "Ticks must be a positive number")
assert(Async._functions[self.id], "Async function is not registered") assert(Async._functions[self.id], "Async function is not registered")
Async._queue_pressure[self.id] = Async._queue_pressure[self.id] + 1 Async._queue_pressure[self.id] = Async._queue_pressure[self.id] + 1
return add_to_resolve_queue(setmetatable({
add_to_queue{ func_id = self.id,
id = self.id,
args = { ... }, args = { ... },
tick = game.tick + ticks, tick = game.tick + ticks,
} }, Async._return_metatable))
end end
--- Run an async function on the next tick if the function is not already queued, allows singleton task/thread behaviour --- Run an async function on the next tick if the function is not already queued, allows singleton task/thread behaviour
-- @param ... The arguments to call the function with --- @param ... any The arguments to call the function with
function Async._prototype:start_task(...) --- @return Async.AsyncReturn | nil
assert(not on_tick_mutex, "Cannot queue new async call during execution of another") function Async._function_prototype:start_task(...)
assert(Async._functions[self.id], "Async function is not registered") assert(Async._functions[self.id], "Async function is not registered")
if Async._queue_pressure[self.id] > 0 then return end if Async._queue_pressure[self.id] > 0 then return end
self:start_soon(...) return self:start_soon(...)
end end
--- Run an async function on this tick, then queue it based on its return value --- Run an async function on this tick, then queue it based on its return value
-- @param ... The arguments to call the function with --- @param ... any The arguments to call the function with
function Async._prototype:start_now(...) --- @return Async.AsyncReturn
assert(not on_tick_mutex, "Cannot queue new async call during execution of another") function Async._function_prototype:start_now(...)
assert(Async._functions[self.id], "Async function is not registered") assert(Async._functions[self.id], "Async function is not registered")
local status, rtn1, rtn2 = Async._functions[self.id](...) local status, rtn1, rtn2 = Async._functions[self.id](...)
if status == Async.status.continue then if status == Async.status.continue then
self:start_soon(table.unpack(rtn1)) return self:start_soon(table.unpack(rtn1))
elseif status == Async.status.delay then elseif status == Async.status.delay then
self:start_after(rtn1, table.unpack(rtn2)) return self:start_after(rtn1, table.unpack(rtn2))
elseif status == Async.status.complete or status == nil then elseif status == Async.status.complete or status == nil then
-- The function has finished execution, raise the custom event return setmetatable({
script.raise_event(Async.on_function_complete, { func_id = self.id,
event = Async.on_function_complete, args = { ... },
tick = game.tick,
async_id = self.id,
returned = rtn1, returned = rtn1,
}) }, Async._return_metatable)
else else
error("Async function " .. self.id .. " returned an invalid status: " .. table.inspect(status)) error("Async function " .. self.id .. " returned an invalid status: " .. table.inspect(status))
end end
@@ -196,11 +244,11 @@ end
--- Status Returns. --- Status Returns.
-- Return values used by async functions -- Return values used by async functions
-- @section async-status
local empty_table = setmetatable({}, { local empty_table = setmetatable({}, {
__index = function() error("Field 'Returned' is Immutable") end, __index = function() error("Field 'Returned' is Immutable") end,
}) -- File scope to allow for reuse __newindex = function() error("Field 'Returned' is Immutable") end,
})
--- Default status, will raise on_function_complete --- Default status, will raise on_function_complete
-- @param ... The return value of the async call -- @param ... The return value of the async call
@@ -224,97 +272,113 @@ end
-- @param ... The arguments to call the function with -- @param ... The arguments to call the function with
function Async.status.delay(ticks, ...) function Async.status.delay(ticks, ...)
ExpUtil.assert_argument_type(ticks, "number", 1, "ticks") ExpUtil.assert_argument_type(ticks, "number", 1, "ticks")
assert(ticks > 0, "Ticks must be a positive number")
if ... == nil then if ... == nil then
return Async.status.continue, ticks, empty_table return Async.status.continue, ticks, empty_table
end end
return Async.status.delay, ticks, { ... } return Async.status.delay, ticks, { ... }
end end
--- Status Returns.
--- @type Async.AsyncReturn[], Async.AsyncReturn[]
local new_next, new_queue = {}, {} -- File scope to allow for reuse
--- Executes an async function and processes the return value --- Executes an async function and processes the return value
local function exec(pending, tick, new_next, new_queue) local function exec(pending, tick)
if pending.cancelled then return end
local status, rtn1, rtn2 = Async._functions[pending.id](table.unpack(pending.args)) local status, rtn1, rtn2 = Async._functions[pending.id](table.unpack(pending.args))
if status == Async.status.continue then if status == Async.status.continue then
new_next[#new_next + 1] = pending resolve_next[#resolve_next + 1] = pending
pending.tick = nil pending.tick = nil
pending.args = rtn1 pending.args = rtn1
elseif status == Async.status.delay then elseif status == Async.status.delay then
new_queue[#new_queue + 1] = pending resolve_queue[#resolve_queue + 1] = pending
pending.tick = tick + rtn1 pending.tick = tick + rtn1
pending.args = rtn2 pending.args = rtn2
elseif status == Async.status.complete or status == nil then elseif status == Async.status.complete or status == nil then
-- The function has finished execution, raise the custom event -- The function has finished execution, raise the custom event
Async._queue_pressure[pending.id] = Async._queue_pressure[pending.id] - 1 Async._queue_pressure[pending.id] = Async._queue_pressure[pending.id] - 1
script.raise_event(Async.on_function_complete, { pending.returned = rtn1
event = Async.on_function_complete, if pending.next_id then
tick = tick, resolve_next[#resolve_next + 1] = setmetatable({
async_id = pending.id, func_id = pending.next_id,
returned = rtn1, args = rtn1,
}) }, Async._return_metatable)
end
else else
error("Async function " .. pending.id .. " returned an invalid status: " .. table.inspect(status)) error("Async function " .. pending.id .. " returned an invalid status: " .. table.inspect(status))
end end
end end
local new_next, new_queue = {}, {} -- File scope to allow for reuse
--- Each tick, run all next tick functions, then check if any in the queue need to be executed --- Each tick, run all next tick functions, then check if any in the queue need to be executed
local function on_tick() local function on_tick()
if async_next == nil then return end if resolve_next == nil then return end
local tick = game.tick local tick = game.tick
-- Execute all pending functions -- Swap the references around so it is safe to iterate the arrays
for index = 1, #async_next, 1 do local real_next, real_queue = resolve_next, resolve_queue
exec(async_next[index], tick, new_next, new_queue) resolve_next, resolve_queue = new_next, new_queue
async_next[index] = nil
-- Execute all pending async functions
for index = 1, #real_next, 1 do
exec(real_next[index], tick)
real_next[index] = nil
end end
for index = #async_queue, 1, -1 do for index = #real_queue, 1, -1 do
local pending = async_queue[index] local pending = real_queue[index]
if pending.tick > tick then if pending.tick > tick and not pending.canceled then
break break
end end
exec(pending, tick, new_next, new_queue) exec(pending, tick)
async_queue[index] = nil real_queue[index] = nil
end end
-- Queue any functions that did not complete -- Swap the references back to normal
resolve_next, resolve_queue = real_next, real_queue
-- Queue any functions that were added during the execution of the others
for index = 1, #new_next, 1 do for index = 1, #new_next, 1 do
async_next[index] = new_next[index] resolve_next[index] = new_next[index]
new_next[index] = nil new_next[index] = nil
end end
for index = 1, #new_queue, 1 do for index = 1, #new_queue, 1 do
add_to_queue(new_next[index]) add_to_resolve_queue(new_queue[index])
new_next[index] = nil new_queue[index] = nil
end end
end end
--- On load, check the queue status and update the pressure values --- On load, check the queue status and update the pressure values
function Async.on_load() function Async.on_load()
if storage.exp_async_next == nil then return end if storage.exp_async_next == nil then return end
async_next = storage.exp_async_next resolve_next = storage.exp_async_next
async_queue = storage.exp_async_queue resolve_queue = storage.exp_async_queue
for _, pending in ipairs(async_next) do
local count = Async._queue_pressure[pending.id]
if count == nil then
log("Warning: Pending async function missing after load: " .. pending.id)
Async._functions[pending.id] = function() end -- NOP
count = 0
end
Async._queue_pressure[pending.id] = count + 1
end
for _, pending in ipairs(async_queue) do -- Rebuild the queue pressure table
for _, pending in ipairs(resolve_next) do
local count = Async._queue_pressure[pending.id] local count = Async._queue_pressure[pending.id]
if count == nil then if count then
log("Warning: Pending async function missing after load: " .. pending.id)
Async._functions[pending.id] = function() end -- NOP
count = 0
end
Async._queue_pressure[pending.id] = count + 1 Async._queue_pressure[pending.id] = count + 1
else
log("Warning: Pending async function missing after load: " .. pending.id)
pending.canceled = true
end end
end end
--- On server startup initialise the storage data for _, pending in ipairs(resolve_queue) do
local count = Async._queue_pressure[pending.id]
if count then
Async._queue_pressure[pending.id] = count + 1
else
log("Warning: Pending async function missing after load: " .. pending.id)
pending.canceled = true
end
end
end
--- On init and server startup initialise the storage data
function Async.on_init() function Async.on_init()
if storage.exp_async_next == nil then if storage.exp_async_next == nil then
storage.exp_async_next = {} storage.exp_async_next = {}
@@ -325,4 +389,5 @@ end
Async.events[defines.events.on_tick] = on_tick Async.events[defines.events.on_tick] = on_tick
Async.events[Clustorio.events.on_server_startup] = Async.on_init Async.events[Clustorio.events.on_server_startup] = Async.on_init
Async._function_metatable.__call = Async._function_prototype.start_soon
return Async return Async

View File

@@ -122,8 +122,9 @@ end
--- @param level number? The level of the stack to get the file of, a value of 1 is the caller of this function --- @param level number? The level of the stack to get the file of, a value of 1 is the caller of this function
--- @return string # The relative filepath of the given stack frame --- @return string # The relative filepath of the given stack frame
function Common.safe_file_path(level) function Common.safe_file_path(level)
level = level or 1 local debug_info = getinfo((level or 1) + 1, "Sn")
return getinfo(level + 1, "S").short_src:sub(10, -5) local safe_source = debug_info.source:find("__level__")
return safe_source == 1 and debug_info.short_src:sub(10, -5) or debug_info.source
end end
--- Returns the name of your module, this assumes your module is stored within /modules (which it is for clustorio) --- Returns the name of your module, this assumes your module is stored within /modules (which it is for clustorio)
@@ -141,7 +142,7 @@ end
--- Returns the name of a function in a safe and consistent format --- Returns the name of a function in a safe and consistent format
--- @param func number | function The level of the stack to get the name of, a value of 1 is the caller of this function --- @param func number | function The level of the stack to get the name of, a value of 1 is the caller of this function
--- @param raw boolean When true there will not be any < > around the name --- @param raw boolean? When true there will not be any < > around the name
--- @return string # The name of the function at the given stack frame or provided as an argument --- @return string # The name of the function at the given stack frame or provided as an argument
function Common.get_function_name(func, raw) function Common.get_function_name(func, raw)
local debug_info = getinfo(func, "Sn") local debug_info = getinfo(func, "Sn")