Command Sink
The Command sink executes a CLI command for every event that matches its filter. It runs commands sequentially from an internal queue, ensuring that one command finishes before the next one starts.
This is ideal for integrating with existing scripts, legacy systems, or performing local actions like sending notifications via a custom CLI tool, updating local files, or triggering deployments.
Getting Started
Add a Command sink to your config.yaml with the command you want to run:
sink:
notify_admin:
type: command
command: "mail -s 'New event: #root.event_id' admin@example.com"The sink will execute this command for every event. It uses the templating engine for customizing the command string.
Core Concepts
Real-time Delivery and Retries
The Command sink listens for new events in real-time. When a matching event arrives, it is added to an internal queue and processed sequentially.
- Success: A command is considered successful if it returns an exit code of
0. - Failure: Any non-zero exit code is treated as a failure. The sink will retry failed commands up to
max_retries(default: 3). - Circuit Breaker: To prevent runaway processes or system overload during persistent failures, the sink includes a circuit breaker. If 5 commands fail in a row, the sink will stop executing any commands for 10 minutes (the "cool-off" period).
Batch Processing
If many events accumulate in the queue (controlled by batch_threshold), the sink can switch to "batch mode". In this mode, it executes a single command for all events in the current batch.
sink:
bulk_process:
type: command
command: "process_one.sh --id #root.event_id"
batch_command: "process_batch.sh --payloads '$root'"
batch_threshold: 10- Implicit Batching: If
batch_commandis not provided, it uses the regularcommandtemplate, but therootobject in the template context becomes a list of events instead of a single event.
Note
If you enable batching (batch_threshold > 1) and do not provide a batch_command, your primary command must be compatible with both a single object and a list of objects. For example, echo #root.event_id will fail when root is a list. In most cases, it is recommended to provide an explicit batch_command when using batching.
- Explicit Batching: Providing a
batch_commandallows you to use a different CLI tool or different flags optimized for bulk operations.
TTL (Time-To-Live)
TTL is enabled by default with a default_ttl of 1h. Events older than their TTL are skipped and marked as processed with a skip message. This prevents the sink from running thousands of stale commands after a long downtime.
Persistence
All executions are recorded in the command_sink_deliveries table. Every received event has an entry with a processed flag. If the command fails, the error is logged, and the sink will periodically retry the event.
Automatic Shell Quoting
The Command sink automatically quotes all interpolated values using shlex.quote(). This ensures that even if your event data contains spaces, quotes, or other special shell characters, the command will remain safe and valid. You do not need to manually add quotes around #root or $root placeholders in your command or batch_command configuration.
Configuration
Robust List Format (Recommended)
To avoid shell interpretation issues and reliably handle special characters, use the list-based command format. This method passes arguments directly to the operating system, bypassing the shell entirely.
sink:
openclaw:
type: command
# Use a list for direct-argv execution (bypasses shell)
command:
- "/usr/local/bin/openclaw"
- "agent"
- "--session-id"
- "main"
- "--message"
# $root will be safely interpolated as JSON
- "Event just happened: $root"Minimal Configuration (Shell)
Using a string will trigger shell execution, which is convenient for piping or redirection but requires careful quoting.
sink:
my_cmd:
type: command
command: "echo #root.event_id >> events.log"Defaults: match: "*", batch_threshold: 10, max_retries: 3, ttl_enabled: true, default_ttl: "1h".
Full Configuration
sink:
advanced_cmd:
type: command
command: ["python", "process.py", "--data", "$root.data"]
batch_command: ["python", "process_batch.py", "--json", "$root"]
batch_threshold: 50
max_retries: 5
retry_interval: "5m"
match:
- "user.auth.*"
- "payment.processed"
ttl_enabled: true
default_ttl: "12h"
event_ttl:
"user.auth.*": "1h"Configuration Reference
| Parameter | Type | Default | Description |
|---|---|---|---|
type | string | — | Must be command. |
command | `string | list` | Required |
batch_command | `string | list` | — |
batch_threshold | int | 10 | Number of events in queue required to trigger batch processing. |
max_retries | int | 3 | Maximum number of retries for a failed command. |
retry_interval | string | "10s" | Minimum wait between retries. |
match | `string | list` | "*" |
ttl_enabled | bool | true | Whether to skip events older than their TTL. |
default_ttl | string | "1h" | Default TTL for events without a specific rule. |
event_ttl | dict | {} | Per-type TTL overrides. Keys use the same matching patterns as match. |
Template Interpolation
The Command sink uses the templating engine to dynamically construct commands.
#root.path: Raw string interpolation (e.g.,evt_123).$root.path: JSON string interpolation (e.g.,{"key":"value"}).
When using the list format, values are interpolated as-is without extra shell quoting. When using the string format, values are automatically quoted using shlex.quote() for shell safety.
Single Event Context
When processing one by one, root is a single event object:
#root.event_id->evt_123$root.data->{"key":"value"}
Batch Context
When processing a batch, root is a list of event objects:
$root->[{"event_id": "evt_1", ...}, {"event_id": "evt_2", ...}]#root.0.event_id->evt_1(Accessing by index)
