Title here
Summary here
December 27, 20243 minutes
场景如下,系统中存在定时任务,每分钟触发一次,任务会调用第三方平台的API来获取系统中不同设备的实时数据,我们需要在定时任务中对设备的数据进行整理(返回的数据是一段时间内的数据,每次请求的时候会以系统现有最新的记录时间作为请求的起始时间,以当前时间作为结束时间的参数),同时将设备中的数据进行入库,入库时需要进行现有数据的对比,如果最新的数据与下一条待插入的数据各项参数均没有发生变化的话,就只更新这条记录的更新时间。为了优化入库的效率,现在使用的是并行的方式来进行任务的处理,同时又由于触发时间间隔较短,还可能会出现并发的问题,在之前的测试中会存在偶发的并行问题,现在思考一下后续的解决方案。
public class DeviceDataProcessor
{
private readonly SemaphoreSlim _semaphore;
private readonly ILogger<DeviceDataProcessor> _logger;
private readonly IDeviceDataService _deviceDataService;
private readonly ApplicationDbContext _dbContext;
private readonly IDistributedLockProvider _lockProvider;
private readonly DeviceDataConfiguration _configuration;
private readonly IDistributedCache _cache; // 用于记录处理状态
public DeviceDataProcessor(
ILogger<DeviceDataProcessor> logger,
IDeviceDataService deviceDataService,
ApplicationDbContext dbContext,
IDistributedLockProvider lockProvider,
DeviceDataConfiguration configuration,
IDistributedCache cache)
{
_logger = logger;
_deviceDataService = deviceDataService;
_dbContext = dbContext;
_lockProvider = lockProvider;
_configuration = configuration;
_cache = cache;
_semaphore = new SemaphoreSlim(
configuration.MaxConcurrency,
configuration.MaxConcurrency); // 控制并发数量
}
private async Task ProcessDeviceBatchWithIdempotency(IEnumerable<Device> deviceBatch)
{
await _semaphore.WaitAsync();
try
{
var deviceDataTasks = deviceBatch.Select(async device =>
{
// 获取设备最后更新时间
var lastProcessed = await _dbContext.DeviceData
.Where(d => d.DeviceId == device.Id)
.OrderByDescending(d => d.Timestamp)
.Select(d => d.Timestamp)
.FirstOrDefaultAsync();
var fromTime = lastProcessed != default ? lastProcessed : DateTime.UtcNow.AddDays(-1);
// 生成幂等键 - 基于设备ID和时间范围
var idempotencyKey = $"device_{device.Id}_{fromTime:yyyyMMddHHmmss}_{DateTime.UtcNow:yyyyMMddHHmmss}";
// 检查是否已处理
if (await _cache.GetAsync(idempotencyKey) != null)
{
_logger.LogInformation($"Data for device {device.Id} in this time range already processed");
return null;
}
var data = await _deviceDataService.GetDeviceDataFromApi(device.Id, fromTime, DateTime.UtcNow);
// 设置幂等标记
await _cache.SetAsync(idempotencyKey,
BitConverter.GetBytes(1),
new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(1)
});
return (device, data);
});
var results = await Task.WhenAll(deviceDataTasks);
// 过滤掉已处理的null结果
results = results.Where(r => r != null).ToArray();
using var transaction = await _dbContext.Database.BeginTransactionAsync();
try
{
foreach (var (device, dataList) in results)
{
foreach (var data in dataList)
{
// 计算数据指纹
var dataFingerprint = CalculateDataFingerprint(data);
// 查询是否存在相同数据指纹的记录
var existingWithFingerprint = await _dbContext.DeviceData
.Where(d => d.DeviceId == device.Id && d.DataFingerprint == dataFingerprint)
.FirstOrDefaultAsync();
if (existingWithFingerprint != null)
{
// 数据已存在,只更新时间戳
existingWithFingerprint.UpdatedAt = DateTime.UtcNow;
_dbContext.Update(existingWithFingerprint);
continue;
}
// 获取最新记录进行数据比对
var latestRecord = await _dbContext.DeviceData
.Where(d => d.DeviceId == device.Id)
.OrderByDescending(d => d.Timestamp)
.FirstOrDefaultAsync();
if (latestRecord != null)
{
if (!IsDataChanged(latestRecord.Data, data.Data))
{
// 数据没变化,更新时间戳
latestRecord.UpdatedAt = DateTime.UtcNow;
_dbContext.Update(latestRecord);
}
else
{
// 数据有变化,插入新记录
await InsertNewRecord(device.Id, data, dataFingerprint);
}
}
else
{
// 首次插入数据
await InsertNewRecord(device.Id, data, dataFingerprint);
}
}
}
await _dbContext.SaveChangesAsync();
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
finally
{
_semaphore.Release();
}
}
private async Task InsertNewRecord(string deviceId, DeviceData data, string dataFingerprint)
{
var newRecord = new DeviceData
{
DeviceId = deviceId,
Data = data.Data,
Timestamp = data.Timestamp,
DataFingerprint = dataFingerprint,
CreatedAt = DateTime.UtcNow
};
await _dbContext.DeviceData.AddAsync(newRecord);
}
private string CalculateDataFingerprint(DeviceData data)
{
var contentBytes = Encoding.UTF8.GetBytes(
JsonSerializer.Serialize(new
{
data.DeviceId,
data.Data,
data.Timestamp
})
);
using var sha256 = SHA256.Create();
var hashBytes = sha256.ComputeHash(contentBytes);
return Convert.ToBase64String(hashBytes);
}
}public class DeviceDataService : IDeviceDataService
{
private readonly RetryPolicy _retryPolicy;
private readonly IHttpClientFactory _httpClientFactory;
public DeviceDataService(IHttpClientFactory httpClientFactory)
{
_httpClientFactory = httpClientFactory;
_retryPolicy = Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
}
public async Task<DeviceData> GetDeviceDataFromApi(string deviceId)
{
return await _retryPolicy.ExecuteAsync(async () =>
{
using var client = _httpClientFactory.CreateClient();
var response = await client.GetAsync($"/api/devices/{deviceId}/data");
return await response.Content.ReadFromJsonAsync<DeviceData>();
});
}
public void UpdateDeviceData(DeviceData existing, DeviceData newData)
{
existing.Version = newData.Version;
existing.Data = newData.Data;
existing.UpdatedAt = DateTime.UtcNow;
}
}