Title here
Summary here
July 14, 20254 minutes
在 .NET 开发中,我们经常遇到需要处理多个任务的场景。foreach、Task.WhenAll 和 Parallel.ForEach 是三种常见的处理方式,但它们的适用场景和性能特征截然不同。通过深入分析这三种方法,我们可以更好地理解并发(Concurrency)和并行(Parallelism)的本质区别,以及如何在实际开发中做出正确的选择。
// 串行执行示例
public async Task ProcessItemsSequentially(IEnumerable<string> urls)
{
var results = new List<string>();
foreach (var url in urls)
{
var result = await httpClient.GetStringAsync(url);
results.Add(result);
}
return results;
}执行特征:
适用场景:
// 异步并发执行示例
public async Task<IEnumerable<string>> ProcessItemsConcurrently(IEnumerable<string> urls)
{
var tasks = urls.Select(url => httpClient.GetStringAsync(url));
var results = await Task.WhenAll(tasks);
return results;
}执行特征:
技术细节:
// 多线程并行执行示例
public IEnumerable<ProcessedData> ProcessItemsInParallel(IEnumerable<RawData> data)
{
var results = new ConcurrentBag<ProcessedData>();
Parallel.ForEach(data, item =>
{
var processed = PerformCpuIntensiveOperation(item);
results.Add(processed);
});
return results;
}执行特征:
技术细节:
定义: 系统能够处理多个同时进行的任务,但不一定要求这些任务在同一时刻执行。
核心特征:
技术实现:
// 并发处理多个 I/O 操作
public async Task<CombinedResult> FetchDataConcurrently()
{
// 所有请求逻辑上同时发起
var userTask = userService.GetUserAsync(userId);
var ordersTask = orderService.GetOrdersAsync(userId);
var preferencesTask = preferenceService.GetPreferencesAsync(userId);
// 等待所有操作完成
await Task.WhenAll(userTask, ordersTask, preferencesTask);
return new CombinedResult
{
User = userTask.Result,
Orders = ordersTask.Result,
Preferences = preferencesTask.Result
};
}定义: 同一时刻真正有多个任务在不同的处理器核心上执行。
核心特征:
技术实现:
// 并行处理大量数据
public double[] CalculateSquareRoots(double[] numbers)
{
var results = new double[numbers.Length];
// 工作被分配到多个 CPU 核心同时处理
Parallel.For(0, numbers.Length, i =>
{
results[i] = Math.Sqrt(numbers[i]);
});
return results;
}| 特性 | 并发 | 并行 |
|---|---|---|
| 问题域 | I/O 等待、资源协调 | CPU 计算能力不足 |
| 资源利用 | 时间片轮转,避免空闲等待 | 空间分割,多核同时工作 |
| 硬件要求 | 单核即可 | 多核必需 |
| 主要瓶颈 | I/O 延迟、网络延迟 | CPU 计算能力 |
| 实现关键 | 异步编程模型 | 多线程编程模型 |
| 典型场景 | Web API 调用、数据库查询 | 图像处理、数学计算 |
在选择优化策略之前,需要准确识别系统的性能瓶颈:
// 性能分析示例
public class PerformanceAnalyzer
{
public async Task<PerformanceMetrics> AnalyzeBottleneck()
{
var stopwatch = Stopwatch.StartNew();
var cpuCounter = new PerformanceCounter("Processor", "% Processor Time", "_Total");
// 监控 CPU 使用率
var initialCpuUsage = cpuCounter.NextValue();
// 执行待分析的操作
await SomeOperation();
var finalCpuUsage = cpuCounter.NextValue();
var elapsedTime = stopwatch.Elapsed;
return new PerformanceMetrics
{
CpuUsage = finalCpuUsage - initialCpuUsage,
ExecutionTime = elapsedTime,
BottleneckType = finalCpuUsage > 80 ? BottleneckType.CPU : BottleneckType.IO
};
}
}任务类型判断
├── I/O 密集型(网络、文件、数据库)
│ ├── 任务间无依赖 → Task.WhenAll(并发)
│ └── 任务间有依赖 → foreach(串行)
├── CPU 密集型(计算、算法、数据处理)
│ ├── 可并行化 → Parallel.ForEach(并行)
│ └── 不可并行化 → foreach(串行)
└── 混合型
└── 分阶段处理:先并发获取数据,再并行处理// 问题:需要从多个服务获取商品详情数据
public class ProductPageService
{
// 错误做法:串行获取(耗时 = 各服务响应时间之和)
public async Task<ProductPageData> GetProductDataSequential(int productId)
{
var product = await productService.GetProductAsync(productId); // 200ms
var inventory = await inventoryService.GetInventoryAsync(productId); // 150ms
var reviews = await reviewService.GetReviewsAsync(productId); // 300ms
var recommendations = await recommendationService.GetRecommendationsAsync(productId); // 250ms
return new ProductPageData(product, inventory, reviews, recommendations);
// 总耗时:900ms
}
// 正确做法:并发获取(耗时 = 最慢服务的响应时间)
public async Task<ProductPageData> GetProductDataConcurrent(int productId)
{
var productTask = productService.GetProductAsync(productId);
var inventoryTask = inventoryService.GetInventoryAsync(productId);
var reviewsTask = reviewService.GetReviewsAsync(productId);
var recommendationsTask = recommendationService.GetRecommendationsAsync(productId);
await Task.WhenAll(productTask, inventoryTask, reviewsTask, recommendationsTask);
return new ProductPageData(
productTask.Result,
inventoryTask.Result,
reviewsTask.Result,
recommendationsTask.Result
);
// 总耗时:300ms(最慢的服务响应时间)
}
}// 问题:需要对大量图片进行缩略图生成
public class ImageProcessingService
{
// 错误做法:使用并发处理 CPU 密集型任务
public async Task ProcessImagesWrong(string[] imagePaths)
{
var tasks = imagePaths.Select(path => Task.Run(() => ProcessSingleImage(path)));
await Task.WhenAll(tasks);
// 问题:创建过多线程,上下文切换开销大,效率反而降低
}
// 正确做法:使用并行处理
public void ProcessImagesCorrect(string[] imagePaths)
{
Parallel.ForEach(imagePaths, new ParallelOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount
}, imagePath =>
{
ProcessSingleImage(imagePath);
});
// 优势:自动负载均衡,最优线程数,充分利用 CPU 资源
}
private void ProcessSingleImage(string imagePath)
{
// CPU 密集型图像处理操作
using var image = Image.Load(imagePath);
var thumbnail = image.Clone(x => x.Resize(150, 150));
thumbnail.Save(GetThumbnailPath(imagePath));
}
}// 场景:从多个数据源获取数据后进行复杂计算
public class DataAnalysisService
{
public async Task<AnalysisResult> PerformAnalysis(AnalysisRequest request)
{
// 第一阶段:并发获取数据(I/O 密集型)
var dataSourceTasks = request.DataSources.Select(source =>
dataService.FetchDataAsync(source));
var allData = await Task.WhenAll(dataSourceTasks);
// 第二阶段:并行计算分析(CPU 密集型)
var analysisResults = new ConcurrentBag<PartialResult>();
Parallel.ForEach(allData, dataset =>
{
var result = PerformComplexCalculation(dataset);
analysisResults.Add(result);
});
// 第三阶段:聚合结果(串行,需要保证数据一致性)
return AggregateResults(analysisResults);
}
}// 限制同时执行的任务数量,避免资源耗尽
public class ThrottledConcurrentProcessor
{
private readonly SemaphoreSlim semaphore;
public ThrottledConcurrentProcessor(int maxConcurrency)
{
semaphore = new SemaphoreSlim(maxConcurrency);
}
public async Task<IEnumerable<T>> ProcessWithThrottling<T>(
IEnumerable<string> urls,
Func<string, Task<T>> processor)
{
var tasks = urls.Select(async url =>
{
await semaphore.WaitAsync();
try
{
return await processor(url);
}
finally
{
semaphore.Release();
}
});
return await Task.WhenAll(tasks);
}
}public class RobustParallelProcessor
{
public async Task<ProcessingResult> ProcessWithErrorHandling<T>(
IEnumerable<T> items,
Func<T, Task<ProcessedItem>> processor)
{
var successful = new ConcurrentBag<ProcessedItem>();
var failures = new ConcurrentBag<FailedItem>();
var tasks = items.Select(async item =>
{
try
{
var result = await processor(item);
successful.Add(result);
}
catch (Exception ex)
{
failures.Add(new FailedItem { Item = item, Error = ex });
}
});
await Task.WhenAll(tasks);
return new ProcessingResult
{
Successful = successful.ToList(),
Failures = failures.ToList()
};
}
}public class CancellableProcessor
{
public async Task ProcessWithCancellation<T>(
IEnumerable<T> items,
Func<T, CancellationToken, Task> processor,
CancellationToken cancellationToken = default)
{
var parallelOptions = new ParallelOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = Environment.ProcessorCount
};
await Task.Run(() =>
{
Parallel.ForEach(items, parallelOptions, item =>
{
cancellationToken.ThrowIfCancellationRequested();
processor(item, cancellationToken).Wait(cancellationToken);
});
}, cancellationToken);
}
}通过深入分析 foreach、Task.WhenAll 和 Parallel.ForEach 三种方式,我们可以总结出以下关键理解:
Task.WhenAll + async/awaitParallel.ForEach 或 Parallel.Forforeach 串行执行掌握这些概念和技术,能够帮助我们在实际开发中做出正确的性能优化决策,构建高效、稳定的应用系统。