diff --git a/agentix/runtime/PROTOCOL.md b/agentix/runtime/PROTOCOL.md index 784d4b7..9b6a37a 100644 --- a/agentix/runtime/PROTOCOL.md +++ b/agentix/runtime/PROTOCOL.md @@ -22,7 +22,8 @@ instances are intentionally outside the callable boundary. Put remote code behind an importable top-level function instead. Args and kwargs travel separately as `arguments = pickle.dumps((args, kwargs))`. -Return values travel as `value = pickle.dumps(result)`. +Return values travel as `value = pack(result)` using +`agentix.runtime.shared.codec`. ```python from my_project.tasks import run @@ -51,7 +52,7 @@ runtime. ```text call {call_id, callable, arguments} -call:result {call_id, value} # value is pickle.dumps(result) +call:result {call_id, value} # value is pack(result) call:error {call_id, error} cancel {call_id} ``` @@ -127,4 +128,4 @@ Client mapping: - Per-call timeouts; callers use `asyncio.wait_for(...)`. - Retries; calls are at-most-once. - Auth/TLS; providers own that layer. -- Annotation-driven validation on the wire (args/return are pickle today). +- Annotation-driven validation on the wire (args are pickle today). diff --git a/agentix/runtime/client/client.py b/agentix/runtime/client/client.py index 51178e8..d17cdfd 100644 --- a/agentix/runtime/client/client.py +++ b/agentix/runtime/client/client.py @@ -108,8 +108,23 @@ def _decode_payload(raw: Any) -> dict[str, Any]: return raw -def _unpickle_value(raw: Any) -> Any: - return pickle.loads(raw) if raw is not None else None +def _decode_result_value(raw: Any) -> Any: + if raw is None: + return None + if isinstance(raw, memoryview): + raw = raw.tobytes() + elif isinstance(raw, bytearray): + raw = bytes(raw) + if not isinstance(raw, bytes): + raise RuntimeError( + "sandbox return value must be msgpack bytes; " + f"got {type(raw).__name__}. " + "This security boundary prevents host-side pickle deserialization." + ) + try: + return unpack(raw) + except Exception as exc: + raise RuntimeError(f"failed to decode sandbox return payload: {type(exc).__name__}: {exc}") from exc class RuntimeClient: @@ -271,7 +286,7 @@ async def _try_http_fast_path( if not isinstance(reply, dict): raise RuntimeError("invalid /call reply payload") if reply.get("ok") is True: - return "result", _unpickle_value(reply.get("value")) + return "result", _decode_result_value(reply.get("value")) if reply.get("ok") is False: err = RemoteError.model_validate(reply.get("error") or {}) return "error", err @@ -325,7 +340,7 @@ async def remote( kind, data = await q.get() if kind == "result": terminated = True - return cast(R, _unpickle_value(data.get("value"))) + return cast(R, _decode_result_value(data.get("value"))) if kind == "error": err = RemoteError.model_validate(data["error"]) terminated = True diff --git a/agentix/runtime/server/worker/invoker.py b/agentix/runtime/server/worker/invoker.py index c08a461..5cbe9e2 100644 --- a/agentix/runtime/server/worker/invoker.py +++ b/agentix/runtime/server/worker/invoker.py @@ -1,8 +1,7 @@ """Worker-side callable execution. -The worker unpickles the callable + args/kwargs, calls it, and pickles -the result back. No shape detection, no TypeAdapter validation — pickle -preserves Python object identity end to end. +The worker unpickles the callable + args/kwargs, calls it, and +msgpack-encodes the result back. A coroutine function is awaited directly on the worker's event loop; a plain (sync) function runs in a thread via ``asyncio.to_thread`` so a @@ -31,6 +30,7 @@ from typing import Any from agentix.runtime.shared.callables import display_name_for +from agentix.runtime.shared.codec import pack from agentix.runtime.shared.models import RemoteError, RemoteRequest, RemoteResponse from agentix.utils import context as _context from agentix.utils.trace._bridge import DISPATCH_CALL_ID @@ -86,13 +86,13 @@ async def call(self, fn: Any, request: RemoteRequest) -> RemoteResponse: ), ) try: - payload = pickle.dumps(result) + payload = pack(result) except Exception as exc: return RemoteResponse( ok=False, error=RemoteError( type="ResultEncodeError", - message=f"failed to pickle return value: {exc}", + message=f"failed to encode return value: {exc}", ), ) return RemoteResponse(ok=True, value=payload) diff --git a/agentix/runtime/shared/framing.py b/agentix/runtime/shared/framing.py index c9820c2..80d3ab4 100644 --- a/agentix/runtime/shared/framing.py +++ b/agentix/runtime/shared/framing.py @@ -19,7 +19,7 @@ ─── worker → runtime ───────────────────────────────────── ready {} — sent once after worker startup boot_error {error} — sent once if startup fails - result {call_id, value} — call succeeded (value is pickle bytes) + result {call_id, value} — call succeeded (value is msgpack bytes) error {call_id, error} — call failed sio_open {namespace} — open a side-channel namespace sio_emit {namespace, event, data} — emit side-channel data @@ -32,7 +32,7 @@ `callable` is an import-path `RemoteCallable` string (`module::qualname`); `arguments` is pickle.dumps((args, kwargs)); the -worker pickles the return value back into `value`. +worker msgpack-encodes the return value back into `value`. """ from __future__ import annotations diff --git a/agentix/runtime/shared/models.py b/agentix/runtime/shared/models.py index 454ab2e..eac7c62 100644 --- a/agentix/runtime/shared/models.py +++ b/agentix/runtime/shared/models.py @@ -5,8 +5,8 @@ and the worker subprocess. Both client and server import from here. Wire encoding: callable identity travels as an import path -(`module::qualname`). Args, kwargs, and return values travel as stdlib -pickle blobs so arbitrary Python values can cross the boundary. +(`module::qualname`). Args/kwargs travel as stdlib pickle blobs, while +return values travel as msgpack blobs via `agentix.runtime.shared.codec`. """ from __future__ import annotations @@ -57,7 +57,7 @@ class RemoteError(BaseModel): class RemoteResponse(BaseModel): - """Internal worker response. `value` is a pickle blob on success.""" + """Internal worker response. `value` is a msgpack blob on success.""" ok: bool value: bytes | None = None diff --git a/docs/reference/architecture.mdx b/docs/reference/architecture.mdx index 3b562a8..24d1716 100644 --- a/docs/reference/architecture.mdx +++ b/docs/reference/architecture.mdx @@ -90,7 +90,7 @@ arguments = pickle.dumps((args, kwargs)) ``` The runtime worker imports the module, resolves the function, unpickles -the arguments, and invokes it. Return values travel back as pickle +the arguments, and invokes it. Return values travel back as msgpack blobs inside `call:result`. ## Call Flow @@ -102,8 +102,8 @@ blobs inside `call:result`. 4. Client emits `call` on Socket.IO `/rpc`. 5. Runtime server forwards the request to the worker subprocess. 6. Worker imports `app`, resolves `run`, unpickles args, and calls it. -7. Worker pickles the return value. -8. Server emits `call:result`; client unpickles the value and returns it. +7. Worker msgpack-encodes the return value. +8. Server emits `call:result`; client msgpack-decodes the value and returns it. ``` ## Transports @@ -158,7 +158,7 @@ For each call, the worker: 1. resolves the `RemoteCallable` import path 2. unpickles `(args, kwargs)` 3. calls the callable (awaiting when the return value is awaitable) -4. pickles the return value +4. msgpack-encodes the return value The worker also hosts the sandbox-side `agentix.sio` bridge. Plugin namespaces register inside the worker and forward events through diff --git a/tests/runtime/client/test_main_module.py b/tests/runtime/client/test_main_module.py index 7594047..5a2fe4c 100644 --- a/tests/runtime/client/test_main_module.py +++ b/tests/runtime/client/test_main_module.py @@ -17,16 +17,16 @@ async def test_remote_call_to_importable_module(): from tests._user_app_target import add, greet try: - import pickle + from agentix.runtime.shared.codec import unpack resp = await mp.call(request_for(greet, kwargs={"name": "world"})) assert resp.ok, resp.error - assert pickle.loads(resp.value) == "hello world" + assert unpack(resp.value) == "hello world" # Second function on the same module should reuse the same worker. resp2 = await mp.call(request_for(add, kwargs={"a": 3, "b": 4})) assert resp2.ok, resp2.error - assert pickle.loads(resp2.value) == 7 + assert unpack(resp2.value) == 7 finally: await mp.shutdown() diff --git a/tests/runtime/client/test_robustness.py b/tests/runtime/client/test_robustness.py index b505642..60d66ed 100644 --- a/tests/runtime/client/test_robustness.py +++ b/tests/runtime/client/test_robustness.py @@ -8,6 +8,8 @@ from __future__ import annotations +import os +import pickle import socket import pytest @@ -18,10 +20,17 @@ RuntimeClient, RuntimeUnreachable, WorkerExited, + _decode_result_value, ) +from agentix.runtime.shared.codec import pack from tests._worker_target import count_exec_and_sleep, self_sigkill +class _SandboxPickleExploit: + def __reduce__(self): + return (os.putenv, ("AGENTIX_UNSAFE_PICKLE_TRIGGERED", "1")) + + @pytest.mark.asyncio async def test_call_timeout_when_call_exceeds_deadline(live_server): base_url = await live_server() @@ -80,3 +89,15 @@ async def test_fail_pending_drains_queues_with_fatal_error(): assert data is err finally: await client._client.aclose() + + +def test_decode_result_value_accepts_msgpack_value() -> None: + assert _decode_result_value(pack({"ok": True})) == {"ok": True} + + +def test_decode_result_value_rejects_pickle_exploit(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("AGENTIX_UNSAFE_PICKLE_TRIGGERED", raising=False) + payload = pickle.dumps(_SandboxPickleExploit()) + with pytest.raises(RuntimeError, match="failed to decode sandbox return payload"): + _decode_result_value(payload) + assert os.environ.get("AGENTIX_UNSAFE_PICKLE_TRIGGERED") is None diff --git a/tests/runtime/server/worker/test_subprocess.py b/tests/runtime/server/worker/test_subprocess.py index 643a80f..b799fdb 100644 --- a/tests/runtime/server/worker/test_subprocess.py +++ b/tests/runtime/server/worker/test_subprocess.py @@ -17,6 +17,7 @@ import pytest from agentix.runtime.server.worker import RuntimeWorkerClient +from agentix.runtime.shared.codec import unpack from agentix.runtime.shared.models import RemoteRequest from tests import _worker_target as target from tests._rpc_helpers import request_for @@ -34,8 +35,8 @@ async def test_subprocess_worker_round_trip(): try: resp = await mp.call(request_for(target.echo, kwargs={"msg": "hi"})) assert resp.ok, resp.error - result = pickle.loads(resp.value) - assert result.msg == "echo:hi" + result = unpack(resp.value) + assert result["msg"] == "echo:hi" finally: await mp.shutdown() @@ -53,7 +54,7 @@ async def test_subprocess_worker_handles_many_concurrent_calls(): responses = await asyncio.gather( *(mp.call(request_for(target.echo, kwargs={"msg": str(i)})) for i in range(25)) ) - msgs = sorted(pickle.loads(r.value).msg for r in responses) + msgs = sorted(unpack(r.value)["msg"] for r in responses) assert msgs == sorted(f"echo:{i}" for i in range(25)) finally: await mp.shutdown() @@ -93,8 +94,8 @@ async def test_subprocess_worker_ignores_cwd_agentix_package( try: resp = await mp.call(request_for(target.echo, kwargs={"msg": "shadow"})) assert resp.ok, resp.error - result = pickle.loads(resp.value) - assert result.msg == "echo:shadow" + result = unpack(resp.value) + assert result["msg"] == "echo:shadow" finally: await mp.shutdown() @@ -110,13 +111,13 @@ async def test_subprocess_worker_child_reading_stdin_does_not_steal_frames(): try: r1 = await mp.call(request_for(target.spawn_stdin_reading_child)) assert r1.ok, r1.error - assert pickle.loads(r1.value) == 0 + assert unpack(r1.value) == 0 # If the child had stolen frame bytes, the pipe would be desynced # and this call would hang or return garbage. r2 = await mp.call(request_for(target.echo, kwargs={"msg": "after"})) assert r2.ok, r2.error - assert pickle.loads(r2.value).msg == "echo:after" + assert unpack(r2.value)["msg"] == "echo:after" finally: await mp.shutdown() @@ -179,7 +180,7 @@ async def test_subprocess_worker_respawns_after_death(): r2 = await mp.call(request_for(target.echo, kwargs={"msg": "two"})) assert r2.ok, r2.error - assert pickle.loads(r2.value).msg == "echo:two" + assert unpack(r2.value)["msg"] == "echo:two" assert mp._worker is not worker1 # a fresh worker was spawned # The dead worker must be torn down on respawn, not leaked: its drain diff --git a/tests/runtime/shared/test_models.py b/tests/runtime/shared/test_models.py index ce196bb..31a2275 100644 --- a/tests/runtime/shared/test_models.py +++ b/tests/runtime/shared/test_models.py @@ -8,6 +8,7 @@ from agentix.provider.base import SandboxConfig, SandboxResource from agentix.runtime.shared.callables import RemoteCallable +from agentix.runtime.shared.codec import pack, unpack from agentix.runtime.shared.models import RemoteError, RemoteRequest, RemoteResponse BUNDLE_REF = "/cache/agentix/bundles/sha256-deadbeef" @@ -33,9 +34,9 @@ def test_remote_callable_rejects_non_callable(): def test_remote_response_ok_shape(): - resp = RemoteResponse(ok=True, value=pickle.dumps({"x": 1})) + resp = RemoteResponse(ok=True, value=pack({"x": 1})) assert resp.error is None - assert pickle.loads(resp.value) == {"x": 1} + assert unpack(resp.value) == {"x": 1} def test_remote_response_error_shape(): diff --git a/tests/runtime/test_protocol.py b/tests/runtime/test_protocol.py index c099f9e..301d060 100644 --- a/tests/runtime/test_protocol.py +++ b/tests/runtime/test_protocol.py @@ -55,10 +55,8 @@ async def _on_result(data): await sio.disconnect() assert payload["call_id"] == "call-ok" - import pickle - - result = pickle.loads(payload["value"]) - assert result.msg == "echo:hi" + result = unpack(payload["value"]) + assert result["msg"] == "echo:hi" async def test_socketio_bad_callable_returns_error(use_inprocess_worker, live_server): @@ -97,7 +95,7 @@ async def test_client_remote_round_trip(use_inprocess_worker, live_server): base_url = await live_server() async with RuntimeClient(base_url) as c: result = await c.remote(target.echo, msg="hello") - assert result.msg == "echo:hello" + assert result["msg"] == "echo:hello" async def test_client_remote_http_fast_path_falls_back_to_sio(use_inprocess_worker, live_server): @@ -157,8 +155,7 @@ async def _on_result(data): await sio.disconnect() assert payload["call_id"] == call_id - import pickle as _pickle - assert _pickle.loads(payload["value"]) == 1, "fn must have run exactly once" + assert unpack(payload["value"]) == 1, "fn must have run exactly once" async def test_runtime_replays_unacked_result_after_reconnect(use_inprocess_worker, live_server): @@ -210,8 +207,7 @@ async def _on_result(data): await sio_b.disconnect() assert payload["call_id"] == call_id - import pickle as _pickle - assert _pickle.loads(payload["value"]) == 1, "fn must run exactly once" + assert unpack(payload["value"]) == 1, "fn must run exactly once" async def test_client_remote_http_fallback_does_not_double_execute(use_inprocess_worker, live_server):