Last active
October 17, 2025 15:16
-
-
Save canton7/c4cd3abf0d0869a11535eeebfdb45c76 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Generic; | |
| using System.Threading; | |
| public delegate void ObservablePropertySubscriber<in T>(T oldValue, T newValue); | |
| public interface IObservableProperty<out T> | |
| { | |
| /// <summary> | |
| /// The current value of the observable property | |
| /// </summary> | |
| T Value { get; } | |
| /// <summary> | |
| /// Subscribe to changes from this property | |
| /// </summary> | |
| /// <remarks> | |
| /// <paramref name="subscriber"/> will be called on a captured <see cref="SynchronizationContext"/> | |
| /// </remarks> | |
| void Subscribe(ObservablePropertySubscriber<T> subscriber, CancellationToken token); | |
| /// <summary> | |
| /// Subscribe to changes from this property | |
| /// </summary> | |
| /// <remarks> | |
| /// <paramref name="subscriber"/> will NOT be called on a captured <see cref="SynchronizationContext"/> | |
| /// </remarks> | |
| void SubscribeSynchronously(ObservablePropertySubscriber<T> subscriber, CancellationToken token); | |
| /// <summary> | |
| /// Subscribe to changes from this property. <paramref name="subscriber"/> will be called immediately with the current value | |
| /// </summary> | |
| /// <remarks> | |
| /// <paramref name="subscriber"/> will be called on a captured <see cref="SynchronizationContext"/> | |
| /// </remarks> | |
| void SubscribeAndNotify(ObservablePropertySubscriber<T> subscriber, CancellationToken token); | |
| } | |
| public class ObservableProperty<T> : PublisherBase<ObservablePropertySubscriber<T>>, IObservableProperty<T> | |
| { | |
| private readonly object _valueLock = new(); | |
| private readonly IEqualityComparer<T> _comparer; | |
| private readonly T _initialValue; | |
| private T _value; | |
| public T Value | |
| { | |
| get | |
| { | |
| lock (_valueLock) | |
| { | |
| return _value; | |
| } | |
| } | |
| } | |
| public ObservableProperty(T initialValue, IEqualityComparer<T>? comparer = null) | |
| { | |
| _comparer = comparer ?? EqualityComparer<T>.Default; | |
| _initialValue = initialValue; | |
| _value = initialValue; | |
| } | |
| public void SubscribeAndNotify(ObservablePropertySubscriber<T> subscriber, CancellationToken token) | |
| { | |
| AddSubscriber(subscriber, SynchronizationContext.Current ?? new SynchronizationContext(), token); | |
| if (!token.IsCancellationRequested) | |
| { | |
| T value; | |
| lock (_valueLock) | |
| { | |
| value = _value; | |
| } | |
| subscriber(_initialValue, value); | |
| } | |
| } | |
| public void Publish(T newValue) | |
| { | |
| T oldValue; | |
| lock (_valueLock) | |
| { | |
| oldValue = _value; | |
| if (_comparer.Equals(oldValue, newValue)) | |
| { | |
| return; | |
| } | |
| _value = newValue; | |
| } | |
| Publish(x => x.Invoke(oldValue, newValue)); | |
| } | |
| public void UpdateValue(Func<T, T> updater) | |
| { | |
| // We're calling upwards within a lock! The caller must be careful to limit the scope of this | |
| T oldValue; | |
| T newValue; | |
| lock (_valueLock) | |
| { | |
| oldValue = _value; | |
| newValue = updater(_value); | |
| if (_comparer.Equals(oldValue, newValue)) | |
| { | |
| return; | |
| } | |
| _value = newValue; | |
| } | |
| Publish(x => x.Invoke(oldValue, newValue)); | |
| } | |
| private class Subscriber | |
| { | |
| public ObservablePropertySubscriber<T> Delegate { get; init; } = null!; | |
| public SynchronizationContext SynchronizationContext { get; init; } = null!; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| using System; | |
| using System.Collections.Immutable; | |
| using System.Threading; | |
| public interface IPublisher<T> | |
| { | |
| /// <summary> | |
| /// Subscribe to changes from this property | |
| /// </summary> | |
| /// <remarks> | |
| /// <paramref name="subscriber"/> will be called on a captured <see cref="SynchronizationContext"/> | |
| /// </remarks> | |
| void Subscribe(Action<T> subscriber, CancellationToken token); | |
| /// <summary> | |
| /// Subscribe to changes from this property | |
| /// </summary> | |
| /// <remarks> | |
| /// <paramref name="subscriber"/> will NOT be called on a captured <see cref="SynchronizationContext"/> | |
| /// </remarks> | |
| void SubscribeSynchronously(Action<T> subscriber, CancellationToken token); | |
| } | |
| public class Publisher<T> : PublisherBase<Action<T>>, IPublisher<T> | |
| { | |
| internal void Publish(T value) => Publish(x => x(value)); | |
| } | |
| public abstract class PublisherBase<TDelegate> where TDelegate : Delegate | |
| { | |
| private ImmutableList<Subscriber> _subscribers = []; | |
| public void Subscribe(TDelegate subscriber, CancellationToken token) => | |
| AddSubscriber(subscriber, SynchronizationContext.Current ?? new SynchronizationContext(), token); | |
| public void SubscribeSynchronously(TDelegate subscriber, CancellationToken token) => | |
| AddSubscriber(subscriber, null, token); | |
| protected void AddSubscriber(TDelegate subscriber, SynchronizationContext? synchronizationContext, CancellationToken token) | |
| { | |
| ImmutableList<Subscriber> newSubscribers; | |
| var newSubscriber = new Subscriber() | |
| { | |
| Delegate = subscriber, | |
| SynchronizationContext = synchronizationContext, | |
| }; | |
| do | |
| { | |
| newSubscribers = _subscribers; | |
| } | |
| while (Interlocked.CompareExchange(ref _subscribers, newSubscribers.Add(newSubscriber), newSubscribers) != newSubscribers); | |
| token.Register(() => RemoveSubscriber(newSubscriber)); | |
| // If it was already cancelled, or was cancelled while we were subscribing, unsub them | |
| if (token.IsCancellationRequested) | |
| { | |
| RemoveSubscriber(newSubscriber); | |
| } | |
| } | |
| private void RemoveSubscriber(Subscriber subscriber) | |
| { | |
| ImmutableList<Subscriber> newSubscribers; | |
| do | |
| { | |
| newSubscribers = _subscribers; | |
| } | |
| while (Interlocked.CompareExchange(ref _subscribers, newSubscribers.Remove(subscriber), newSubscribers) != newSubscribers); | |
| } | |
| protected void Publish(Action<TDelegate> invoker) | |
| { | |
| foreach (var subscriber in _subscribers) | |
| { | |
| if (subscriber.SynchronizationContext != null) | |
| { | |
| subscriber.SynchronizationContext.Post(x => invoker((TDelegate)x!), subscriber.Delegate); | |
| } | |
| else | |
| { | |
| invoker(subscriber.Delegate); | |
| } | |
| } | |
| } | |
| private class Subscriber | |
| { | |
| public TDelegate Delegate { get; init; } = null!; | |
| public SynchronizationContext? SynchronizationContext { get; init; } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment