class ACP::StdioTransport

Overview

Implements the ACP transport over a pair of IO objects. In the typical case these are the stdin (for writing) and stdout (for reading) of a spawned agent child process.

Incoming messages are read in a background fiber and placed into a buffered channel so that #receive never blocks the writer.

Thread safety: Crystal fibers are cooperatively scheduled on a single thread, so we don't need mutexes — just channels.

Direct Known Subclasses

Defined in:

acp/transport.cr

Constant Summary

DEFAULT_MAX_LINE_BYTES = (16 * 1024) * 1024

Hard cap on the size of a single incoming JSON-RPC line. ACP messages are typically a few KiB; 16 MiB is generous enough for an unusually large ContentBlock (e.g. embedded image data) while preventing a runaway / malicious agent from OOM-ing the client by streaming an unbounded line.

SENSITIVE_METHODS = Set {"authenticate"}

JSON-RPC method names whose params are treated as sensitive. The transport redacts these from DEBUG-level frame logs so a session with debug logging enabled does not write credentials/tokens to log files. Add to this set if new methods carry secrets.

Constructors

Instance Method Summary

Instance methods inherited from class ACP::Transport

close : Nil close, closed? : Bool closed?, receive : JSON::Any | Nil receive, send(message : Hash(String, JSON::Any)) : Nil send

Constructor Detail

def self.new(reader : IO, writer : IO, buffer_size : Int32 = 256, max_line_bytes : Int32 = DEFAULT_MAX_LINE_BYTES) #

Creates a new stdio transport.

  • reader — the IO to read incoming JSON-RPC messages from (typically the agent process's stdout).
  • writer — the IO to write outgoing JSON-RPC messages to (typically the agent process's stdin).
  • buffer_size — how many messages to buffer in the channel before back-pressuring the reader fiber. Default 256.
  • max_line_bytes — abort and log a warning if a single incoming line exceeds this many bytes. Default 16 MiB.

[View source]

Instance Method Detail

def close : Nil #

Closes the transport, signaling the reader to stop and closing the underlying IO objects.


[View source]
def closed? : Bool #

Returns true if the transport has been closed.


[View source]
def incoming : Channel(JSON::Any | Nil) #

The channel that the reader fiber pushes parsed messages into. A nil value signals that the reader has stopped (EOF or error).


[View source]
def receive(timeout : Time::Span) : JSON::Any | Nil #

Receives with a timeout. Returns nil if no message arrives within the given duration.


[View source]
def receive : JSON::Any | Nil #

Receives the next incoming JSON-RPC message. Blocks until a message is available. Returns nil if the transport is closed or the reader encountered EOF.


[View source]
def send(message : Hash(String, JSON::Any)) : Nil #

Sends a JSON-RPC message over the transport. The message is serialized as a single line of JSON followed by a newline.


[View source]