November 30, 2023
By: Kevin

c#并行/并发编程基础知识和最佳实践

threaded

我们生活在一个多线程(multi-threaded)的世界, 在提高应用程序性能和响应性方面, 是个躲不开的话题.

本文总结了从基础知识和一些最佳实践, 并且全程提供代码样例和文档引用, 所有代码可以在emacs的org-mode或vscode中直接以脚本方式执行.

并发(concurrent)和并行(parallel): 理解差异

在深入语言细节之前, 首先澄清以下这两个词的含义.

并发(concurrent)是指在一段时间内执行多个任务, 但不一定同时执行.

在并发系统中, 任务可能会在重叠的时间段内开始, 运行和完成.

即使在单核处理器上, 通过上下文切换(处理器快速在任务之间切换)也可以实现并发, 给人一种它们同时运行的错觉.

并行(parallel)是指同时执行多个任务, 通常通过利用多核处理器或分布式系统实现. 并行是并发的一个子集, 但并不是所有并发系统都是并行的.

C#中的并发: 基础知识

线程

线程是并发编程中的基本执行单元. 在C#中, System.Threading.Thread类提供了线程的能力. -- 官方文档

问题是这部分代码在2002年c#1.0发布的时候就存在了.

使用Threading类来直接操作线程是不推荐的做法. 这部分代码有沉重的历史包袱的复杂的使用约束.

要控制复杂逻辑需要和锁机制一起用, 非常容易因为调用顺序, 资源占用等因素导致不可预测的结果(比如思索).

对于身高250以下的人非常不友好.

好比你看电影里有人可以拿双节棍打架, 但是这和你自己拿着双节棍去打架是完全两回事, 大多数人, 远如拿根棍子顺手.

  1. 创建线程简单.

    要创建一个新线程, 可以实例化Thread类, 传递一个代表要在新线程上执行的方法的委托:

    using System.Threading;
    
    Thread newThread = new Thread(DoWork);
    newThread.Start();
    static void DoWork()
    {
        Console.WriteLine("Hello from the new thread!");
    }
    
    Hello from the new thread!
    

    容易的部分就此结束, 接下来就要管理状态, 同步数据, 管控资源, 哪一步都可以绊倒十次.

  2. 线程状态

> 线程可以处于几种状态, 包括未启动(Unstarted), 运行(Running), WaitSleepJoin, 暂停, 停止和中止. 可以使用ThreadState属性检查线程的状态 -- 官方文档

**知道有这些状态就足够了.**

``` csharp
using System.Threading;

Thread newThread = new Thread(DoWork);
Console.WriteLine($"Thread state: {newThread.ThreadState}");
newThread.Start();
Console.WriteLine($"Thread state: {newThread.ThreadState}");

static void DoWork()
{
    Console.WriteLine("Hello from the new thread!");
}
```

``` example
Thread state: Unstarted
Thread state: Running
Hello from the new thread!
```
  1. 线程优先级(priority)

    可以使用Priority属性设置线程的优先级. 优先级较高的线程会在优先级较低的线程之前被调度 -- 官方文档

    实际上, 和Lock一起用, 简直就给自己DIY上吊绳.

    而且当长时间运行或资源密集型线程使用高优先级时, 可能导致其他线程饥饿.

    碰都别碰.

任务(Task)

C#中的任务是并发编程的更高级抽象, 建立在线程之上. 这个是居家必备

System.Threading.Tasks类代表可以并发运行并返回结果的单个操作.

任务比线程更轻量级和灵活, 并与异步编程的async和await关键字紧密集成.

  1. 创建任务

    使用Task.RunTask.Factory.StartNew方法创建新任务. 以下是一个例子:

    using System;
    using System.Threading.Tasks;
    
    Task newTask = Task.Run(() => DoWork());
    newTask.Wait();
    
    static void DoWork()
    {
        Console.WriteLine("Hello from the new task!");
    }
    
    Hello from the new task!
    
  2. 任务取消

    CancellationTokenSourceCancellationToken类提供了一种协作方式来请求和处理任务的取消. 通过使用这些类, 可以在必要时优雅地取消任务并清理资源.

    可以使用CancellationTokenSource来取消任务, 并将CancellationToken传递给任务:

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    
    CancellationTokenSource cts = new CancellationTokenSource();
    CancellationToken token = cts.Token;
    
    Task newTask = Task.Run(() => DoWork(token));
    cts.CancelAfter(TimeSpan.FromSeconds(1));
    try
    {
        newTask.Wait();
    }
    catch (AggregateException ae)
    {
        Console.WriteLine("Task was canceled.", ae);
    }
    
    
    static void DoWork(CancellationToken token)
    {
        for (int i = 0; i < 5; i++)
        {
            token.ThrowIfCancellationRequested();
            Console.WriteLine("Working...");
            Thread.Sleep(500);
        }
    }
    
    Working...
    Working...
    Task was canceled.
    
  3. 连续任务

    任务支持连续任务, 可以将任务链接在一起. 可以使用ContinueWith方法指定一个新任务, 该任务在前一个任务完成时启动:

    using System;
    using System.Threading.Tasks;
    
    Task<int> initialTask = Task.Run(() => DoWork());
    Task continuationTask = initialTask.ContinueWith(prevTask => Console.WriteLine($"Result: {prevTask.Result}"));
    
    continuationTask.Wait();
    
    static int DoWork () => 42;
    
    Result: 42
    

