PostgreSQL Queue Processing: How to Handle Concurrency Efficiently
Practical Guide to Concurrency-Friendly Queue Processing in PostgreSQL with Entity Framework Core
In modern applications, efficient queue processing is crucial for maintaining performance and ensuring that tasks are handled in a timely manner. When dealing with high volumes of tasks, managing concurrency becomes a key challenge. PostgreSQL, with its robust feature set, provides powerful tools to handle these challenges effectively.
In this article, we will explore how to manage concurrency in PostgreSQL queue processing. We will provide practical examples using C# and Entity Framework, demonstrating how to implement efficient and reliable queue processing systems. Whether you are building a new application or optimizing an existing one, these techniques will help you ensure that your queue processing is both performant and concurrency-friendly.
Let's dive in and see how you can leverage PostgreSQL and .NET to handle queue processing with ease.
Building the Playground: Deploying PostgreSQL via Docker and Connecting to it using Entity Framework Core
Let's start by deploying PostgreSQL on our localhost. Here's a compose.yml
file that does just that:
services:
postgres:
image: postgres:13
ports:
- "5432:5432"
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
After running docker compose up -d
we should have our database deployed. Let's also create a new .NET project by running dotnet new console
. Finally, we'll need a package for PostgreSQL provider for Entity Framework, a package to use snake_case
naming convention, and a console logger, for the observability. Here's the script to install all of them:
dotnet add package Npgsql.EntityFrameworkCore.PostgreSQL
dotnet add package EFCore.NamingConventions
dotnet add package Microsoft.Extensions.Logging.Console
Now, let's make a very simple context with just a single table:
public class Db(DbContextOptions<Db> options) : DbContext(options)
{
public DbSet<QueueItem> Queue { get; set; }
}
public class QueueItem
{
public int Id { get; set; }
public string Name { get; set; } = null!;
}
Let's also do a very not production-ready helper method for initializing our database using the packages we've installed earlier:
public static Db Create(bool useLogger = true)
{
var options = new DbContextOptionsBuilder<Db>()
.UseNpgsql("Host=localhost;Database=playground;Username=postgres;Password=postgres")
.UseSnakeCaseNamingConvention()
.UseLoggerFactory(LoggerFactory.Create(builder =>
{
builder.SetMinimumLevel(LogLevel.Trace);
builder.AddFilter("Microsoft.EntityFrameworkCore.ChangeTracking", LogLevel.Information);
builder.AddFilter("Microsoft.EntityFrameworkCore.Query", LogLevel.Information);
if (useLogger)
builder.AddSimpleConsole(c => c.SingleLine = true);
}))
.Options;
return new Db(options);
}
Let's also do another helper method that will allow us to instantly get a brand new database based on the schema we've described using our entity objects:
public static async Task<Db> CreateEmpty(bool useLogger = true)
{
var db = Create(useLogger);
await db.Database.EnsureDeletedAsync();
await db.Database.EnsureCreatedAsync();
return db;
}
The next thing we'll need is a thousand queue items for us to play around with. Let's add a helper method for them:
public async Task SaveThousandNewItems()
{
var items = Enumerable.Range(1, 1000)
.Select(i => new QueueItem() { Name = $"Task {i}" });
Queue.AddRange(items);
await SaveChangesAsync();
}
Before moving to the interesting part, let's test what we have here:
using var db = await Db.CreateEmpty(useLogger: false);
await db.SaveThousandNewItems();
using var db2 = Db.Create(useLogger: true);
var count = await db2.Queue.CountAsync(); // should return 1000
I hope you'll also be able to successfully obtain a 1000, because now it's time to do some queue processing!
Introducing the FOR UPDATE Operator!
As you may see in the title, we'll use the FOR UPDATE
operator for our task. The operator locks selected rows for the duration of the current transaction, which is exactly what we need: Exclusively acquire a set of queue items for the time of processing. Our goal for this article is just to get familiar with the basics of the operator, so we'll use the simplest example possible:
SELECT *
FROM queue
LIMIT 100
FOR UPDATE
Locking the rows is good for exclusivity, but pretty bad for performance, since the other queries will likely just wait for the transaction to release the lock. Gladly, we can tell other queries to just forget about the locked rows using FOR UPDATE SKIP LOCKED
construct. Here's how we can use this operator in another query, calculating how many rows are currently available:
WITH unlocked_rows AS (
SELECT 1
FROM queue
FOR UPDATE SKIP LOCKED
)
SELECT COUNT(*) as "Value"
FROM unlocked_rows
Now, when we know our SQL, let's put it together with Entity Framework and run some experiments!
Locking a Hundred Items in EF with the FOR UPDATE operator
First thing first, we'll initiate our database
await (await Db.CreateEmpty(useLogger: false)).SaveThousandNewItems();
Then, using a new db
instance we'll lock a hundred items. We'll need to begin a transaction, so that our retrieved rows will stay locked not just for the duration of the query, but until we close the transaction. Here's how we can start the query:
Console.WriteLine("Starting Lock Hundred");
await using var db = Db.Create(useLogger: false);
await using var tx = db.Database.BeginTransaction();
_ = await db.Queue.FromSql(
$"""
SELECT *
FROM queue
LIMIT 100
FOR UPDATE
"""
).ToListAsync();
Before exiting let's sleep for a little while, so that we can run another query against the table with some locked items:
public async Task FinishAndSleep(int milliseconds)
{
Console.WriteLine("Finished Lock Hundred Query, Sleeping");
await Task.Delay(milliseconds);
Console.WriteLine("Sleeping ended");
}
What we'll run in another thread is our familiar query, counting currently available items. Here's the code:
public async Task CheckCurrentlyAvailable(int milliseconds)
{
await using var db = Db.Create(useLogger: false);
await using var tx = db.Database.BeginTransaction();
var count = await db.Database.SqlQuery<int>(
$"""
WITH unlocked_rows AS (
SELECT 1
FROM queue
FOR UPDATE SKIP LOCKED
)
SELECT COUNT(*) as "Value"
FROM unlocked_rows
"""
).FirstAsync();
await Task.Delay(milliseconds);
Console.WriteLine($"Unlocked rows count: {count}");
}
Moreover, we won't just count it once, we'll do it 10 times to track dynamics. Here's a helper method, that will let us achieve that;
public static async Task TenTimes(Func<Task> task)
{
var checks = 0;
while (checks < 10)
{
await task();
checks++;
}
}
Let's put it all together! After initiating the database, we'll start a task in a background, locking our items for about 500 milliseconds. In parallel, we'll run the check of currently available items. Here's how it all will look together:
await (await Db.CreateEmpty(useLogger: false)).SaveThousandNewItems();
_ = Task.Run(async () =>
{
Console.WriteLine("Starting Lock Hundred");
await using var db = Db.Create(useLogger: false);
await using var tx = db.Database.BeginTransaction();
_ = await db.Queue.FromSql(
$"""
SELECT *
FROM queue
LIMIT 100
FOR UPDATE
"""
).ToListAsync();
await FinishAndSleep(500);
});
await TenTimes(() => CheckCurrentlyAvailable(100));
And here's the result we might expect:
Starting Lock Hundred
Finished Lock Hundred Query, Sleeping
Unlocked rows count: 900
Unlocked rows count: 900
Unlocked rows count: 900
Unlocked rows count: 900
Sleeping ended
Unlocked rows count: 900
Unlocked rows count: 1000
Unlocked rows count: 1000
Unlocked rows count: 1000
Unlocked rows count: 1000
Unlocked rows count: 1000
As you may see, the query successfully locked a hundred items for the first five check iterations. After that, we were able to get the whole thousand items unlocked!
This is basically it about the PostgreSQL! But if you are anything like me, you might be wondering how the transactions we've used are closed in EF - let's experiment with that!
Bonus: Playing around with Entity Framework (EF) Transactions
First, let's try removing all the using
statements we have:
var db = Db.Create(useLogger: false);
_ = db.Database.BeginTransaction();
Here's what we will get in this case:
Starting Lock Hundred
Unlocked rows count: 1000
Finished Lock Hundred Query, Sleeping
Unlocked rows count: 900
Unlocked rows count: 900
Unlocked rows count: 900
Unlocked rows count: 900
Sleeping ended
Unlocked rows count: 900
Unlocked rows count: 900
Unlocked rows count: 900
Unlocked rows count: 900
Unlocked rows count: 900
As you might see, the lock never ended, since we've never disposed the transaction in any way. But what will happen if we dispose only the Db
and not the transaction:
await using var db = Db.Create(useLogger: false);
_ = db.Database.BeginTransaction();
Well, it seems like this will still unlock our rows:
Starting Lock Hundred
Finished Lock Hundred Query, Sleeping
Unlocked rows count: 900
Unlocked rows count: 900
Unlocked rows count: 900
Unlocked rows count: 900
Sleeping ended
Unlocked rows count: 900
Unlocked rows count: 1000
Unlocked rows count: 1000
Unlocked rows count: 1000
Unlocked rows count: 1000
Unlocked rows count: 1000
And what if we dispose just the transaction:
var db = Db.Create(useLogger: false);
await using var tx = db.Database.BeginTransaction();
As you might expect, that will work as well:
Starting Lock Hundred
Unlocked rows count: 1000
Finished Lock Hundred Query, Sleeping
Unlocked rows count: 900
Unlocked rows count: 900
Unlocked rows count: 900
Unlocked rows count: 900
Sleeping ended
Unlocked rows count: 900
Unlocked rows count: 1000
Unlocked rows count: 1000
Unlocked rows count: 1000
Unlocked rows count: 1000
The conclusion that we might end up with is: don't forget to dispose... at least something.
Wrapping Up!
By following the techniques and examples provided in this article, you should now have a solid understanding of how to handle concurrency in PostgreSQL queue processing using C# and Entity Framework. Efficient queue processing is essential for maintaining the performance and reliability of your applications, especially when dealing with high volumes of tasks.
You can find the code from this article here in the backi GitHub repository. The repository contains tools and best practices for various background processing operations like the one we've discussed in this article. Don't hesitate to give this repository a star! ⭐
And also ... claps are appreciated! 👏