Skip to content

Instantly share code, notes, and snippets.

@canton7
Last active October 17, 2025 15:16
Show Gist options
  • Select an option

  • Save canton7/c4cd3abf0d0869a11535eeebfdb45c76 to your computer and use it in GitHub Desktop.

Select an option

Save canton7/c4cd3abf0d0869a11535eeebfdb45c76 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Threading;
public delegate void ObservablePropertySubscriber<in T>(T oldValue, T newValue);
public interface IObservableProperty<out T>
{
/// <summary>
/// The current value of the observable property
/// </summary>
T Value { get; }
/// <summary>
/// Subscribe to changes from this property
/// </summary>
/// <remarks>
/// <paramref name="subscriber"/> will be called on a captured <see cref="SynchronizationContext"/>
/// </remarks>
void Subscribe(ObservablePropertySubscriber<T> subscriber, CancellationToken token);
/// <summary>
/// Subscribe to changes from this property
/// </summary>
/// <remarks>
/// <paramref name="subscriber"/> will NOT be called on a captured <see cref="SynchronizationContext"/>
/// </remarks>
void SubscribeSynchronously(ObservablePropertySubscriber<T> subscriber, CancellationToken token);
/// <summary>
/// Subscribe to changes from this property. <paramref name="subscriber"/> will be called immediately with the current value
/// </summary>
/// <remarks>
/// <paramref name="subscriber"/> will be called on a captured <see cref="SynchronizationContext"/>
/// </remarks>
void SubscribeAndNotify(ObservablePropertySubscriber<T> subscriber, CancellationToken token);
}
public class ObservableProperty<T> : PublisherBase<ObservablePropertySubscriber<T>>, IObservableProperty<T>
{
private readonly object _valueLock = new();
private readonly IEqualityComparer<T> _comparer;
private readonly T _initialValue;
private T _value;
public T Value
{
get
{
lock (_valueLock)
{
return _value;
}
}
}
public ObservableProperty(T initialValue, IEqualityComparer<T>? comparer = null)
{
_comparer = comparer ?? EqualityComparer<T>.Default;
_initialValue = initialValue;
_value = initialValue;
}
public void SubscribeAndNotify(ObservablePropertySubscriber<T> subscriber, CancellationToken token)
{
AddSubscriber(subscriber, SynchronizationContext.Current ?? new SynchronizationContext(), token);
if (!token.IsCancellationRequested)
{
T value;
lock (_valueLock)
{
value = _value;
}
subscriber(_initialValue, value);
}
}
public void Publish(T newValue)
{
T oldValue;
lock (_valueLock)
{
oldValue = _value;
if (_comparer.Equals(oldValue, newValue))
{
return;
}
_value = newValue;
}
Publish(x => x.Invoke(oldValue, newValue));
}
public void UpdateValue(Func<T, T> updater)
{
// We're calling upwards within a lock! The caller must be careful to limit the scope of this
T oldValue;
T newValue;
lock (_valueLock)
{
oldValue = _value;
newValue = updater(_value);
if (_comparer.Equals(oldValue, newValue))
{
return;
}
_value = newValue;
}
Publish(x => x.Invoke(oldValue, newValue));
}
private class Subscriber
{
public ObservablePropertySubscriber<T> Delegate { get; init; } = null!;
public SynchronizationContext SynchronizationContext { get; init; } = null!;
}
}
using System;
using System.Collections.Immutable;
using System.Threading;
public interface IPublisher<T>
{
/// <summary>
/// Subscribe to changes from this property
/// </summary>
/// <remarks>
/// <paramref name="subscriber"/> will be called on a captured <see cref="SynchronizationContext"/>
/// </remarks>
void Subscribe(Action<T> subscriber, CancellationToken token);
/// <summary>
/// Subscribe to changes from this property
/// </summary>
/// <remarks>
/// <paramref name="subscriber"/> will NOT be called on a captured <see cref="SynchronizationContext"/>
/// </remarks>
void SubscribeSynchronously(Action<T> subscriber, CancellationToken token);
}
public class Publisher<T> : PublisherBase<Action<T>>, IPublisher<T>
{
internal void Publish(T value) => Publish(x => x(value));
}
public abstract class PublisherBase<TDelegate> where TDelegate : Delegate
{
private ImmutableList<Subscriber> _subscribers = [];
public void Subscribe(TDelegate subscriber, CancellationToken token) =>
AddSubscriber(subscriber, SynchronizationContext.Current ?? new SynchronizationContext(), token);
public void SubscribeSynchronously(TDelegate subscriber, CancellationToken token) =>
AddSubscriber(subscriber, null, token);
protected void AddSubscriber(TDelegate subscriber, SynchronizationContext? synchronizationContext, CancellationToken token)
{
ImmutableList<Subscriber> newSubscribers;
var newSubscriber = new Subscriber()
{
Delegate = subscriber,
SynchronizationContext = synchronizationContext,
};
do
{
newSubscribers = _subscribers;
}
while (Interlocked.CompareExchange(ref _subscribers, newSubscribers.Add(newSubscriber), newSubscribers) != newSubscribers);
token.Register(() => RemoveSubscriber(newSubscriber));
// If it was already cancelled, or was cancelled while we were subscribing, unsub them
if (token.IsCancellationRequested)
{
RemoveSubscriber(newSubscriber);
}
}
private void RemoveSubscriber(Subscriber subscriber)
{
ImmutableList<Subscriber> newSubscribers;
do
{
newSubscribers = _subscribers;
}
while (Interlocked.CompareExchange(ref _subscribers, newSubscribers.Remove(subscriber), newSubscribers) != newSubscribers);
}
protected void Publish(Action<TDelegate> invoker)
{
foreach (var subscriber in _subscribers)
{
if (subscriber.SynchronizationContext != null)
{
subscriber.SynchronizationContext.Post(x => invoker((TDelegate)x!), subscriber.Delegate);
}
else
{
invoker(subscriber.Delegate);
}
}
}
private class Subscriber
{
public TDelegate Delegate { get; init; } = null!;
public SynchronizationContext? SynchronizationContext { get; init; }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment