🌊

Step 2 Landed — Akka.NET Streams Now Carry the Whole Voice Path in AgentZero Lite

Series: AgentZero Lite — Part 7. The previous post (Part 6 — Step 1 Embedding an Open vLLM, Step 2 Akka Streams) laid out the roadmap. This one reports back: Step 2 is in the trunk, opt-in behind a feature flag, with eleven TestKit cases passing and the legacy batch path still primary until live testing validates the stream path.

TL;DR for the people who scroll

  • The voice subsystem (microphone → STT → reactor → LLM → TTS → speaker) is now built around two Akka.NET RunnableGraphs owned by a single VoiceStreamActor at /user/stage/voice.
  • INPUT graph turns 50 ms PCM frames into transcripts. OUTPUT graph turns LLM token streams into progressive TTS playback. Both share a KillSwitch so the actor can tear them down atomically — BargeIn, CancelInflight, EndTurn, DeviceLost are all Tell-only messages that end up in the same teardown path.
  • STT and TTS parallelism are SmallestMailboxPool routers, not fused SelectAsync(parallelism: N) constants. Operations knob, not a code constant.
  • Akka.NET 1.5 doesn't expose StatefulSelectMany on FlowOperations (it's on SourceOperations only). Custom GraphStage<FlowShape<...>> is the canonical pattern for stateful flows in this codebase, and TestKit's SourceProbe / SinkProbe hook into it cleanly.
  • Opt-in via VoiceSettings.UseStreamPipeline (default false). The legacy OnVoiceUtteranceEnded → SendCurrentInput batch path stays primary until live testing on real devices.
Implementation summary: 27 files, +2,062 LoC, eleven stream TestKit cases passing in 337 ms, full Debug build clean. Git commit 0a87d1b on main.

Why streams instead of "another async helper"

Part 6 made the architectural argument; here's the operational one. The old voice path was a sequence of awaited calls:
UtteranceEnded event → byte[] PCM → await STT → set txtInput.Text → SendCurrentInput → reactor finishes → await TTS → Play(byte[])
It worked, but every link in that chain has a different reason to fail differently:
Link
Failure mode the chain didn't model
Microphone
OS-level buffer overflow if downstream stalls
STT
Transient 5xx from the cloud provider, mid-utterance
LLM
SSE token feed dropping mid-sentence
TTS
Per-chunk failure (a single sentence rendered poorly)
Speaker
Device disappeared (Bluetooth headset walked out of range)
User
Talked over the AI mid-sentence (barge-in)
The batch chain reacted to all of these the same way: throw, log, drop the turn. A streaming pipeline lets each stage carry its own backpressure, retry, and cancellation policy without the others changing — the same shape Akka actors already gave us for terminal management, just now per-element instead of per-mailbox.

Topology — two graphs, one actor

flowchart TD Actor["<b>VoiceStreamActor</b><br/>/user/stage/voice<br/>• owns SharedKillSwitch<br/>• Tell-only protocol<br/>• holds materialized handles"] subgraph INPUT ["INPUT graph (mic → reactor)"] direction TB I1["Source.Queue&lt;MicFrame&gt;<br/>(64, OverflowStrategy.DropHead)"] I2["KillSwitches.Single"] I3["<b>VoiceSegmenterStage</b><br/>VAD + 1 s pre-roll + utterance FSM"] I4[".Async()"] I5["SelectAsync(p, sttPool.Ask)"] I6["Sink.ActorRefWithAck(Self)"] I1 --> I2 --> I3 --> I4 --> I5 --> I6 end subgraph OUTPUT ["OUTPUT graph (LLM tokens → speaker)"] direction TB O1["Source.Queue&lt;string&gt;<br/>(64, OverflowStrategy.Backpressure)"] O2["KillSwitches.Single"] O3["<b>SentenceChunkerStage</b><br/>.?!\\n + min/max chunk len"] O4["TtsTextCleaner.StripMarkdown"] O5["SelectAsync(p, ttsPool.Ask)"] O6["Sink.ForEach(playback.Enqueue)"] O1 --> O2 --> O3 --> O4 --> O5 --> O6 end Actor -- "owns + materializes" --> I1 Actor -- "owns + materializes" --> O1 classDef actor fill:#1f2937,stroke:#0ea5e9,color:#e5e7eb,stroke-width:2px classDef stage fill:#0b3d2e,stroke:#10b981,color:#d1fae5 class Actor actor class I3,O3 stage
The crucial property is that the actor is the only thing the rest of the app talks to. WPF code never touches a graph or a kill switch. NAudio's capture callback Tells the actor with each MicFrame. The reactor's eventual reply Tells the actor with SpeakText (the OUTPUT graph then chunks per sentence and synthesizes in parallel with playback). The Cancel button Tells CancelInflight. Every cross-thread interaction is either a Tell or a Source.Queue.OfferAsync — both fire-and-forget, both backpressured by their own rules.

