Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save papyr/b5c9b3c241096a3cab7cfbf9b28a2a7c to your computer and use it in GitHub Desktop.

Select an option

Save papyr/b5c9b3c241096a3cab7cfbf9b28a2a7c to your computer and use it in GitHub Desktop.
Observable sql data
public static class SqlObservations
{
public static IObservable<IEnumerable<IDataRecord>> MonitorChanges(string connectionString, string sql)
{
/*
1. read data from database and return results
2. monitor changes
3. when there is a change, repeat the whole process
*/
return Observable.Create<IEnumerable<IDataRecord>>(observer =>
{
var disposables = new CompositeDisposable();
using (var connection = new SqlConnection(ConfigurationManager.ConnectionStrings[connectionString].ConnectionString))
{
connection.Open();
using (var command = new SqlCommand(sql, connection))
{
command.Notification = null;
var dependency = new SqlDependency(command);
//when there is an update in the database, invalidate the entire query
var nextChange = Observable.FromEventPattern<OnChangeEventHandler, SqlNotificationEventArgs>(
handler => dependency.OnChange += handler,
handler => dependency.OnChange -= handler)
.Take(1)
.Subscribe(_ => observer.OnCompleted());
disposables.Add(nextChange);
if (connection.State == ConnectionState.Closed)
connection.Open();
using (var reader = command.ExecuteReader())
{
//Fire next observable
observer.OnNext(reader.Cast<IDataRecord>());
}
}
}
return disposables;
})
//When the sequence completes, reload everything again
.Repeat();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment