定时任务中的并发并行问题

December 27, 20243 minutes

场景如下,系统中存在定时任务,每分钟触发一次,任务会调用第三方平台的API来获取系统中不同设备的实时数据,我们需要在定时任务中对设备的数据进行整理(返回的数据是一段时间内的数据,每次请求的时候会以系统现有最新的记录时间作为请求的起始时间,以当前时间作为结束时间的参数),同时将设备中的数据进行入库,入库时需要进行现有数据的对比,如果最新的数据与下一条待插入的数据各项参数均没有发生变化的话,就只更新这条记录的更新时间。为了优化入库的效率,现在使用的是并行的方式来进行任务的处理,同时又由于触发时间间隔较短,还可能会出现并发的问题,在之前的测试中会存在偶发的并行问题,现在思考一下后续的解决方案。

问题分析

  • 每分钟触发的定时任务可能会与上一次任务产生重叠
  • 需要并行处理多个设备的数据以提升效率
  • 数据入库需要保证一致性和准确性
  • 合理处理API调用限制和响应时间

解决方案

  1. 并发控制问题
  • 使用分布式锁(分布式环境下)确保只有一个实例在运行
  • 使用Semaphore控制并行度,避免过度并发
  • 采用乐观锁进行数据版本控制
  1. 性能问题
  • 批量处理设备数据,减少对数据库的操作次数
  • 使用异步方法处理I/O操作
  • 考虑增加重试机制处理临时性故障
  1. 数据一致性问题
  • 采用事务确保数据完整性
  • 增加幂等性处理,避免重复处理

代码示例

public class DeviceDataProcessor
{
    private readonly SemaphoreSlim _semaphore;
    private readonly ILogger<DeviceDataProcessor> _logger;
    private readonly IDeviceDataService _deviceDataService;
    private readonly ApplicationDbContext _dbContext;
    private readonly IDistributedLockProvider _lockProvider;
    private readonly DeviceDataConfiguration _configuration;
    private readonly IDistributedCache _cache; // 用于记录处理状态

    public DeviceDataProcessor(
        ILogger<DeviceDataProcessor> logger,
        IDeviceDataService deviceDataService,
        ApplicationDbContext dbContext,
        IDistributedLockProvider lockProvider,
        DeviceDataConfiguration configuration,
        IDistributedCache cache)
    {
        _logger = logger;
        _deviceDataService = deviceDataService;
        _dbContext = dbContext;
        _lockProvider = lockProvider;
        _configuration = configuration;
        _cache = cache;
        _semaphore = new SemaphoreSlim(
            configuration.MaxConcurrency,
            configuration.MaxConcurrency); // 控制并发数量
    }

    private async Task ProcessDeviceBatchWithIdempotency(IEnumerable<Device> deviceBatch)
    {
        await _semaphore.WaitAsync();
        try
        {
            var deviceDataTasks = deviceBatch.Select(async device =>
            {
                // 获取设备最后更新时间
                var lastProcessed = await _dbContext.DeviceData
                    .Where(d => d.DeviceId == device.Id)
                    .OrderByDescending(d => d.Timestamp)
                    .Select(d => d.Timestamp)
                    .FirstOrDefaultAsync();

                var fromTime = lastProcessed != default ? lastProcessed : DateTime.UtcNow.AddDays(-1);

                // 生成幂等键 - 基于设备ID和时间范围
                var idempotencyKey = $"device_{device.Id}_{fromTime:yyyyMMddHHmmss}_{DateTime.UtcNow:yyyyMMddHHmmss}";

                // 检查是否已处理
                if (await _cache.GetAsync(idempotencyKey) != null)
                {
                    _logger.LogInformation($"Data for device {device.Id} in this time range already processed");
                    return null;
                }

                var data = await _deviceDataService.GetDeviceDataFromApi(device.Id, fromTime, DateTime.UtcNow);

                // 设置幂等标记
                await _cache.SetAsync(idempotencyKey,
                    BitConverter.GetBytes(1),
                    new DistributedCacheEntryOptions
                    {
                        AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1)
                    });

                return (device, data);
            });

            var results = await Task.WhenAll(deviceDataTasks);

            // 过滤掉已处理的null结果
            results = results.Where(r => r != null).ToArray();

            using var transaction = await _dbContext.Database.BeginTransactionAsync();
            try
            {
                foreach (var (device, dataList) in results)
                {
                    foreach (var data in dataList)
                    {
                        // 计算数据指纹
                        var dataFingerprint = CalculateDataFingerprint(data);

                        // 查询是否存在相同数据指纹的记录
                        var existingWithFingerprint = await _dbContext.DeviceData
                            .Where(d => d.DeviceId == device.Id && d.DataFingerprint == dataFingerprint)
                            .FirstOrDefaultAsync();

                        if (existingWithFingerprint != null)
                        {
                            // 数据已存在,只更新时间戳
                            existingWithFingerprint.UpdatedAt = DateTime.UtcNow;
                            _dbContext.Update(existingWithFingerprint);
                            continue;
                        }

                        // 获取最新记录进行数据比对
                        var latestRecord = await _dbContext.DeviceData
                            .Where(d => d.DeviceId == device.Id)
                            .OrderByDescending(d => d.Timestamp)
                            .FirstOrDefaultAsync();

                        if (latestRecord != null)
                        {
                            if (!IsDataChanged(latestRecord.Data, data.Data))
                            {
                                // 数据没变化,更新时间戳
                                latestRecord.UpdatedAt = DateTime.UtcNow;
                                _dbContext.Update(latestRecord);
                            }
                            else
                            {
                                // 数据有变化,插入新记录
                                await InsertNewRecord(device.Id, data, dataFingerprint);
                            }
                        }
                        else
                        {
                            // 首次插入数据
                            await InsertNewRecord(device.Id, data, dataFingerprint);
                        }
                    }
                }

                await _dbContext.SaveChangesAsync();
                await transaction.CommitAsync();
            }
            catch
            {
                await transaction.RollbackAsync();
                throw;
            }
        }
        finally
        {
            _semaphore.Release();
        }
    }

    private async Task InsertNewRecord(string deviceId, DeviceData data, string dataFingerprint)
    {
        var newRecord = new DeviceData
        {
            DeviceId = deviceId,
            Data = data.Data,
            Timestamp = data.Timestamp,
            DataFingerprint = dataFingerprint,
            CreatedAt = DateTime.UtcNow
        };
        await _dbContext.DeviceData.AddAsync(newRecord);
    }

    private string CalculateDataFingerprint(DeviceData data)
    {
        var contentBytes = Encoding.UTF8.GetBytes(
            JsonSerializer.Serialize(new
            {
                data.DeviceId,
                data.Data,
                data.Timestamp
            })
        );
        using var sha256 = SHA256.Create();
        var hashBytes = sha256.ComputeHash(contentBytes);
        return Convert.ToBase64String(hashBytes);
    }
}
public class DeviceDataService : IDeviceDataService
{
    private readonly RetryPolicy _retryPolicy;
    private readonly IHttpClientFactory _httpClientFactory;

    public DeviceDataService(IHttpClientFactory httpClientFactory)
    {
        _httpClientFactory = httpClientFactory;
        _retryPolicy = Policy
            .Handle<HttpRequestException>()
            .WaitAndRetryAsync(3, retryAttempt =>
                TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
    }

    public async Task<DeviceData> GetDeviceDataFromApi(string deviceId)
    {
        return await _retryPolicy.ExecuteAsync(async () =>
        {
            using var client = _httpClientFactory.CreateClient();
            var response = await client.GetAsync($"/api/devices/{deviceId}/data");
            return await response.Content.ReadFromJsonAsync<DeviceData>();
        });
    }

    public void UpdateDeviceData(DeviceData existing, DeviceData newData)
    {
        existing.Version = newData.Version;
        existing.Data = newData.Data;
        existing.UpdatedAt = DateTime.UtcNow;
    }
}