我有下面的课。

class Test{
    public HashSet<string> Data = new HashSet<string>();
}

我需要从不同的线程改变字段“数据”,所以我想对我目前的线程安全实现的一些意见。

class Test{
    public HashSet<string> Data = new HashSet<string>();

    public void Add(string Val){
            lock(Data) Data.Add(Val);
    }

    public void Remove(string Val){
            lock(Data) Data.Remove(Val);
    }
}

是否有更好的解决方案,直接到字段并保护它免受多线程的并发访问?


当前回答

我发现既不能简单地锁定System.Collections.Generic的添加和删除方法。HashSet,或者包装框架的ConcurrentDictionary在需要良好性能的“高吞吐量”场景中是足够的。

通过使用这个简单的想法就可以获得相当好的性能:

public class ExampleHashSet<T>
{
    const int ConcurrencyLevel = 124;
    const int Lower31BitMask = 0x7FFFFFFF;
    
    HashSet<T>[] sets = new HashSet<T>[ConcurrencyLevel];
    IEqualityComparer<T> comparer;

    public ExampleHashSet()
    {
        comparer = EqualityComparer<T>.Default;

        for(int i = 0; i < ConcurrencyLevel; i++)
            sets[i] = new HashSet<T>();
    }

    public bool Add(T item)
    {
        int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
        
        lock(sets[hash]) 
        {
            return sets[hash].Add(item);
        }
    }

    public bool Remove(T item)
    {
        int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
        
        lock(sets[hash]) 
        {
            return sets[hash].Remove(item);
        }
    }

    // further methods ...
}

系统的HashSet是包装的,但与其他答案不同的是,我们对多个答案持有锁 hashset。不同的线程能够在不同的hashset上“工作”,从而降低了总体等待时间。

这个想法可以在HashSet本身中被概括和直接实现(持有桶上的锁,而不是锁) 成套)。这里可以找到一个例子。

其他回答

使ISet<T>并发的棘手之处在于集合方法(并、交、差)本质上是迭代的。至少,您必须遍历操作中涉及的一个集合的所有n个成员,同时锁定两个集合。

当您必须在迭代期间锁定整个集合时,您将失去ConcurrentDictionary<T,byte>的优势。如果没有锁定,这些操作就不是线程安全的。

考虑到ConcurrentDictionary<T,byte>所增加的开销,使用权重较轻的HashSet<T>并将所有内容都锁起来可能更明智。

如果您不需要设置操作,则使用ConcurrentDictionary<T,字节>,并且在添加键时仅使用default(byte)作为值。

您的实现是正确的。不幸的是,. net框架没有提供内置并发哈希集类型。然而,有一些变通办法。

ConcurrentDictionary最后(推荐)

第一个是在命名空间System.Collections.Concurrent中使用类ConcurrentDictionary<TKey, TValue>。在这种情况下,值是没有意义的,所以我们可以使用一个简单的字节(内存中的1个字节)。

private ConcurrentDictionary<string, byte> _data;

这是推荐的选项,因为该类型是线程安全的,并且提供了与HashSet<T>相同的优点,除非key和value是不同的对象。

资料来源:社会化MSDN

Self-implementation

最后,正如您所做的,您可以实现自己的数据类型,使用锁或其他。net提供的线程安全的方法。这里有一个很好的例子:如何在。net中实现ConcurrentHashSet

这个解决方案的唯一缺点是类型HashSet<T>不能正式并发访问,即使是读取操作。

我引用了链接帖子的代码(最初由Ben Mosher编写)。

using System;
using System.Collections.Generic;
using System.Threading;

namespace BlahBlah.Utilities
{
    public class ConcurrentHashSet<T> : IDisposable
    {
        private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
        private readonly HashSet<T> _hashSet = new HashSet<T>();

        #region Implementation of ICollection<T> ...ish
        public bool Add(T item)
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Add(item);
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public void Clear()
        {
            _lock.EnterWriteLock();
            try
            {
                _hashSet.Clear();
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public bool Contains(T item)
        {
            _lock.EnterReadLock();
            try
            {
                return _hashSet.Contains(item);
            }
            finally
            {
                if (_lock.IsReadLockHeld) _lock.ExitReadLock();
            }
        }

        public bool Remove(T item)
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Remove(item);
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public int Count
        {
            get
            {
                _lock.EnterReadLock();
                try
                {
                    return _hashSet.Count;
                }
                finally
                {
                    if (_lock.IsReadLockHeld) _lock.ExitReadLock();
                }
            }
        }
        #endregion

        #region Dispose
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
                if (_lock != null)
                    _lock.Dispose();
        }
        ~ConcurrentHashSet()
        {
            Dispose(false);
        }
        #endregion
    }
}

EDIT:将入口锁方法移到try块之外,因为它们可以抛出异常并执行finally块中包含的指令。

ConcurrentBag(不建议)

不建议使用ConcurrentBag<T>,因为这种类型只允许以线程安全的方式插入给定的元素和删除随机元素。这个类是为促进生产者-消费者场景而设计的,这不是OP的目标(更多解释在这里)。

其他操作(例如,扩展方法提供的操作)不支持并发使用。MSDN文档警告:“ConcurrentBag的所有公共和受保护成员都是线程安全的,可以从多个线程并发使用。然而,通过ConcurrentBag实现的接口之一访问的成员,包括扩展方法,不保证是线程安全的,可能需要由调用者同步。”

我基于ConcurrentDictionary创建了一个实际的ConcurrentHashSet,而不是包装ConcurrentDictionary或锁定HashSet。

这个实现支持每个项目的基本操作,而不需要HashSet的set操作,因为它们在并发场景中没有意义:

var concurrentHashSet = new ConcurrentHashSet<string>(
    new[]
    {
        "hamster",
        "HAMster",
        "bar",
    },
    StringComparer.OrdinalIgnoreCase);

concurrentHashSet.TryRemove("foo");

if (concurrentHashSet.Contains("BAR"))
{
    Console.WriteLine(concurrentHashSet.Count);
}

输出:2

你可以从NuGet这里得到它,在GitHub这里看到源代码。

我更喜欢完整的解决方案,所以我这样做了:请注意,我的计数以一种不同的方式实现,因为我不明白为什么应该禁止在尝试计数它的值时读取哈希集。

@Zen,谢谢你让它开始。

[DebuggerDisplay("Count = {Count}")]
[Serializable]
public class ConcurrentHashSet<T> : ICollection<T>, ISet<T>, ISerializable, IDeserializationCallback
{
    private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);

    private readonly HashSet<T> _hashSet = new HashSet<T>();

    public ConcurrentHashSet()
    {
    }

    public ConcurrentHashSet(IEqualityComparer<T> comparer)
    {
        _hashSet = new HashSet<T>(comparer);
    }

    public ConcurrentHashSet(IEnumerable<T> collection)
    {
        _hashSet = new HashSet<T>(collection);
    }

    public ConcurrentHashSet(IEnumerable<T> collection, IEqualityComparer<T> comparer)
    {
        _hashSet = new HashSet<T>(collection, comparer);
    }

    protected ConcurrentHashSet(SerializationInfo info, StreamingContext context)
    {
        _hashSet = new HashSet<T>();

        // not sure about this one really...
        var iSerializable = _hashSet as ISerializable;
        iSerializable.GetObjectData(info, context);
    }

    #region Dispose

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (disposing)
            if (_lock != null)
                _lock.Dispose();
    }

    public IEnumerator<T> GetEnumerator()
    {
        return _hashSet.GetEnumerator();
    }

    ~ConcurrentHashSet()
    {
        Dispose(false);
    }

    public void OnDeserialization(object sender)
    {
        _hashSet.OnDeserialization(sender);
    }

    public void GetObjectData(SerializationInfo info, StreamingContext context)
    {
        _hashSet.GetObjectData(info, context);
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    #endregion

    public void Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.Add(item);
        }
        finally
        {
            if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public void UnionWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.UnionWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }

    public void IntersectWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.IntersectWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }

    public void ExceptWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.ExceptWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }

    public void SymmetricExceptWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.SymmetricExceptWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsSubsetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsSubsetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsSupersetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsSupersetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsProperSupersetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsProperSupersetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsProperSubsetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsProperSubsetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool Overlaps(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Overlaps(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool SetEquals(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.SetEquals(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    bool ISet<T>.Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Add(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public void Clear()
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.Clear();
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool Contains(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Contains(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public void CopyTo(T[] array, int arrayIndex)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.CopyTo(array, arrayIndex);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool Remove(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Remove(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public int Count
    {
        get
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Count;
            }
            finally
            {
                if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }

        }
    }

    public bool IsReadOnly
    {
        get { return false; }
    }
}

我发现既不能简单地锁定System.Collections.Generic的添加和删除方法。HashSet,或者包装框架的ConcurrentDictionary在需要良好性能的“高吞吐量”场景中是足够的。

通过使用这个简单的想法就可以获得相当好的性能:

public class ExampleHashSet<T>
{
    const int ConcurrencyLevel = 124;
    const int Lower31BitMask = 0x7FFFFFFF;
    
    HashSet<T>[] sets = new HashSet<T>[ConcurrencyLevel];
    IEqualityComparer<T> comparer;

    public ExampleHashSet()
    {
        comparer = EqualityComparer<T>.Default;

        for(int i = 0; i < ConcurrencyLevel; i++)
            sets[i] = new HashSet<T>();
    }

    public bool Add(T item)
    {
        int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
        
        lock(sets[hash]) 
        {
            return sets[hash].Add(item);
        }
    }

    public bool Remove(T item)
    {
        int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
        
        lock(sets[hash]) 
        {
            return sets[hash].Remove(item);
        }
    }

    // further methods ...
}

系统的HashSet是包装的,但与其他答案不同的是,我们对多个答案持有锁 hashset。不同的线程能够在不同的hashset上“工作”,从而降低了总体等待时间。

这个想法可以在HashSet本身中被概括和直接实现(持有桶上的锁,而不是锁) 成套)。这里可以找到一个例子。