Actor pattern library for message-passing concurrency

Apache-2.0 997 downloads
Updated 4 months ago Repository
actorconcurrencymessage-passing

Run

wippy run wippy/actor

Actor

Actor pattern library for message-passing concurrency with topic-based routing.

Installation

entries:
  - name: actor
    kind: ns.dependency
    component: wippy/actor
    version: "*"

Basic Usage

local actor = require("actor")

local initial_state = {
    counter = 0
}

local handlers = {
    increment = function(state, payload, topic, from)
        state.counter = state.counter + (payload.amount or 1)
    end,

    get_count = function(state, payload, topic, from)
        process.send(from, "count_result", {count = state.counter})
    end,

    stop = function(state)
        return actor.exit({final_count = state.counter})
    end
}

local a = actor.new(initial_state, handlers)
return a:run()

Handlers

Topic Handlers

Functions without __ prefix handle messages by topic name:

local handlers = {
    ping = function(state, payload, topic, from)
        process.send(from, "pong", {})
    end,

    echo = function(state, payload, topic, from)
        process.send(from, "echo_reply", payload)
    end
}

Special Handlers

local handlers = {
    __init = function(state)
        -- Called once when actor starts
        state.start_time = os.time()
    end,

    __default = function(state, payload, topic, from)
        -- Handles unmatched topics
    end,

    __on_event = function(state, event, kind, from)
        -- Handles system events (CANCEL, etc.)
    end,

    __on_cancel = function(state, event, kind, from)
        -- Handles cancellation specifically
        return actor.exit({reason = "cancelled"})
    end,

    __on_internal_message = function(state, payload, msg_type, from)
        -- Handles internal async results
    end
}

Control Flow

Exit Actor

function handlers.shutdown(state, payload)
    return actor.exit({status = "done", data = state.data})
end

Chain to Another Handler

function handlers.validate(state, payload)
    if payload.valid then
        return actor.next("process", payload)
    end
    return actor.next("reject", {reason = "invalid"})
end

function handlers.process(state, payload)
    -- Process validated payload
end

function handlers.reject(state, payload)
    -- Handle rejection
end

State Methods

Available on the state object:

Dynamic Handler Registration

function handlers.__init(state)
    state.add_handler("custom_topic", function(s, payload, topic, from)
        -- Handle custom topic
    end)
end

function handlers.cleanup(state)
    state.remove_handler("custom_topic")
end

Channel Registration

function handlers.__init(state)
    local my_channel = channel.new(10)

    state.register_channel(my_channel, function(s, value, ok, channel_id)
        if ok then
            -- Process channel value
        else
            -- Channel closed
        end
    end)

    state.my_channel = my_channel
end

function handlers.stop_channel(state)
    state.unregister_channel(state.my_channel)
end

Async Operations

function handlers.start_background(state, payload)
    state.async(function()
        -- Long running operation
        local result = do_work()
        return actor.next("work_done", result)
    end)
end

function handlers.work_done(state, payload)
    state.result = payload
end

Wait for Message

function handlers.request_and_wait(state, payload)
    process.send(payload.target, "request", {id = 123})

    local response, err = state.wait("response", 5000)  -- 5 second timeout
    if err then
        return actor.next("timeout", {})
    end

    state.response = response
end

Process Events

The actor automatically handles process inbox and system events channel:

local handlers = {
    __on_event = function(state, event, kind, from)
        if kind == process.event.CANCEL then
            return actor.exit({reason = "cancelled"})
        end
    end
}

Complete Example

local actor = require("actor")

local handlers = {
    __init = function(state)
        state.items = {}
        state.async(function()
            return actor.next("ready", {})
        end)
    end,

    ready = function(state)
        process.send(state.parent, "actor_ready", {pid = process.pid()})
    end,

    add_item = function(state, payload)
        table.insert(state.items, payload.item)
        return actor.next("notify_change", {})
    end,

    notify_change = function(state)
        if state.subscriber then
            process.send(state.subscriber, "items_changed", {count = #state.items})
        end
    end,

    subscribe = function(state, payload, topic, from)
        state.subscriber = from
    end,

    get_items = function(state, payload, topic, from)
        process.send(from, "items_list", {items = state.items})
    end,

    __on_cancel = function(state)
        return actor.exit({items = state.items})
    end
}

local a = actor.new({parent = process.parent()}, handlers)
return a:run()