Skip to content

Observables

Virtufin.Data provides IObservable<T> adapters for three reactive data sources: NATS pub/sub, WebSocket frames, and Valkey keyspaces. All three implement auto-reconnect with exponential backoff.

The IObservable<T> contract

public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

public interface IObserver<in T>
{
    void OnNext(T value);
    void OnError(Exception ex);
    void OnCompleted();
}

The adapters in Virtufin.Data wrap an external stream and present it as an IObservable<T>. Subscribers use Subscribe(IObserver<T>) to receive values; unsubscribing disposes the underlying subscription.

NATS observable

using Virtufin.Data;

var nats = new NatsConnection("nats://localhost:4222");
var observable = new NatsObservable<string>(nats, "orders.>");

using var sub = observable.Subscribe(new ConsoleObserver<string>());

class ConsoleObserver<T> : IObserver<T>
{
    public void OnNext(T value) => Console.WriteLine(value);
    public void OnError(Exception ex) => Console.Error.WriteLine(ex);
    public void OnCompleted() => Console.WriteLine("done");
}

The NatsObservable: - Subscribes to subjects matching the pattern - Re-subscribes on reconnect (the pattern, not the exact subject) - Buffers the last N values for late subscribers (configurable)

WebSocket observable

var ws = new ClientWebSocket();
await ws.ConnectAsync(new Uri("wss://api.binance.com/ws/btcusdt@trade"), CancellationToken.None);
var observable = new WebSocketObservable<TradeEvent>(ws);

using var sub = observable.Subscribe(observer);

The WebSocketObservable: - Reads frames from the WebSocket - Parses each frame (configurable: JSON, FlatBuffers, raw) - Reconnects on disconnect with exponential backoff - Surfaces parse errors as OnError

Valkey observable

var mux = await ConnectionMultiplexer.ConnectAsync("localhost:6379");
var db = mux.GetDatabase();
var observable = new ValkeyObservable<DecimalAmount>(db, "btc-price");

using var sub = observable.Subscribe(observer);

The ValkeyObservable: - Uses Valkey keyspace notifications (__keyspace@0__:) to watch for changes - Re-subscribes on PSUB reconnect - Decodes values (configurable: FlatBuffers, JSON, raw bytes)

Why IObservable<T>?

The .NET reactive extensions (System.Reactive) are the idiomatic way to express asynchronous streams. Every consumer of streaming data in the platform already uses IObservable<T> — the adapters integrate without glue code.

For more complex stream combinators (filter, merge, throttle), use System.Reactive:

var fastTrades = observable
    .Where(t => t.Symbol == "BTCUSDT")
    .Throttle(TimeSpan.FromMilliseconds(100))
    .Select(t => t.Price);

Performance

  • NATS: native protocol, single connection per subject pattern, no serialization overhead for string/byte[] payloads.
  • WebSocket: one connection per WebSocketObservable, frame-by-frame parsing.
  • Valkey: uses connection multiplexing (one ConnectionMultiplexer can serve many ValkeyObservable instances).

For very high-throughput paths, use the byte[] overload of each observable and parse downstream — that avoids intermediate allocations.

See also