Skip to content

Model API client parity: queue + streaming + uploads + webhooks on Faraday (v0.3.0)#5

Merged
mculp merged 17 commits into
mainfrom
feat/streaming-storage-webhooks
Jun 8, 2026
Merged

Model API client parity: queue + streaming + uploads + webhooks on Faraday (v0.3.0)#5
mculp merged 17 commits into
mainfrom
feat/streaming-storage-webhooks

Conversation

@mculp

@mculp mculp commented Jun 7, 2026

Copy link
Copy Markdown
Owner

The client feature-parity work for the fal Model API, now targeting main (foundation landed in #3; this consolidates the former #4 + #5 — see #4 for why they can no longer stand alone).

What this adds (parity with the official JS/Python/PHP clients)

  • Queuesubmit / status / result / cancel addressed by (app_id, request_id); cancel returns true/false(/raises); logs: and webhook_url: options. Per-request URLs use the full endpoint id (verified against fal docs + official clients — the variant must be kept, or nested model ids 404).
  • Streamingstream(id, input) consumes Server-Sent Events via an incremental Sse::Parser; yields each event, returns the final result.
  • Uploadsupload(path_or_io) two-step presigned upload to fal storage → public URL.
  • Module conveniencesFal.run/subscribe/stream/upload/queue over a memoized default client.
  • Faraday transport — pure Ruby, no native extensions, so the gem installs cleanly on every Ruby including 4.0 (http.rb’s llhttp-ffi fails to build there).
  • EndpointId, ApiError.for, typed errors, polymorphic Status, full README + CLAUDE.md, v0.3.0.

Hardened via multi-agent + bot review

A parallel multi-agent review and the CodeRabbit/Copilot/Greptile bots caught and fixed real issues a green suite had hidden — most notably the nested-model queue URL (P0), dropped final SSE event, lost streamed-error detail, and the Ruby 4.0 install (→ Faraday). All threads addressed.

Verification

191 examples, 0 failures on Ruby 3.4 & 4.0; RuboCop clean; gem build + require "fal-ai" OK.

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added module-level convenience methods (Fal.run, Fal.subscribe, Fal.stream, Fal.upload, Fal.queue) for simpler API usage.
    • Streaming support via Server-Sent Events for real-time event consumption.
    • File upload capability to store files in Fal storage CDN.
    • Queue cancellation support.
    • Webhook integration with optional log retrieval.
    • REST URL configuration override.
  • Breaking Changes

    • Queue#status and Queue#result now require app_id and request_id parameters instead of direct URLs.
    • SubmitResponse structure expanded with new fields.

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@coderabbitai

coderabbitai Bot commented Jun 7, 2026

Copy link
Copy Markdown

Review Change Stack

Warning

Review limit reached

@mculp, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 35 minutes and 26 seconds. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: cb3dbbe3-0efe-4995-9b55-db2177e9e67b

📥 Commits

Reviewing files that changed from the base of the PR and between 04ab381 and 65b333d.

📒 Files selected for processing (10)
  • lib/fal/connection.rb
  • lib/fal/endpoint_id.rb
  • lib/fal/endpoints.rb
  • lib/fal/response.rb
  • lib/fal/streaming.rb
  • spec/fal/connection_spec.rb
  • spec/fal/endpoint_id_spec.rb
  • spec/fal/endpoints_spec.rb
  • spec/fal/response_spec.rb
  • spec/fal/subscriber_spec.rb
📝 Walkthrough

Walkthrough

v0.3.0 release: transport migration from http.rb to Faraday, queue API redesign from URL-based to ID-based operations, SSE streaming and presigned-URL file uploads, module-level convenience methods via default client. All tests and documentation updated.

Changes

v0.3.0 Release

Layer / File(s) Summary
Dependencies, config, and value objects
fal-ai.gemspec, lib/fal/configuration.rb, lib/fal/endpoint_id.rb, spec/fal/endpoint_id_spec.rb
Faraday 2.0 and openssl ≥ 3.0 replace http gem; Configuration#rest_url and REST_HOST constant added; new EndpointId value object wraps endpoint strings with coercion, equality, and hash support.
HTTP transport: Faraday rewrite
lib/fal/connection.rb, lib/fal/errors.rb, spec/fal/connection_spec.rb
Connection rebuilt for Faraday with shared json_request path for post/get/put; new stream method handles SSE with Accept: text/event-stream and chunk yielding; new upload method PUTs raw bytes to presigned URLs. ApiError.for factory method dispatches HTTP status codes to typed error subclasses. Specs migrated from double-based to WebMock integration testing.
Queue API: ID-based endpoint redesign
lib/fal/endpoints.rb, lib/fal/queue.rb, lib/fal/response.rb, spec/fal/endpoints_spec.rb, spec/fal/queue_spec.rb
Endpoints refactored to use endpoint_id (coerced via EndpointId.coerce) instead of app_id; new Stream, Status, Result, Cancel, StorageInitiate endpoint classes added; old generic Url endpoint removed. Queue API changed from URL-based (submit returning status/response URLs) to ID-based (status(app_id, request_id, logs:), result(app_id, request_id), new cancel(app_id, request_id)); webhook_url parameter added to submit. SubmitResponse struct expanded to include app_id and cancel_url. Response#cancel_url accessor and fallback error_message via raw error text added.
Streaming and file upload features
lib/fal/sse/parser.rb, lib/fal/streaming.rb, lib/fal/storage.rb, spec/fal/sse/parser_spec.rb, spec/fal/streaming_spec.rb, spec/fal/storage_spec.rb
SSE::Parser buffers chunks, normalizes CR/CRLF/LF, extracts data: lines, yields events on blank-line termination, supports flush for final event. Streaming class consumes SSE via parser, collects events, returns final event; optional block yields each. Storage class initiates presigned upload via StorageInitiate endpoint, PUTs bytes, returns public file_url; nested Source helper normalizes file input, infers content-type, handles IO rewinding.
Client integration and module API
lib/fal.rb, lib/fal/client.rb, lib/fal/subscriber.rb, spec/fal/client_spec.rb, spec/fal/subscriber_spec.rb, spec/fal_spec.rb
Client#run inlined; Client#subscribe gains logs: and webhook_url: parameters; new Client#stream and Client#upload delegate to streaming/storage. Subscriber#wait_for_completion accepts logs: keyword and uses submit_response.app_id/request_id instead of URLs. Module-level Fal.run, Fal.subscribe, Fal.stream, Fal.upload, Fal.queue added via memoized default_client; Fal.reset_configuration! clears client cache.
Full spec coverage
spec/fal/, spec/fal/integration_spec.rb
Connection specs rewritten for WebMock; queue/endpoint/error/response specs updated for ID-based API and Faraday doubles; new SSE parser, streaming, storage, endpoint_id specs added; integration specs expanded for nested IDs, webhook params, cancellation, storage flow, and typed error handling.
Documentation and release
README.md, CLAUDE.md, CHANGELOG.md, lib/fal/version.rb, .github/workflows/ci.yml
README restructured with features list, installation/config, run/subscribe/stream/upload/webhook/cancel examples, status predicates, error handling guide, architecture overview. New CLAUDE.md contributor guide. Changelog records 0.3.0 release with feature/breaking-change/fix bullets. Version bumped to 0.3.0. CI matrix adds Ruby 4.0.4 and ruby-head with continue-on-error for experimental.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

✨ Finishing Touches
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch feat/streaming-storage-webhooks

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@mculp mculp changed the title Streaming, file upload, module conveniences + docs (v0.3.0) Streaming, file upload, module conveniences, Faraday transport + docs (v0.3.0) Jun 7, 2026
@mculp

mculp commented Jun 7, 2026

Copy link
Copy Markdown
Owner Author

Added one more change to this PR after opening it: switched the HTTP transport from http.rb to Faraday.

While removing the committed Gemfile.lock (above), CI surfaced that http.rb’s llhttp-ffi native extension fails to build on Ruby 4.0 — the ecosystem has not shipped prebuilt ffi binaries for 4.0 yet, so a lockfile-free bundle install builds ffi from source and bundler’s parallel installer races. That broke bundle install on a supported Ruby (and would hit end users).

Faraday is pure Ruby (default adapter is stdlib net/http), so the gem now installs with zero native extensions on any Ruby. Connection is rewritten on Faraday; streaming uses an on_data callback that only yields 2xx chunks; the connection spec now drives the real transport through WebMock.

Verified: clean bundle install + full suite green on Ruby 3.4 and 4.0.

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@mculp

mculp commented Jun 7, 2026

Copy link
Copy Markdown
Owner Author

Post-review hardening (parallel multi-agent review)

After opening the stack, I ran a multi-angle parallel review (correctness, OOP/design, test coverage, silent-failure, parity audit, security) and an adversarial re-review. It caught real issues that the green suite had hidden — the most important verified against fal.ai docs + the official fal-js/Python clients:

  • P0 — queue request URLs must use the FULL endpoint id. fal-ai/flux/schnell is polled at …/fal-ai/flux/schnell/requests/{id}/status, not a truncated …/fal-ai/flux/…. The old code (and a self-fulfilling integration stub) dropped the variant, which 404s every nested model id — i.e. most video models, including this PR’s own kling example. Retired EndpointId#app; queue endpoints now build from the full id.
  • SSE final event was dropped when the server didn’t send a trailing blank line — and that final event is the result, so stream returned nil. Added Parser#flush; stream now raises on an empty stream.
  • Streamed errors lost their detail ("Unknown error") — now the server’s message is surfaced; Response#error_message also falls back to non-JSON bodies.
  • REST host corrected rest.alpha.fal.airest.fal.ai.
  • Extracted the status→error mapping into ApiError.for; Connection#upload returns a Fal::Response.
  • Docs: webhook receivers must verify fal’s ED25519 signature themselves (delivery-only); roadmap expanded (multipart upload, per-call subscribe overrides, queue status streaming).

Security review found the posture sound (API key never sent to presigned upload URLs; webhook URL escaped; no host confusion via app_id). 190 examples, 0 failures on Ruby 3.4 & 4.0; RuboCop clean.

@greptile-apps

greptile-apps Bot commented Jun 7, 2026

Copy link
Copy Markdown

Greptile Summary

This PR brings the fal-ai Ruby gem to feature parity with the official JS/Python/PHP clients by adding SSE streaming, presigned file uploads, the full queue lifecycle (submit / status / result / cancel), webhooks, and module-level convenience wrappers. The HTTP transport is migrated from http.rb to Faraday, eliminating native-extension build failures on Ruby 4.x.

  • Queue (submit, status, result, cancel) uses the full endpoint id in every per-request URL so nested model ids (e.g. fal-ai/flux/schnell) are routed correctly; cancel returns true/false for the "already completed" 400 case.
  • Streaming is backed by an incremental Sse::Parser that handles split CRLFs across chunk boundaries and servers that omit the trailing blank line after the final event.
  • Uploads follow a two-step presigned flow; Storage::Source normalizes path strings and IO objects, rewinding before read so partially-consumed IOs upload whole.

Confidence Score: 5/5

Safe to merge; all previously flagged issues are addressed and the new code is well-structured and thoroughly tested.

The core transport, streaming, queue, upload, and error-handling paths are all spec-covered and the multi-round bot review caught and fixed the most consequential edge cases. The one remaining gap — status_class lacking the same non-Hash guard that error_message already carries — only triggers on a 2xx queue-status response whose body is a JSON array, an extremely unlikely condition in the success path.

lib/fal/response.rb — status_class is the only method in the file not yet guarded against a non-Hash data value.

Important Files Changed

Filename Overview
lib/fal/client.rb Clean facade delegating to Queue, Streaming, and Storage; all helpers are lazily memoized per-client instance.
lib/fal/connection.rb Faraday-based transport; streaming chunk routing, non-Hash error-body guard, and nil-env.status handling are all in place and spec-covered.
lib/fal/response.rb error_message is guarded against non-Hash data, but status_class indexes data["status"] without the same guard — a non-Hash 2xx body would raise TypeError in to_status.
lib/fal/queue.rb Full queue lifecycle implemented; cancel returns true/false for the 400 case; SubmitResponse carries all URL fields for transparency.
lib/fal/streaming.rb SSE stream driver that calls parser.flush for final events and uses a sentinel to distinguish "no events" from a legitimate nil/false result.
lib/fal/sse/parser.rb Incremental SSE parser with correct CRLF deferral across chunk boundaries and a flush path for servers that omit the trailing blank line.
lib/fal/storage.rb Two-step presigned upload; Source.from_io rewinds before read; content-type inferred from extension with a safe default.
lib/fal/endpoints.rb URL builders for all endpoints; QueueRequest uses full endpoint id so nested model ids route correctly; webhook URL is CGI-escaped.
lib/fal.rb Module-level conveniences delegate to a memoized default_client; reset_configuration! clears both the config and client for test isolation.

Sequence Diagram

sequenceDiagram
  participant App
  participant Fal as Fal::Client
  participant Queue as Fal::Queue
  participant Sub as Fal::Subscriber
  participant Conn as Fal::Connection
  participant SSE as Sse::Parser
  participant API as fal API

  Note over App,API: Synchronous run
  App->>Fal: run(app_id, input)
  Fal->>Conn: post(Run endpoint)
  Conn->>API: "POST fal.run/{id}"
  API-->>Conn: 200 + result JSON
  Conn-->>App: response.data

  Note over App,API: Queue subscribe (polling)
  App->>Fal: subscribe(app_id, input)
  Fal->>Queue: submit(app_id, input)
  Queue->>Conn: post(Submit endpoint)
  Conn->>API: "POST queue.fal.run/{id}"
  API-->>Queue: "SubmitResponse {request_id}"
  Queue->>Sub: wait_for_completion(submit_response)
  loop poll until COMPLETED
    Sub->>Queue: status(app_id, request_id)
    Queue->>API: "GET queue.fal.run/{id}/requests/{req_id}/status"
    API-->>Sub: "Status (Queued | InProgress | Completed)"
    Sub-->>App: yield status
  end
  Sub->>Queue: result(app_id, request_id)
  Queue->>API: "GET queue.fal.run/{id}/requests/{req_id}"
  API-->>App: final result data

  Note over App,API: Streaming (SSE)
  App->>Fal: stream(app_id, input)
  Fal->>Conn: stream(Stream endpoint)
  Conn->>API: "POST fal.run/{id}/stream"
  loop SSE chunks
    API-->>Conn: raw chunk
    Conn-->>SSE: feed(chunk)
    SSE-->>App: yield parsed event
  end
  Conn->>SSE: flush
  SSE-->>App: yield final event
  Fal-->>App: return final event
Loading

Fix All in Claude Code

Reviews (8): Last reviewed commit: "Harden input contracts and tighten specs..." | Re-trigger Greptile

Comment thread lib/fal/connection.rb
Comment thread lib/fal/sse/parser.rb
Comment thread lib/fal/storage.rb
@mculp mculp force-pushed the feat/queue-parity branch from cc72acb to f8dc95d Compare June 7, 2026 04:03
@mculp mculp force-pushed the feat/streaming-storage-webhooks branch from 3f5189e to 3cebb6f Compare June 7, 2026 04:03
@mculp mculp force-pushed the feat/queue-parity branch from f8dc95d to d7ec2fd Compare June 7, 2026 04:11
@mculp mculp force-pushed the feat/streaming-storage-webhooks branch from 3cebb6f to 414cc67 Compare June 7, 2026 04:12
mculp and others added 11 commits June 7, 2026 05:13
Match the queue ergonomics of the official fal clients (JS/Python/PHP):

- Add EndpointId, a value object that parses "fal-ai/flux/schnell" into the run
  path (the full id) and the queue application path (owner/alias, "fal-ai/flux").
  Namespaced ids (workflows/, comfy/) keep three segments.
- Rebuild Endpoints around it: Run/Submit address the full id; Status/Result/
  Cancel are built from the app root so nested ids resolve to the correct
  /{app}/requests/{id}/... path. Submit appends ?fal_webhook when given a URL.
- Queue#status and #result now take (app_id, request_id) instead of a raw URL;
  add Queue#cancel (true on accept, false on HTTP 400, raise otherwise) and a
  logs: option on #status.
- SubmitResponse now carries app_id and cancel_url; Subscriber polls via
  (app_id, request_id) and forwards logs:.
- Add Connection#put and collapse post/get/put onto one verb-dispatching path.

The existing nested-model integration tests double as a correctness proof: they
stub the server's app-rooted URLs, so a green suite means EndpointId builds the
same URLs the gateway returns.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
`|| raise` also fires on a false value; request_id is always a string, so this
is behavior-preserving, but nil? states the intent.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Round out Model API parity with the official fal clients:

- stream(app_id, input): consume Server-Sent Events from streaming models,
  yielding each event and returning the final result. Backed by an incremental
  Sse::Parser (buffers across chunk boundaries; handles CRLF and multi-line
  data) and a Connection#stream that checks status before iterating the body.
- upload(file): two-step presigned upload to fal storage (initiate on the REST
  host, then PUT the bytes), returning the public URL. Accepts a path or any IO;
  infers content type from the extension. Connection#upload sends the bytes with
  no fal auth header, since presigned URLs carry their own.
- Module-level Fal.run/subscribe/stream/upload/queue over a memoized
  Fal.default_client, matching fal.run()/fal_client.run() ergonomics.
- Endpoints::Stream and Endpoints::StorageInitiate; Configuration#rest_url.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
http.rb depends on llhttp-ffi, a native C extension. On Ruby 4.0 — where the
ecosystem has not yet shipped prebuilt `ffi` binaries — a lockfile-free
`bundle install` builds `ffi` from source, and bundler s parallel installer
races: llhttp-ffi s extension build starts before `ffi` is available and fails.
That broke `bundle install` on a supported Ruby and would hit end users too.

Faraday is pure Ruby (its default adapter is the stdlib net/http), so the gem
now installs with no native extensions on any Ruby. Connection is rewritten on
Faraday (streaming uses an on_data callback that only yields 2xx chunks), and
its spec now drives the real transport through WebMock instead of mocking a
fluent HTTP API.

Verified: a clean `bundle install` and the full suite pass on Ruby 3.4 and 4.0.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A parallel multi-agent review surfaced bugs that a green suite had hidden:

- P0: queue per-request URLs must use the FULL endpoint id (verified against
  fal.ai docs and the fal-js/Python clients): fal-ai/flux/schnell is polled at
  .../fal-ai/flux/schnell/requests/{id}/status, NOT a truncated .../fal-ai/flux.
  The old code (and a self-fulfilling integration stub) dropped the variant,
  which 404s every nested model id (most video models). Retire EndpointId#app
  and the NAMESPACES special-case; QueueRequest now builds from the full id.
- SSE: the parser only emitted on a "\n\n" boundary, so a final event sent
  without a trailing blank line — which carries the completed result — was
  silently dropped and stream returned nil. Add Parser#flush; Streaming flushes
  after the stream and raises on an empty stream.
- Streaming errors lost their detail: the net/http adapter nils the body and
  the collector skipped error chunks, yielding "Unknown error". Capture non-2xx
  chunks and build the message from them; use env.status.to_i so a nil status
  is treated as an error (not silently dropped).
- Response#error_message now falls back to a non-JSON error body instead of
  "Unknown error" (gateway 502/504 pages were being hidden).
- Storage REST host corrected from rest.alpha.fal.ai to rest.fal.ai.
- Connection#upload returns a Fal::Response, not a raw Faraday object.
- Extract the status -> error-class mapping into ApiError.for, off Connection.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- Warn that webhook receivers must verify fal's ED25519 signature themselves
  (the gem does delivery only, not verification yet).
- Expand the roadmap with multipart upload, per-call subscribe timeout/poll
  overrides, and queue status streaming (found in the parity audit).
- Remove stale references to the old "queue app" URL truncation now that queue
  URLs use the full endpoint id.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
A re-review found three stale references to the removed app-path truncation.
Update the Queue and QueueRequest doc comments and the Subscriber spec fixture
to the full-id URLs, and document why chunk_collector trusts env.status (the
net/http adapter sets it before streaming the body).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Review feedback (PR #4):
- Re-add a WebMock-backed test that subscribe yields IN_QUEUE -> IN_PROGRESS ->
  COMPLETED in order, restoring end-to-end coverage of the polling loop.
- Note that SubmitResponse status/response/cancel URLs are informational (the
  gem builds its own request URLs) and may be nil.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@mculp mculp force-pushed the feat/streaming-storage-webhooks branch from 414cc67 to dfabfb9 Compare June 7, 2026 10:14
@mculp mculp changed the base branch from feat/queue-parity to main June 7, 2026 10:14
@mculp mculp changed the title Streaming, file upload, module conveniences, Faraday transport + docs (v0.3.0) Model API client parity: queue + streaming + uploads + webhooks on Faraday (v0.3.0) Jun 7, 2026
@mculp mculp closed this Jun 7, 2026
@mculp mculp reopened this Jun 7, 2026
@mculp

mculp commented Jun 7, 2026

Copy link
Copy Markdown
Owner Author

Hey Vox, why did you choose Faraday over HTTP.rb?

I prefer HTTP.rb and it seemed like it was already implemented correctly.

Was there a strong reason to change?

@mculp

mculp commented Jun 7, 2026

Copy link
Copy Markdown
Owner Author

Honest answer: http.rb wasn't wrong — it was implemented correctly, and its streaming (response.body.each) was actually cleaner than Faraday's on_data. The switch was not about correctness or code quality.

The one concrete reason: bundle install fails on Ruby 4.0. http.rb → llhttp-ffiffi, and ffi has no prebuilt binary for the Ruby 4.0 ABI yet, so it compiles from source. With no committed Gemfile.lock (library convention) + bundler's parallel installer, llhttp-ffi's native build races ahead of ffi and dies:

Gem::MissingSpecError: Could not find 'ffi' (>= 1.15.5)
LoadError: cannot load such file -- ffi-compiler/compile_task

That broke two of the stated goals (installs via bundler + CI on 4.0). Note it's an upstream ecosystem timing gap (ffi hasn't shipped 4.0 binaries), not a flaw in http.rb — gem install fal-ai (serial) works fine; only a parallel bundle install on 4.0 hits it.

You can keep http.rb. The standard workaround for this llhttp-ffi/ffi race is a serial install in CI (bundle install --jobs 1, or pre-installing ffi). That should make CI green; the only residual caveat is an end-user on Ruby 4.0 doing a parallel bundle install could hit the same race until ffi ships 4.0 binaries (it will). gem install is unaffected.

I went with Faraday because it's pure Ruby (installs everywhere, today) and it was on your list of acceptable options — but you're right to question it; I should have flagged the tradeoff and let you decide rather than switching unilaterally.

Happy to revert to http.rb and add the serial-install CI fix (I'd verify the --jobs 1 workaround actually goes green on 4.0 before committing). Or keep Faraday for the broad-install guarantee. Your call.

mculp and others added 3 commits June 7, 2026 06:01
Test whether http.rb (-> llhttp-ffi -> ffi) installs on the Ruby 4.x line now.
- Add http (~> 5.0) and openssl (>= 3.0) as runtime deps.
- Matrix gains 4.0.4 (stable) and 4.1.0.dev (continue-on-error).
Code still uses Faraday; this only exercises bundle install across the matrix.
If http installs cleanly everywhere, the transport can move back to http.rb.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… on 4.x)

CI on the widened matrix confirmed http.rb still fails on Ruby 4.0, 4.0.4, and
4.1.0.dev: llhttp-ffi loses bundler's parallel-install race against ffi (the ffi
binary itself is fine). openssl 4.0.2 built cleanly on every version, so the
openssl dependency stays.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… version)

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread lib/fal/connection.rb
Address Greptile review on PR #5 — four boundary-condition defects,
each with a failing-first spec:

- connection: json_error_detail raised TypeError when an error body
  parsed to a JSON Array (proxy/CDN wrapping) instead of a Hash; guard
  with is_a?(Hash) so the typed ApiError is preserved.
- connection: a nil env.status from a non-default Faraday adapter
  (the documented faraday: injection point) routed every chunk into
  the error buffer, surfacing a false "no events" failure on a 200.
  Treat a missing status as success; real errors are still caught by
  ensure_streamed_success.
- sse/parser: a CRLF split across a chunk boundary turned a lone CR
  into LF, fabricating a blank-line separator and splitting one
  multi-line event in two. Defer a trailing CR to the next chunk.
- storage: read uploaded an IO from its current position, truncating a
  partially-consumed stream. Rewind first when the IO supports it.

195 examples, 0 failures; RuboCop clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread lib/fal/response.rb
Response#error_message indexed JSON.parse output with string keys
unconditionally, so a proxy/CDN-wrapped error body (a top-level array
like [{"detail":...}] or a scalar) raised TypeError: no implicit
conversion of String into Integer instead of a typed ApiError. Every
non-streaming request hits this via raise_api_error.

Guard on data.is_a?(Hash) — mirroring the streaming path's
json_error_detail — and route the non-Hash fallback through the raw
response body rather than data["raw"], which would itself re-raise
when indexing an Array. Extract a shared truncate helper.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (3)
spec/fal/endpoints_spec.rb (1)

3-86: ⚡ Quick win

add direct spec coverage for Fal::Endpoints::StorageInitiate

this file validates endpoint URL/method contracts, but the new storage-init endpoint contract is not asserted here.

diff suggestion
   describe Fal::Endpoints::Cancel do
@@
   end
+
+  describe Fal::Endpoints::StorageInitiate do
+    it "POSTs the storage initiate URL with storage_type" do
+      endpoint = described_class.new(base_url: "https://rest.fal.ai")
+
+      expect(endpoint.url).to eq("https://rest.fal.ai/storage/upload/initiate?storage_type=fal-cdn-v3")
+      expect(endpoint.method).to eq(:post)
+    end
+  end
 end
Based on learnings: "Applies to **/*_spec.rb : Write RSpec tests for all Ruby code; maintain comprehensive test coverage."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@spec/fal/endpoints_spec.rb` around lines 3 - 86, Add a new spec block for
Fal::Endpoints::StorageInitiate that mirrors the existing endpoint contract
tests: instantiate Fal::Endpoints::StorageInitiate with endpoint_id and base_url
and assert endpoint.url and endpoint.method produce the expected path and HTTP
verb (use the same patterns as Run/Submit/Stream tests), and add any additional
assertions for query params (e.g., webhook URL URL-encoding) if StorageInitiate
supports them; reference the Fal::Endpoints::StorageInitiate class and its
endpoint.url and endpoint.method to locate where to add these expectations.

Source: Learnings

lib/fal/streaming.rb (1)

25-33: ⚡ Quick win

avoid storing all stream events when only the final one is returned

current code accumulates every event in memory, but stream only needs to return the last event. switch to last_event + seen_any to keep memory constant for long-running streams.

proposed refactor
 def stream(app_id, input, &block)
   endpoint = Endpoints::Stream.new(endpoint_id: app_id, base_url: `@config.run_url`)
-  events = collect_events(endpoint, input, &block)
-  raise Error, "stream produced no events" if events.empty?
-
-  events.last
+  last_event, seen_any = collect_events(endpoint, input, &block)
+  raise Error, "stream produced no events" unless seen_any
+  last_event
 end

 def collect_events(endpoint, input, &block)
-  events = []
+  last_event = nil
+  seen_any = false
   parser = Sse::Parser.new
   `@connection.stream`(endpoint, body: input) do |chunk|
-    parser.feed(chunk) { |data| events << emit(data, &block) }
+    parser.feed(chunk) do |data|
+      last_event = emit(data, &block)
+      seen_any = true
+    end
   end
-  parser.flush { |data| events << emit(data, &block) }
-  events
+  parser.flush do |data|
+    last_event = emit(data, &block)
+    seen_any = true
+  end
+  [last_event, seen_any]
 end
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@lib/fal/streaming.rb` around lines 25 - 33, The collect_events method
currently accumulates every parsed event into events array causing unbounded
memory; change it to track only the last emitted event by replacing events array
with a last_event variable and a seen_any flag, feed chunks via Sse::Parser and
on each parsed data set last_event = emit(data, &block) and mark seen_any =
true, call parser.flush similarly, and finally return last_event (or
nil/appropriate value if seen_any is false); update references to events to use
last_event and preserve use of `@connection.stream`, parser, emit, and
Sse::Parser.
spec/fal/subscriber_spec.rb (1)

52-55: ⚡ Quick win

add explicit logs: true propagation coverage

you already cover the default logs: false path; add one example for logs: true so the new keyword arg contract is locked down.

example spec addition
   it "checks status using the app id and request id" do
     expect(queue).to receive(:status)
       .with("fal-ai/flux/schnell", "req-123", logs: false)
       .and_return(completed_status)

     subscriber.wait_for_completion(submit_response)
   end
+
+  it "passes logs: true to queue.status when requested" do
+    expect(queue).to receive(:status)
+      .with("fal-ai/flux/schnell", "req-123", logs: true)
+      .and_return(completed_status)
+
+    subscriber.wait_for_completion(submit_response, logs: true)
+  end

Based on learnings: Write RSpec tests for all Ruby code; maintain comprehensive test coverage.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@spec/fal/subscriber_spec.rb` around lines 52 - 55, Add a new example in
spec/fal/subscriber_spec.rb that mirrors the existing "checks status using the
app id and request id" test but asserts propagation of the logs: true keyword;
specifically, create an it block that expects queue to receive :status with
arguments ("fal-ai/flux/schnell", "req-123", logs: true) and to_return an
appropriate status (e.g., completed_status or a separate fixture), then invoke
the same code path that calls Subscriber#status (or whatever triggers
queue.status) so the spec verifies logs: true is forwarded.

Source: Learnings

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@lib/fal/connection.rb`:
- Around line 31-39: The stream method (Connection#stream) currently calls the
provided on_chunk block via chunk_collector and will crash with NoMethodError if
no block is passed; enforce the contract at entry by checking for a block and
raising a clear ArgumentError (e.g., "block required") when none is given, so
callers get a deterministic error, and keep the existing usage of
chunk_collector(error_body, &on_chunk) and ensure_streamed_success(response,
error_body).

In `@lib/fal/endpoint_id.rb`:
- Around line 15-17: The constructor EndpointId#initialize currently coerces
nil/blank ids to an empty string via `@id` = id.to_s, allowing invalid ids to
propagate; change initialize to validate the incoming id (nil or
blank/whitespace-only) and raise ArgumentError when it's blank instead of
assigning, and otherwise store the trimmed string into `@id` so only non-empty
endpoint ids are accepted.

In `@lib/fal/endpoints.rb`:
- Around line 53-56: The initializer allows a nil/blank request_id which later
produces an invalid queue URL; inside the initialize method
(lib/fal/endpoints.rb) validate request_id and raise ArgumentError immediately
when request_id is nil or blank (e.g. request_id.nil? ||
request_id.to_s.strip.empty?), so set `@request_id` only after validation; this
prevents building an invalid URL later (the code that constructs the queue URL
using `@request_id`).

In `@README.md`:
- Around line 146-148: The README wording is inconsistent: the docstring around
`submit` / `SubmitResponse` mentions parameters as `(app_id, request_id)` but
examples and functions (`submit`, `status`, `result`, `cancel`) actually use a
single full `endpoint_id`; update the text to refer to `endpoint_id` instead of
`(app_id, request_id)` so the parameter naming matches the examples and the
shapes used by `submit`, `status`, `result`, and `cancel`.

In `@spec/fal/connection_spec.rb`:
- Around line 39-43: The spec ("omits the body when none is given") currently
only asserts the return type, so update the test to assert the outbound HTTP
request had no body: after calling connection.post(run_endpoint) use WebMock's
assertion (e.g., assert_requested / expect(a_request(...)) with a body matcher
or block) to verify the POST to "https://fal.run/fal-ai/flux" had an empty or
nil body; reference the existing stub_request, connection.post, and run_endpoint
to locate and update the test.

---

Nitpick comments:
In `@lib/fal/streaming.rb`:
- Around line 25-33: The collect_events method currently accumulates every
parsed event into events array causing unbounded memory; change it to track only
the last emitted event by replacing events array with a last_event variable and
a seen_any flag, feed chunks via Sse::Parser and on each parsed data set
last_event = emit(data, &block) and mark seen_any = true, call parser.flush
similarly, and finally return last_event (or nil/appropriate value if seen_any
is false); update references to events to use last_event and preserve use of
`@connection.stream`, parser, emit, and Sse::Parser.

In `@spec/fal/endpoints_spec.rb`:
- Around line 3-86: Add a new spec block for Fal::Endpoints::StorageInitiate
that mirrors the existing endpoint contract tests: instantiate
Fal::Endpoints::StorageInitiate with endpoint_id and base_url and assert
endpoint.url and endpoint.method produce the expected path and HTTP verb (use
the same patterns as Run/Submit/Stream tests), and add any additional assertions
for query params (e.g., webhook URL URL-encoding) if StorageInitiate supports
them; reference the Fal::Endpoints::StorageInitiate class and its endpoint.url
and endpoint.method to locate where to add these expectations.

In `@spec/fal/subscriber_spec.rb`:
- Around line 52-55: Add a new example in spec/fal/subscriber_spec.rb that
mirrors the existing "checks status using the app id and request id" test but
asserts propagation of the logs: true keyword; specifically, create an it block
that expects queue to receive :status with arguments ("fal-ai/flux/schnell",
"req-123", logs: true) and to_return an appropriate status (e.g.,
completed_status or a separate fixture), then invoke the same code path that
calls Subscriber#status (or whatever triggers queue.status) so the spec verifies
logs: true is forwarded.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 3230a936-f36e-49bc-a999-897bb712fc59

📥 Commits

Reviewing files that changed from the base of the PR and between 054e3fd and 04ab381.

📒 Files selected for processing (32)
  • .github/workflows/ci.yml
  • CHANGELOG.md
  • CLAUDE.md
  • README.md
  • fal-ai.gemspec
  • lib/fal.rb
  • lib/fal/client.rb
  • lib/fal/configuration.rb
  • lib/fal/connection.rb
  • lib/fal/endpoint_id.rb
  • lib/fal/endpoints.rb
  • lib/fal/errors.rb
  • lib/fal/queue.rb
  • lib/fal/response.rb
  • lib/fal/sse/parser.rb
  • lib/fal/storage.rb
  • lib/fal/streaming.rb
  • lib/fal/subscriber.rb
  • lib/fal/version.rb
  • spec/fal/client_spec.rb
  • spec/fal/connection_spec.rb
  • spec/fal/endpoint_id_spec.rb
  • spec/fal/endpoints_spec.rb
  • spec/fal/errors_spec.rb
  • spec/fal/integration_spec.rb
  • spec/fal/queue_spec.rb
  • spec/fal/response_spec.rb
  • spec/fal/sse/parser_spec.rb
  • spec/fal/storage_spec.rb
  • spec/fal/streaming_spec.rb
  • spec/fal/subscriber_spec.rb
  • spec/fal_spec.rb

Comment thread lib/fal/connection.rb
Comment thread lib/fal/endpoint_id.rb
Comment thread lib/fal/endpoints.rb
Comment thread README.md
Comment thread spec/fal/connection_spec.rb
- Connection#stream raises ArgumentError at entry when no block is given,
  instead of a deferred NoMethodError on the first chunk.
- EndpointId rejects blank ids (and trims whitespace) at construction.
- Endpoints::QueueRequest rejects a blank request_id rather than building
  an invalid .../requests//... URL.
- Streaming keeps only the final event (O(1) memory) instead of buffering
  every event for a result that only returns the last one.
- Specs: assert the body is actually omitted on bodyless POSTs; cover
  Endpoints::StorageInitiate; cover Subscriber logs: true propagation.

Declined the README (app_id, request_id) -> endpoint_id rename: the public
queue parameter is app_id across queue.rb, SubmitResponse, and CLAUDE.md
(mirroring the official clients), so the README already matches the code.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@mculp mculp merged commit c194b15 into main Jun 8, 2026
7 checks passed
mculp added a commit that referenced this pull request Jun 8, 2026
Address Greptile review on PR #5 — four boundary-condition defects,
each with a failing-first spec:

- connection: json_error_detail raised TypeError when an error body
  parsed to a JSON Array (proxy/CDN wrapping) instead of a Hash; guard
  with is_a?(Hash) so the typed ApiError is preserved.
- connection: a nil env.status from a non-default Faraday adapter
  (the documented faraday: injection point) routed every chunk into
  the error buffer, surfacing a false "no events" failure on a 200.
  Treat a missing status as success; real errors are still caught by
  ensure_streamed_success.
- sse/parser: a CRLF split across a chunk boundary turned a lone CR
  into LF, fabricating a blank-line separator and splitting one
  multi-line event in two. Defer a trailing CR to the next chunk.
- storage: read uploaded an IO from its current position, truncating a
  partially-consumed stream. Rewind first when the IO supports it.

195 examples, 0 failures; RuboCop clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@mculp mculp deleted the feat/streaming-storage-webhooks branch June 8, 2026 20:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant