Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save AlexF4Dev/3790d109e60cebee047db7d9ed982ffd to your computer and use it in GitHub Desktop.
Save AlexF4Dev/3790d109e60cebee047db7d9ed982ffd to your computer and use it in GitHub Desktop.

Revisions

  1. @davidfowl davidfowl revised this gist Feb 13, 2021. 1 changed file with 2 additions and 1 deletion.
    3 changes: 2 additions & 1 deletion ConcurrentDictionaryExtensions.cs
    Original file line number Diff line number Diff line change
    @@ -77,8 +77,9 @@ public static async Task<TValue> GetOrAddAsync<TKey, TValue>(
    // Make sure all waiters see the exception
    tcs.SetException(ex);

    // TODO: We remove the entry if the factory failed so it's not a permanent failure
    // We remove the entry if the factory failed so it's not a permanent failure
    // and future gets can retry (this could be a pluggable policy)
    dictionary.TryRemove(key, out _);
    throw;
    }
    }
  2. @davidfowl davidfowl revised this gist Feb 5, 2021. No changes.
  3. @davidfowl davidfowl revised this gist Feb 5, 2021. 1 changed file with 1 addition and 30 deletions.
    31 changes: 1 addition & 30 deletions ConcurrentDictionaryExtensions.cs
    Original file line number Diff line number Diff line change
    @@ -1,36 +1,7 @@
    using System;
    using System.Collections.Concurrent;
    using System.IO;
    using System.Net.Http;
    using System.Threading.Tasks;

    //var fileCache = new ConcurrentDictionary<string, FileStream>();

    //Parallel.For(0, 5, i =>
    //{
    // var f = fileCache.GetOrAdd("Blah.txt", File.OpenRead);
    // f.Dispose();
    //});

    //File.OpenWrite("Blah.txt");

    var responseCache = new ConcurrentDictionary<string, Task<string>>();

    var client = new HttpClient();

    var tasks = new Task[5];
    for (int i = 0; i < tasks.Length; i++)
    {
    tasks[i] = Task.Run(async () =>
    {
    await responseCache.GetOrAdd("http://www.bing.com", client.GetStringAsync);
    });
    }

    await Task.WhenAll(tasks);


    namespace System.Collections.Generic
    namespace System.Collections.Concurrent
    {
    public static class ConcurrentDictionaryExtensions
    {
  4. @davidfowl davidfowl revised this gist Feb 5, 2021. 1 changed file with 47 additions and 5 deletions.
    52 changes: 47 additions & 5 deletions ConcurrentDictionaryExtensions.cs
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,35 @@
    using System;
    using System.Collections.Concurrent;
    using System.IO;
    using System.Net.Http;
    using System.Threading.Tasks;

    //var fileCache = new ConcurrentDictionary<string, FileStream>();

    //Parallel.For(0, 5, i =>
    //{
    // var f = fileCache.GetOrAdd("Blah.txt", File.OpenRead);
    // f.Dispose();
    //});

    //File.OpenWrite("Blah.txt");

    var responseCache = new ConcurrentDictionary<string, Task<string>>();

    var client = new HttpClient();

    var tasks = new Task[5];
    for (int i = 0; i < tasks.Length; i++)
    {
    tasks[i] = Task.Run(async () =>
    {
    await responseCache.GetOrAdd("http://www.bing.com", client.GetStringAsync);
    });
    }

    await Task.WhenAll(tasks);


    namespace System.Collections.Generic
    {
    public static class ConcurrentDictionaryExtensions
    @@ -63,11 +95,21 @@ public static async Task<TValue> GetOrAddAsync<TKey, TValue>(
    var tcs = new TaskCompletionSource<TValue>(TaskCreationOptions.RunContinuationsAsynchronously);
    if (dictionary.TryAdd(key, tcs.Task))
    {
    // TODO: Remove the entry if the factory failed so it's not a permanent failure
    // and future gets can retry (this could be a pluggable policy)
    var value = await valueFactory(key);
    tcs.TrySetResult(value);
    return await tcs.Task;
    try
    {
    var value = await valueFactory(key);
    tcs.TrySetResult(value);
    return await tcs.Task;
    }
    catch (Exception ex)
    {
    // Make sure all waiters see the exception
    tcs.SetException(ex);

    // TODO: We remove the entry if the factory failed so it's not a permanent failure
    // and future gets can retry (this could be a pluggable policy)
    throw;
    }
    }
    }
    }
  5. @davidfowl davidfowl revised this gist Feb 5, 2021. 1 changed file with 5 additions and 13 deletions.
    18 changes: 5 additions & 13 deletions ConcurrentDictionaryExtensions.cs
    Original file line number Diff line number Diff line change
    @@ -63,19 +63,11 @@ public static async Task<TValue> GetOrAddAsync<TKey, TValue>(
    var tcs = new TaskCompletionSource<TValue>(TaskCreationOptions.RunContinuationsAsynchronously);
    if (dictionary.TryAdd(key, tcs.Task))
    {
    try
    {
    var value = await valueFactory(key);
    tcs.TrySetResult(value);
    return await tcs.Task;
    }
    catch
    {
    // We remove the entry if the factory failed so it's not a permanent failure
    // and future gets can retry (this could be a pluggable policy)
    dictionary.TryRemove(key, out _);
    throw;
    }
    // TODO: Remove the entry if the factory failed so it's not a permanent failure
    // and future gets can retry (this could be a pluggable policy)
    var value = await valueFactory(key);
    tcs.TrySetResult(value);
    return await tcs.Task;
    }
    }
    }
  6. @davidfowl davidfowl created this gist Feb 5, 2021.
    83 changes: 83 additions & 0 deletions ConcurrentDictionaryExtensions.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,83 @@
    namespace System.Collections.Generic
    {
    public static class ConcurrentDictionaryExtensions
    {
    /// <summary>
    /// Provides an alternative to <see cref="ConcurrentDictionary{TKey, TValue}.GetOrAdd(TKey, Func{TKey, TValue})"/> that disposes values that implement <see cref="IDisposable"/>.
    /// </summary>
    /// <typeparam name="TKey"></typeparam>
    /// <typeparam name="TValue"></typeparam>
    /// <param name="dictionary"></param>
    /// <param name="key"></param>
    /// <param name="valueFactory"></param>
    /// <returns></returns>
    public static TValue GetOrAddWithDispose<TKey, TValue>(
    this ConcurrentDictionary<TKey, TValue> dictionary,
    TKey key,
    Func<TKey, TValue> valueFactory) where TValue : IDisposable
    {
    while (true)
    {
    if (dictionary.TryGetValue(key, out var value))
    {
    // Try to get the value
    return value;
    }

    /// Try to add the value
    value = valueFactory(key);
    if (dictionary.TryAdd(key, value))
    {
    // Won the race, so return the instance
    return value;
    }

    // Lost the race, dispose the created object
    value.Dispose();
    }
    }


    /// <summary>
    /// Provides an alternative to <see cref="ConcurrentDictionary{TKey, TValue}.GetOrAdd(TKey, Func{TKey, TValue})"/> specifically for asynchronous values. The factory method will only run once.
    /// </summary>
    /// <typeparam name="TKey"></typeparam>
    /// <typeparam name="TValue"></typeparam>
    /// <param name="dictionary"></param>
    /// <param name="key"></param>
    /// <param name="valueFactory"></param>
    /// <returns></returns>
    public static async Task<TValue> GetOrAddAsync<TKey, TValue>(
    this ConcurrentDictionary<TKey, Task<TValue>> dictionary,
    TKey key,
    Func<TKey, Task<TValue>> valueFactory)
    {
    while (true)
    {
    if (dictionary.TryGetValue(key, out var task))
    {
    return await task;
    }

    // This is the task that we'll return to all waiters. We'll complete it when the factory is complete
    var tcs = new TaskCompletionSource<TValue>(TaskCreationOptions.RunContinuationsAsynchronously);
    if (dictionary.TryAdd(key, tcs.Task))
    {
    try
    {
    var value = await valueFactory(key);
    tcs.TrySetResult(value);
    return await tcs.Task;
    }
    catch
    {
    // We remove the entry if the factory failed so it's not a permanent failure
    // and future gets can retry (this could be a pluggable policy)
    dictionary.TryRemove(key, out _);
    throw;
    }
    }
    }
    }
    }
    }