ol.llx.ai.impl.client.event-stream

platforms: clj, cljs

Shared stream runtime orchestration for all hosts.

Why

Host runtimes should differ only in transport concerns (how bytes arrive, how cancellation reaches the underlying request). Event decoding, lifecycle emission, finalize behavior, and terminal error handling must stay identical.

What

This namespace owns the shared stream loop: - opens a provider stream with retry policy - emits canonical :start / delta / :done events - converts runtime failures into canonical terminal error events - coordinates cancellation across producer/source and consumer/output channel

How

Hosts inject two hooks: - :open-stream! to acquire the transport stream response - :start-source! to push provider payloads onto payload-ch

The shared loop consumes payload/control channels with Promesa CSP, applies adapter decode/finalize functions, and emits canonical events to out.

await!

clj

platforms: clj

(await! x)
(await! x duration default-on-timeout)

cljs

platforms: cljs

(await! x)
(await! x _duration default-on-timeout)

open-stream-with-retries*

clj

platforms: clj

(open-stream-with-retries* {:keys [adapter env model request request-opts]})

cljs

platforms: cljs

(open-stream-with-retries* {:keys [adapter env model request request-opts]})

emit-events*

clj

platforms: clj

(emit-events* {:keys [emit! cancel-fn events]})

cljs

platforms: cljs

(emit-events* {:keys [emit! cancel-fn events]})

emit-start*

clj

platforms: clj

(emit-start* {:keys [emit! cancel-fn env model]})

cljs

platforms: cljs

(emit-start* {:keys [emit! cancel-fn env model]})

decode-payload-step

clj

platforms: clj

(decode-payload-step {:keys [adapter env model state item-index payload]})

cljs

platforms: cljs

(decode-payload-step {:keys [adapter env model state item-index payload]})

process-payload-step*

clj

platforms: clj

(process-payload-step* {:keys [adapter env model state* emit! cancel-fn]} item-index payload)

cljs

platforms: cljs

(process-payload-step* {:keys [adapter env model state* emit! cancel-fn]} item-index payload)

finalize-step

clj

platforms: clj

(finalize-step {:keys [adapter env state]})

cljs

platforms: cljs

(finalize-step {:keys [adapter env state]})

finalize-stream*

clj

platforms: clj

(finalize-stream* {:keys [adapter env model state* emit! cancel-fn]})

cljs

platforms: cljs

(finalize-stream* {:keys [adapter env model state* emit! cancel-fn]})

emit-terminal-error*

clj

platforms: clj

(emit-terminal-error* {:keys [adapter env model state* emit!]} stream-ex)

cljs

platforms: cljs

(emit-terminal-error* {:keys [adapter env model state* emit!]} stream-ex)

payload-msg

clj

platforms: clj

(payload-msg payload)

cljs

platforms: cljs

(payload-msg payload)

error-msg

clj

platforms: clj

(error-msg ex)

cljs

platforms: cljs

(error-msg ex)

run-stream!

clj

platforms: clj

(run-stream! {:keys [adapter env model request out state* cancel! start-source! open-stream!] :as args})

cljs

platforms: cljs

(run-stream! {:keys [adapter env model request out state* cancel! start-source! open-stream!] :as args})