---
title: "Reactive Queries"
description: "Automatically re-execute queries when underlying data changes with observable patterns."
canonical: "https://roomsharp.dev/docs/v0.5.4/reactive-queries"
source: "src/content/v0.5.4/reactive-queries.mdx"
---
# Reactive Queries & Change Tracking
RoomSharp includes a reactive query system that automatically re-executes queries when underlying data changes. The system is table-id based, transaction-aware, coalesced, and uses bounded per-subscriber delivery.
For production guidance on cache bounds, UI binding errors, WinForms lifetime, and refresh policy, see [Reactive Production Patterns](/docs/v0.5.4/reactive-production-patterns).
## Enabling Change Tracking
```csharp
using RoomSharp.Extensions;
using RoomSharp.Invalidation;
using RoomSharp.Reactive;
var db = RoomDatabase.Builder()
.UseSqlite("app.db")
.EnableChangeTracking(o =>
{
o.DispatchInterval = TimeSpan.FromMilliseconds(50);
o.DeliveryMode = DeliveryMode.LatestOnly;
})
.Build();
```
Once enabled, `Insert`, `Update`, and `Delete` operations automatically notify registered observers. The source generator emits the necessary invalidation calls in the generated DAO implementations.
---
## Observable Queries
The `Observe` extension method creates a query that automatically refreshes when watched tables change.
### Basic Usage
```csharp
using RoomSharp.Reactive;
// Get table ID (computed once at startup)
// Prefer the typed form when the watched table belongs to an entity.
var todosTableId = db.GetTableIdOrThrow();
// String lookup is also available when you only have the table name:
// var todosTableId = db.GetTableIdOrThrow("todos");
// Create an observable query watching the table
await using var query = db.Observe(
async ct => await db.TodoDao.GetAllAsync(),
todosTableId);
// Subscribe to receive updates
using var subscription = query.Subscribe(
onNext: todos => Console.WriteLine($"Count: {todos.Count}"),
onError: ex => Console.WriteLine($"Error: {ex.Message}")
);
// The subscription receives:
// 1. Initial query result immediately after subscribing
// 2. Updated results whenever Insert/Update/Delete affects the table
```
### Query Factory Overloads
```csharp
// Using table names (string overload - simpler)
await using var query = db.Observe(
async ct => await db.TodoDao.GetAllAsync(),
"todos");
// Watch multiple tables (string overload)
await using var query = db.Observe(
async ct => await db.GetTodosWithCategories(),
"todos", "categories");
// Using table IDs (int overload - slightly faster)
var todosId = db.GetTableIdOrThrow();
await using var query = db.Observe(async ct => await db.TodoDao.GetAllAsync(), todosId);
// Static lambda overload (avoids closure allocation)
await using var query = db.Observe, AppDatabase>(
static async (ct, db) => await db.TodoDao.GetAllAsync(),
"todos");
```
---
## Getting Initial Value
Use `GetValueAsync()` to await the first query result without subscribing:
```csharp
await using var query = db.Observe(
async ct => await db.TodoDao.GetAllAsync(),
"todos");
// Wait for initial data (replaces Task.Delay workarounds)
var initialTodos = await query.GetValueAsync();
Console.WriteLine($"Initial count: {initialTodos.Count}");
// Then subscribe for updates
using var sub = query.Subscribe(
onNext: todos => Console.WriteLine($"Updated: {todos.Count}"),
onError: ex => Console.WriteLine($"Error: {ex.Message}"));
```
---
## Query Operators
`IObservableQuery` supports fluent operators for stream processing:
```csharp
var query = db.Observe(async ct => await db.TodoDao.GetAllAsync(), "todos")
.DistinctUntilChanged() // Filter consecutive duplicates
.Debounce(TimeSpan.FromMilliseconds(300)) // Wait for quiet period
.Where(list => list.Count > 0); // Filter results
// Other operators:
// .Throttle(TimeSpan) - Limit emission rate
// .DistinctUntilChanged(keySelector) - Compare by key
// .Select(selector) - Transform results
```
### Debounce vs Throttle
```csharp
// Debounce: delays refresh until a quiet period
await using var debounced = db.Observe(ct => ..., todos)
.Debounce(TimeSpan.FromMilliseconds(100))
.DistinctUntilChanged();
// Throttle: limits refresh rate
await using var throttled = db.Observe(ct => ..., todos)
.Throttle(TimeSpan.FromMilliseconds(200))
.Where(list => list.Count > 0);
```
---
## IObservableQuery Interface
```csharp
public interface IObservableQuery : IAsyncDisposable
{
T? CurrentValue { get; }
bool IsStale { get; }
IDisposable Subscribe(Action onNext, Action onError);
ValueTask GetValueAsync(CancellationToken ct = default);
ValueTask RefreshAsync(CancellationToken ct = default);
IAsyncEnumerable ToAsyncEnumerable(CancellationToken ct = default);
}
```
| Member | Description |
|--------|-------------|
| `WatchedTables` | Tables this query monitors for changes |
| `IsStale` | Indicates pending refresh after a table change |
| `Subscribe(onNext, onError)` | Register callbacks for query results; returns disposable subscription |
| `GetValueAsync(ct)` | Manually fetch current value without subscribing |
| `Invalidate()` | Force re-execution of the query |
---
## DataSourceChanged (Lightweight Event)
For simple notification scenarios where you don't need reactive queries, RoomSharp provides a zero-allocation callback mechanism:
```csharp
// Register a lightweight callback for data changes
db.OnDataSourceChanged(info =>
{
Console.WriteLine($"Table {info.TableId}: {info.Operation} ({info.AffectedRows} rows)");
});
// Remove callback when no longer needed
db.OnDataSourceChanged(null);
```
### DataChangeInfo Structure
```csharp
public readonly struct DataChangeInfo
{
public readonly int TableId; // Table identifier
public readonly DataChangeOperation Operation; // Insert, Update, Delete, BulkInsert
public readonly int AffectedRows; // Number of affected rows
}
public enum DataChangeOperation : byte
{
Insert = 0,
Update = 1,
Delete = 2,
BulkInsert = 3
}
```
### Performance Characteristics
| Feature | Description |
|---------|-------------|
| **Zero-allocation** | Uses `struct` instead of `class` for event args |
| **Zero-cost when unused** | Inline null check with early return |
| **Immediate notification** | Called synchronously after each operation |
| **No dependencies** | Works without `EnableChangeTracking()` |
### Use Cases
- **Cache invalidation**: Clear in-memory caches when data changes
- **UI notifications**: Show toast messages or update badges
- **Audit logging**: Record all database operations
- **Sync triggers**: Start background sync when local data changes
### Comparison with Reactive Queries
| Feature | `OnDataSourceChanged` | `Observe()` + `Subscribe()` |
|---------|----------------------|----------------------------|
| Setup | Simple callback | Requires `EnableChangeTracking()` |
| Data access | TableId only | Re-executes query, returns data |
| Batching | Per-operation | Coalesced notifications |
| Allocation | Zero (struct) | Minimal (pooled) |
| Best for | Simple notifications | Data-bound UI updates |
---
## Manual Notifications
For raw SQL or external modifications:
```csharp
// Single table
db.NotifyTableChanged("todos");
// Multiple tables
db.NotifyTablesChanged("todos", "categories");
```
Use manual notifications when executing raw SQL via QueryExtensions or when external processes modify the database.
---
## Change Tracker Access
Direct access to the change tracker is available for advanced scenarios:
```csharp
var tracker = db.ChangeTracker;
// Check if a table is tracked
bool isTracked = tracker.IsTracked("todos");
// Get current version (increments on each change)
long version = tracker.GetVersion("todos");
// List all tracked tables
var tables = tracker.TrackedTables;
```
---
## Flush Invalidations
`RoomDatabase.FlushInvalidations()` forces a dispatcher tick (useful for deterministic tests).
```csharp
// In tests, force immediate processing
db.FlushInvalidations();
```
---
## Architecture Notes
- Queries are coalesced automatically to prevent duplicate refreshes
- In `Serialized` mode, background refreshes are scheduled through `BackgroundQueryScheduler`
- In `Parallel` mode, refreshes execute immediately with separate database sessions
- Notifications are deferred until after transaction commit
- Observers are held via weak references to allow garbage collection if not disposed
---
## Rx.NET Integration
For advanced reactive programming with operators like `Throttle`, `DistinctUntilChanged`, and `ObserveOn`, install the companion package:
```bash
dotnet add package RoomSharp.Reactive
```
This provides `ToObservable()` to convert `IObservableQuery` to `IObservable` and additional extensions for `TableChangeTracker`.
---
## QueryCache
`RoomSharp.Reactive` includes a single-flight TTL cache for reactive query results:
```csharp
using RoomSharp.Reactive;
var cache = new QueryCache(db, new QueryCacheOptions
{
DefaultTtl = TimeSpan.FromSeconds(30),
MaxEntries = 1_000,
FactoryCancellationMode = CacheFactoryCancellationMode.NeverCancelInFlight
});
var todosTable = db.GetTableIdOrThrow();
var key = db.BuildCacheKey>("select * from todos", parameters: null);
var cached = db.ObserveCached(
cache,
key,
ct => new ValueTask>(db.TodoDao.GetAllAsync()),
new QueryCacheEntryOptions
{
Ttl = TimeSpan.FromSeconds(10),
TableIds = new[] { todosTable }
},
options: null,
todosTable);
```
Use this for read-heavy reactive views where a short TTL is acceptable and duplicate refreshes should be collapsed. `NeverCancelInFlight` keeps a shared factory alive when one caller cancels, which is the safest default for UI screens. Use `IQueryCacheMetrics2` when you need eviction reasons, active entry counts, or factory timing.
---
## Computed Views
`ComputedView` derives a new reactive value from an existing observable query:
```csharp
var todosQuery = db.ObserveReactive(
ct => new ValueTask>(db.TodoDao.GetAllAsync()),
db.GetTableIdOrThrow());
var stats = ComputedView.Combine(
todosQuery,
todos => new TodoStats(todos.Count),
new ComputedViewOptions
{
DebounceInterval = TimeSpan.FromMilliseconds(200)
});
using var sub = stats.Subscribe(
value => UpdateStats(value),
error => Log(error));
```
---
## UI Collection Binding
For WPF/WinUI-style collections:
```csharp
using RoomSharp.Reactive;
using System.Collections.ObjectModel;
var collection = new ReactiveObservableCollection();
var ui = SynchronizationContext.Current!;
using var sub = query
.AsObservable()
.BindToObservableCollection(
collection,
new ReactiveCollectionBindingOptions
{
Dispatch = action => ui.Post(_ => action(), null),
KeySelector = t => t.Id,
UpdateMode = ReactiveCollectionUpdateMode.Reset,
MergeExisting = (existing, incoming) =>
{
existing.Title = incoming.Title;
existing.IsDone = incoming.IsDone;
},
OnError = error => Log(error)
});
```
`ReactiveObservableCollection` can replace or merge a full result set with one reset notification. For small lists, normal `ObservableCollection` is still fine.
For WinForms, install the WinForms companion package:
```bash
dotnet add package RoomSharp.Reactive.WinForms
```
```csharp
using RoomSharp.Reactive;
using RoomSharp.Reactive.WinForms;
using var sub = query
.AsObservable()
.BindToDataGridView(
dataGridView1,
new WinFormsBindingOptions
{
KeySelector = t => t.Id,
UpdateMode = ReactiveCollectionUpdateMode.MergeByKey,
PreserveCurrentRow = true,
PreserveScrollPosition = true,
OnError = error => Log(error)
});
```