Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions agentix/runtime/PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ instances are intentionally outside the callable boundary. Put remote
code behind an importable top-level function instead.

Args and kwargs travel separately as `arguments = pickle.dumps((args, kwargs))`.
Return values travel as `value = pickle.dumps(result)`.
Return values travel as `value = pack(result)` using
`agentix.runtime.shared.codec`.

```python
from my_project.tasks import run
Expand Down Expand Up @@ -51,7 +52,7 @@ runtime.

```text
call {call_id, callable, arguments}
call:result {call_id, value} # value is pickle.dumps(result)
call:result {call_id, value} # value is pack(result)
call:error {call_id, error}
cancel {call_id}
```
Expand Down Expand Up @@ -127,4 +128,4 @@ Client mapping:
- Per-call timeouts; callers use `asyncio.wait_for(...)`.
- Retries; calls are at-most-once.
- Auth/TLS; providers own that layer.
- Annotation-driven validation on the wire (args/return are pickle today).
- Annotation-driven validation on the wire (args are pickle today).
23 changes: 19 additions & 4 deletions agentix/runtime/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,23 @@ def _decode_payload(raw: Any) -> dict[str, Any]:
return raw


def _unpickle_value(raw: Any) -> Any:
return pickle.loads(raw) if raw is not None else None
def _decode_result_value(raw: Any) -> Any:
if raw is None:
return None
if isinstance(raw, memoryview):
raw = raw.tobytes()
elif isinstance(raw, bytearray):
raw = bytes(raw)
if not isinstance(raw, bytes):
raise RuntimeError(
"sandbox return value must be msgpack bytes; "
f"got {type(raw).__name__}. "
"This security boundary prevents host-side pickle deserialization."
)
try:
return unpack(raw)
except Exception as exc:
raise RuntimeError(f"failed to decode sandbox return payload: {type(exc).__name__}: {exc}") from exc


class RuntimeClient:
Expand Down Expand Up @@ -271,7 +286,7 @@ async def _try_http_fast_path(
if not isinstance(reply, dict):
raise RuntimeError("invalid /call reply payload")
if reply.get("ok") is True:
return "result", _unpickle_value(reply.get("value"))
return "result", _decode_result_value(reply.get("value"))
if reply.get("ok") is False:
err = RemoteError.model_validate(reply.get("error") or {})
return "error", err
Expand Down Expand Up @@ -325,7 +340,7 @@ async def remote(
kind, data = await q.get()
if kind == "result":
terminated = True
return cast(R, _unpickle_value(data.get("value")))
return cast(R, _decode_result_value(data.get("value")))
if kind == "error":
err = RemoteError.model_validate(data["error"])
terminated = True
Expand Down
10 changes: 5 additions & 5 deletions agentix/runtime/server/worker/invoker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
"""Worker-side callable execution.

The worker unpickles the callable + args/kwargs, calls it, and pickles
the result back. No shape detection, no TypeAdapter validation — pickle
preserves Python object identity end to end.
The worker unpickles the callable + args/kwargs, calls it, and
msgpack-encodes the result back.

A coroutine function is awaited directly on the worker's event loop; a
plain (sync) function runs in a thread via ``asyncio.to_thread`` so a
Expand Down Expand Up @@ -31,6 +30,7 @@
from typing import Any

from agentix.runtime.shared.callables import display_name_for
from agentix.runtime.shared.codec import pack
from agentix.runtime.shared.models import RemoteError, RemoteRequest, RemoteResponse
from agentix.utils import context as _context
from agentix.utils.trace._bridge import DISPATCH_CALL_ID
Expand Down Expand Up @@ -86,13 +86,13 @@ async def call(self, fn: Any, request: RemoteRequest) -> RemoteResponse:
),
)
try:
payload = pickle.dumps(result)
payload = pack(result)
except Exception as exc:
return RemoteResponse(
ok=False,
error=RemoteError(
type="ResultEncodeError",
message=f"failed to pickle return value: {exc}",
message=f"failed to encode return value: {exc}",
),
)
return RemoteResponse(ok=True, value=payload)
Expand Down
4 changes: 2 additions & 2 deletions agentix/runtime/shared/framing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
─── worker → runtime ─────────────────────────────────────
ready {} — sent once after worker startup
boot_error {error} — sent once if startup fails
result {call_id, value} — call succeeded (value is pickle bytes)
result {call_id, value} — call succeeded (value is msgpack bytes)
error {call_id, error} — call failed
sio_open {namespace} — open a side-channel namespace
sio_emit {namespace, event, data} — emit side-channel data
Expand All @@ -32,7 +32,7 @@

`callable` is an import-path `RemoteCallable` string
(`module::qualname`); `arguments` is pickle.dumps((args, kwargs)); the
worker pickles the return value back into `value`.
worker msgpack-encodes the return value back into `value`.
"""

from __future__ import annotations
Expand Down
6 changes: 3 additions & 3 deletions agentix/runtime/shared/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
and the worker subprocess. Both client and server import from here.

Wire encoding: callable identity travels as an import path
(`module::qualname`). Args, kwargs, and return values travel as stdlib
pickle blobs so arbitrary Python values can cross the boundary.
(`module::qualname`). Args/kwargs travel as stdlib pickle blobs, while
return values travel as msgpack blobs via `agentix.runtime.shared.codec`.
"""

from __future__ import annotations
Expand Down Expand Up @@ -57,7 +57,7 @@ class RemoteError(BaseModel):


class RemoteResponse(BaseModel):
"""Internal worker response. `value` is a pickle blob on success."""
"""Internal worker response. `value` is a msgpack blob on success."""

ok: bool
value: bytes | None = None
Expand Down
8 changes: 4 additions & 4 deletions docs/reference/architecture.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ arguments = pickle.dumps((args, kwargs))
```

The runtime worker imports the module, resolves the function, unpickles
the arguments, and invokes it. Return values travel back as pickle
the arguments, and invokes it. Return values travel back as msgpack
blobs inside `call:result`.

## Call Flow
Expand All @@ -102,8 +102,8 @@ blobs inside `call:result`.
4. Client emits `call` on Socket.IO `/rpc`.
5. Runtime server forwards the request to the worker subprocess.
6. Worker imports `app`, resolves `run`, unpickles args, and calls it.
7. Worker pickles the return value.
8. Server emits `call:result`; client unpickles the value and returns it.
7. Worker msgpack-encodes the return value.
8. Server emits `call:result`; client msgpack-decodes the value and returns it.
```

## Transports
Expand Down Expand Up @@ -158,7 +158,7 @@ For each call, the worker:
1. resolves the `RemoteCallable` import path
2. unpickles `(args, kwargs)`
3. calls the callable (awaiting when the return value is awaitable)
4. pickles the return value
4. msgpack-encodes the return value

The worker also hosts the sandbox-side `agentix.sio` bridge. Plugin
namespaces register inside the worker and forward events through
Expand Down
6 changes: 3 additions & 3 deletions tests/runtime/client/test_main_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ async def test_remote_call_to_importable_module():
from tests._user_app_target import add, greet

try:
import pickle
from agentix.runtime.shared.codec import unpack

resp = await mp.call(request_for(greet, kwargs={"name": "world"}))
assert resp.ok, resp.error
assert pickle.loads(resp.value) == "hello world"
assert unpack(resp.value) == "hello world"

# Second function on the same module should reuse the same worker.
resp2 = await mp.call(request_for(add, kwargs={"a": 3, "b": 4}))
assert resp2.ok, resp2.error
assert pickle.loads(resp2.value) == 7
assert unpack(resp2.value) == 7

finally:
await mp.shutdown()
Expand Down
21 changes: 21 additions & 0 deletions tests/runtime/client/test_robustness.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from __future__ import annotations

import os
import pickle
import socket

import pytest
Expand All @@ -18,10 +20,17 @@
RuntimeClient,
RuntimeUnreachable,
WorkerExited,
_decode_result_value,
)
from agentix.runtime.shared.codec import pack
from tests._worker_target import count_exec_and_sleep, self_sigkill


class _SandboxPickleExploit:
def __reduce__(self):
return (os.putenv, ("AGENTIX_UNSAFE_PICKLE_TRIGGERED", "1"))


@pytest.mark.asyncio
async def test_call_timeout_when_call_exceeds_deadline(live_server):
base_url = await live_server()
Expand Down Expand Up @@ -80,3 +89,15 @@ async def test_fail_pending_drains_queues_with_fatal_error():
assert data is err
finally:
await client._client.aclose()


def test_decode_result_value_accepts_msgpack_value() -> None:
assert _decode_result_value(pack({"ok": True})) == {"ok": True}


def test_decode_result_value_rejects_pickle_exploit(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("AGENTIX_UNSAFE_PICKLE_TRIGGERED", raising=False)
payload = pickle.dumps(_SandboxPickleExploit())
with pytest.raises(RuntimeError, match="failed to decode sandbox return payload"):
_decode_result_value(payload)
assert os.environ.get("AGENTIX_UNSAFE_PICKLE_TRIGGERED") is None
17 changes: 9 additions & 8 deletions tests/runtime/server/worker/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import pytest

from agentix.runtime.server.worker import RuntimeWorkerClient
from agentix.runtime.shared.codec import unpack
from agentix.runtime.shared.models import RemoteRequest
from tests import _worker_target as target
from tests._rpc_helpers import request_for
Expand All @@ -34,8 +35,8 @@ async def test_subprocess_worker_round_trip():
try:
resp = await mp.call(request_for(target.echo, kwargs={"msg": "hi"}))
assert resp.ok, resp.error
result = pickle.loads(resp.value)
assert result.msg == "echo:hi"
result = unpack(resp.value)
assert result["msg"] == "echo:hi"
finally:
await mp.shutdown()

Expand All @@ -53,7 +54,7 @@ async def test_subprocess_worker_handles_many_concurrent_calls():
responses = await asyncio.gather(
*(mp.call(request_for(target.echo, kwargs={"msg": str(i)})) for i in range(25))
)
msgs = sorted(pickle.loads(r.value).msg for r in responses)
msgs = sorted(unpack(r.value)["msg"] for r in responses)
assert msgs == sorted(f"echo:{i}" for i in range(25))
finally:
await mp.shutdown()
Expand Down Expand Up @@ -93,8 +94,8 @@ async def test_subprocess_worker_ignores_cwd_agentix_package(
try:
resp = await mp.call(request_for(target.echo, kwargs={"msg": "shadow"}))
assert resp.ok, resp.error
result = pickle.loads(resp.value)
assert result.msg == "echo:shadow"
result = unpack(resp.value)
assert result["msg"] == "echo:shadow"
finally:
await mp.shutdown()

Expand All @@ -110,13 +111,13 @@ async def test_subprocess_worker_child_reading_stdin_does_not_steal_frames():
try:
r1 = await mp.call(request_for(target.spawn_stdin_reading_child))
assert r1.ok, r1.error
assert pickle.loads(r1.value) == 0
assert unpack(r1.value) == 0

# If the child had stolen frame bytes, the pipe would be desynced
# and this call would hang or return garbage.
r2 = await mp.call(request_for(target.echo, kwargs={"msg": "after"}))
assert r2.ok, r2.error
assert pickle.loads(r2.value).msg == "echo:after"
assert unpack(r2.value)["msg"] == "echo:after"
finally:
await mp.shutdown()

Expand Down Expand Up @@ -179,7 +180,7 @@ async def test_subprocess_worker_respawns_after_death():

r2 = await mp.call(request_for(target.echo, kwargs={"msg": "two"}))
assert r2.ok, r2.error
assert pickle.loads(r2.value).msg == "echo:two"
assert unpack(r2.value)["msg"] == "echo:two"
assert mp._worker is not worker1 # a fresh worker was spawned

# The dead worker must be torn down on respawn, not leaked: its drain
Expand Down
5 changes: 3 additions & 2 deletions tests/runtime/shared/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from agentix.provider.base import SandboxConfig, SandboxResource
from agentix.runtime.shared.callables import RemoteCallable
from agentix.runtime.shared.codec import pack, unpack
from agentix.runtime.shared.models import RemoteError, RemoteRequest, RemoteResponse

BUNDLE_REF = "/cache/agentix/bundles/sha256-deadbeef"
Expand All @@ -33,9 +34,9 @@ def test_remote_callable_rejects_non_callable():


def test_remote_response_ok_shape():
resp = RemoteResponse(ok=True, value=pickle.dumps({"x": 1}))
resp = RemoteResponse(ok=True, value=pack({"x": 1}))
assert resp.error is None
assert pickle.loads(resp.value) == {"x": 1}
assert unpack(resp.value) == {"x": 1}


def test_remote_response_error_shape():
Expand Down
14 changes: 5 additions & 9 deletions tests/runtime/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ async def _on_result(data):
await sio.disconnect()

assert payload["call_id"] == "call-ok"
import pickle

result = pickle.loads(payload["value"])
assert result.msg == "echo:hi"
result = unpack(payload["value"])
assert result["msg"] == "echo:hi"


async def test_socketio_bad_callable_returns_error(use_inprocess_worker, live_server):
Expand Down Expand Up @@ -97,7 +95,7 @@ async def test_client_remote_round_trip(use_inprocess_worker, live_server):
base_url = await live_server()
async with RuntimeClient(base_url) as c:
result = await c.remote(target.echo, msg="hello")
assert result.msg == "echo:hello"
assert result["msg"] == "echo:hello"


async def test_client_remote_http_fast_path_falls_back_to_sio(use_inprocess_worker, live_server):
Expand Down Expand Up @@ -157,8 +155,7 @@ async def _on_result(data):
await sio.disconnect()

assert payload["call_id"] == call_id
import pickle as _pickle
assert _pickle.loads(payload["value"]) == 1, "fn must have run exactly once"
assert unpack(payload["value"]) == 1, "fn must have run exactly once"


async def test_runtime_replays_unacked_result_after_reconnect(use_inprocess_worker, live_server):
Expand Down Expand Up @@ -210,8 +207,7 @@ async def _on_result(data):
await sio_b.disconnect()

assert payload["call_id"] == call_id
import pickle as _pickle
assert _pickle.loads(payload["value"]) == 1, "fn must run exactly once"
assert unpack(payload["value"]) == 1, "fn must run exactly once"


async def test_client_remote_http_fallback_does_not_double_execute(use_inprocess_worker, live_server):
Expand Down