.NET 中并发与并行编程的深度理解

July 14, 20254 minutes

.NET 中并发与并行编程的深度理解

前言

在 .NET 开发中,我们经常遇到需要处理多个任务的场景。foreachTask.WhenAllParallel.ForEach 是三种常见的处理方式,但它们的适用场景和性能特征截然不同。通过深入分析这三种方法,我们可以更好地理解并发(Concurrency)和并行(Parallelism)的本质区别,以及如何在实际开发中做出正确的选择。

三种执行方式的技术分析

foreach - 串行执行

// 串行执行示例
public async Task ProcessItemsSequentially(IEnumerable<string> urls)
{
    var results = new List<string>();
    foreach (var url in urls)
    {
        var result = await httpClient.GetStringAsync(url);
        results.Add(result);
    }
    return results;
}

执行特征:

  • 严格按顺序执行,一次只处理一个任务
  • 必须等待当前任务完全完成才能开始下一个
  • 线程利用率低,但资源消耗最小
  • 执行结果的顺序与输入顺序完全一致

适用场景:

  • 任务之间存在依赖关系
  • 需要严格控制执行顺序
  • 资源受限的环境
  • 调试和问题排查阶段

Task.WhenAll - 异步并发执行

// 异步并发执行示例
public async Task<IEnumerable<string>> ProcessItemsConcurrently(IEnumerable<string> urls)
{
    var tasks = urls.Select(url => httpClient.GetStringAsync(url));
    var results = await Task.WhenAll(tasks);
    return results;
}

执行特征:

  • 所有异步操作同时启动
  • 利用异步机制,线程在等待 I/O 操作时可以处理其他任务
  • 主要消耗内存资源(每个 Task 对象约 96 字节)
  • 线程使用效率极高,适合 I/O 密集型操作

技术细节:

  • 任务调度器管理任务执行
  • 利用 I/O 完成端口(IOCP)机制
  • 线程在等待期间被释放到线程池供其他操作使用

Parallel.ForEach - 多线程并行执行

// 多线程并行执行示例
public IEnumerable<ProcessedData> ProcessItemsInParallel(IEnumerable<RawData> data)
{
    var results = new ConcurrentBag<ProcessedData>();

    Parallel.ForEach(data, item =>
    {
        var processed = PerformCpuIntensiveOperation(item);
        results.Add(processed);
    });

    return results;
}

执行特征:

  • 自动将工作分配到多个线程
  • 充分利用多核 CPU 资源
  • 线程数量通常等于或略多于 CPU 核心数
  • 适合 CPU 密集型计算任务

技术细节:

  • 内置负载均衡和工作窃取算法
  • 支持取消操作和异常聚合
  • 可以配置并行度和分区策略

并发与并行的本质区别

并发(Concurrency)

定义: 系统能够处理多个同时进行的任务,但不一定要求这些任务在同一时刻执行。

核心特征:

  • 关注任务的调度和协调
  • 主要解决等待时间浪费的问题
  • 可以在单核 CPU 上实现
  • 重点在于逻辑上的同时性

技术实现:

// 并发处理多个 I/O 操作
public async Task<CombinedResult> FetchDataConcurrently()
{
    // 所有请求逻辑上同时发起
    var userTask = userService.GetUserAsync(userId);
    var ordersTask = orderService.GetOrdersAsync(userId);
    var preferencesTask = preferenceService.GetPreferencesAsync(userId);

    // 等待所有操作完成
    await Task.WhenAll(userTask, ordersTask, preferencesTask);

    return new CombinedResult
    {
        User = userTask.Result,
        Orders = ordersTask.Result,
        Preferences = preferencesTask.Result
    };
}

并行(Parallelism)

定义: 同一时刻真正有多个任务在不同的处理器核心上执行。

核心特征:

  • 关注任务的同时执行
  • 主要解决计算性能不足的问题
  • 必须要多核 CPU 支持
  • 重点在于物理上的同时性

技术实现:

// 并行处理大量数据
public double[] CalculateSquareRoots(double[] numbers)
{
    var results = new double[numbers.Length];

    // 工作被分配到多个 CPU 核心同时处理
    Parallel.For(0, numbers.Length, i =>
    {
        results[i] = Math.Sqrt(numbers[i]);
    });

    return results;
}

关键区别对比

特性并发并行
问题域I/O 等待、资源协调CPU 计算能力不足
资源利用时间片轮转,避免空闲等待空间分割,多核同时工作
硬件要求单核即可多核必需
主要瓶颈I/O 延迟、网络延迟CPU 计算能力
实现关键异步编程模型多线程编程模型
典型场景Web API 调用、数据库查询图像处理、数学计算

性能优化决策框架

问题诊断

在选择优化策略之前,需要准确识别系统的性能瓶颈:

// 性能分析示例
public class PerformanceAnalyzer
{
    public async Task<PerformanceMetrics> AnalyzeBottleneck()
    {
        var stopwatch = Stopwatch.StartNew();
        var cpuCounter = new PerformanceCounter("Processor", "% Processor Time", "_Total");

        // 监控 CPU 使用率
        var initialCpuUsage = cpuCounter.NextValue();

        // 执行待分析的操作
        await SomeOperation();

        var finalCpuUsage = cpuCounter.NextValue();
        var elapsedTime = stopwatch.Elapsed;

        return new PerformanceMetrics
        {
            CpuUsage = finalCpuUsage - initialCpuUsage,
            ExecutionTime = elapsedTime,
            BottleneckType = finalCpuUsage > 80 ? BottleneckType.CPU : BottleneckType.IO
        };
    }
}

选择决策树

任务类型判断
├── I/O 密集型(网络、文件、数据库)
│   ├── 任务间无依赖 → Task.WhenAll(并发)
│   └── 任务间有依赖 → foreach(串行)
├── CPU 密集型(计算、算法、数据处理)
│   ├── 可并行化 → Parallel.ForEach(并行)
│   └── 不可并行化 → foreach(串行)
└── 混合型
    └── 分阶段处理:先并发获取数据,再并行处理

实际应用案例

案例1:电商系统商品页面数据聚合

// 问题:需要从多个服务获取商品详情数据
public class ProductPageService
{
    // 错误做法:串行获取(耗时 = 各服务响应时间之和)
    public async Task<ProductPageData> GetProductDataSequential(int productId)
    {
        var product = await productService.GetProductAsync(productId);      // 200ms
        var inventory = await inventoryService.GetInventoryAsync(productId); // 150ms
        var reviews = await reviewService.GetReviewsAsync(productId);       // 300ms
        var recommendations = await recommendationService.GetRecommendationsAsync(productId); // 250ms

        return new ProductPageData(product, inventory, reviews, recommendations);
        // 总耗时:900ms
    }

    // 正确做法:并发获取(耗时 = 最慢服务的响应时间)
    public async Task<ProductPageData> GetProductDataConcurrent(int productId)
    {
        var productTask = productService.GetProductAsync(productId);
        var inventoryTask = inventoryService.GetInventoryAsync(productId);
        var reviewsTask = reviewService.GetReviewsAsync(productId);
        var recommendationsTask = recommendationService.GetRecommendationsAsync(productId);

        await Task.WhenAll(productTask, inventoryTask, reviewsTask, recommendationsTask);

        return new ProductPageData(
            productTask.Result,
            inventoryTask.Result,
            reviewsTask.Result,
            recommendationsTask.Result
        );
        // 总耗时:300ms(最慢的服务响应时间)
    }
}

案例2:图像批量处理系统

// 问题:需要对大量图片进行缩略图生成
public class ImageProcessingService
{
    // 错误做法:使用并发处理 CPU 密集型任务
    public async Task ProcessImagesWrong(string[] imagePaths)
    {
        var tasks = imagePaths.Select(path => Task.Run(() => ProcessSingleImage(path)));
        await Task.WhenAll(tasks);
        // 问题:创建过多线程,上下文切换开销大,效率反而降低
    }

    // 正确做法:使用并行处理
    public void ProcessImagesCorrect(string[] imagePaths)
    {
        Parallel.ForEach(imagePaths, new ParallelOptions
        {
            MaxDegreeOfParallelism = Environment.ProcessorCount
        }, imagePath =>
        {
            ProcessSingleImage(imagePath);
        });
        // 优势:自动负载均衡,最优线程数,充分利用 CPU 资源
    }

    private void ProcessSingleImage(string imagePath)
    {
        // CPU 密集型图像处理操作
        using var image = Image.Load(imagePath);
        var thumbnail = image.Clone(x => x.Resize(150, 150));
        thumbnail.Save(GetThumbnailPath(imagePath));
    }
}

案例3:混合场景处理

// 场景:从多个数据源获取数据后进行复杂计算
public class DataAnalysisService
{
    public async Task<AnalysisResult> PerformAnalysis(AnalysisRequest request)
    {
        // 第一阶段:并发获取数据(I/O 密集型)
        var dataSourceTasks = request.DataSources.Select(source =>
            dataService.FetchDataAsync(source));
        var allData = await Task.WhenAll(dataSourceTasks);

        // 第二阶段:并行计算分析(CPU 密集型)
        var analysisResults = new ConcurrentBag<PartialResult>();

        Parallel.ForEach(allData, dataset =>
        {
            var result = PerformComplexCalculation(dataset);
            analysisResults.Add(result);
        });

        // 第三阶段:聚合结果(串行,需要保证数据一致性)
        return AggregateResults(analysisResults);
    }
}

高级优化策略

1. 控制并发度

// 限制同时执行的任务数量,避免资源耗尽
public class ThrottledConcurrentProcessor
{
    private readonly SemaphoreSlim semaphore;

    public ThrottledConcurrentProcessor(int maxConcurrency)
    {
        semaphore = new SemaphoreSlim(maxConcurrency);
    }

    public async Task<IEnumerable<T>> ProcessWithThrottling<T>(
        IEnumerable<string> urls,
        Func<string, Task<T>> processor)
    {
        var tasks = urls.Select(async url =>
        {
            await semaphore.WaitAsync();
            try
            {
                return await processor(url);
            }
            finally
            {
                semaphore.Release();
            }
        });

        return await Task.WhenAll(tasks);
    }
}

2. 异常处理策略

public class RobustParallelProcessor
{
    public async Task<ProcessingResult> ProcessWithErrorHandling<T>(
        IEnumerable<T> items,
        Func<T, Task<ProcessedItem>> processor)
    {
        var successful = new ConcurrentBag<ProcessedItem>();
        var failures = new ConcurrentBag<FailedItem>();

        var tasks = items.Select(async item =>
        {
            try
            {
                var result = await processor(item);
                successful.Add(result);
            }
            catch (Exception ex)
            {
                failures.Add(new FailedItem { Item = item, Error = ex });
            }
        });

        await Task.WhenAll(tasks);

        return new ProcessingResult
        {
            Successful = successful.ToList(),
            Failures = failures.ToList()
        };
    }
}

3. 取消机制

public class CancellableProcessor
{
    public async Task ProcessWithCancellation<T>(
        IEnumerable<T> items,
        Func<T, CancellationToken, Task> processor,
        CancellationToken cancellationToken = default)
    {
        var parallelOptions = new ParallelOptions
        {
            CancellationToken = cancellationToken,
            MaxDegreeOfParallelism = Environment.ProcessorCount
        };

        await Task.Run(() =>
        {
            Parallel.ForEach(items, parallelOptions, item =>
            {
                cancellationToken.ThrowIfCancellationRequested();
                processor(item, cancellationToken).Wait(cancellationToken);
            });
        }, cancellationToken);
    }
}

总结

通过深入分析 foreachTask.WhenAllParallel.ForEach 三种方式,我们可以总结出以下关键理解:

核心原则

  1. 并发优化等待时间:当系统瓶颈在于 I/O 等待时,使用异步并发编程
  2. 并行优化计算时间:当系统瓶颈在于 CPU 计算时,使用多线程并行编程
  3. 串行保证执行顺序:当任务间有依赖关系或需要严格控制时,使用串行执行

决策指导

  • I/O 密集型任务Task.WhenAll + async/await
  • CPU 密集型任务Parallel.ForEachParallel.For
  • 混合型任务 → 分阶段处理,I/O 阶段用并发,计算阶段用并行
  • 有序依赖任务foreach 串行执行

最佳实践

  1. 先分析瓶颈,再选择策略
  2. 合理控制并发度,避免资源耗尽
  3. 完善异常处理和取消机制
  4. 根据实际负载调整策略

掌握这些概念和技术,能够帮助我们在实际开发中做出正确的性能优化决策,构建高效、稳定的应用系统。