From 67a169ce0a5fa317d332554721a6c6dc40a6beec Mon Sep 17 00:00:00 2001 From: Cooldude2606 <25043174+Cooldude2606@users.noreply.github.com> Date: Tue, 1 Oct 2024 22:05:07 +0100 Subject: [PATCH] Improve Async Module --- exp_util/module/async.lua | 331 ++++++++++++++++++++++--------------- exp_util/module/common.lua | 7 +- 2 files changed, 202 insertions(+), 136 deletions(-) diff --git a/exp_util/module/async.lua b/exp_util/module/async.lua index 487f975f..1f166493 100644 --- a/exp_util/module/async.lua +++ b/exp_util/module/async.lua @@ -1,42 +1,55 @@ ---[[-- Util Module - Async -- Provides a method of spreading work across multiple ticks and running functions at a later time -@core Async -@alias Async +--[[-- ExpUtil - Async +Provides a method of spreading work across multiple ticks and running functions at a later time -@usage-- Bypass permission groups +--- Bypass permission groups -- This is a simple example, you should have some kind of validation to prevent security flaws local function setAdmin(player, state) player.admin = state end -local setAdminAsync = Async.register(setAdmin) -setAdminAsync(game.players[1], true) +local set_admin_async = Async.register(setAdmin) +set_admin_async(game.players[1], true) -@usage-- Functions stored in storage table --- This can be used to create run time configurable callbacks, although this is not recommended -storage.myCallback = Async.register(function() - game.print("I got called!") -end) +--- Functions stored in storage table +-- Async functions and return values are safe to store in storage +-- However they must be registered during the control stage +local function say_hello(name) + game.print("Hello " .. name) +end + +storage.say_hello_async = Async.register(say_hello) -- The function can be called just like any other function -storage.myCallback() +storage.say_hello_async("John") -@usage-- Creating singleton tasks (best used with storage data) +-- Run the function this tick rather than the default of next tick +storage.say_hello_async:start_now("Dave") + +-- Call the function after 60 ticks +storage.say_hello_async:start_after(60, "Steve") + +-- You can cancel any task or function call that hasn't returned +-- You can store this task in storage to cancel at any time, or poll if it returned +local task = storage.say_hello_async:start_after(30, "Kevin") +task:cancel() + +--- Creating multi tick tasks (best used with storage data) -- This allows you to split large tasks across multiple ticks to prevent lag -local myTask = Async.register(function(remainingWork) - game.print("Working... " .. remainingWork) - if remainingWork > 0 then - return Async.status.continue(remainingWork - 1) +local my_task = Async.register(function(words) + game.print(table.remove(words)) + if #words > 0 then + return Async.status.continue(words) end end) -myTask:start_task(10) -- Queues the task -myTask:start_task(10) -- Does nothing, task is already running -myTask:start_now(10) -- Ignores the already running instance and starts a second one +my_task:start_task{ "foo", "bar", "baz" } -- Queues the task +my_task:start_task{ "A", "B", "C" } -- Does nothing, task is already running +my_task:start_soon{ "1", "2", "3" } -- Ignores the already running instance and starts a second one +my_task:start_now{ "X", "Y", "Z" } -- Same as start_soon but will run once this tick then queues the remainder -@usage-- Actions with variable delays +--- Actions with variable delays -- on_nth_tick is great for consistent delays, but tasks allow for variable delays -local linearBackoff = Async.register(function(startingDelay, remainingWork) +local linear_backoff = Async.register(function(startingDelay, remainingWork) game.print("Working... " .. remainingWork) if remainingWork > 0 then local newDelay = startingDelay + 1 @@ -44,11 +57,12 @@ local linearBackoff = Async.register(function(startingDelay, remainingWork) end end) -linearBackoff(1, 10) +linear_backoff(1, 10) -@usage-- Getting return values --- you can capture the return values of an async function using the event -local fillTableAsync = Async.register(function(tbl, val, remainingWork) +--- Getting return values +-- You can capture the return values of an async function using another async function +-- Note that you can not chain calls to return_to only one return capture is allowed +local fill_table_async = Async.register(function(tbl, val, remainingWork) table.insert(tbl, val) if remainingWork > 0 then return Async.status.continue(tbl, val, remainingWork - 1) @@ -57,67 +71,108 @@ local fillTableAsync = Async.register(function(tbl, val, remainingWork) end end) -local function on_function_complete(event) - if event.async_id ~= fillTableAsync.id then return end - local filledTable = table.unpack(event.return_values) - game.print("Table has length of " .. #filledTable) +local function print_table_size(tbl) + game.print("Table has length of " .. #tbl) end -fillTableAsync({}, "foo", 10) -- Puts 10 lots of foo into the table +local print_table_size_async = Async.register(print_table_size) +fill_table_async({}, "foo", 10):return_to(print_table_size_async) ]] +local ExpUtil = require("modules/exp_util") local Clustorio = require("modules/clusterio/api") -local ExpUtil = require("modules/exp_util/common") local Async = { status = {}, -- Stores the allowed return types from a async function events = {}, -- Stores all event handlers for this module - _prototype = {}, -- Prototype of the async function type _queue_pressure = {}, -- Stores the count of each function in the queue to avoid queue iteration during start_task _functions = {}, -- Stores a reference to all registered functions - --- Raised when any async function has finished execution - -- @event on_function_complete - -- @tparam AsyncFunction async_id The function which finished execution, comparable to the return of register - -- @tparam table return_values An array representing the values returned by the completed function - on_function_complete = script.generate_event_name(), } -Async._metatable = { - __call = function(self, ...) Async._prototype.start_soon(self, ...) end, - __index = Async._prototype, +--- @class Async.AsyncFunction +--- @field id number The id of this async function +--- @operator call: function +Async._function_prototype = {} + +Async._function_metatable = { + __call = nil, -- Async._function_prototype.start_soon, + __index = Async._function_prototype, __class = "AsyncFunction", } -script.register_metatable("AsyncFunction", Async._metatable) +--- @class Async.AsyncReturn +--- @field func_id number The id of the async function to be called +--- @field args any[] The arguments to call the function with +--- @field tick number? If present, the function will be called on this game tick +--- @field next_id number? The id of the async function to be called with the return value +--- @field canceled boolean? True if the call has been canceled +--- @field returned any[]? The return values of the function call +Async._return_prototype = {} -- Prototype of the async return type ---- Globals -local async_next -- Stores a queue of async functions to be executed on the next tick -local async_queue -- Stores a queue of async functions to be executed on a later tick -local on_tick_mutex = false -- It is not safe to modify the globals while this value is true +Async._return_metatable = { + __index = Async._return_prototype, + __class = "AsyncReturn", +} + +script.register_metatable("AsyncFunction", Async._function_metatable) +script.register_metatable("AsyncReturn", Async._return_metatable) + +--- Storage Variables + +local resolve_next --- @type Async.AsyncReturn[] Stores a queue of async functions to be executed on the next tick +local resolve_queue --- @type Async.AsyncReturn[] Stores a queue of async functions to be executed on a later tick --- Insert an item into the priority queue -local function add_to_queue(pending) +--- @param pending Async.AsyncReturn +--- @return Async.AsyncReturn +local function add_to_next_tick(pending) + resolve_next[#resolve_next + 1] = pending + return pending +end + +--- Insert an item into the priority queue +--- @param pending Async.AsyncReturn +--- @return Async.AsyncReturn +local function add_to_resolve_queue(pending) local tick = pending.tick - for index = #async_queue, 1, -1 do - if async_queue[index].tick >= tick then - async_queue[index + 1] = pending - return + for index = #resolve_queue, 1, -1 do + if resolve_queue[index].tick >= tick then + resolve_queue[index + 1] = pending + return pending else - async_queue[index + 1] = async_queue[index] + resolve_queue[index + 1] = resolve_queue[index] end end - async_queue[1] = pending + resolve_queue[1] = pending + return pending end ---- Static Methods. --- Static methods of the class --- @section async-static +--- Async Return. +-- Similar to a JS promise, it is returned after starting a task and allows awaiting and cancellation +-- Because it would result inefficient code, it is not possible to chain calls to after + +--- Cancel an async function from being called +function Async._return_prototype:cancel() + self.canceled = true +end + +--- Assign an async function to be called on completion of this function +--- @param async_func Async.AsyncFunction The function which will be called using start_soon +function Async._return_prototype:return_to(async_func) + self.next_id = async_func.id + if self.returned then + async_func(table.unpack(self.returned)) + end +end + +--- Async Function. +-- Functions which can be put in storage and used as tasks to be completed over multiple ticks --- Register a new async function --- @tparam function func The function which becomes the async function --- @treturn AsyncFunction The newly registered async function +--- @param func function The function which becomes the async function +--- @return Async.AsyncFunction # The newly registered async function function Async.register(func) ExpUtil.assert_not_runtime() ExpUtil.assert_argument_type(func, "function", 1, "func") @@ -126,69 +181,62 @@ function Async.register(func) Async._functions[id] = func Async._queue_pressure[id] = 0 - return setmetatable({ id = id }, Async._metatable) + return setmetatable({ id = id }, Async._function_metatable) end ---- Prototype Methods. --- Prototype methods of the class instances --- @section async-prototype - --- Run an async function on the next tick, this is the default and can be used to bypass permission groups --- @param ... The arguments to call the function with -function Async._prototype:start_soon(...) - assert(not on_tick_mutex, "Cannot queue new async call during execution of another") +--- @param ... any The arguments to call the function with +--- @return Async.AsyncReturn +function Async._function_prototype:start_soon(...) assert(Async._functions[self.id], "Async function is not registered") Async._queue_pressure[self.id] = Async._queue_pressure[self.id] + 1 - - async_next[#async_next + 1] = { - id = self.id, + return add_to_next_tick(setmetatable({ + func_id = self.id, args = { ... }, - } + }, Async._return_metatable)) end --- Run an async function after the given number of ticks --- @tparam number ticks The number of ticks to call the function after --- @param ... The arguments to call the function with -function Async._prototype:start_after(ticks, ...) +--- @param ticks number The number of ticks to call the function after +--- @param ... any The arguments to call the function with +--- @return Async.AsyncReturn +function Async._function_prototype:start_after(ticks, ...) ExpUtil.assert_argument_type(ticks, "number", 1, "ticks") - assert(not on_tick_mutex, "Cannot queue new async call during execution of another") + assert(ticks > 0, "Ticks must be a positive number") assert(Async._functions[self.id], "Async function is not registered") Async._queue_pressure[self.id] = Async._queue_pressure[self.id] + 1 - - add_to_queue{ - id = self.id, + return add_to_resolve_queue(setmetatable({ + func_id = self.id, args = { ... }, tick = game.tick + ticks, - } + }, Async._return_metatable)) end --- Run an async function on the next tick if the function is not already queued, allows singleton task/thread behaviour --- @param ... The arguments to call the function with -function Async._prototype:start_task(...) - assert(not on_tick_mutex, "Cannot queue new async call during execution of another") +--- @param ... any The arguments to call the function with +--- @return Async.AsyncReturn | nil +function Async._function_prototype:start_task(...) assert(Async._functions[self.id], "Async function is not registered") if Async._queue_pressure[self.id] > 0 then return end - self:start_soon(...) + return self:start_soon(...) end --- Run an async function on this tick, then queue it based on its return value --- @param ... The arguments to call the function with -function Async._prototype:start_now(...) - assert(not on_tick_mutex, "Cannot queue new async call during execution of another") +--- @param ... any The arguments to call the function with +--- @return Async.AsyncReturn +function Async._function_prototype:start_now(...) assert(Async._functions[self.id], "Async function is not registered") local status, rtn1, rtn2 = Async._functions[self.id](...) if status == Async.status.continue then - self:start_soon(table.unpack(rtn1)) + return self:start_soon(table.unpack(rtn1)) elseif status == Async.status.delay then - self:start_after(rtn1, table.unpack(rtn2)) + return self:start_after(rtn1, table.unpack(rtn2)) elseif status == Async.status.complete or status == nil then - -- The function has finished execution, raise the custom event - script.raise_event(Async.on_function_complete, { - event = Async.on_function_complete, - tick = game.tick, - async_id = self.id, + return setmetatable({ + func_id = self.id, + args = { ... }, returned = rtn1, - }) + }, Async._return_metatable) else error("Async function " .. self.id .. " returned an invalid status: " .. table.inspect(status)) end @@ -196,11 +244,11 @@ end --- Status Returns. -- Return values used by async functions --- @section async-status local empty_table = setmetatable({}, { __index = function() error("Field 'Returned' is Immutable") end, -}) -- File scope to allow for reuse + __newindex = function() error("Field 'Returned' is Immutable") end, +}) --- Default status, will raise on_function_complete -- @param ... The return value of the async call @@ -224,97 +272,113 @@ end -- @param ... The arguments to call the function with function Async.status.delay(ticks, ...) ExpUtil.assert_argument_type(ticks, "number", 1, "ticks") + assert(ticks > 0, "Ticks must be a positive number") if ... == nil then return Async.status.continue, ticks, empty_table end return Async.status.delay, ticks, { ... } end +--- Status Returns. + +--- @type Async.AsyncReturn[], Async.AsyncReturn[] +local new_next, new_queue = {}, {} -- File scope to allow for reuse + --- Executes an async function and processes the return value -local function exec(pending, tick, new_next, new_queue) +local function exec(pending, tick) + if pending.cancelled then return end local status, rtn1, rtn2 = Async._functions[pending.id](table.unpack(pending.args)) if status == Async.status.continue then - new_next[#new_next + 1] = pending + resolve_next[#resolve_next + 1] = pending pending.tick = nil pending.args = rtn1 elseif status == Async.status.delay then - new_queue[#new_queue + 1] = pending + resolve_queue[#resolve_queue + 1] = pending pending.tick = tick + rtn1 pending.args = rtn2 elseif status == Async.status.complete or status == nil then -- The function has finished execution, raise the custom event Async._queue_pressure[pending.id] = Async._queue_pressure[pending.id] - 1 - script.raise_event(Async.on_function_complete, { - event = Async.on_function_complete, - tick = tick, - async_id = pending.id, - returned = rtn1, - }) + pending.returned = rtn1 + if pending.next_id then + resolve_next[#resolve_next + 1] = setmetatable({ + func_id = pending.next_id, + args = rtn1, + }, Async._return_metatable) + end else error("Async function " .. pending.id .. " returned an invalid status: " .. table.inspect(status)) end end -local new_next, new_queue = {}, {} -- File scope to allow for reuse --- Each tick, run all next tick functions, then check if any in the queue need to be executed local function on_tick() - if async_next == nil then return end + if resolve_next == nil then return end local tick = game.tick - -- Execute all pending functions - for index = 1, #async_next, 1 do - exec(async_next[index], tick, new_next, new_queue) - async_next[index] = nil + -- Swap the references around so it is safe to iterate the arrays + local real_next, real_queue = resolve_next, resolve_queue + resolve_next, resolve_queue = new_next, new_queue + + -- Execute all pending async functions + for index = 1, #real_next, 1 do + exec(real_next[index], tick) + real_next[index] = nil end - for index = #async_queue, 1, -1 do - local pending = async_queue[index] - if pending.tick > tick then + for index = #real_queue, 1, -1 do + local pending = real_queue[index] + if pending.tick > tick and not pending.canceled then break end - exec(pending, tick, new_next, new_queue) - async_queue[index] = nil + exec(pending, tick) + real_queue[index] = nil end - -- Queue any functions that did not complete + -- Swap the references back to normal + resolve_next, resolve_queue = real_next, real_queue + + -- Queue any functions that were added during the execution of the others for index = 1, #new_next, 1 do - async_next[index] = new_next[index] + resolve_next[index] = new_next[index] new_next[index] = nil end for index = 1, #new_queue, 1 do - add_to_queue(new_next[index]) - new_next[index] = nil + add_to_resolve_queue(new_queue[index]) + new_queue[index] = nil end end --- On load, check the queue status and update the pressure values function Async.on_load() if storage.exp_async_next == nil then return end - async_next = storage.exp_async_next - async_queue = storage.exp_async_queue - for _, pending in ipairs(async_next) do + resolve_next = storage.exp_async_next + resolve_queue = storage.exp_async_queue + + -- Rebuild the queue pressure table + for _, pending in ipairs(resolve_next) do local count = Async._queue_pressure[pending.id] - if count == nil then + if count then + Async._queue_pressure[pending.id] = count + 1 + else log("Warning: Pending async function missing after load: " .. pending.id) - Async._functions[pending.id] = function() end -- NOP - count = 0 + pending.canceled = true end - Async._queue_pressure[pending.id] = count + 1 end - for _, pending in ipairs(async_queue) do + for _, pending in ipairs(resolve_queue) do local count = Async._queue_pressure[pending.id] - if count == nil then + if count then + Async._queue_pressure[pending.id] = count + 1 + else log("Warning: Pending async function missing after load: " .. pending.id) - Async._functions[pending.id] = function() end -- NOP - count = 0 + pending.canceled = true end - Async._queue_pressure[pending.id] = count + 1 end end ---- On server startup initialise the storage data +--- On init and server startup initialise the storage data function Async.on_init() if storage.exp_async_next == nil then storage.exp_async_next = {} @@ -325,4 +389,5 @@ end Async.events[defines.events.on_tick] = on_tick Async.events[Clustorio.events.on_server_startup] = Async.on_init +Async._function_metatable.__call = Async._function_prototype.start_soon return Async diff --git a/exp_util/module/common.lua b/exp_util/module/common.lua index f48c0888..2c3c8638 100644 --- a/exp_util/module/common.lua +++ b/exp_util/module/common.lua @@ -122,8 +122,9 @@ end --- @param level number? The level of the stack to get the file of, a value of 1 is the caller of this function --- @return string # The relative filepath of the given stack frame function Common.safe_file_path(level) - level = level or 1 - return getinfo(level + 1, "S").short_src:sub(10, -5) + local debug_info = getinfo((level or 1) + 1, "Sn") + local safe_source = debug_info.source:find("__level__") + return safe_source == 1 and debug_info.short_src:sub(10, -5) or debug_info.source end --- Returns the name of your module, this assumes your module is stored within /modules (which it is for clustorio) @@ -141,7 +142,7 @@ end --- Returns the name of a function in a safe and consistent format --- @param func number | function The level of the stack to get the name of, a value of 1 is the caller of this function ---- @param raw boolean When true there will not be any < > around the name +--- @param raw boolean? When true there will not be any < > around the name --- @return string # The name of the function at the given stack frame or provided as an argument function Common.get_function_name(func, raw) local debug_info = getinfo(func, "Sn")