Skip to content

Workflows & Plugins API — Front-End Integration Guide

This document covers every workflow, plugin, and event subscription endpoint, request/response shapes, the execution engine internals, and the UI patterns you will need to implement.

All endpoints require the admin, provider, or integration role (unless noted otherwise) and accept either Authorization: Bearer <token> or X-API-KEY: <key>.


Table of Contents

Event Subscriptions (Workflow Definitions)

  1. Create a Subscription
  2. List Subscriptions
  3. Get a Single Subscription
  4. Update a Subscription
  5. Delete a Subscription

Workflow Graphs (Nodes & Edges)

  1. Get Workflow Graph
  2. Replace Workflow Graph

Simulation

  1. Simulate a Workflow Run

Workflow Runs (Execution History)

  1. List Workflow Runs
  2. Get a Single Workflow Run
  3. List Run Nodes

Debugging & Introspection

  1. Reconstruct Node Context
  2. Evaluate JSONLogic Expression

Plugins

  1. List Plugin Definitions
  2. List Plugin Instances
  3. Create a Plugin Instance
  4. Get a Single Plugin Instance
  5. Update a Plugin Instance
  6. Delete a Plugin Instance

Supporting Endpoints

  1. List Event Definitions
  2. List Event Delivery Tasks

Architecture & Concepts

  1. Core Concepts
  2. Node Types Reference
  3. Execution Engine
  4. JSONLogic Context
  5. Event Processing Pipeline
  6. Important UI Considerations

1. Create a Subscription

POST /api/v1/event_subscription

Request Body

FieldTypeRequiredDefaultNotes
namestringyesHuman-friendly label
event_typestringyesCloudEvent type to trigger on, e.g. medipal.submission.created
descriptionstringnonullOptional human-readable description
enabledboolnotrueWhether the subscription is active
source_filterstringnonullExact match on CloudEvent source field
subject_filterstringnonullExact match on CloudEvent subject field
condition_jsonlogicobjectnonullJSONLogic tree; event is routed only when it evaluates to truthy

Response (200)

json
{
  "id": "a1b2c3d4-...",
  "name": "High Heart Rate Alert",
  "event_type": "medipal.vital.heart_rate.high",
  "description": "Triggers when heart rate exceeds threshold",
  "enabled": true,
  "source_filter": null,
  "subject_filter": null,
  "condition_jsonlogic": null,
  "created_at": "2026-02-28T10:00:00Z",
  "updated_at": null,
  "deleted_at": null
}

2. List Subscriptions

GET /api/v1/event_subscription

Query Parameters

ParamTypeDefaultNotes
limitinteger25Min 1, max 250
offsetinteger0Min 0
event_typestringFilter by exact event type
enabledbooleanFilter by active flag
namestringCase-insensitive partial match on name
searchstringCase-insensitive partial match on name
sort_bystringColumn to sort by
sort_dirstringasc or desc

Response (200)

json
{
  "event_subscriptions": [
    {
      "id": "a1b2c3d4-...",
      "name": "High Heart Rate Alert",
      "event_type": "medipal.vital.heart_rate.high",
      "description": null,
      "enabled": true,
      "source_filter": null,
      "subject_filter": null,
      "condition_jsonlogic": null,
      "created_at": "2026-02-28T10:00:00Z"
    }
  ],
  "total": 1,
  "limit": 25,
  "offset": 0,
  "has_next": false,
  "has_previous": false
}

3. Get a Single Subscription

GET /api/v1/event_subscription/{id}

Response (200)

Same shape as a single item from Create a Subscription.

Errors

StatusWhen
404Subscription not found

4. Update a Subscription

PATCH /api/v1/event_subscription/{id}

Partial update — only send the fields you want to change.

Request Body (all fields optional)

FieldTypeNotes
namestring
descriptionstring
enabledbooleanUse to enable/disable without deleting
event_typestring
source_filterstring
subject_filterstring
condition_jsonlogicobject

To update the workflow graph (nodes/edges) use the Replace Workflow Graph endpoint instead.

Response (200)

Same shape as Get a Single Subscription.

Errors

StatusWhen
400Invalid payload
404Subscription not found

5. Delete a Subscription

DELETE /api/v1/event_subscription/{id}

Response (200)

json
{
  "status": "deleted"
}

This is a soft delete (deleted_at is set). Already-running workflow runs will complete, but no new runs will be triggered.

Errors

StatusWhen
404Subscription not found

6. Get Workflow Graph

GET /api/v1/event_subscription/{id}/graph

Returns the subscription metadata together with its complete workflow graph (all non-deleted nodes and edges).

Response (200)

json
{
  "subscription": {
    "id": "sub-001",
    "name": "High Heart Rate Alert",
    "event_type": "medipal.vital.heart_rate.high",
    "enabled": true,
    "created_at": "2026-02-28T10:00:00Z"
  },
  "nodes": [
    {
      "id": "node-aaa",
      "event_subscription_id": "sub-001",
      "key": "compute_band",
      "name": "Compute Risk Band",
      "type": "COMPUTE",
      "config": {
        "logic": {
          "if": [
            { ">": [{ "var": "event.data.heart_rate" }, 120] },
            "HIGH",
            "NORMAL"
          ]
        }
      },
      "retry_policy": null,
      "timeout_ms": null,
      "continue_on_error": false,
      "created_at": "2026-02-28T10:00:00Z"
    },
    {
      "id": "node-bbb",
      "event_subscription_id": "sub-001",
      "key": "send_alert",
      "name": "Send Alert Email",
      "type": "ACTION",
      "config": {
        "plugin_instance_id": "pi-smtp-001",
        "plugin_action": "send_email",
        "input_mapping": {
          "to": "alerts@example.com",
          "subject": "High heart rate detected",
          "body": { "var": "nodes.compute_band.output.result" }
        }
      },
      "continue_on_error": false,
      "created_at": "2026-02-28T10:00:00Z"
    }
  ],
  "edges": [
    {
      "id": "edge-001",
      "event_subscription_id": "sub-001",
      "from_node_id": "node-aaa",
      "to_node_id": "node-bbb",
      "label": "score is HIGH",
      "condition_jsonlogic": {
        "==": [{ "var": "nodes.compute_band.output.result" }, "HIGH"]
      }
    }
  ]
}

Errors

StatusWhen
404Subscription not found

7. Replace Workflow Graph

PUT /api/v1/event_subscription/{id}/graph

Atomically replaces the entire workflow graph in a single database transaction. The old nodes and edges are soft-deleted and new ones are created. Node configs are validated at save time.

Request Body

FieldTypeRequiredNotes
subscriptionobjectnoSubscription metadata fields to update simultaneously
nodesarraynoArray of node objects (see below)
edgesarraynoArray of edge objects (see below)

Node Object

FieldTypeRequiredDefaultNotes
idstringnoClient-side ID; used to wire edges in the same request
keystringyesStable key, unique within subscription (used in JSONLogic refs)
namestringnokeyHuman-friendly label
descriptionstringnonullOptional description
typestringyesOne of: ACTION, COMPUTE, SWITCH, JOIN, DELAY, END
configobjectvariesnullType-specific configuration (see Node Types)
retry_policyobjectnonull{ max_attempts, backoff, base_seconds }
timeout_msintegernonullHard timeout for node execution
continue_on_errorbooleannofalseIf true, workflow continues even if this node fails

Edge Object

FieldTypeRequiredDefaultNotes
from_node_idstringyesMust match a node id in the same request
to_node_idstringyesMust match a node id in the same request
labelstringnonullUI label for this branch (e.g. "score < 50")
condition_jsonlogicobjectnonullJSONLogic tree; edge is taken only when truthy

Response (200)

Same shape as Get Workflow Graph, reflecting the newly-created nodes/edges with their server-generated IDs.

Errors

StatusWhen
400Missing required node fields, unknown node ID in edge, invalid config
404Subscription not found

Important Notes

  • Client-side IDs: Provide temporary id values on each node so edges can reference them via from_node_id / to_node_id. The server remaps these to real DB-generated UUIDs and returns the mapping in the response.
  • Config validation: Node configs are validated against type-specific Pydantic models at save time. ACTION and COMPUTE nodes require a non-null config; other types accept null.
  • Atomicity: All old nodes/edges are soft-deleted and new ones created within a single transaction. If any validation fails, the entire operation is rolled back.

8. Simulate a Workflow Run

POST /api/v1/event_subscription/{id}/test

Dry-runs the workflow graph without persisting state or invoking plugins. Use this for previewing and debugging workflows in the UI.

Request Body

FieldTypeRequiredNotes
eventobjectyesA full CloudEvent envelope (see below)

CloudEvent Envelope

json
{
  "id": "test-evt-001",
  "source": "medipal/vitals",
  "type": "medipal.vital.heart_rate.high",
  "subject": "patient/123",
  "time": "2026-02-28T10:00:00Z",
  "specversion": "1.0",
  "datacontenttype": "application/json",
  "data": {
    "heart_rate": 135,
    "patient_id": "patient-123"
  }
}

Response (200)

json
{
  "subscription_id": "sub-001",
  "event_id": "test-evt-001",
  "nodes": [
    {
      "node_id": "node-aaa",
      "key": "compute_band",
      "type": "COMPUTE",
      "status": "completed",
      "input": {
        "logic": {
          "if": [
            { ">": [{ "var": "event.data.heart_rate" }, 120] },
            "HIGH",
            "NORMAL"
          ]
        }
      },
      "output": { "result": "HIGH" },
      "error": null,
      "context": {
        "event": {
          "id": "test-evt-001",
          "type": "medipal.vital.heart_rate.high",
          "data": { "heart_rate": 135 }
        },
        "nodes": {}
      },
      "available_paths": [
        "event",
        "event.data",
        "event.data.heart_rate",
        "event.id",
        "event.type"
      ]
    },
    {
      "node_id": "node-bbb",
      "key": "send_alert",
      "type": "ACTION",
      "status": "completed",
      "input": {
        "to": "alerts@example.com",
        "subject": "High heart rate detected",
        "body": "HIGH"
      },
      "output": { "simulated": true, "input": { "...": "..." } },
      "error": null,
      "context": {
        "event": {
          "id": "test-evt-001",
          "type": "medipal.vital.heart_rate.high",
          "data": { "heart_rate": 135 }
        },
        "nodes": { "compute_band": { "output": { "result": "HIGH" } } }
      },
      "available_paths": [
        "event",
        "event.data",
        "event.data.heart_rate",
        "event.id",
        "event.type",
        "nodes.compute_band.output.result"
      ]
    }
  ],
  "edges_traversed": ["edge-001"]
}