C#中的并行: 基础知识

C#提供了System.Threading.Tasks.Parallel类, 用于并行执行任务, 简化了创建和管理并行循环和操作的过程.

Parallel.For和Parallel.ForEach

Parallel.ForParallel.ForEach是允许在C#中执行并行循环的方法.

基本用法

以下是使用 Parallel.For 执行并行循环的示例:

using System.Threading;

Parallel.For(0, 10, i => Console.WriteLine($"Hello from iteration {i} on thread {Thread.CurrentThread.ManagedThreadId}"));
Hello from iteration 0 on thread 1
Hello from iteration 9 on thread 1
Hello from iteration 1 on thread 9
Hello from iteration 2 on thread 5
Hello from iteration 3 on thread 10
Hello from iteration 4 on thread 7
Hello from iteration 5 on thread 12
Hello from iteration 6 on thread 8
Hello from iteration 7 on thread 14
Hello from iteration 8 on thread 13

以下是使用Parallel.ForEach的示例:

using System.Threading;

List<string> items = new List<string> { "apple", "banana", "cherry" };
Parallel.ForEach(items, item => Console.WriteLine($"Processing {item} on thread {Thread.CurrentThread.ManagedThreadId}"));
Processing apple on thread 1
Processing banana on thread 8
Processing cherry on thread 5
  1. 取消和异常处理

    要在并行循环中处理取消和异常, 可以使用 ParallelLoopState 对象和 ParallelOptions 类:

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    
    CancellationTokenSource cts = new CancellationTokenSource();
    ParallelOptions options = new ParallelOptions
    {
        CancellationToken = cts.Token,
        MaxDegreeOfParallelism = Environment.ProcessorCount
    };
    
    try
    {
        Parallel.For(0, 10, options, (i, loopState) =>
        {
            if (i == 5)
              cts.Cancel();
    
            Console.WriteLine($"Hello from iteration {i} on thread {Thread.CurrentThread.ManagedThreadId}");
        });
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Operation was canceled.");
    }
    
    Hello from iteration 2 on thread 7
    Hello from iteration 1 on thread 8
    Hello from iteration 0 on thread 1
    Hello from iteration 3 on thread 9
    Hello from iteration 4 on thread 5
    Hello from iteration 5 on thread 12
    Operation was canceled.
    

Parallel.Invoke

Parallel.Invoke允许并行执行多个动作.

基本用法

以下是使用Parallel.Invoke的示例:

using System.Threading;
using System.Threading.Tasks;


Parallel.Invoke(
    () => DoWork("Task 1"),
    () => DoWork("Task 2"),
    () => DoWork("Task 3")
);


static void DoWork(string taskName)
{
    Console.WriteLine($"Starting {taskName} on thread {Thread.CurrentThread.ManagedThreadId}");
    Thread.Sleep(1000);
    Console.WriteLine($"Finished {taskName}");
}
Starting Task 1 on thread 1
Starting Task 2 on thread 7
Starting Task 3 on thread 5
Finished Task 3
Finished Task 1
Finished Task 2
  1. 取消和异常处理

    要使用 Parallel.Invoke 处理取消和异常, 使用 ParallelOptions 类:

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    
    
    CancellationTokenSource cts = new CancellationTokenSource();
    ParallelOptions options = new ParallelOptions
    {
        CancellationToken = cts.Token,
        MaxDegreeOfParallelism = Environment.ProcessorCount
    };
    
    try
    {
        Parallel.Invoke(options,
            () => DoWork("Task 1", cts.Token),
            () => DoWork("Task 2", cts.Token),
            () => DoWork("Task 3", cts.Token)
        );
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Operation was canceled.");
    }
    
    
    static void DoWork(string taskName, CancellationToken token)
    {
        Console.WriteLine($"Starting {taskName} on thread {Thread.CurrentThread.ManagedThreadId}");
        Thread.Sleep(1000);
        token.ThrowIfCancellationRequested();
        Console.WriteLine($"Finished {taskName}");
    }
    
    Starting Task 3 on thread 5
    Starting Task 2 on thread 9
    Starting Task 1 on thread 1
    Finished Task 3
    Finished Task 1
    Finished Task 2
    

使用 Async 和 Await

C# 中的 asyncawait关键字提供了一种方便的方法来编写可读且可维护的异步代码.

异步编程对于创建响应式应用程序至关重要, 因为它允许执行潜在耗时的操作, 例如I/O 或网络请求, 而不会阻塞主线程.

在这一部分中, 我们将深入研究在 C# 中使用 async 和 await.

Async

一个异步方法是在其声明中包含 async 关键字并通常返回 TaskTask<TResult> 的方法.

async 关键字表示该方法包含一个或多个 await 表达式, 并且可以异步执行.

这里有一个异步方法的简单示例:

using System;
using System.Threading.Tasks;

Console.WriteLine("Starting...");
await Task.Delay(1000);
Console.WriteLine("Finished!");
Starting...
Finished!

在此示例中, 被忽略的 Main 方法被标记为 async 并返回一个 Task . await Task.Delay(1000) 表达式导致该方法暂停执行一秒钟, 然后继续.

Await

await 关键字用于暂停异步方法的执行, 直到给定的任务完成. 当遇到 await表达式时, 当前方法的执行被挂起, 控制权返回给调用方法.

一旦等待的任务完成, 异步方法的执行从暂停的地方继续.

这是一个使用自定义异步方法的 await 的示例:

using System;
using System.Threading.Tasks;

Console.WriteLine("Starting...");
await DoSomethingAsync();
Console.WriteLine("Finished!");

static async Task DoSomethingAsync()
{
    await Task.Delay(1000);
    Console.WriteLine("Did something!");
}
Starting...
Did something!
Finished!

在这个示例中, DoSomethingAsync 方法被 await 调用, 导致 Main方法暂停执行, 直到 DoSomethingAsync 完成.

从异步方法返回值

异步方法可以通过使用 Task<TResult> 返回类型来返回值. 当需要从异步方法返回一个值时, 只需使用 Task.FromResult 方法来创建一个带有指定结果的已完成任务.

这是从异步方法返回值的一个示例:

using System;
using System.Threading.Tasks;

int result = await CalculateAsync(2, 3);
Console.WriteLine($"Result: {result}");

static async Task<int> CalculateAsync(int a, int b)
{
    await Task.Delay(1000); // 模拟耗时操作
    return a + b;
}
Result: 5

在此示例中, CalculateAsync 方法返回一个 Task<int> . Main 方法中的 await 关键字从已完成的任务中检索结果.

异步方法中的异常处理

在使用异步方法时, 可以使用标准的 try-catch 块来处理异常. 当在异步方法内部抛出异常时, 它被捕获并存储在返回的任务中.

当任务被等待时, 异常被重新抛出, 允许在调用方法中捕获并处理它.

这是一个处理异步方法中的异常的示例:

using System;
using System.Threading.Tasks;

try
{
    await DoSomethingAsync();
}
catch (Exception ex)
{
    Console.WriteLine($"Caught exception: {ex.Message}");
}

static async Task DoSomethingAsync()
{
    await Task.Delay(1000); // 模拟耗时操作
    throw new InvalidOperationException("Something went wrong");
}
Caught exception: Something went wrong

使用 Async 和 Await 与 LINQ

还可以将 asyncawait 与LINQ(语言集成查询)结合使用, 以对集合执行异步操作.

要做到这一点, 要使用 System.Linq.Async , 它提供了用于处理 Task<IEnumerable<T>>IAsyncEnumerable<T> 的扩展方法.

这是一个使用 asyncawait 与 LINQ 的示例:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

List<Task<int>> tasks = new List<Task<int>>
{
    Task.FromResult(1),
    Task.FromResult(2),
    Task.FromResult(3)
};

// 等待所有任务完成
 IEnumerable<int>  results = await Task.WhenAll(tasks);

// 使用 LINQ 计算总和
int sum = results.Sum();

Console.WriteLine($"Sum: {sum}");
Sum: 6

在此示例中, 使用 WhenAll 扩展方法来等待列表中所有任务的完成, 然后使用 Sum 方法计算结果的总和.

异步和 Await 的最佳实践

要充分利用 asyncawait , 请遵循以下最佳实践:

  • 对所有可能需要大量时间完成的 I/O 绑定或 CPU 绑定操作使用 asyncawait 关键字. 这将防止阻塞主线程, 取保应用程序保持响应.
  • 当在库代码中等待任务时, 始终使用 ConfigureAwait(false) 方法. 这将通过不捕获当前同步上下文来帮助防止死锁.
  • 使用 Task.Run 将 CPU 绑定工作卸载到线程池, 特别是当工作计算密集并可能阻塞主线程时.
  • 避免在异步方法中使用 Task.WaitTask.Result , 因为它们可能导致死锁或阻塞主线程. 相反, 使用 await 异步等待任务完成.
  • 始终在异步方法中使用 try-catch 块处理异常, 并意识到异常将被存储在返回的任务中, 并在等待任务时重新抛出.

C# 中的同步

当处理并发和并行代码时, 通常需要同步机制来防止数据竞争和其他与共享资源相关的问题. C# 提供了几种同步原语.

Lock

lock 语句是一种简单的同步机制, 确保一次只有一个线程可以执行特定的代码块:

基本用法

这是使用 lock 的一个示例:

using System;
using System.Threading;
using System.Threading.Tasks;

private static readonly object _locker = new object();
private static int _counter = 0;

Parallel.For(0, 10, i => IncrementCounter());

static void IncrementCounter()
{
    lock (_locker)
    {
        _counter++;
        Console.WriteLine($"Counter value: {_counter}");
    }
}
Counter value: 1
Counter value: 2
Counter value: 3
Counter value: 4
Counter value: 5
Counter value: 6
Counter value: 7
Counter value: 8
Counter value: 9
Counter value: 10
  1. 常见问题

    虽然 lock 易于使用, 但如果不小心使用, 可能会导致死锁和争用等问题.

    要小心不要在公共类型或实例上加锁, 因为当多个线程在同一对象上加锁时可能导致死锁.

    此外, 长时间持有锁可能会导致争用并限制并行性.

Monitor

System.Threading.Monitor 类提供了比 lock 更高级的同步特性.

基本用法

这是使用 Monitor 的一个示例:

using System;
using System.Threading;
using System.Threading.Tasks;

private static readonly object _locker = new object();
private static int _counter = 0;

Parallel.For(0, 10, i => IncrementCounter());

static void IncrementCounter()
{
    bool lockTaken = false;
    try
    {
        Monitor.Enter(_locker, ref lockTaken);
        _counter++;
        Console.WriteLine($"Counter value: {_counter}");
    }
    finally
    {
        if (lockTaken)
        {
          Monitor.Exit(_locker);
        }
    }
}
Counter value: 1
Counter value: 2
Counter value: 3
Counter value: 4
Counter value: 5
Counter value: 6
Counter value: 7
Counter value: 8
Counter value: 9
Counter value: 10

Monitor 同时提供了 WaitPulse 方法来处理线程之间的信号传递.


using System;
using System.Threading;

private static readonly object _locker = new object();
private static bool _ready = false;

Thread producer = new Thread(Producer);
Thread consumer = new Thread(Consumer);

producer.Start();
consumer.Start();

producer.Join();
consumer.Join();

static void Producer()
{
    lock (_locker)
    {
        Thread.Sleep(1000);
        _ready = true;
        Monitor.Pulse(_locker);
    }
}

static void Consumer()
{
    lock (_locker)
    {
        while (!_ready)
        {
            Monitor.Wait(_locker);
        }

        Console.WriteLine("Ready!");
    }
}
Ready!

互斥锁和信号量

C# 还提供了 System.Threading.MutexSystem.Threading.Semaphore 类, 用于更高级的同步场景. 互斥锁类似于锁或 Monitor, 但可以跨多个进程工作. 信号量允许多个线程同时访问共享资源, 但数量上限受到限制.

  1. SemaphoreSlim

    SemaphoreSlim 是一个轻量级的同步原语, 用于限制同时访问共享资源的线程数量. 在需要控制应用程序中有限资源访问时非常有用.

    以下是使用 SemaphoreSlim 限制并行任务数量的示例:

    SemaphoreSlim semaphore = new SemaphoreSlim(3); // 允许最多 3 个并发任务
    List<Task> tasks = new List<Task>();
    for (int i = 0; i < 10; i++)
    {
        tasks.Add(Task.Run(async () =>
        {
            await semaphore.WaitAsync();
            try
            {
                // 此处放置限制并行度的代码.
                Console.WriteLine($"Task {Task.CurrentId} is running");
                await Task.Delay(1000);
            }
            finally
            {
                semaphore.Release();
            }
        }));
    }
    Task.WaitAll(tasks.ToArray());
    
  2. Mutex

    互斥锁( Mutex , 即" 互斥排除" )是 C# 中的一个同步原语, 用于确保一次只有一个线程可以访问共享资源或代码的关键部分.

    它类似于 lock 语句和 Monitor 类, 但提供了一些额外的功能, 例如能够跨多个进程工作.

    以下是在 C# 中使用互斥锁的示例:

    using System;
    using System.Threading;
    
    private static Mutex _mutex = new Mutex();
    private static int _counter = 0;
    
    
    Thread thread1 = new Thread(IncrementCounter);
    Thread thread2 = new Thread(IncrementCounter);
    
    thread1.Start();
    thread2.Start();
    
    thread1.Join();
    thread2.Join();
    
    static void IncrementCounter()
    {
        _mutex.WaitOne();
        try
        {
            _counter++;
            Console.WriteLine($"Counter value: {_counter}");
        }
        finally
        {
            _mutex.ReleaseMutex();
        }
    }
    
    Counter value: 1
    Counter value: 2
    
  3. 命名互斥锁(Named Mutex)

    互斥锁的一个关键特性是能够创建命名实例, 这些实例可以在多个进程之间共享.

    当需要在同一系统上运行的不同应用程序之间同步对共享资源的访问时, 这一点非常有用.

    以下是使用命名互斥锁的示例:

    using System;
    using System.Threading;
    
    private static Mutex _mutex = new Mutex(false, "Global\\MyNamedMutex");
    private static int _counter = 0;
    
    Thread thread1 = new Thread(IncrementCounter);
    Thread thread2 = new Thread(IncrementCounter);
    
    thread1.Start();
    thread2.Start();
    
    thread1.Join();
    thread2.Join();
    
    static void IncrementCounter()
    {
        _mutex.WaitOne();
        try
        {
            _counter++;
            Console.WriteLine($"Counter value: {_counter}");
        }
        finally
        {
            _mutex.ReleaseMutex();
        }
    }
    
    Counter value: 1
    Counter value: 2
    

线程安全容器

在多线程环境中使用集合时, 确保使用的数据结构是线程安全的至关重要.

线程安全集合被设计为允许多个线程访问和修改它们, 而不会引起数据竞争或其他同步问题.

在 C# 中, System.Collections.Concurrent 命名空间提供了几种可以在并发和并行应用程序中使用的线程安全集合.

ConcurrentDictionary

ConcurrentDictionary<TKey, TValue> 是一个线程安全的, 基于哈希的字典, 允许并发地添加, 移除和更新键值对.

它设计用于高性能, 低竞争场景, 并且在需要线程安全的关联数据结构时非常有用.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

ConcurrentDictionary<int, string> dictionary = new ConcurrentDictionary<int, string>();

Parallel.For(0, 100, i =>
{
    dictionary.TryAdd(i, $"Value {i}");
});

if (dictionary.TryGetValue(42, out string value))
{
    Console.WriteLine($"Key 42 has the value: {value}");
}
Key 42 has the value: Value 42

ConcurrentBag

ConcurrentBag<T> 是一个简单的, 无序的集合, 允许快速插入和移除元素. 它适用于同一个线程频繁添加和移除元素的场景.

当需要一个简单, 线程安全且最小竞争的集合时, ConcurrentBag<T> 是一个很好的选择.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

ConcurrentBag<int> bag = new ConcurrentBag<int>();

Parallel.For(0, 100, i =>
{
    bag.Add(i);
});

Console.WriteLine($"Bag contains {bag.Count} items.");
Bag contains 100 items.

ConcurrentQueue

ConcurrentQueue<T> 是一个线程安全的, 先进先出(FIFO)集合. 它支持并发的入队和出队操作, 适用于需要线程安全队列的场景, 例如线程间通信或作为简单的工作队列.

示例用法:

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

ConcurrentQueue<int> queue = new ConcurrentQueue<int>();

Parallel.For(0, 100, i =>
{
    queue.Enqueue(i);
});

while (queue.TryDequeue(out int item))
{
    Console.WriteLine($"Dequeued: {item}");
}
Dequeued: 24
Dequeued: 25
Dequeued: 26
Dequeued: 27
Dequeued: 28
Dequeued: 29
Dequeued: 30
Dequeued: 31
Dequeued: 32
Dequeued: 33
Dequeued: 34
Dequeued: 35
Dequeued: 37
Dequeued: 38
Dequeued: 39
Dequeued: 40
Dequeued: 41
Dequeued: 42
Dequeued: 43
Dequeued: 44
Dequeued: 45
Dequeued: 46
Dequeued: 47
Dequeued: 49
Dequeued: 50
Dequeued: 51
Dequeued: 52
Dequeued: 53
Dequeued: 54
Dequeued: 55
Dequeued: 56
Dequeued: 57
Dequeued: 58
Dequeued: 59
Dequeued: 61
Dequeued: 62
Dequeued: 63
Dequeued: 64
Dequeued: 65
Dequeued: 66
Dequeued: 67
Dequeued: 68
Dequeued: 69
Dequeued: 70
Dequeued: 71
Dequeued: 73
Dequeued: 74
Dequeued: 75
Dequeued: 76
Dequeued: 77
Dequeued: 78
Dequeued: 79
Dequeued: 80
Dequeued: 81
Dequeued: 82
Dequeued: 83
Dequeued: 85
Dequeued: 86
Dequeued: 87
Dequeued: 88
Dequeued: 89
Dequeued: 90
Dequeued: 91
Dequeued: 92
Dequeued: 93
Dequeued: 94
Dequeued: 95
Dequeued: 97
Dequeued: 98
Dequeued: 99
Dequeued: 1
Dequeued: 2
Dequeued: 3
Dequeued: 4
Dequeued: 5
Dequeued: 6
Dequeued: 7
Dequeued: 8
Dequeued: 9
Dequeued: 10
Dequeued: 11
Dequeued: 13
Dequeued: 14
Dequeued: 15
Dequeued: 16
Dequeued: 17
Dequeued: 18
Dequeued: 19
Dequeued: 20
Dequeued: 21
Dequeued: 22
Dequeued: 23
Dequeued: 0
Dequeued: 36
Dequeued: 12
Dequeued: 48
Dequeued: 60
Dequeued: 72
Dequeued: 84
Dequeued: 96

ConcurrentStack

ConcurrentStack<T> 是一个线程安全的, 后进先出(LIFO)集合.

ConcurrentQueue<T> 类似, 它支持并发的 push 和 pop 操作. ConcurrentStack<T> 在需要线程安全堆栈的场景中非常有用,

例如实现深度优先搜索算法或维护可重复使用资源的池.

示例用法:

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

ConcurrentStack<int> stack = new ConcurrentStack<int>();

Parallel.For(0, 100, i =>
{
    stack.Push(i);
});

while (stack.TryPop(out int item))
{
    Console.WriteLine($"Popped: {item}");
}
Popped: 24
Popped: 36
Popped: 12
Popped: 99
Popped: 98
Popped: 97
Popped: 96
Popped: 95
Popped: 94
Popped: 93
Popped: 92
Popped: 91
Popped: 90
Popped: 89
Popped: 88
Popped: 87
Popped: 86
Popped: 85
Popped: 84
Popped: 83
Popped: 82
Popped: 81
Popped: 80
Popped: 79
Popped: 78
Popped: 77
Popped: 76
Popped: 75
Popped: 74
Popped: 73
Popped: 72
Popped: 71
Popped: 70
Popped: 69
Popped: 68
Popped: 67
Popped: 66
Popped: 65
Popped: 64
Popped: 63
Popped: 62
Popped: 61
Popped: 60
Popped: 59
Popped: 58
Popped: 57
Popped: 56
Popped: 55
Popped: 54
Popped: 53
Popped: 52
Popped: 51
Popped: 50
Popped: 49
Popped: 48
Popped: 47
Popped: 46
Popped: 45
Popped: 44
Popped: 43
Popped: 42
Popped: 41
Popped: 40
Popped: 39
Popped: 38
Popped: 37
Popped: 35
Popped: 34
Popped: 33
Popped: 32
Popped: 31
Popped: 30
Popped: 29
Popped: 28
Popped: 27
Popped: 26
Popped: 25
Popped: 23
Popped: 22
Popped: 21
Popped: 20
Popped: 19
Popped: 18
Popped: 17
Popped: 16
Popped: 15
Popped: 14
Popped: 13
Popped: 11
Popped: 10
Popped: 9
Popped: 8
Popped: 7
Popped: 6
Popped: 5
Popped: 4
Popped: 3
Popped: 2
Popped: 1
Popped: 0

BlockingCollection

BlockingCollection<T> 是一个对并发集合(如 ConcurrentQueue<T> , ConcurrentStack<T>ConcurrentBag<T>)的高级封装, 提供了阻塞和边界能力.

它适用于实现生产者-消费者场景, 其中希望限制集合中的元素数量或在没有可用元素时阻塞消费者.

使用 ConcurrentQueue<T> 的示例:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

BlockingCollection<int> blockingQueue = new BlockingCollection<int>(new ConcurrentQueue<int>(), 10);
// 生产者
Task producer = Task.Run(() =>
{
    for (int i = 0; i < 100; i++)
    {
        blockingQueue.Add(i);
        Console.WriteLine($"Produced: {i}");
    }

    blockingQueue.CompleteAdding();
});
// 消费者
Task consumer = Task.Run(() =>
{
    while (!blockingQueue.IsCompleted)
    {
        if (blockingQueue.TryTake(out int item, Timeout.Infinite))
        {
            Console.WriteLine($"Consumed: {item}");
        }
    }
});

Task.WaitAll(producer, consumer);
Produced: 0
Produced: 1
Produced: 2
Produced: 3
Produced: 4
Produced: 5
Produced: 6
Produced: 7
Produced: 8
Produced: 9
Consumed: 0
Produced: 10
Consumed: 1
Produced: 11
Consumed: 2
Produced: 12
Consumed: 3
Produced: 13
Consumed: 4
Produced: 14
Consumed: 5
Produced: 15
Consumed: 6
Produced: 16
Consumed: 7
Produced: 17
Consumed: 8
Produced: 18
Consumed: 9
Produced: 19
Consumed: 10
Produced: 20
Consumed: 11
Produced: 21
Consumed: 12
Produced: 22
Consumed: 13
Produced: 23
Consumed: 14
Produced: 24
Consumed: 15
Produced: 25
Consumed: 16
Produced: 26
Consumed: 17
Produced: 27
Consumed: 18
Produced: 28
Consumed: 19
Produced: 29
Consumed: 20
Produced: 30
Consumed: 21
Produced: 31
Consumed: 22
Produced: 32
Consumed: 23
Produced: 33
Consumed: 24
Produced: 34
Consumed: 25
Produced: 35
Consumed: 26
Produced: 36
Consumed: 27
Produced: 37
Consumed: 28
Produced: 38
Consumed: 29
Produced: 39
Consumed: 30
Produced: 40
Consumed: 31
Produced: 41
Consumed: 32
Produced: 42
Consumed: 33
Produced: 43
Consumed: 34
Produced: 44
Consumed: 35
Produced: 45
Consumed: 36
Produced: 46
Consumed: 37
Produced: 47
Consumed: 38
Produced: 48
Consumed: 39
Produced: 49
Consumed: 40
Produced: 50
Consumed: 41
Produced: 51
Consumed: 42
Produced: 52
Consumed: 43
Produced: 53
Consumed: 44
Produced: 54
Consumed: 45
Produced: 55
Consumed: 46
Produced: 56
Consumed: 47
Produced: 57
Consumed: 48
Produced: 58
Consumed: 49
Produced: 59
Consumed: 50
Produced: 60
Consumed: 51
Produced: 61
Consumed: 52
Produced: 62
Consumed: 53
Produced: 63
Consumed: 54
Produced: 64
Consumed: 55
Produced: 65
Consumed: 56
Produced: 66
Consumed: 57
Produced: 67
Consumed: 58
Produced: 68
Consumed: 59
Produced: 69
Consumed: 60
Produced: 70
Consumed: 61
Produced: 71
Consumed: 62
Produced: 72
Consumed: 63
Produced: 73
Consumed: 64
Produced: 74
Consumed: 65
Produced: 75
Consumed: 66
Produced: 76
Consumed: 67
Produced: 77
Consumed: 68
Produced: 78
Consumed: 69
Produced: 79
Produced: 80
Consumed: 70
Consumed: 71
Produced: 81
Consumed: 72
Produced: 82
Consumed: 73
Produced: 83
Consumed: 74
Produced: 84
Consumed: 75
Produced: 85
Consumed: 76
Produced: 86
Consumed: 77
Produced: 87
Consumed: 78
Produced: 88
Consumed: 79
Produced: 89
Consumed: 80
Produced: 90
Consumed: 81
Produced: 91
Consumed: 82
Produced: 92
Consumed: 83
Produced: 93
Consumed: 84
Produced: 94
Consumed: 85
Produced: 95
Consumed: 86
Produced: 96
Consumed: 87
Produced: 97
Consumed: 88
Produced: 98
Consumed: 89
Produced: 99
Consumed: 90
Consumed: 91
Consumed: 92
Consumed: 93
Consumed: 94
Consumed: 95
Consumed: 96
Consumed: 97
Consumed: 98
Consumed: 99

在这个例子中, BlockingCollection<T> 包装了一个 ConcurrentQueue<T>, 并且设置了最多容纳10个元素的界限. 生产者向集合中添加元素, 消费者从中取出元素.

当没有可用元素时, 消费者将会阻塞; 当集合满了时, 生产者将会阻塞.

正确使用线程安全集合

虽然线程安全集合提供了内置的同步机制, 并且设计用于在多线程环境中正确工作, 但正确使用它们以避免潜在问题是至关重要的:

  1. 根据具体用例选择适当的线程安全集合. 例如, 对于先进先出(FIFO)场景使用 ConcurrentQueue<T>, 对于后进先出(LIFO)场景使用 ConcurrentStack<T>.
  2. 当使用 ConcurrentDictionary<TKey, TValue> 时, 请注意一些操作如 CountIsEmpty 可能不是原子的, 并可能返回近似值.
  3. 当使用 BlockingCollection<T> 时, 确保在生产者完成添加项目后调用 CompleteAdding, 以便消费者知道何时停止等待新项目.
  4. 始终测试和分析并发代码, 以确保它正确且高效地工作. 使用 Visual Studio 的并发可视化器或其他调试和分析工具来识别潜在问题或瓶颈.

结论

通过理解并发和并行之间的差异, 并学习使用语言提供的各种工具和技术, 可以创建高性能, 响应快的应用程序.

本指南介绍了 C# 中并发和并行的基础知识, 包括线程, 任务, 并行循环, 同步等. 有了这个基础, 可以继续探索更高级的主题和 C# 中并发与并行编程的最佳实践.

常见问题解答

  1. 并发与并行之间的区别是什么?

    并发是指在一段时间内执行多个任务, 但不一定同时进行. 在并发系统中, 任务可能在重叠的时间段内开始, 运行和完成. 并行是指同时执行多个任务, 通常通过利用多个处理器核心或分布式系统来实现. 并行是并发的一个子集, 但并不是所有并发系统都是并行的.

  2. 我应该在 C# 中何时使用线程而不是任务?

    任务是构建在线程之上的更高级别的并发编程抽象. 它们比线程更轻量级和灵活, 并且与异步编程的 async 和 await 关键字集成. 在大多数情况下, 应该在 C# 中使用任务而不是线程进行并发编程.

  3. C# 中的 lock 语句有什么用途?

    lock 语句是一种简单的同步机制, 确保一次只有一个线程可以执行特定的代码块. 这对于保护共享资源免受数据竞争和其他并发访问相关问题非常有用.

  4. 并行循环是什么, 我应该何时使用它?

    并行循环是一种循环, 它的迭代可以并发地执行, 潜在地并行. 并行循环可以通过利用多个处理器核心来提高计算密集型操作的性能. 当有大量独立的, 计算密集型的操作, 可以跨多个核心分布时, 应该考虑使用并行循环.

  5. 在 C# 中进行并发和并行编程的一些最佳实践是什么?

    一些并发和并行编程的最佳实践包括:

    • 对于大多数并发操作使用任务Task而不是线程Thread.
    • 使用 asyncawait 关键字进行异步编程.
    • 根据特定需求和要求选择适当的同步机制, 如 lock, Monitor, MutexSemaphore.
    • 小心锁定公共类型或实例, 因为它可能导致多个线程在同一对象上锁定时发生死锁.
    • 尽量减少持有锁的时间, 以减少争用并提高并行性.
    • 使用并行循环时, 确保迭代是独立的, 并且可以并发执行, 而不会导致数据竞争或其他问题.
    • 考虑使用取消令牌来允许任务和并行循环的协作取消.
    • 测试和分析并发和并行代码, 以确保它按预期执行, 并识别潜在的瓶颈, 争用或其他问题.
  6. 我如何避免并发代码中的死锁?

    为了避免并发代码中的死锁, 请遵循以下准则:

    • 始终以相同的顺序获取锁. 如果多个线程以一致的顺序获取锁, 那么死锁的可能性会降低.
    • 不再需要时尽快释放锁. 长时间持有锁会增加发生死锁的风险.
    • 使用细粒度的锁定来最小化争用, 并减少死锁的机会. 避免锁定公共类型或实例.
    • 考虑使用非阻塞的同步机制, 如 SemaphoreSlim, ReaderWriterLockSlim 或 CancellationToken.
    • 使用诸如 Visual Studio 的并发可视化器等工具监控和测试代码, 以寻找潜在的死锁.
  7. Parallel.ForParallel.ForEach 有什么区别?

    Parallel.ForParallel.ForEach 都是允许执行并行循环的方法. 两者之间的主要区别在于它们操作的集合类型.

    `Parallel.For` 在整数范围上操作, 而 Parallel.ForEach 则在项目集合(如 List 或 Array)上操作. 根据数据结构和循环要求选择这两种方法.x

  8. 在 C# 中使用并行编程的一些常见用例是什么?

    C# 中的并行编程适用于多种场景, 包括:

    • 通过将工作分布在多个处理器核心上来提高计算密集型操作的性能.
    • 通过将耗时操作卸载到后台任务来实现响应性用户界面.
    • 执行可并行化的大规模数据处理, 分析或转换任务.
    • 实现机器学习, 科学计算或模拟的并行算法.
  9. 并行编程的一些挑战是什么?

    并行编程引入了几个挑战, 例如:

    • 确保共享资源的正确同步, 以防止数据竞争, 死锁和其他问题.
    • 管理并行代码的复杂性, 包括任务, 线程, 同步和错误处理.
    • 处理并行性的性能开销, 如线程创建, 上下文切换和线程间通信.
    • 寻找并行性和争用之间的最佳平衡, 以最大化性能和资源利用.
  10. 可以在 C# 中将 async 和 await 与并行编程一起使用吗?

    是的, 可以在 C# 中将 async 和 await 与并行编程一起使用. async 和 await 关键字用于异步编程, 这是一种并发形式. 它们与任务和 Task 类很好地集成, 使得将异步编程和并行编程

Tags: c#