Series: AgentZero Lite — Part 7. The previous post (Part 6 — Step 1 Embedding an Open vLLM, Step 2 Akka Streams) laid out the roadmap. This one reports back: Step 2 is in the trunk, opt-in behind a feature flag, with eleven TestKit cases passing and the legacy batch path still primary until live testing validates the stream path.
TL;DR for the people who scroll
- The voice subsystem (microphone → STT → reactor → LLM → TTS → speaker) is now built around two Akka.NET
RunnableGraphs owned by a singleVoiceStreamActorat/user/stage/voice.
- INPUT graph turns 50 ms PCM frames into transcripts. OUTPUT graph turns LLM token streams into progressive TTS playback. Both share a
KillSwitchso the actor can tear them down atomically —BargeIn,CancelInflight,EndTurn,DeviceLostare all Tell-only messages that end up in the same teardown path.
- STT and TTS parallelism are
SmallestMailboxPoolrouters, not fusedSelectAsync(parallelism: N)constants. Operations knob, not a code constant.
- Akka.NET 1.5 doesn't expose
StatefulSelectManyonFlowOperations(it's onSourceOperationsonly). CustomGraphStage<FlowShape<...>>is the canonical pattern for stateful flows in this codebase, and TestKit'sSourceProbe/SinkProbehook into it cleanly.
- Opt-in via
VoiceSettings.UseStreamPipeline(defaultfalse). The legacyOnVoiceUtteranceEnded → SendCurrentInputbatch path stays primary until live testing on real devices.
Implementation summary: 27 files, +2,062 LoC, eleven stream TestKit cases passing in 337 ms, full Debug build clean. Git commit
0a87d1b on main.Why streams instead of "another async helper"
Part 6 made the architectural argument; here's the operational one. The old voice path was a sequence of awaited calls:
UtteranceEnded event → byte[] PCM → await STT → set txtInput.Text → SendCurrentInput → reactor finishes → await TTS → Play(byte[])
It worked, but every link in that chain has a different reason to fail differently:
Link | Failure mode the chain didn't model |
Microphone | OS-level buffer overflow if downstream stalls |
STT | Transient 5xx from the cloud provider, mid-utterance |
LLM | SSE token feed dropping mid-sentence |
TTS | Per-chunk failure (a single sentence rendered poorly) |
Speaker | Device disappeared (Bluetooth headset walked out of range) |
User | Talked over the AI mid-sentence (barge-in) |
The batch chain reacted to all of these the same way: throw, log, drop the turn. A streaming pipeline lets each stage carry its own backpressure, retry, and cancellation policy without the others changing — the same shape Akka actors already gave us for terminal management, just now per-element instead of per-mailbox.
Topology — two graphs, one actor
flowchart TD Actor["<b>VoiceStreamActor</b><br/>/user/stage/voice<br/>• owns SharedKillSwitch<br/>• Tell-only protocol<br/>• holds materialized handles"] subgraph INPUT ["INPUT graph (mic → reactor)"] direction TB I1["Source.Queue<MicFrame><br/>(64, OverflowStrategy.DropHead)"] I2["KillSwitches.Single"] I3["<b>VoiceSegmenterStage</b><br/>VAD + 1 s pre-roll + utterance FSM"] I4[".Async()"] I5["SelectAsync(p, sttPool.Ask)"] I6["Sink.ActorRefWithAck(Self)"] I1 --> I2 --> I3 --> I4 --> I5 --> I6 end subgraph OUTPUT ["OUTPUT graph (LLM tokens → speaker)"] direction TB O1["Source.Queue<string><br/>(64, OverflowStrategy.Backpressure)"] O2["KillSwitches.Single"] O3["<b>SentenceChunkerStage</b><br/>.?!\\n + min/max chunk len"] O4["TtsTextCleaner.StripMarkdown"] O5["SelectAsync(p, ttsPool.Ask)"] O6["Sink.ForEach(playback.Enqueue)"] O1 --> O2 --> O3 --> O4 --> O5 --> O6 end Actor -- "owns + materializes" --> I1 Actor -- "owns + materializes" --> O1 classDef actor fill:#1f2937,stroke:#0ea5e9,color:#e5e7eb,stroke-width:2px classDef stage fill:#0b3d2e,stroke:#10b981,color:#d1fae5 class Actor actor class I3,O3 stage
The crucial property is that the actor is the only thing the rest of the app talks to. WPF code never touches a graph or a kill switch. NAudio's capture callback
Tells the actor with each MicFrame. The reactor's eventual reply Tells the actor with SpeakText (the OUTPUT graph then chunks per sentence and synthesizes in parallel with playback). The Cancel button Tells CancelInflight. Every cross-thread interaction is either a Tell or a Source.Queue.OfferAsync — both fire-and-forget, both backpressured by their own rules.P0 — the boring but load-bearing part
Akka 1.5.40 → 1.5.67 across all four csproj that depend on it (ZeroCommon, AgentZeroWpf, ZeroCommon.Tests, AgentTest), plus Akka.Streams and Akka.Streams.TestKit 1.5.67. The 1.5.58 release fixed a .NET 10 CLR shutdown-hook regression that AgentZero Lite already had a workaround for in ActorSystemManager.ShutdownAsync — moving past 1.5.58 lets that workaround stay correct.dotnet test on the headless ZeroCommon.Tests suite came back 88 / 88 after the bump. No actor-topology regressions, no shutdown-hang, no hidden behavior change.P1 — INPUT graph: where Akka.NET diverges from the JVM docs
The first surprise was that
StatefulSelectMany is not available on Flow in Akka.NET 1.5. It's defined on SourceOperations only — Flow.Create<T>().StatefulSelectMany(...) doesn't compile. The build error is unhelpful (it points at the Source overload as the "best match" and complains the receiver type doesn't fit), and the public docs don't flag this asymmetry.The right answer in this codebase is a custom
GraphStage<FlowShape<TIn, TOut>>. It's actually cleaner — you own OnPush / OnPull so backpressure is explicit (pull when you have nothing to emit, push when an utterance just completed), and Akka.Streams.TestKit's SourceProbe / SinkProbe hook into it without any extra ceremony.Here's the load-bearing part of
VoiceSegmenterStage — VAD + utterance FSM + pre-roll, all fused into one stage:public override void OnPush() { var frame = Grab(_stage.In); // Pre-roll ring (always — even outside an utterance — so when VAD // trips the prior PreRollSeconds of audio is available). _preRoll.Enqueue(frame.Pcm16k); _preRollBytes += frame.Pcm16k.Length; while (_preRollBytes > _maxPreRollBytes && _preRoll.Count > 0) _preRollBytes -= _preRoll.Dequeue().Length; var above = frame.Rms >= _stage._config.VadThreshold; if (above) { _utteranceSilenceFrames = 0; if (!_inUtterance) { // Utterance starts — seed buffer with pre-roll. _inUtterance = true; _buffer = new List<byte>(...); foreach (var chunk in _preRoll) _buffer.AddRange(chunk); _startedAt = DateTimeOffset.UtcNow; } else _buffer!.AddRange(frame.Pcm16k); Pull(_stage.In); return; } if (_inUtterance) { _buffer!.AddRange(frame.Pcm16k); if (++_utteranceSilenceFrames >= _stage._config.UtteranceHangoverFrames) { _inUtterance = false; var pcm = _buffer.ToArray(); _buffer = null; Push(_stage.Out, new PcmSegment(pcm, pcm.Length / 32_000.0, _startedAt)); return; } Pull(_stage.In); return; } Pull(_stage.In); }
The STT call is not a fused
SelectAsync(parallelism: N, async pcm => stt.Transcribe(...)). It goes through a SmallestMailboxPool of SttWorkerActor instances, and the SelectAsync stage just Asks the pool:.SelectAsync(parallelism, async (PcmSegment seg) => { var reply = await sttPool.Ask<TranscribeReply>( new TranscribeRequest(seg, language), TimeSpan.FromSeconds(120)); return new VoiceTranscriptReady(reply.Transcript, reply.DurationSeconds); })
Why the indirection? Two reasons:
- Operations knob.
VoiceSettings.StreamSttParallelismcontrols the pool size. Bumping it later doesn't require recompiling.
- One STT engine per worker. Each worker actor lazily constructs its
ISpeechToTextand pays the cold-start cost (Whisper.net "small" loads ~487 MB) on its first segment. The router naturally spreads load across the warmed workers.
The terminal sink is
Sink.ActorRefWithAck, not Sink.ActorRef. The latter has no backpressure — flooding the actor's mailbox with mic frames was the failure mode I almost wrote into the codebase before checking. With Ack the actor drives the demand: it Tells VoiceFrameAck after each transcript, the sink waits for that ack before pushing the next.P2 — OUTPUT graph: progressive TTS without a custom playback engine
The OUTPUT graph mirrors the INPUT graph in shape but runs in the opposite direction. The token-stream entry is a
Source.Queue<string> with OverflowStrategy.Backpressure (text is small, slow tokens are fine to wait on; audio frames are not). A small Task pump drains an IAsyncEnumerable<string> into the queue:_ = Task.Run(async () => { try { await foreach (var token in cmd.TokenStream) { var result = await queue.OfferAsync(token); if (result is QueueOfferResult.QueueClosed) return; if (result is QueueOfferResult.Failure f) { _log.Error(f.Cause, "..."); return; } } queue.Complete(); } catch (OperationCanceledException) { try { queue.Complete(); } catch { } } catch (Exception ex) { _log.Error(ex, "..."); try { queue.Fail(ex); } catch { } } });
TODO : pipe to - for nonblock in actor
SentenceChunkerStage accumulates tokens in a StringBuilder, scans for sentence terminators (., ?, !) once the buffer crosses a min length, and falls back to "split at last whitespace under MaxChunkChars" if a runaway sentence shows up. On OnUpstreamFinish it flushes whatever is still buffered as the last chunk.The TTS path is an exact mirror of the STT path:
SelectAsync(parallelism: 2, ttsPool.Ask) against a SmallestMailboxPool of TtsWorkerActors. With two workers, the next sentence is being synthesized while the current one is playing — that's where progressive feel comes from.The actual playback is the deliberately-boring part.
NAudioPlaybackQueue (in AgentZeroWpf) implements an IAudioPlaybackQueue interface declared in ZeroCommon. Each Enqueue(byte[], format) call adds a clip. When the current clip's WaveOutEvent.PlaybackStopped fires, the queue drains the next. It's sequential, not gap-less — there is a perceptible millisecond-level seam between clips. BufferedWaveProvider-based seamless playback is on the deferred list, not in this commit.P3 — actor-mediated control plane
This is the part that's harder to do well in a pure stream graph. Akka.Streams has a
Decider (per-stage Resume / Restart / Stop) and RestartFlow.WithBackoff, both of which work, both of which are slightly off-target for a voice pipeline:Deciderhandles "this stage threw" but doesn't speak the language of "the user just talked over the AI."
RestartFlow.WithBackoffrestarts the entire flow on failure — heavy when the failure is a single transient 5xx and the rest of the stage state (loaded STT model, open socket) is still good.
The decision was to push the control plane up into the actor and use the streams for what they're best at. Concretely:
Concern | Lives in | Mechanism |
Per-element retry on transient HTTP/IO/timeout | Stt/TtsWorkerActor | TransientRetry.WithBackoffAsync — 3 attempts, exponential, only retries HttpRequestException / TaskCanceledException / IOException / TimeoutException. OperationCanceledException and ArgumentException propagate immediately. |
Atomic teardown of OUTPUT graph | VoiceStreamActor.CancelOutputGraph | KillSwitches.Single.Shutdown() • Source.Queue.Complete() • playback.Stop() |
Barge-in detection | VoiceStreamActor.OnMicFrame (inline) | While _outputActive, count consecutive loud frames; at 4 frames → Self.Tell(new BargeIn()) |
_outputActive state | VoiceStreamActor | _playback.PlaybackStarted → true; PlaybackStopped → false |
User-driven cancel | Self.Tell(new CancelInflight()) | UI button → same CancelOutputGraph path |
Device-lost recovery | Self.Tell(new DeviceLost(reason)) | tears down both INPUT and OUTPUT, lets UI re-prompt for device pick |
The
BargeIn detector is currently inline in OnMicFrame — counting frames against _vadThreshold while playback is active. A child BargeInDetectorActor is on the deferred list (it lets the policy grow — voice-class detection, named-call recognition — without bloating the parent), but the inline version is enough to validate the pattern.The barge-in dynamic, end-to-end:
sequenceDiagram autonumber participant Mic as Mic capture participant Actor as VoiceStreamActor participant Out as OUTPUT graph participant Spk as NAudioPlaybackQueue Out->>Spk: enqueue TTS chunk Spk-->>Actor: PlaybackStarted Note over Actor: _outputActive = true<br/>_consecutiveLoudFrames = 0 Mic->>Actor: MicFrame (loud, +1) Mic->>Actor: MicFrame (loud, +1) Mic->>Actor: MicFrame (loud, +1) Mic->>Actor: MicFrame (loud, +1) — threshold hit Actor->>Actor: Self.Tell(BargeIn) Actor->>Out: KillSwitch.Shutdown() Actor->>Out: Source.Queue<string>.Complete() Actor->>Spk: Stop() (clears queue) Spk-->>Actor: PlaybackStopped Note over Actor: _outputActive = false<br/>INPUT graph keeps running —<br/>user's interrupting utterance<br/>flows through normally
Five non-obvious things I'd tell anyone wiring this on Akka.NET
StatefulSelectManyis not onFlowOperations. It'sSourceOperationsonly in 1.5. Stateful flows are customGraphStage<FlowShape<>>. Don't waste an hour searching for the operator.
Source.ActorRefdoes not supportOverflowStrategy.Backpressure. For real-time devices useSource.Queue<T>(N, OverflowStrategy.DropHead)— the audio thread cannot slow down, so dropping the oldest frame on overflow preserves recency. Trying to backpressure the soundcard is the wrong direction.
Sink.ActorRefhas no backpressure. UseSink.ActorRefWithAck(actor, init, ack, complete, fail). The actor mustSender.Tell(ack)on the init message and after every element; the sink drives the demand. Easy to miss until your actor mailbox is buried under mic frames.
- The materializer's actors share the default dispatcher. A stream stage that pokes the WPF UI directly will block the dispatcher pool. Marshal back to the dispatcher only at the terminal sink, never mid-graph.
- Per-call retry beats
RestartFlow.WithBackofffor voice. A transient 5xx on TTS shouldn't tear down a loaded model and a warm socket. A 3-attempt exponential retry inside the worker actor is cheaper and matches the user's perceived latency budget (5xx ↔ recovery happens in well under one utterance).
What's still on the table
The list of "deliberately deferred" items is a feature, not an apology — every one of them is documented in
harness/logs/tamer/ so the next session knows what was punted and why.- Reactor
IAsyncEnumerable<string>token stream. Right nowHandleReactorResultfiresSpeakText(r.FinalMessage)after the reactor finishes; the OUTPUT graph still chunks per sentence and overlaps TTS with playback, but the LLM has already ended its turn. Wiring the reactor's per-token feed directly into the graph is the next step — and the place where the seconds-of-latency win is.
BargeInDetectoras a child actor. Currently inline. The policy (4 consecutive loud frames at threshold) is good enough for a single user, single device. Voice-class detection or named-call recognition belongs in its own actor.
BufferedWaveProvider-based gap-less playback. SequentialWaveOutEventis the correct sequential ordering but leaves an audible seam between TTS chunks.
- Settings UI. The
UseStreamPipelineandStreamSttParallelismknobs are JSON-only today.
The Step 1 from Part 6 — embedding an open vLLM end-to-end — hasn't moved this week. Step 2 went first because it was the load-bearing piece for everything else: a streaming voice path with a clean control plane is the substrate the local vLLM eventually plugs into. ElevenLabs stays plugged in throughout, exactly as Part 6 promised.
Closing
The intent of Part 6 was to be honest about the road. The intent of this post is to be honest about the build: opt-in flag off, eleven TestKit cases, a feature flag because live mics on real devices haven't run end-to-end yet. There's no marketing line here. The thing works, the next thing is the reactor token stream, and the open vLLM is still waiting at the end of the road.
Notes go up next on what the live test actually shows.
- psmon
TECH LINKS
- 𝕏 @webnori