Context Snapshots

In simulation mode, every node in the response includes two extra fields:

FieldTypeDescription
contextobjectThe full data object available to this node at execution time
available_pathsstring[]All dot-notation paths usable in {"var": "..."} expressions at this node

These are only present in simulation responses. Use available_paths to power autocomplete or path pickers in your JSONLogic expression editor.

Simulation Behaviour

AspectReal ExecutionSimulation
DB writesCreates WorkflowRun + node recordsSkipped entirely
Plugin callsInvokes real pluginReturns { "simulated": true, "input": {...} }
DelaysSleeps for delay_msSkipped (instant)
Context snapshotsNot includedcontext + available_paths per node
ResponseSame summary shapeSame shape + context fields

Errors

StatusWhen
400Missing event field or invalid payload
404Subscription not found

9. List Workflow Runs

GET /api/v1/workflow_run

Roles required: admin or integration only.

Query Parameters

ParamTypeDefaultNotes
limitinteger25Min 1
offsetinteger0Min 0
event_subscription_idstringFilter by subscription
statusstringFilter: pending, running, completed, failed
event_typestringFilter by event type
started_at_gtestringISO-8601 lower bound
started_at_ltestringISO-8601 upper bound
completed_at_gtestringISO-8601 lower bound
completed_at_ltestringISO-8601 upper bound
sort_bystringColumn to sort by
sort_dirstringasc or desc

Response (200)

json
{
  "runs": [
    {
      "id": "run-001",
      "event_subscription_id": "sub-001",
      "event_id": "evt-abc",
      "event_type": "medipal.vital.heart_rate.high",
      "trigger_event": { "...full CloudEvent JSON..." },
      "status": "completed",
      "started_at": "2026-02-28T10:00:00Z",
      "completed_at": "2026-02-28T10:00:02Z",
      "error": null,
      "created_at": "2026-02-28T10:00:00Z"
    }
  ],
  "total": 1,
  "limit": 25,
  "offset": 0,
  "has_next": false,
  "has_previous": false
}

Workflow Run Status Values

StatusMeaning
pendingCreated but not yet started
runningActively executing nodes
completedAll nodes finished successfully (or reached END)
failedA node failed and continue_on_error was false

10. Get a Single Workflow Run

GET /api/v1/workflow_run/{id}

Roles required: admin or integration only.

Response (200)

Same shape as a single item from List Workflow Runs.

Errors

StatusWhen
404Workflow run not found

11. List Run Nodes

GET /api/v1/workflow_run/{id}/node

Roles required: admin or integration only.

Returns all node execution records for a workflow run, ordered by created_at ascending (traversal order).

Response (200)

json
{
  "nodes": [
    {
      "id": "rn-001",
      "workflow_run_id": "run-001",
      "node_id": "node-aaa",
      "node_key": "compute_band",
      "node_type": "COMPUTE",
      "status": "completed",
      "input": {
        "logic": {
          "if": [
            { ">": [{ "var": "event.data.heart_rate" }, 120] },
            "HIGH",
            "NORMAL"
          ]
        }
      },
      "output": { "result": "HIGH" },
      "error": null,
      "started_at": "2026-02-28T10:00:00Z",
      "completed_at": "2026-02-28T10:00:01Z",
      "attempts": 0,
      "created_at": "2026-02-28T10:00:00Z"
    },
    {
      "id": "rn-002",
      "workflow_run_id": "run-001",
      "node_id": "node-bbb",
      "node_key": "send_alert",
      "node_type": "ACTION",
      "status": "completed",
      "input": {
        "to": "alerts@example.com",
        "subject": "High heart rate detected",
        "body": "HIGH"
      },
      "output": { "email_id": "msg-xyz" },
      "error": null,
      "started_at": "2026-02-28T10:00:01Z",
      "completed_at": "2026-02-28T10:00:02Z",
      "attempts": 0,
      "created_at": "2026-02-28T10:00:00Z"
    }
  ]
}

Run Node Status Values

StatusMeaning
pendingCreated, waiting for predecessors
runningCurrently executing
completedFinished successfully
failedNode execution raised an error
skippedAll incoming edge conditions evaluated to false

Node Input by Type

Every node records the data it received as input. The shape depends on the node type:

Node Typeinput Shape
COMPUTE{ "logic": <JSONLogic expression> } — the expression that was evaluated
ACTIONThe resolved input_mapping dict sent to the plugin
DELAY{ "delay_ms": <number> } — the configured delay
SWITCHnull — routing is handled by edge conditions
JOINnull — synchronisation only
ENDnull — terminal node

Input is captured before execution, so even failed nodes will have their input recorded for debugging purposes.

Errors

StatusWhen
404Workflow run not found

12. Reconstruct Node Context

POST /api/v1/workflow_run/{id}/context

Reconstructs the full data context that was available to a specific node during a past workflow run. Use this to inspect what variables and paths were accessible when debugging expression failures.

Request Body

FieldTypeRequiredNotes
node_keystringyesThe key of the target node

Response (200)

json
{
  "context": {
    "event": {
      "id": "evt-001",
      "type": "medipal.submission.created",
      "data": {
        "submission_id": "sub-001",
        "questionnaire_id": "q-001",
        "answers": { "q1": "yes", "q2": 5 }
      }
    },
    "nodes": {
      "compute_score": {
        "output": { "result": 85 }
      }
    }
  },
  "available_paths": [
    "event",
    "event.data",
    "event.data.answers",
    "event.data.answers.q1",
    "event.data.answers.q2",
    "event.data.questionnaire_id",
    "event.data.submission_id",
    "event.id",
    "event.type",
    "nodes.compute_score.output.result"
  ]
}

The context contains two namespaces:

  • event — the trigger event that started the run
  • nodes — outputs from all nodes that completed before the target node

The available_paths list contains every dot-notation path usable in {"var": "..."} expressions at this node's position. Use it to power autocomplete in your expression editor.

Errors

StatusWhen
404Workflow run or node key not found

13. Evaluate JSONLogic Expression

POST /api/v1/workflow/evaluate

A sandbox for testing JSONLogic expressions without running a full workflow. Supports two modes:

  • Direct mode — provide the expression and a data dict
  • From-run mode — provide the expression, a past workflow_run_id, and a node_key; the server reconstructs the context automatically

Request Body

FieldTypeRequiredNotes
expressionobjectyesA JSONLogic expression
dataobjectnoDirect mode: the data dict to evaluate against
workflow_run_idstringnoFrom-run mode: ID of the past workflow run
node_keystringnoFrom-run mode: target node key (required with workflow_run_id)

You must provide either data (direct mode) or workflow_run_id + node_key (from-run mode), but not both.

Response (200) — Direct Mode

json
{
  "result": { "value": "HIGH" },
  "data_used": {
    "score": 85,
    "threshold": 80
  },
  "available_paths": ["score", "threshold"]
}

Response (200) — From-Run Mode

json
{
  "result": { "value": true },
  "data_used": {
    "event": { "data": { "heart_rate": 135 } },
    "nodes": { "compute_band": { "output": { "result": "HIGH" } } }
  },
  "available_paths": [
    "event",
    "event.data",
    "event.data.heart_rate",
    "nodes.compute_band.output.result"
  ]
}

Errors

StatusWhen
400Both modes specified, neither mode specified, or missing node_key
404Workflow run or node key not found (from-run mode)

14. List Plugin Definitions

GET /api/v1/plugin_definition

Plugin definitions are auto-discovered at startup from Python entry points in the mp_server.plugins group. They are read-only via the API.

Query Parameters

ParamTypeDefaultNotes
limitinteger25
offsetinteger0
namestringCase-insensitive partial match on name
searchstringCase-insensitive partial match on name
sort_bystringColumn to sort by
sort_dirstringasc or desc

Response (200)

json
{
  "plugin_definitions": [
    {
      "id": "pd-001",
      "name": "mp.smtp_email",
      "version": "v1.0.0",
      "runtime": "package",
      "entrypoint": "mp_plugins.smtp:SmtpPlugin",
      "instance_config_schema": {
        "type": "object",
        "properties": {
          "host": { "type": "string" },
          "port": { "type": "integer" },
          "username": { "type": "string" },
          "password": { "type": "string" }
        },
        "required": ["host", "port"]
      },
      "actions": [
        {
          "name": "send_email",
          "title": "Send Email",
          "description": "Send an email via SMTP",
          "input_schema": {
            "type": "object",
            "properties": {
              "to": { "type": "string" },
              "subject": { "type": "string" },
              "body": { "type": "string" }
            },
            "required": ["to", "subject", "body"]
          },
          "output_schema": {
            "type": "object",
            "properties": {
              "email_id": { "type": "string" }
            }
          },
          "idempotent": false,
          "timeout_seconds": 30
        }
      ],
      "dist_name": "mp-plugins-smtp",
      "dist_commit": "abc123...",
      "enabled": true,
      "created_at": "2026-02-28T10:00:00Z"
    }
  ],
  "total": 1,
  "limit": 25,
  "offset": 0,
  "has_next": false,
  "has_previous": false
}

Key Fields

FieldDescription
runtime"package" (in-process Python) or "service" (external service)
entrypointPython module:class path for in-process plugins
instance_config_schemaJSON Schema describing what config each instance requires
actionsDeclared plugin actions with input/output schemas
actions[].input_schemaJSON Schema used to validate payloads before invoking the plugin
actions[].idempotentWhether the action is safe to retry

15. List Plugin Instances

GET /api/v1/plugin_instance

Query Parameters

ParamTypeDefaultNotes
limitinteger25
offsetinteger0
plugin_definition_idstringFilter by parent definition
display_namestringExact match on display name
enabledbooleanFilter by active flag
searchstringCase-insensitive partial match on display name
sort_bystringColumn to sort by
sort_dirstringasc or desc

Response (200)

json
{
  "plugin_instances": [
    {
      "id": "pi-001",
      "plugin_definition_id": "pd-001",
      "display_name": "Primary SMTP",
      "config_json": { "host": "smtp.example.com", "port": 587 },
      "config_hash": "e3b0c44298fc1c149afb...",
      "enabled": true,
      "created_at": "2026-02-28T10:00:00Z"
    }
  ],
  "total": 1,
  "limit": 25,
  "offset": 0,
  "has_next": false,
  "has_previous": false
}

16. Create a Plugin Instance

POST /api/v1/plugin_instance

Request Body

FieldTypeRequiredDefaultNotes
plugin_definition_idstringyesID of the plugin definition to instantiate
display_namestringyesHuman-friendly label (e.g. "Primary SMTP")
config_jsonobjectnonullInstance-specific config; must match definition's instance_config_schema
enabledbooleanyesWhether the instance is active

Response (200)

json
{
  "id": "pi-001",
  "plugin_definition_id": "pd-001",
  "display_name": "Primary SMTP",
  "config_json": { "host": "smtp.example.com", "port": 587 },
  "config_hash": "e3b0c44298fc1c149afb...",
  "enabled": true,
  "created_at": "2026-02-28T10:00:00Z"
}

config_hash is automatically computed as a SHA-256 of the JSON-serialised config_json. It is used for change detection and idempotency.


17. Get a Single Plugin Instance

GET /api/v1/plugin_instance/{id}

Response (200)

Same shape as Create a Plugin Instance.

Errors

StatusWhen
404Plugin instance not found

18. Update a Plugin Instance

PATCH /api/v1/plugin_instance/{id}

Partial update — only send the fields you want to change.

Request Body (all fields optional)

FieldTypeNotes
display_namestring
config_jsonobjectconfig_hash is recomputed automatically
enabledbooleanUse to enable/disable without deleting

Response (200)

Same shape as Get a Single Plugin Instance.

Errors

StatusWhen
404Plugin instance not found

19. Delete a Plugin Instance

DELETE /api/v1/plugin_instance/{id}

Response

204 No Content — empty body.

This is a soft delete (deleted_at is set). Any workflow nodes referencing this instance will fail at execution time if the instance is deleted or disabled.

Errors

StatusWhen
404Plugin instance not found

20. List Event Definitions

GET /api/v1/event_definition

Returns the registry of known CloudEvent types that can be subscribed to. Use this to populate the event type picker in the UI.

Query Parameters

ParamTypeDefaultNotes
limitinteger25Min 1
offsetinteger0Min 0
event_typestringExact match filter
deprecatedbooleanFilter by deprecation status
searchstringCase-insensitive partial match
sort_bystringColumn to sort by
sort_dirstringasc or desc

Response (200)

json
{
  "event_definitions": [
    {
      "event_type": "medipal.submission.created",
      "schema_version": "v1",
      "description": "Fired when a questionnaire submission is created",
      "data_schema": { "..." },
      "subject_template": "submission/{id}",
      "examples": [],
      "deprecated": false
    }
  ],
  "total": 5,
  "limit": 25,
  "offset": 0,
  "has_next": false,
  "has_previous": false
}

21. List Event Delivery Tasks

GET /api/v1/event_delivery_task

Roles required: admin only.

Admin visibility into the event outbox queue. Use this to inspect failed deliveries, monitor queue depth, and debug event processing issues.

Query Parameters

ParamTypeDefaultNotes
limitinteger25Min 1
offsetinteger0Min 0
statusstringFilter: PENDING, IN_PROGRESS, DONE, FAILED
targetstringFilter by delivery target (e.g. local-handlers)
created_at_gtestringISO-8601 lower bound
created_at_ltestringISO-8601 upper bound
next_attempt_at_gtestringISO-8601 lower bound
next_attempt_at_ltestringISO-8601 upper bound
done_at_gtestringISO-8601 lower bound
done_at_ltestringISO-8601 upper bound
sort_bystringColumn to sort by
sort_dirstringasc or desc

Response (200)

json
{
  "tasks": [
    {
      "id": "task-001",
      "cloudevent": { "...full CloudEvent JSON..." },
      "target": "local-handlers",
      "status": "DONE",
      "error": null,
      "attempts": 1,
      "max_attempts": 3,
      "next_attempt_at": null,
      "locked_until": null,
      "done_at": "2026-02-28T10:00:01Z",
      "created_at": "2026-02-28T10:00:00Z"
    }
  ],
  "total": 1,
  "limit": 25,
  "offset": 0,
  "has_next": false,
  "has_previous": false
}

Delivery Task Status Values

StatusMeaning
PENDINGQueued, waiting for next worker cycle
IN_PROGRESSClaimed by a worker, currently being processed
DONESuccessfully delivered
FAILEDExhausted all max_attempts (default 3)

22. Core Concepts

Event Subscriptions

An Event Subscription is a workflow definition that reacts to CloudEvents. It specifies:

  • Which events to listen for (event_type, plus optional source_filter, subject_filter, and condition_jsonlogic).
  • What to do when a matching event arrives — defined as a directed acyclic graph (DAG) of nodes connected by edges.

Workflow Graph (DAG)

Each subscription owns a graph of nodes and edges:

[COMPUTE: score_band] ──condition──> [ACTION: send_alert] ──> [END]
                       \
                        ──condition──> [ACTION: log_normal] ──> [END]
  • Nodes are computational units (evaluate logic, invoke plugins, branch, delay, join, or stop).
  • Edges are directed connections between nodes, optionally guarded by JSONLogic conditions that determine which path the workflow follows.

Plugins

Plugins are reusable integrations (email, SMS, HTTP, etc.) discovered at startup via Python entry points:

  • Plugin Definition: A read-only record describing a plugin's identity, capabilities (actions), and per-instance configuration schema.
  • Plugin Instance: A configured deployment of a definition (e.g. "Primary SMTP" with specific host/credentials). Workflow ACTION nodes reference a plugin instance by ID.

Workflow Runs

When a CloudEvent matches a subscription, the engine creates a Workflow Run that tracks the entire execution, and a Workflow Run Node per graph node tracking individual status, input, output, and errors.


23. Node Types Reference

ACTION

Invokes a plugin action with resolved input.

Config FieldTypeRequiredDescription
plugin_instance_idstringyesID of the plugin instance to invoke
plugin_actionstringyesAction name (e.g. send_email)
input_mappingobjectnoTemplate for building the plugin payload

input_mapping values are either:

  • Scalar literals (string, int, bool) — used as-is.
  • JSONLogic dicts — evaluated against the current context to produce the value. Example: {"var": "event.data.user_id"} resolves to the actual user ID from the triggering event.
json
{
  "plugin_instance_id": "pi-smtp-001",
  "plugin_action": "send_email",
  "input_mapping": {
    "to": { "var": "event.data.patient_email" },
    "subject": "Alert Notification",
    "body": { "var": "nodes.compute_message.output.result" }
  }
}

COMPUTE

Evaluates a JSONLogic expression and stores the result.

Config FieldTypeRequiredDescription
logicobjectyesJSONLogic expression to evaluate

The result is normalised to a dict. If the expression returns a scalar, it is wrapped as {"result": <value>}.

json
{
  "logic": {
    "if": [{ ">": [{ "var": "event.data.score" }, 80] }, "HIGH", "NORMAL"]
  }
}

SWITCH

Conditional branching node. The node itself performs no computation — routing is controlled entirely by JSONLogic conditions on its outgoing edges.

Config FieldTypeRequiredDescription
modestringnoFIRST_TRUE (default) or ALL_TRUE
  • FIRST_TRUE: Only the first outgoing edge whose condition passes is taken.
  • ALL_TRUE: All outgoing edges whose conditions pass are taken (parallel branches).

Note: In the current engine implementation, edge conditions are evaluated during graph traversal regardless of mode. The mode field is advisory for UI rendering.

JOIN

Synchronises multiple incoming branches before continuing.

Config FieldTypeRequiredDescription
modestringnoALL, ANY, or N_OF_M
n_requiredintegerwhen N_OF_MNumber of branches that must complete
timeout_msintegernoOptional timeout before failing the join
  • ALL: Wait for all incoming branches (default when mode is omitted).
  • ANY: Proceed when any branch completes.
  • N_OF_M: Proceed once n_required branches have completed.

If n_required is specified and fewer predecessors have completed, the node raises a RuntimeError and fails the workflow run.

DELAY

Pauses workflow execution for a fixed duration or until a specific time.

Config FieldTypeRequiredDescription
delay_msintegerone ofFixed delay in milliseconds (min 0)
until_templatestringone ofTemplate rendering to an RFC-3339 timestamp

One of delay_ms or until_template must be provided.

Simulation mode: Delays are skipped entirely for instant previews.

END

Terminates workflow execution. Has no configuration fields.

When the executor reaches an END node, traversal stops immediately and the workflow run is marked completed. Any remaining unvisited nodes are not executed.

json
{
  "type": "END",
  "config": {}
}

24. Execution Engine

Lifecycle

CloudEvent published
  → EventBus.publish()
    → Outbox enqueue (at-least-once guarantee)
      → OutboxWorker.drain()
        ├─ Local handlers (best-effort)
        ├─ SubscriptionRouter.route()
        │   ├─ Load enabled subscriptions matching event.type
        │   ├─ Apply filters (source, subject, condition_jsonlogic)
        │   ├─ Load graph (nodes + edges)
        │   └─ For each matching subscription:
        │       └─ WorkflowExecutor.execute()
        └─ WebhookDispatcher.dispatch()

Execution Steps (WorkflowExecutor)

  1. Build adjacency structures from edges (in-edges, out-edges, in-degree).
  2. Topological sort using Kahn's algorithm. Raises ValueError if the graph contains a cycle.
  3. Create DB records — one WorkflowRun (status: running) and one WorkflowRunNode per node (status: pending). Skipped in simulation mode.
  4. Traverse nodes in topological order:
    • Evaluate incoming edge conditions via JSONLogic.
    • If ALL incoming edges fail → mark node skipped, continue.
    • Otherwise → mark node running, dispatch by type.
    • Store output in WorkflowContext for downstream nodes.
    • On success → mark completed with input/output data.
    • On failure → mark failed with error; if continue_on_error is false, fail the entire run.
    • On END node → stop traversal.
  5. Finalise run — mark WorkflowRun as completed or failed.

Edge Condition Evaluation

  • An edge with no condition_jsonlogic always passes (unconditional).
  • An edge with a condition passes only if the JSONLogic evaluates to truthy.
  • A node is skipped only when ALL incoming edges have conditions AND all of them fail.
  • Source nodes (no incoming edges) are always reachable.

Error Handling

Node SettingBehaviour on Failure
continue_on_error: false (default)Node fails → entire run fails immediately
continue_on_error: trueNode fails → logged, workflow continues

Subscription-level failures are isolated — one subscription failing does not block others from executing for the same event.


25. JSONLogic Context

All JSONLogic expressions (edge conditions, compute logic, input mappings, subscription conditions) are evaluated against a context object with two namespaces:

json
{
  "event": {
    "id": "evt-001",
    "source": "medipal/vitals",
    "type": "medipal.vital.heart_rate.high",
    "subject": "patient/123",
    "time": "2026-02-28T10:00:00Z",
    "data": {
      "heart_rate": 135,
      "patient_id": "patient-123"
    }
  },
  "nodes": {
    "compute_band": {
      "output": { "result": "HIGH" }
    },
    "send_alert": {
      "output": { "email_id": "msg-xyz" }
    }
  }
}

Referencing Event Data

json
{ "var": "event.data.heart_rate" }
{ "var": "event.type" }
{ "var": "event.subject" }

Referencing Prior Node Outputs

json
{ "var": "nodes.compute_band.output.result" }
{ "var": "nodes.send_alert.output.email_id" }

Condition Examples

json
// Edge condition: only take this path if heart rate > 120
{ ">": [{ "var": "event.data.heart_rate" }, 120] }

// Edge condition: only take this path if compute node returned "HIGH"
{ "==": [{ "var": "nodes.compute_band.output.result" }, "HIGH"] }

// Subscription condition: only trigger for specific source
{ "==": [{ "var": "event.source" }, "medipal/vitals"] }

// Compute node logic: categorise into bands
{
  "if": [
    { ">": [{ "var": "event.data.score" }, 80] }, "HIGH",
    { ">": [{ "var": "event.data.score" }, 50] }, "MEDIUM",
    "LOW"
  ]
}

Questionnaire Submission Enrichment

When a medipal.submission.created or medipal.submission.anonymous_created event is published, the answers payload is automatically enriched with name-keyed lookups for scoring data. This makes it possible to reference scoring variables and functions by human-readable name in JSONLogic expressions, instead of by UUID.

The enrichment adds two fields inside event.data.answers.scoring:

FieldSourceDescription
variables_by_nameRe-indexed from scoring.variables (UUID-keyed)Dict keyed by variable name instead of UUID
functions_by_nameRe-indexed from scoring.functions (list)Dict keyed by function name instead of index

The original UUID-keyed variables and index-based functions are preserved unchanged. Events without a scoring block in their answers are not affected — the enrichment is a no-op for non-submission events.

Example: Accessing a Scoring Variable by Name

Given a submission where the scoring engine computed a bmi variable (stored internally as variables["uuid-abc-123"]), the enriched event data looks like:

json
{
  "event": {
    "data": {
      "answers": {
        "scoring": {
          "variables": {
            "uuid-abc-123": {
              "id": "uuid-abc-123",
              "name": "bmi",
              "value": 22.5
            }
          },
          "variables_by_name": {
            "bmi": { "id": "uuid-abc-123", "name": "bmi", "value": 22.5 }
          }
        }
      }
    }
  }
}

To access the BMI value in a JSONLogic expression:

json
{ "var": "event.data.answers.scoring.variables_by_name.bmi.value" }

To use it in a condition (e.g., route to a different path if BMI > 30):

json
{
  ">": [{ "var": "event.data.answers.scoring.variables_by_name.bmi.value" }, 30]
}

