Observables
Virtufin.Data provides IObservable<T> adapters for three reactive data sources: NATS pub/sub, WebSocket frames, and Valkey keyspaces. All three implement auto-reconnect with exponential backoff.
The IObservable<T> contract
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
public interface IObserver<in T>
{
void OnNext(T value);
void OnError(Exception ex);
void OnCompleted();
}
The adapters in Virtufin.Data wrap an external stream and present it as an IObservable<T>. Subscribers use Subscribe(IObserver<T>) to receive values; unsubscribing disposes the underlying subscription.
NATS observable
using Virtufin.Data;
var nats = new NatsConnection("nats://localhost:4222");
var observable = new NatsObservable<string>(nats, "orders.>");
using var sub = observable.Subscribe(new ConsoleObserver<string>());
class ConsoleObserver<T> : IObserver<T>
{
public void OnNext(T value) => Console.WriteLine(value);
public void OnError(Exception ex) => Console.Error.WriteLine(ex);
public void OnCompleted() => Console.WriteLine("done");
}
The NatsObservable:
- Subscribes to subjects matching the pattern
- Re-subscribes on reconnect (the pattern, not the exact subject)
- Buffers the last N values for late subscribers (configurable)
WebSocket observable
var ws = new ClientWebSocket();
await ws.ConnectAsync(new Uri("wss://api.binance.com/ws/btcusdt@trade"), CancellationToken.None);
var observable = new WebSocketObservable<TradeEvent>(ws);
using var sub = observable.Subscribe(observer);
The WebSocketObservable:
- Reads frames from the WebSocket
- Parses each frame (configurable: JSON, FlatBuffers, raw)
- Reconnects on disconnect with exponential backoff
- Surfaces parse errors as OnError
Valkey observable
var mux = await ConnectionMultiplexer.ConnectAsync("localhost:6379");
var db = mux.GetDatabase();
var observable = new ValkeyObservable<DecimalAmount>(db, "btc-price");
using var sub = observable.Subscribe(observer);
The ValkeyObservable:
- Uses Valkey keyspace notifications (__keyspace@0__:) to watch for changes
- Re-subscribes on PSUB reconnect
- Decodes values (configurable: FlatBuffers, JSON, raw bytes)
Why IObservable<T>?
The .NET reactive extensions (System.Reactive) are the idiomatic way to express asynchronous streams. Every consumer of streaming data in the platform already uses IObservable<T> — the adapters integrate without glue code.
For more complex stream combinators (filter, merge, throttle), use System.Reactive:
var fastTrades = observable
.Where(t => t.Symbol == "BTCUSDT")
.Throttle(TimeSpan.FromMilliseconds(100))
.Select(t => t.Price);
Performance
- NATS: native protocol, single connection per subject pattern, no serialization overhead for
string/byte[]payloads. - WebSocket: one connection per
WebSocketObservable, frame-by-frame parsing. - Valkey: uses connection multiplexing (one
ConnectionMultiplexercan serve manyValkeyObservableinstances).
For very high-throughput paths, use the byte[] overload of each observable and parse downstream — that avoids intermediate allocations.
See also
- Architecture — the reactive layer's place in the project
- Variant Type — the payload type for all streams
- API Reference — full public API of the reactive types