P0 — the boring but load-bearing part

Akka 1.5.40 → 1.5.67 across all four csproj that depend on it (ZeroCommon, AgentZeroWpf, ZeroCommon.Tests, AgentTest), plus Akka.Streams and Akka.Streams.TestKit 1.5.67. The 1.5.58 release fixed a .NET 10 CLR shutdown-hook regression that AgentZero Lite already had a workaround for in ActorSystemManager.ShutdownAsync — moving past 1.5.58 lets that workaround stay correct.
dotnet test on the headless ZeroCommon.Tests suite came back 88 / 88 after the bump. No actor-topology regressions, no shutdown-hang, no hidden behavior change.

P1 — INPUT graph: where Akka.NET diverges from the JVM docs

The first surprise was that StatefulSelectMany is not available on Flow in Akka.NET 1.5. It's defined on SourceOperations only — Flow.Create<T>().StatefulSelectMany(...) doesn't compile. The build error is unhelpful (it points at the Source overload as the "best match" and complains the receiver type doesn't fit), and the public docs don't flag this asymmetry.
The right answer in this codebase is a custom GraphStage<FlowShape<TIn, TOut>>. It's actually cleaner — you own OnPush / OnPull so backpressure is explicit (pull when you have nothing to emit, push when an utterance just completed), and Akka.Streams.TestKit's SourceProbe / SinkProbe hook into it without any extra ceremony.
Here's the load-bearing part of VoiceSegmenterStage — VAD + utterance FSM + pre-roll, all fused into one stage:
public override void OnPush() { var frame = Grab(_stage.In); // Pre-roll ring (always — even outside an utterance — so when VAD // trips the prior PreRollSeconds of audio is available). _preRoll.Enqueue(frame.Pcm16k); _preRollBytes += frame.Pcm16k.Length; while (_preRollBytes > _maxPreRollBytes && _preRoll.Count > 0) _preRollBytes -= _preRoll.Dequeue().Length; var above = frame.Rms >= _stage._config.VadThreshold; if (above) { _utteranceSilenceFrames = 0; if (!_inUtterance) { // Utterance starts — seed buffer with pre-roll. _inUtterance = true; _buffer = new List<byte>(...); foreach (var chunk in _preRoll) _buffer.AddRange(chunk); _startedAt = DateTimeOffset.UtcNow; } else _buffer!.AddRange(frame.Pcm16k); Pull(_stage.In); return; } if (_inUtterance) { _buffer!.AddRange(frame.Pcm16k); if (++_utteranceSilenceFrames >= _stage._config.UtteranceHangoverFrames) { _inUtterance = false; var pcm = _buffer.ToArray(); _buffer = null; Push(_stage.Out, new PcmSegment(pcm, pcm.Length / 32_000.0, _startedAt)); return; } Pull(_stage.In); return; } Pull(_stage.In); }
The STT call is not a fused SelectAsync(parallelism: N, async pcm => stt.Transcribe(...)). It goes through a SmallestMailboxPool of SttWorkerActor instances, and the SelectAsync stage just Asks the pool:
.SelectAsync(parallelism, async (PcmSegment seg) => { var reply = await sttPool.Ask<TranscribeReply>( new TranscribeRequest(seg, language), TimeSpan.FromSeconds(120)); return new VoiceTranscriptReady(reply.Transcript, reply.DurationSeconds); })
Why the indirection? Two reasons:
  1. Operations knob. VoiceSettings.StreamSttParallelism controls the pool size. Bumping it later doesn't require recompiling.
  1. One STT engine per worker. Each worker actor lazily constructs its ISpeechToText and pays the cold-start cost (Whisper.net "small" loads ~487 MB) on its first segment. The router naturally spreads load across the warmed workers.
The terminal sink is Sink.ActorRefWithAck, not Sink.ActorRef. The latter has no backpressure — flooding the actor's mailbox with mic frames was the failure mode I almost wrote into the codebase before checking. With Ack the actor drives the demand: it Tells VoiceFrameAck after each transcript, the sink waits for that ack before pushing the next.

P2 — OUTPUT graph: progressive TTS without a custom playback engine

The OUTPUT graph mirrors the INPUT graph in shape but runs in the opposite direction. The token-stream entry is a Source.Queue<string> with OverflowStrategy.Backpressure (text is small, slow tokens are fine to wait on; audio frames are not). A small Task pump drains an IAsyncEnumerable<string> into the queue:
_ = Task.Run(async () => { try { await foreach (var token in cmd.TokenStream) { var result = await queue.OfferAsync(token); if (result is QueueOfferResult.QueueClosed) return; if (result is QueueOfferResult.Failure f) { _log.Error(f.Cause, "..."); return; } } queue.Complete(); } catch (OperationCanceledException) { try { queue.Complete(); } catch { } } catch (Exception ex) { _log.Error(ex, "..."); try { queue.Fail(ex); } catch { } } });
TODO : pipe to - for nonblock in actor
 
SentenceChunkerStage accumulates tokens in a StringBuilder, scans for sentence terminators (., ?, !) once the buffer crosses a min length, and falls back to "split at last whitespace under MaxChunkChars" if a runaway sentence shows up. On OnUpstreamFinish it flushes whatever is still buffered as the last chunk.
The TTS path is an exact mirror of the STT path: SelectAsync(parallelism: 2, ttsPool.Ask) against a SmallestMailboxPool of TtsWorkerActors. With two workers, the next sentence is being synthesized while the current one is playing — that's where progressive feel comes from.
The actual playback is the deliberately-boring part. NAudioPlaybackQueue (in AgentZeroWpf) implements an IAudioPlaybackQueue interface declared in ZeroCommon. Each Enqueue(byte[], format) call adds a clip. When the current clip's WaveOutEvent.PlaybackStopped fires, the queue drains the next. It's sequential, not gap-less — there is a perceptible millisecond-level seam between clips. BufferedWaveProvider-based seamless playback is on the deferred list, not in this commit.

P3 — actor-mediated control plane

This is the part that's harder to do well in a pure stream graph. Akka.Streams has a Decider (per-stage Resume / Restart / Stop) and RestartFlow.WithBackoff, both of which work, both of which are slightly off-target for a voice pipeline:
  • Decider handles "this stage threw" but doesn't speak the language of "the user just talked over the AI."
  • RestartFlow.WithBackoff restarts the entire flow on failure — heavy when the failure is a single transient 5xx and the rest of the stage state (loaded STT model, open socket) is still good.
The decision was to push the control plane up into the actor and use the streams for what they're best at. Concretely:
Concern
Lives in
Mechanism
Per-element retry on transient HTTP/IO/timeout
Stt/TtsWorkerActor
TransientRetry.WithBackoffAsync — 3 attempts, exponential, only retries HttpRequestException / TaskCanceledException / IOException / TimeoutException. OperationCanceledException and ArgumentException propagate immediately.
Atomic teardown of OUTPUT graph
VoiceStreamActor.CancelOutputGraph
KillSwitches.Single.Shutdown()Source.Queue.Complete()playback.Stop()
Barge-in detection
VoiceStreamActor.OnMicFrame (inline)
While _outputActive, count consecutive loud frames; at 4 frames → Self.Tell(new BargeIn())
_outputActive state
VoiceStreamActor
_playback.PlaybackStartedtrue; PlaybackStoppedfalse
User-driven cancel
Self.Tell(new CancelInflight())
UI button → same CancelOutputGraph path
Device-lost recovery
Self.Tell(new DeviceLost(reason))
tears down both INPUT and OUTPUT, lets UI re-prompt for device pick
The BargeIn detector is currently inline in OnMicFrame — counting frames against _vadThreshold while playback is active. A child BargeInDetectorActor is on the deferred list (it lets the policy grow — voice-class detection, named-call recognition — without bloating the parent), but the inline version is enough to validate the pattern.
The barge-in dynamic, end-to-end:
sequenceDiagram autonumber participant Mic as Mic capture participant Actor as VoiceStreamActor participant Out as OUTPUT graph participant Spk as NAudioPlaybackQueue Out->>Spk: enqueue TTS chunk Spk-->>Actor: PlaybackStarted Note over Actor: _outputActive = true<br/>_consecutiveLoudFrames = 0 Mic->>Actor: MicFrame (loud, +1) Mic->>Actor: MicFrame (loud, +1) Mic->>Actor: MicFrame (loud, +1) Mic->>Actor: MicFrame (loud, +1) — threshold hit Actor->>Actor: Self.Tell(BargeIn) Actor->>Out: KillSwitch.Shutdown() Actor->>Out: Source.Queue&lt;string&gt;.Complete() Actor->>Spk: Stop() (clears queue) Spk-->>Actor: PlaybackStopped Note over Actor: _outputActive = false<br/>INPUT graph keeps running —<br/>user's interrupting utterance<br/>flows through normally

Five non-obvious things I'd tell anyone wiring this on Akka.NET

  1. StatefulSelectMany is not on FlowOperations. It's SourceOperations only in 1.5. Stateful flows are custom GraphStage<FlowShape<>>. Don't waste an hour searching for the operator.
  1. Source.ActorRef does not support OverflowStrategy.Backpressure. For real-time devices use Source.Queue<T>(N, OverflowStrategy.DropHead) — the audio thread cannot slow down, so dropping the oldest frame on overflow preserves recency. Trying to backpressure the soundcard is the wrong direction.
  1. Sink.ActorRef has no backpressure. Use Sink.ActorRefWithAck(actor, init, ack, complete, fail). The actor must Sender.Tell(ack) on the init message and after every element; the sink drives the demand. Easy to miss until your actor mailbox is buried under mic frames.
  1. The materializer's actors share the default dispatcher. A stream stage that pokes the WPF UI directly will block the dispatcher pool. Marshal back to the dispatcher only at the terminal sink, never mid-graph.
  1. Per-call retry beats RestartFlow.WithBackoff for voice. A transient 5xx on TTS shouldn't tear down a loaded model and a warm socket. A 3-attempt exponential retry inside the worker actor is cheaper and matches the user's perceived latency budget (5xx ↔ recovery happens in well under one utterance).

What's still on the table

The list of "deliberately deferred" items is a feature, not an apology — every one of them is documented in harness/logs/tamer/ so the next session knows what was punted and why.
  • Reactor IAsyncEnumerable<string> token stream. Right now HandleReactorResult fires SpeakText(r.FinalMessage) after the reactor finishes; the OUTPUT graph still chunks per sentence and overlaps TTS with playback, but the LLM has already ended its turn. Wiring the reactor's per-token feed directly into the graph is the next step — and the place where the seconds-of-latency win is.
  • BargeInDetector as a child actor. Currently inline. The policy (4 consecutive loud frames at threshold) is good enough for a single user, single device. Voice-class detection or named-call recognition belongs in its own actor.
  • BufferedWaveProvider-based gap-less playback. Sequential WaveOutEvent is the correct sequential ordering but leaves an audible seam between TTS chunks.
  • Settings UI. The UseStreamPipeline and StreamSttParallelism knobs are JSON-only today.
The Step 1 from Part 6 — embedding an open vLLM end-to-end — hasn't moved this week. Step 2 went first because it was the load-bearing piece for everything else: a streaming voice path with a clean control plane is the substrate the local vLLM eventually plugs into. ElevenLabs stays plugged in throughout, exactly as Part 6 promised.

Closing

The intent of Part 6 was to be honest about the road. The intent of this post is to be honest about the build: opt-in flag off, eleven TestKit cases, a feature flag because live mics on real devices haven't run end-to-end yet. There's no marketing line here. The thing works, the next thing is the reactor token stream, and the open vLLM is still waiting at the end of the road.
Notes go up next on what the live test actually shows.
  • psmon

TECH LINKS