--- title: "Reactive Queries" description: "Automatically re-execute queries when underlying data changes with observable patterns." canonical: "https://roomsharp.dev/docs/v0.5.4/reactive-queries" source: "src/content/v0.5.4/reactive-queries.mdx" --- # Reactive Queries & Change Tracking RoomSharp includes a reactive query system that automatically re-executes queries when underlying data changes. The system is table-id based, transaction-aware, coalesced, and uses bounded per-subscriber delivery. For production guidance on cache bounds, UI binding errors, WinForms lifetime, and refresh policy, see [Reactive Production Patterns](/docs/v0.5.4/reactive-production-patterns). ## Enabling Change Tracking ```csharp using RoomSharp.Extensions; using RoomSharp.Invalidation; using RoomSharp.Reactive; var db = RoomDatabase.Builder() .UseSqlite("app.db") .EnableChangeTracking(o => { o.DispatchInterval = TimeSpan.FromMilliseconds(50); o.DeliveryMode = DeliveryMode.LatestOnly; }) .Build(); ``` Once enabled, `Insert`, `Update`, and `Delete` operations automatically notify registered observers. The source generator emits the necessary invalidation calls in the generated DAO implementations. --- ## Observable Queries The `Observe` extension method creates a query that automatically refreshes when watched tables change. ### Basic Usage ```csharp using RoomSharp.Reactive; // Get table ID (computed once at startup) // Prefer the typed form when the watched table belongs to an entity. var todosTableId = db.GetTableIdOrThrow(); // String lookup is also available when you only have the table name: // var todosTableId = db.GetTableIdOrThrow("todos"); // Create an observable query watching the table await using var query = db.Observe( async ct => await db.TodoDao.GetAllAsync(), todosTableId); // Subscribe to receive updates using var subscription = query.Subscribe( onNext: todos => Console.WriteLine($"Count: {todos.Count}"), onError: ex => Console.WriteLine($"Error: {ex.Message}") ); // The subscription receives: // 1. Initial query result immediately after subscribing // 2. Updated results whenever Insert/Update/Delete affects the table ``` ### Query Factory Overloads ```csharp // Using table names (string overload - simpler) await using var query = db.Observe( async ct => await db.TodoDao.GetAllAsync(), "todos"); // Watch multiple tables (string overload) await using var query = db.Observe( async ct => await db.GetTodosWithCategories(), "todos", "categories"); // Using table IDs (int overload - slightly faster) var todosId = db.GetTableIdOrThrow(); await using var query = db.Observe(async ct => await db.TodoDao.GetAllAsync(), todosId); // Static lambda overload (avoids closure allocation) await using var query = db.Observe, AppDatabase>( static async (ct, db) => await db.TodoDao.GetAllAsync(), "todos"); ``` --- ## Getting Initial Value Use `GetValueAsync()` to await the first query result without subscribing: ```csharp await using var query = db.Observe( async ct => await db.TodoDao.GetAllAsync(), "todos"); // Wait for initial data (replaces Task.Delay workarounds) var initialTodos = await query.GetValueAsync(); Console.WriteLine($"Initial count: {initialTodos.Count}"); // Then subscribe for updates using var sub = query.Subscribe( onNext: todos => Console.WriteLine($"Updated: {todos.Count}"), onError: ex => Console.WriteLine($"Error: {ex.Message}")); ``` --- ## Query Operators `IObservableQuery` supports fluent operators for stream processing: ```csharp var query = db.Observe(async ct => await db.TodoDao.GetAllAsync(), "todos") .DistinctUntilChanged() // Filter consecutive duplicates .Debounce(TimeSpan.FromMilliseconds(300)) // Wait for quiet period .Where(list => list.Count > 0); // Filter results // Other operators: // .Throttle(TimeSpan) - Limit emission rate // .DistinctUntilChanged(keySelector) - Compare by key // .Select(selector) - Transform results ``` ### Debounce vs Throttle ```csharp // Debounce: delays refresh until a quiet period await using var debounced = db.Observe(ct => ..., todos) .Debounce(TimeSpan.FromMilliseconds(100)) .DistinctUntilChanged(); // Throttle: limits refresh rate await using var throttled = db.Observe(ct => ..., todos) .Throttle(TimeSpan.FromMilliseconds(200)) .Where(list => list.Count > 0); ``` --- ## IObservableQuery Interface ```csharp public interface IObservableQuery : IAsyncDisposable { T? CurrentValue { get; } bool IsStale { get; } IDisposable Subscribe(Action onNext, Action onError); ValueTask GetValueAsync(CancellationToken ct = default); ValueTask RefreshAsync(CancellationToken ct = default); IAsyncEnumerable ToAsyncEnumerable(CancellationToken ct = default); } ``` | Member | Description | |--------|-------------| | `WatchedTables` | Tables this query monitors for changes | | `IsStale` | Indicates pending refresh after a table change | | `Subscribe(onNext, onError)` | Register callbacks for query results; returns disposable subscription | | `GetValueAsync(ct)` | Manually fetch current value without subscribing | | `Invalidate()` | Force re-execution of the query | --- ## DataSourceChanged (Lightweight Event) For simple notification scenarios where you don't need reactive queries, RoomSharp provides a zero-allocation callback mechanism: ```csharp // Register a lightweight callback for data changes db.OnDataSourceChanged(info => { Console.WriteLine($"Table {info.TableId}: {info.Operation} ({info.AffectedRows} rows)"); }); // Remove callback when no longer needed db.OnDataSourceChanged(null); ``` ### DataChangeInfo Structure ```csharp public readonly struct DataChangeInfo { public readonly int TableId; // Table identifier public readonly DataChangeOperation Operation; // Insert, Update, Delete, BulkInsert public readonly int AffectedRows; // Number of affected rows } public enum DataChangeOperation : byte { Insert = 0, Update = 1, Delete = 2, BulkInsert = 3 } ``` ### Performance Characteristics | Feature | Description | |---------|-------------| | **Zero-allocation** | Uses `struct` instead of `class` for event args | | **Zero-cost when unused** | Inline null check with early return | | **Immediate notification** | Called synchronously after each operation | | **No dependencies** | Works without `EnableChangeTracking()` | ### Use Cases - **Cache invalidation**: Clear in-memory caches when data changes - **UI notifications**: Show toast messages or update badges - **Audit logging**: Record all database operations - **Sync triggers**: Start background sync when local data changes ### Comparison with Reactive Queries | Feature | `OnDataSourceChanged` | `Observe()` + `Subscribe()` | |---------|----------------------|----------------------------| | Setup | Simple callback | Requires `EnableChangeTracking()` | | Data access | TableId only | Re-executes query, returns data | | Batching | Per-operation | Coalesced notifications | | Allocation | Zero (struct) | Minimal (pooled) | | Best for | Simple notifications | Data-bound UI updates | --- ## Manual Notifications For raw SQL or external modifications: ```csharp // Single table db.NotifyTableChanged("todos"); // Multiple tables db.NotifyTablesChanged("todos", "categories"); ``` Use manual notifications when executing raw SQL via QueryExtensions or when external processes modify the database. --- ## Change Tracker Access Direct access to the change tracker is available for advanced scenarios: ```csharp var tracker = db.ChangeTracker; // Check if a table is tracked bool isTracked = tracker.IsTracked("todos"); // Get current version (increments on each change) long version = tracker.GetVersion("todos"); // List all tracked tables var tables = tracker.TrackedTables; ``` --- ## Flush Invalidations `RoomDatabase.FlushInvalidations()` forces a dispatcher tick (useful for deterministic tests). ```csharp // In tests, force immediate processing db.FlushInvalidations(); ``` --- ## Architecture Notes - Queries are coalesced automatically to prevent duplicate refreshes - In `Serialized` mode, background refreshes are scheduled through `BackgroundQueryScheduler` - In `Parallel` mode, refreshes execute immediately with separate database sessions - Notifications are deferred until after transaction commit - Observers are held via weak references to allow garbage collection if not disposed --- ## Rx.NET Integration For advanced reactive programming with operators like `Throttle`, `DistinctUntilChanged`, and `ObserveOn`, install the companion package: ```bash dotnet add package RoomSharp.Reactive ``` This provides `ToObservable()` to convert `IObservableQuery` to `IObservable` and additional extensions for `TableChangeTracker`. --- ## QueryCache `RoomSharp.Reactive` includes a single-flight TTL cache for reactive query results: ```csharp using RoomSharp.Reactive; var cache = new QueryCache(db, new QueryCacheOptions { DefaultTtl = TimeSpan.FromSeconds(30), MaxEntries = 1_000, FactoryCancellationMode = CacheFactoryCancellationMode.NeverCancelInFlight }); var todosTable = db.GetTableIdOrThrow(); var key = db.BuildCacheKey>("select * from todos", parameters: null); var cached = db.ObserveCached( cache, key, ct => new ValueTask>(db.TodoDao.GetAllAsync()), new QueryCacheEntryOptions { Ttl = TimeSpan.FromSeconds(10), TableIds = new[] { todosTable } }, options: null, todosTable); ``` Use this for read-heavy reactive views where a short TTL is acceptable and duplicate refreshes should be collapsed. `NeverCancelInFlight` keeps a shared factory alive when one caller cancels, which is the safest default for UI screens. Use `IQueryCacheMetrics2` when you need eviction reasons, active entry counts, or factory timing. --- ## Computed Views `ComputedView` derives a new reactive value from an existing observable query: ```csharp var todosQuery = db.ObserveReactive( ct => new ValueTask>(db.TodoDao.GetAllAsync()), db.GetTableIdOrThrow()); var stats = ComputedView.Combine( todosQuery, todos => new TodoStats(todos.Count), new ComputedViewOptions { DebounceInterval = TimeSpan.FromMilliseconds(200) }); using var sub = stats.Subscribe( value => UpdateStats(value), error => Log(error)); ``` --- ## UI Collection Binding For WPF/WinUI-style collections: ```csharp using RoomSharp.Reactive; using System.Collections.ObjectModel; var collection = new ReactiveObservableCollection(); var ui = SynchronizationContext.Current!; using var sub = query .AsObservable() .BindToObservableCollection( collection, new ReactiveCollectionBindingOptions { Dispatch = action => ui.Post(_ => action(), null), KeySelector = t => t.Id, UpdateMode = ReactiveCollectionUpdateMode.Reset, MergeExisting = (existing, incoming) => { existing.Title = incoming.Title; existing.IsDone = incoming.IsDone; }, OnError = error => Log(error) }); ``` `ReactiveObservableCollection` can replace or merge a full result set with one reset notification. For small lists, normal `ObservableCollection` is still fine. For WinForms, install the WinForms companion package: ```bash dotnet add package RoomSharp.Reactive.WinForms ``` ```csharp using RoomSharp.Reactive; using RoomSharp.Reactive.WinForms; using var sub = query .AsObservable() .BindToDataGridView( dataGridView1, new WinFormsBindingOptions { KeySelector = t => t.Id, UpdateMode = ReactiveCollectionUpdateMode.MergeByKey, PreserveCurrentRow = true, PreserveScrollPosition = true, OnError = error => Log(error) }); ```