In web applications, you may encounter situations where you need to execute long-running tasks, such as processing large files, sending emails, or calling external APIs. If these tasks are executed synchronously within an HTTP request, it could lead to a poor user experience and even timeouts.
In this post, we’ll discuss how to address this problem by implementing a background task queue in ASP.NET Core. This approach enables the execution of long-running tasks without blocking the main request processing pipeline.
Let’s say we have an API endpoint that receives a video file and needs to process it. The processing includes uploading the video to a third-party storage service, updating video metadata, and adding the video to an archive. This process can take a considerable amount of time and should not be executed synchronously within the HTTP request.
Here’s the initial implementation of the endpoint:
public class MyController : ControllerBase{private readonly IVideoProcessingService _videoProcessingService;public MyController(IVideoProcessingService videoProcessingService){_videoProcessingService = videoProcessingService;}[HttpPost("upload-video")]public async Task<IActionResult> UploadVideo(UploadVideoRequest request){await _videoProcessingService.UploadAsync(request);await _videoProcessingService.UpdateMetadataAsync(request);await _videoProcessingService.AddToArchiveAsync(request);return Ok();}}
The above implementation suffers from the mentioned problem: it executes the long-running tasks synchronously within the HTTP request, which could lead to a poor user experience and even timeouts.
To solve this problem, we can implement a background task queue using a combination of IBackgroundTaskQueue
, BackgroundTaskQueue
, and BackgroundTaskService
classes. This way, the long-running tasks are executed in the background without blocking the main request processing pipeline.
First, we define the IBackgroundTaskQueue
interface:
public interface IBackgroundTaskQueue{void QueueBackgroundWorkItem(Func<CancellationToken, ValueTask> workItem);Task<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken);}
Next, we implement the BackgroundTaskQueue
class:
public class BackgroundTaskQueue : IBackgroundTaskQueue{private readonly ConcurrentQueue<Func<CancellationToken, ValueTask>> _workItems = new();private readonly SemaphoreSlim _signal = new(0);public void QueueBackgroundWorkItem(Func<CancellationToken, ValueTask> workItem){if (workItem == null){throw new ArgumentNullException(nameof(workItem));}_workItems.Enqueue(workItem);_signal.Release();}public async Task<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken){await _signal.WaitAsync(cancellationToken);_workItems.TryDequeue(out var workItem);return workItem!;}}
We also need to implement the BackgroundTaskService
class:
public class BackgroundTaskService : BackgroundService{private readonly IBackgroundTaskQueue _taskQueue;public BackgroundTaskService(IBackgroundTaskQueue taskQueue){_taskQueue = taskQueue;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (!stoppingToken.IsCancellationRequested){var workItem = await _taskQueue.DequeueAsync(stoppingToken);try{if (workItem != null){await workItem(stoppingToken);}}catch (Exception){// Log the exception if necessary}}}}
Now, we can update the API endpoint to use the background task queue:
public class MyController : ControllerBase{private readonly IBackgroundTaskQueue _backgroundTaskQueue;public MyController(IBackgroundTaskQueue backgroundTaskQueue){_backgroundTaskQueue = backgroundTaskQueue;}[HttpPost("upload-video")]public IActionResult UploadVideo(UploadVideoRequest request){_backgroundTaskQueue.QueueBackgroundWorkItem(async token =>{await _videoProcessingService.UploadAsync(request, token);await _videoProcessingService.UpdateMetadataAsync(request, token);await _videoProcessingService.AddToArchiveAsync(request, token);});return Ok();}}
By using the background task queue, we’ve moved the long-running tasks out of the main request processing pipeline. This ensures that the user receives a response promptly, while the tasks are executed in the background.
Finally, we need to register the background task queue and service in the Startup.cs
:
public void ConfigureServices(IServiceCollection services){services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();services.AddHostedService<BackgroundTaskService>();// Other service registrations...}
Now, when the UploadVideo
endpoint is called, the long-running tasks will be executed in the background, improving the user experience and avoiding potential timeouts.
To add task progress and monitoring, we can introduce an ITaskProgressService
interface and its implementation to keep track of task progress. This will allow us to provide real-time updates on the progress of the long-running tasks.
First, we define the ITaskProgressService
interface:
public interface ITaskProgressService{void RegisterTask(Guid taskId);void UpdateProgress(Guid taskId, int progress);int GetProgress(Guid taskId);}
Next, we implement the TaskProgressService
class:
public class TaskProgressService : ITaskProgressService{private readonly ConcurrentDictionary<Guid, int> _taskProgress;public TaskProgressService(){_taskProgress = new ConcurrentDictionary<Guid, int>();}public void RegisterTask(Guid taskId){_taskProgress.TryAdd(taskId, 0);}public void UpdateProgress(Guid taskId, int progress){_taskProgress.AddOrUpdate(taskId, progress, (id, oldValue) => progress);}public int GetProgress(Guid taskId){return _taskProgress.TryGetValue(taskId, out int progress) ? progress : -1;}}
Now, we can update the background task queue to report progress updates:
public class MyController : ControllerBase{private readonly IBackgroundTaskQueue _backgroundTaskQueue;private readonly ITaskProgressService _taskProgressService;public MyController(IBackgroundTaskQueue backgroundTaskQueue, ITaskProgressService taskProgressService){_backgroundTaskQueue = backgroundTaskQueue;_taskProgressService = taskProgressService;}[HttpPost("upload-video")]public IActionResult UploadVideo(UploadVideoRequest request){Guid taskId = Guid.NewGuid();_taskProgressService.RegisterTask(taskId);_backgroundTaskQueue.QueueBackgroundWorkItem(async token =>{await _videoProcessingService.UploadAsync(request, token);_taskProgressService.UpdateProgress(taskId, 33);await _videoProcessingService.UpdateMetadataAsync(request, token);_taskProgressService.UpdateProgress(taskId, 66);await _videoProcessingService.AddToArchiveAsync(request, token);_taskProgressService.UpdateProgress(taskId, 100);});return Ok(new { TaskId = taskId });}}
We can add a new API endpoint to get the progress of a task by its ID:
[HttpGet("task-progress/{taskId}")]public IActionResult GetTaskProgress(Guid taskId){int progress = _taskProgressService.GetProgress(taskId);if (progress == -1){return NotFound();}return Ok(new { Progress = progress });}
Finally, we need to register the TaskProgressService
in the Startup.cs
:
public void ConfigureServices(IServiceCollection services){services.AddSingleton<ITaskProgressService, TaskProgressService>();// Other service registrations...}
With this implementation, the user can now monitor the progress of their tasks using the GetTaskProgress
endpoint. This provides a way to keep the user informed about the status of their long-running tasks in the background.
To handle errors and retry failed tasks, we can introduce a retry mechanism in the BackgroundTaskService
. This will allow us to automatically retry tasks that have encountered an error, reducing the likelihood of incomplete tasks.
First, let’s define a simple retry policy that allows us to specify the maximum number of retries and the delay between attempts:
public class RetryPolicy{public int MaxRetries { get; set; } = 3;public TimeSpan DelayBetweenRetries { get; set; } = TimeSpan.FromSeconds(5);}
Next, we can update the BackgroundTaskService
to include error handling and retry logic:
public class BackgroundTaskService : BackgroundService{private readonly IBackgroundTaskQueue _taskQueue;private readonly RetryPolicy _retryPolicy;public BackgroundTaskService(IBackgroundTaskQueue taskQueue, RetryPolicy retryPolicy){_taskQueue = taskQueue;_retryPolicy = retryPolicy;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (!stoppingToken.IsCancellationRequested){var workItem = await _taskQueue.DequeueAsync(stoppingToken);if (workItem != null){int retryCount = 0;while (retryCount < _retryPolicy.MaxRetries){try{await workItem(stoppingToken);break;}catch (Exception){// Log the exception if necessaryretryCount++;if (retryCount < _retryPolicy.MaxRetries){await Task.Delay(_retryPolicy.DelayBetweenRetries, stoppingToken);}}}}}}}
Finally, we need to register the RetryPolicy
in the Startup.cs
:
public void ConfigureServices(IServiceCollection services){services.AddSingleton<RetryPolicy>(new RetryPolicy{MaxRetries = 3,DelayBetweenRetries = TimeSpan.FromSeconds(5)});// Other service registrations...}
With this implementation, the BackgroundTaskService
will now retry failed tasks according to the configured retry policy. This helps ensure that tasks have a higher chance of being completed successfully, even in the face of transient errors or other issues.
You can further enhance this implementation by adding more sophisticated error handling, such as different retry strategies, error logging, or notification systems to alert you when tasks fail repeatedly.
As the number of long-running tasks increases, it may become necessary to scale the background task processing to handle the additional workload. One approach to achieve this is by introducing multiple instances of the BackgroundTaskService
and distributing tasks among them.
First, let’s define a worker configuration class that allows us to specify the number of worker instances:
public class WorkerConfiguration{public int WorkerCount { get; set; } = 1;}
Next, update the BackgroundTaskService
registration in the Startup.cs
to create multiple instances of the service based on the worker configuration:
public void ConfigureServices(IServiceCollection services){services.AddSingleton<WorkerConfiguration>(new WorkerConfiguration{WorkerCount = Environment.ProcessorCount // Use the number of processor cores as worker count});var workerConfiguration = services.BuildServiceProvider().GetRequiredService<WorkerConfiguration>();for (int i = 0; i < workerConfiguration.WorkerCount; i++){services.AddHostedService<BackgroundTaskService>();}// Other service registrations...}
By using multiple instances of the BackgroundTaskService
, we distribute the workload among them, allowing for better utilization of system resources and improved overall throughput.
To further optimize task distribution among worker instances, you can implement various task distribution strategies. For example, you could use a round-robin approach, where each task is assigned to the next available worker. Alternatively, you could assign tasks based on the current workload of each worker or even implement more advanced algorithms that consider factors such as task priority or estimated completion time.
To ensure that the system scales effectively, it’s essential to monitor the background task processing performance. By collecting metrics such as task completion times, resource usage, and queue lengths, you can gain insights into the system’s behavior and identify potential bottlenecks or areas for improvement.
In addition to scaling horizontally (by adding more worker instances), you can also consider scaling vertically by adjusting the resources allocated to each worker (e.g., CPU, memory). Furthermore, you can explore other scaling options such as containerization and orchestration (e.g., Docker, Kubernetes) or using managed services like Azure Functions or AWS Lambda to handle background tasks.
With these approaches in place, you can efficiently scale your background task processing infrastructure to handle increasing workloads and maintain high performance under varying conditions.
In some cases, you may need to execute more critical tasks before less important ones. To implement task prioritization, we can extend the IBackgroundTaskQueue
interface and its implementation to support task priorities.
First, let’s define a new class PriorityBackgroundTask
that will hold the task function and its priority:
public class PriorityBackgroundTask{public Func<CancellationToken, ValueTask> TaskFunction { get; set; }public int Priority { get; set; }}
Next, update the IBackgroundTaskQueue
interface to support task priorities:
public interface IBackgroundTaskQueue{void QueueBackgroundWorkItem(int priority, Func<CancellationToken, ValueTask> workItem);Task<PriorityBackgroundTask> DequeueAsync(CancellationToken cancellationToken);}
Now, modify the BackgroundTaskQueue
class to use a priority queue instead of a concurrent queue:
public class BackgroundTaskQueue : IBackgroundTaskQueue{private readonly ConcurrentPriorityQueue<int, PriorityBackgroundTask> _priorityQueue = new();private readonly SemaphoreSlim _signal = new(0);public void QueueBackgroundWorkItem(int priority, Func<CancellationToken, ValueTask> workItem){if (workItem == null){throw new ArgumentNullException(nameof(workItem));}var priorityTask = new PriorityBackgroundTask { TaskFunction = workItem, Priority = priority };_priorityQueue.Enqueue(priority, priorityTask);_signal.Release();}public async Task<PriorityBackgroundTask> DequeueAsync(CancellationToken cancellationToken){await _signal.WaitAsync(cancellationToken);_priorityQueue.TryDequeue(out var priorityTask);return priorityTask!;}}
You can use an existing priority queue implementation or create your own. In this example, we assume a ConcurrentPriorityQueue<TKey, TValue>
class is available, which internally manages the priority order.
Update the BackgroundTaskService
class to use the PriorityBackgroundTask
:
public class BackgroundTaskService : BackgroundService{private readonly IBackgroundTaskQueue _taskQueue;private readonly RetryPolicy _retryPolicy;public BackgroundTaskService(IBackgroundTaskQueue taskQueue, RetryPolicy retryPolicy){_taskQueue = taskQueue;_retryPolicy = retryPolicy;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (!stoppingToken.IsCancellationRequested){var priorityTask = await _taskQueue.DequeueAsync(stoppingToken);if (priorityTask != null){int retryCount = 0;while (retryCount < _retryPolicy.MaxRetries){try{await priorityTask.TaskFunction(stoppingToken);break;}catch (Exception){// Log the exception if necessaryretryCount++;if (retryCount < _retryPolicy.MaxRetries){await Task.Delay(_retryPolicy.DelayBetweenRetries, stoppingToken);}}}}}}}
Update the API endpoint to use the background task queue with task priorities:
public class MyController : ControllerBase{private readonly IBackgroundTaskQueue _backgroundTaskQueue;public MyController(IBackgroundTaskQueue backgroundTaskQueue){_backgroundTaskQueue = backgroundTaskQueue;}[HttpPost("upload-video")]public IActionResult UploadVideo(UploadVideoRequest request, int priority){_backgroundTaskQueue.QueueBackgroundWorkItem(priority, async token =>{await _videoProcessingService.UploadAsync(request, token);await _videoProcessingService.UpdateMetadataAsync(request, token);await _videoProcessingService.AddToArchiveAsync(request, token);});return Ok();}}
In this example, we added an int priority
parameter to the UploadVideo
method, which is then passed to the QueueBackgroundWorkItem
method. You can adjust the priority parameter source as needed (e.g., from the request object, a query parameter, or based on user roles).
By introducing task prioritization, you can ensure that critical tasks are executed before less important ones, enabling a more efficient and responsive background task processing system.
Efficient load balancing and task distribution are essential for maximizing resource utilization and system throughput. In this section, we’ll explore different techniques for distributing tasks among multiple worker instances.
One simple approach to task distribution is the round-robin technique, where tasks are assigned to worker instances in a circular order. To implement this, we need to modify the BackgroundTaskQueue
class to maintain a separate task queue for each worker instance and enqueue tasks in a round-robin fashion:
public class BackgroundTaskQueue : IBackgroundTaskQueue{private readonly List<ConcurrentQueue<PriorityBackgroundTask>> _queues;private readonly SemaphoreSlim _signal;private int _currentIndex;public BackgroundTaskQueue(WorkerConfiguration workerConfiguration){_queues = Enumerable.Range(0, workerConfiguration.WorkerCount).Select(_ => new ConcurrentQueue<PriorityBackgroundTask>()).ToList();_signal = new SemaphoreSlim(workerConfiguration.WorkerCount);_currentIndex = 0;}public void QueueBackgroundWorkItem(int priority, Func<CancellationToken, ValueTask> workItem){if (workItem == null){throw new ArgumentNullException(nameof(workItem));}var priorityTask = new PriorityBackgroundTask { TaskFunction = workItem, Priority = priority };_queues[GetNextIndex()].Enqueue(priorityTask);_signal.Release();}public async Task<PriorityBackgroundTask> DequeueAsync(CancellationToken cancellationToken){await _signal.WaitAsync(cancellationToken);for (int i = 0; i < _queues.Count; i++){if (_queues[i].TryDequeue(out var priorityTask)){return priorityTask;}}return null!;}private int GetNextIndex(){int index = _currentIndex;_currentIndex = (_currentIndex + 1) % _queues.Count;return index;}}
This implementation assumes that you have registered the WorkerConfiguration
class in the dependency injection container and that it is injected into the BackgroundTaskQueue
constructor.
There are various other task distribution strategies that you can consider, such as:
Least Workload: Assign tasks to the worker instance with the smallest current workload (e.g., based on the number of tasks in the worker’s queue or estimated completion time).
Task Affinity: Assign tasks to worker instances based on specific criteria, such as data locality or resource requirements (e.g., CPU, memory). This can help minimize data movement and improve overall performance.
Consistent Hashing: Use a consistent hashing algorithm to distribute tasks among worker instances. This approach can minimize task redistribution when worker instances are added or removed, ensuring a more stable distribution.
Sharding: Divide tasks into shards based on specific attributes (e.g., task type, data partition) and assign each shard to a dedicated worker instance. This can help reduce contention and improve resource utilization.
By implementing advanced load balancing and task distribution techniques, you can further optimize the background task processing system, ensuring efficient resource utilization and improved performance under varying workloads.
In this post, we have covered various aspects of implementing background task processing in ASP.NET Core, including task progress and monitoring, error handling and retry, scaling, task prioritization, and load balancing and task distribution. By adopting these techniques, you can build a more robust, efficient, and scalable background task processing system that meets the demands of modern web applications.
To dive deeper into the topics covered in this post, consider checking out the following resources:
We’d love to hear your feedback on this tutorial! If you have any questions or suggestions for improvement, please don’t hesitate to reach out. You can leave a comment below, or you can contact us through the following channels:
We’ll do our best to address any questions or concerns you may have. We look forward to hearing from you and helping you make the most of background task processing in ASP.NET Core!
Quick Links
Legal Stuff