--- title: "Reactive Queries" description: "Automatically re-execute queries when underlying data changes with observable patterns." canonical: "https://roomsharp.dev/docs/v0.5.4/reactive-queries" source: "src/content/v0.5.4/reactive-queries.mdx" --- # Reactive Queries & Change Tracking RoomSharp includes a reactive query system that automatically re-executes queries when underlying data changes. The system is table-id based, transaction-aware, coalesced, and uses bounded per-subscriber delivery. ## Enabling Change Tracking ```csharp using RoomSharp.Extensions; using RoomSharp.Invalidation; using RoomSharp.Reactive; var db = RoomDatabase.Builder() .UseSqlite("app.db") .EnableChangeTracking(o => { o.DispatchInterval = TimeSpan.FromMilliseconds(50); o.DeliveryMode = DeliveryMode.LatestOnly; }) .Build(); ``` Once enabled, `Insert`, `Update`, and `Delete` operations automatically notify registered observers. The source generator emits the necessary invalidation calls in the generated DAO implementations. --- ## Observable Queries The `Observe` extension method creates a query that automatically refreshes when watched tables change. ### Basic Usage ```csharp using RoomSharp.Reactive; // Get table ID (computed once at startup) var todosTableId = db.GetTableIdOrThrow("todos"); // Create an observable query watching the table await using var query = db.Observe( async ct => await db.TodoDao.GetAllAsync(), todosTableId); // Subscribe to receive updates using var subscription = query.Subscribe( onNext: todos => Console.WriteLine($"Count: {todos.Count}"), onError: ex => Console.WriteLine($"Error: {ex.Message}") ); // The subscription receives: // 1. Initial query result immediately after subscribing // 2. Updated results whenever Insert/Update/Delete affects the table ``` ### Query Factory Overloads ```csharp // Using table names (string overload - simpler) await using var query = db.Observe( async ct => await db.TodoDao.GetAllAsync(), "todos"); // Watch multiple tables (string overload) await using var query = db.Observe( async ct => await db.GetTodosWithCategories(), "todos", "categories"); // Using table IDs (int overload - slightly faster) var todosId = db.GetTableIdOrThrow("todos"); await using var query = db.Observe(async ct => await db.TodoDao.GetAllAsync(), todosId); // Static lambda overload (avoids closure allocation) await using var query = db.Observe, AppDatabase>( static async (ct, db) => await db.TodoDao.GetAllAsync(), "todos"); ``` --- ## Getting Initial Value Use `GetValueAsync()` to await the first query result without subscribing: ```csharp await using var query = db.Observe( async ct => await db.TodoDao.GetAllAsync(), "todos"); // Wait for initial data (replaces Task.Delay workarounds) var initialTodos = await query.GetValueAsync(); Console.WriteLine($"Initial count: {initialTodos.Count}"); // Then subscribe for updates using var sub = query.Subscribe( onNext: todos => Console.WriteLine($"Updated: {todos.Count}"), onError: ex => Console.WriteLine($"Error: {ex.Message}")); ``` --- ## Query Operators `IObservableQuery` supports fluent operators for stream processing: ```csharp var query = db.Observe(async ct => await db.TodoDao.GetAllAsync(), "todos") .DistinctUntilChanged() // Filter consecutive duplicates .Debounce(TimeSpan.FromMilliseconds(300)) // Wait for quiet period .Where(list => list.Count > 0); // Filter results // Other operators: // .Throttle(TimeSpan) - Limit emission rate // .DistinctUntilChanged(keySelector) - Compare by key // .Select(selector) - Transform results ``` ### Debounce vs Throttle ```csharp // Debounce: delays refresh until a quiet period await using var debounced = db.Observe(ct => ..., todos) .Debounce(TimeSpan.FromMilliseconds(100)) .DistinctUntilChanged(); // Throttle: limits refresh rate await using var throttled = db.Observe(ct => ..., todos) .Throttle(TimeSpan.FromMilliseconds(200)) .Where(list => list.Count > 0); ``` --- ## IObservableQuery Interface ```csharp public interface IObservableQuery : IAsyncDisposable { T? CurrentValue { get; } bool IsStale { get; } IDisposable Subscribe(Action onNext, Action onError); ValueTask GetValueAsync(CancellationToken ct = default); ValueTask RefreshAsync(CancellationToken ct = default); IAsyncEnumerable ToAsyncEnumerable(CancellationToken ct = default); } ``` | Member | Description | |--------|-------------| | `WatchedTables` | Tables this query monitors for changes | | `IsStale` | Indicates pending refresh after a table change | | `Subscribe(onNext, onError)` | Register callbacks for query results; returns disposable subscription | | `GetValueAsync(ct)` | Manually fetch current value without subscribing | | `Invalidate()` | Force re-execution of the query | --- ## DataSourceChanged (Lightweight Event) For simple notification scenarios where you don't need reactive queries, RoomSharp provides a zero-allocation callback mechanism: ```csharp // Register a lightweight callback for data changes db.OnDataSourceChanged(info => { Console.WriteLine($"Table {info.TableId}: {info.Operation} ({info.AffectedRows} rows)"); }); // Remove callback when no longer needed db.OnDataSourceChanged(null); ``` ### DataChangeInfo Structure ```csharp public readonly struct DataChangeInfo { public readonly int TableId; // Table identifier public readonly DataChangeOperation Operation; // Insert, Update, Delete, BulkInsert public readonly int AffectedRows; // Number of affected rows } public enum DataChangeOperation : byte { Insert = 0, Update = 1, Delete = 2, BulkInsert = 3 } ``` ### Performance Characteristics | Feature | Description | |---------|-------------| | **Zero-allocation** | Uses `struct` instead of `class` for event args | | **Zero-cost when unused** | Inline null check with early return | | **Immediate notification** | Called synchronously after each operation | | **No dependencies** | Works without `EnableChangeTracking()` | ### Use Cases - **Cache invalidation**: Clear in-memory caches when data changes - **UI notifications**: Show toast messages or update badges - **Audit logging**: Record all database operations - **Sync triggers**: Start background sync when local data changes ### Comparison with Reactive Queries | Feature | `OnDataSourceChanged` | `Observe()` + `Subscribe()` | |---------|----------------------|----------------------------| | Setup | Simple callback | Requires `EnableChangeTracking()` | | Data access | TableId only | Re-executes query, returns data | | Batching | Per-operation | Coalesced notifications | | Allocation | Zero (struct) | Minimal (pooled) | | Best for | Simple notifications | Data-bound UI updates | --- ## Manual Notifications For raw SQL or external modifications: ```csharp // Single table db.NotifyTableChanged("todos"); // Multiple tables db.NotifyTablesChanged("todos", "categories"); ``` Use manual notifications when executing raw SQL via QueryExtensions or when external processes modify the database. --- ## Change Tracker Access Direct access to the change tracker is available for advanced scenarios: ```csharp var tracker = db.ChangeTracker; // Check if a table is tracked bool isTracked = tracker.IsTracked("todos"); // Get current version (increments on each change) long version = tracker.GetVersion("todos"); // List all tracked tables var tables = tracker.TrackedTables; ``` --- ## Flush Invalidations `RoomDatabase.FlushInvalidations()` forces a dispatcher tick (useful for deterministic tests). ```csharp // In tests, force immediate processing db.FlushInvalidations(); ``` --- ## Architecture Notes - Queries are coalesced automatically to prevent duplicate refreshes - In `Serialized` mode, background refreshes are scheduled through `BackgroundQueryScheduler` - In `Parallel` mode, refreshes execute immediately with separate database sessions - Notifications are deferred until after transaction commit - Observers are held via weak references to allow garbage collection if not disposed --- ## Rx.NET Integration For advanced reactive programming with operators like `Throttle`, `DistinctUntilChanged`, and `ObserveOn`, install the companion package: ```bash dotnet add package RoomSharp.Reactive ``` This provides `ToObservable()` to convert `IObservableQuery` to `IObservable` and additional extensions for `TableChangeTracker`. --- ## QueryCache `RoomSharp.Reactive` includes a single-flight TTL cache for reactive query results: ```csharp using RoomSharp.Reactive; var cache = new QueryCache(db); var key = db.BuildCacheKey>("select * from todos", parameters: null); var cached = db.ObserveCached( cache, key, ct => new ValueTask>(db.TodoDao.GetAllAsync()), new QueryCacheEntryOptions { Ttl = TimeSpan.FromSeconds(10), TableIds = new[] { db.GetTableIdOrThrow("todos") } }, options: null, db.GetTableIdOrThrow("todos")); ``` Use this for read-heavy reactive views where a short TTL is acceptable and duplicate refreshes should be collapsed. --- ## Computed Views `ComputedView` derives a new reactive value from an existing observable query: ```csharp var todosQuery = db.ObserveReactive( ct => new ValueTask>(db.TodoDao.GetAllAsync()), db.GetTableIdOrThrow("todos")); var stats = ComputedView.Combine( todosQuery, todos => new TodoStats(todos.Count), new ComputedViewOptions { DebounceInterval = TimeSpan.FromMilliseconds(200) }); using var sub = stats.Subscribe( value => UpdateStats(value), error => Log(error)); ``` --- ## UI Collection Binding For WPF/WinUI-style collections: ```csharp using RoomSharp.Reactive; using System.Collections.ObjectModel; var collection = new ObservableCollection(); using var sub = query .AsObservable() .BindToObservableCollection( collection, SynchronizationContext.Current, keySelector: t => t.Id, mergeExisting: (existing, incoming) => { existing.Title = incoming.Title; existing.IsDone = incoming.IsDone; }); ``` For WinForms, install the WinForms companion package: ```bash dotnet add package RoomSharp.Reactive.WinForms ``` ```csharp using RoomSharp.Reactive.WinForms; using var sub = query .AsObservable() .BindToDataGridView( dataGridView1, keySelector: t => t.Id); ```