To access a scoring function by name:

json
{ "var": "event.data.answers.scoring.functions_by_name.calc_bmi.formula" }

Important: The var operator uses dot notation only. Bracket notation like variables["uuid-abc-123"] is not supported by JSONLogic and will silently return null. Always use variables_by_name.<name> to access scoring variables in workflow expressions.

Discovering Available Paths

If you are unsure which paths are available at a given node, use one of these approaches:

  1. Simulation — Call POST /api/v1/event_subscription/{id}/test with a sample event. Each node in the response includes available_paths.

  2. Context reconstruction — For a past run, call POST /api/v1/workflow_run/{id}/context with { "node_key": "..." } to see the exact context and paths that were available.

  3. Expression sandbox — Use POST /api/v1/workflow/evaluate to test an expression against custom data or a past run's context.


26. Event Processing Pipeline

End-to-End Flow

Application Code


EventBus.publish(CloudEvent)

  ├─ 1. (Optional) Validate event type is registered
  ├─ 2. Enqueue to outbox (target: "local-handlers")
  └─ 3. (Optional) Fire local handlers immediately (best-effort)


OutboxWorker.drain() ◄── Scheduler / Background loop

  ├─ Lock batch of pending messages (row-level locks)
  ├─ For each message:
  │   ├─ Parse CloudEvent
  │   ├─ Deliver to local-handlers
  │   ├─ Route to SubscriptionRouter ──────────┐
  │   ├─ Dispatch to WebhookDispatcher          │
  │   └─ Mark DONE (or FAILED for retry)        │
  │                                              │
  │         ┌────────────────────────────────────┘
  │         ▼
  │   SubscriptionRouter.route(event)
  │     ├─ Query subscriptions by event_type
  │     ├─ Filter: enabled, source_filter, subject_filter, condition_jsonlogic
  │     ├─ Load nodes + edges for each match
  │     └─ For each match:
  │         └─ WorkflowExecutor.execute()
  │             ├─ Topological sort
  │             ├─ Create WorkflowRun + RunNodes
  │             ├─ Traverse DAG
  │             │   ├─ ACTION → PluginService.invoke_instance()
  │             │   ├─ COMPUTE → jsonLogic()
  │             │   ├─ DELAY → sleep
  │             │   ├─ SWITCH → noop (edges handle routing)
  │             │   ├─ JOIN → verify predecessors
  │             │   └─ END → stop
  │             └─ Mark run completed/failed

  └─ Return processed count

Delivery Guarantees

ComponentGuarantee
OutboxAt-least-once (handlers must be idempotent)
Subscription routingBest-effort (failures isolated per subscription)
Webhook dispatchBest-effort (failures isolated per webhook)
Plugin invocationAt-most-once per node execution

27. Important UI Considerations

Pagination

All list endpoints share the same pagination contract:

?limit=25&offset=0

Use has_next / has_previous booleans to enable/disable pagination controls. total gives the full count for "showing X of Y" labels.

Workflow Run Status Badges

StatusColour
pendinggrey
runningblue
completedgreen
failedred

Run Node Status Badges

StatusColour
pendinggrey
runningblue
completedgreen
failedred
skippedyellow

Node Type Icons

Suggested icon mapping for the graph editor:

TypeIcon Suggestion
ACTIONLightning bolt / plug
COMPUTECalculator / code brackets
SWITCHGit branch / fork
JOINMerge arrows / funnel
DELAYClock / hourglass
ENDStop sign / circle with border

Graph Editor

When building the visual graph editor:

  1. Node creation — Present the 6 node types as a palette. When a user selects a type, show the appropriate config form based on the type.
  2. Edge creation — Allow drawing connections between nodes. Offer an optional JSONLogic condition editor (or a simplified rule builder that generates JSONLogic).
  3. Save — Collect all nodes and edges, assign temporary client IDs, and call PUT /api/v1/event_subscription/{id}/graph. The server returns the same graph with real IDs.
  4. Simulate — After saving, offer a "Test" button that opens a panel where the user can paste a sample CloudEvent JSON and see which nodes would execute and what outputs they would produce.

Plugin Instance Picker

For ACTION node configuration:

  1. Fetch available plugin instances from GET /api/v1/plugin_instance.
  2. Once a user selects a plugin instance, load its parent definition from GET /api/v1/plugin_definition to show available actions.
  3. When the user selects an action, display the input_schema to help them build the input_mapping.

Event Type Picker

For subscription creation:

  1. Fetch registered event types from GET /api/v1/event_definition.
  2. Present as a searchable dropdown.
  3. Show description and data_schema (if available) to help users understand what data the event carries.

Enable / Disable Toggle

Use PATCH /api/v1/event_subscription/{id} with { "enabled": true|false } to toggle a subscription without deleting it.

Use PATCH /api/v1/plugin_instance/{id} with { "enabled": true|false } to toggle a plugin instance.

Timestamps

All timestamps are ISO-8601 UTC. Convert to the user's local timezone for display.