Decouple systems with ASP.NET Core Hosted Service
The story of Hosted Service and Channel to build decoupled systems.
I have been working on a project for around two and a half years. The high architecture is the Event Sourcing-Like. I do not dare to claim it Event Sourcing because it will lead to many debates.
Here is the overview. There are 2 data stores. The primary one is an SQL database storing all events. This is the fact, the truth of the system. The second one is a Cosmos database. This stores the projected views from the events. So the writes go to the Event Store. The reads go to the Cosmos store.
The write side composes of Application Services. The read side composes of Projections. We have to send events to the Projections to project them and store the normalized views in Cosmos. The communication has to be asynchronous.
The normal reaction is some kind of a queue. However, we do not want to go that far if we can handle everything inside a single Web Application. Our application is not at StackOverflow scale so a single Web Application instance should be enough. There is a lot of data for sure, but the request rates are reasonable.
Projections as Background Service
The Projections must be run as a background service alongside with the WebApi. With the ASP.NET Core, it is fairly easy with Hosted Service. All good stuffs are documented by Microsoft. Go and check it out. Good stuff! Guaranteed!
We also use Hosted Service for other purposes such as Deleting Files.
Ok, we have the WebApi (the application services) and a background service, how are they communicating to each other?
Channel
It is well-documented, well-explained here DevBlog Channel, or a wonderful blog from Steve Gordon. These links cover everything you need to know about Channel.
To solve the communication issue, we use Channel to create a channel from Application Services to Projections. It is a multi-writers, single-reader channel. If we need a two-way ticket, another channel is created. We also have it for the scenario where the Application Services have to wait for the Projections.
It works beautifully.
Code
So it is time to see some code. All the code is on GitHub
I started a new project with the .NET 6.0 (the latest one). It is cool to use the latest even thought I do not use any special feature in the project.
Let start with a simple stupid API
[ApiController]
[Route("[controller]")]
public class CountersController : ControllerBase
{
private readonly CountersService _service;
public CountersController(CountersService service)
{
_service = service;
}
[HttpPost]
[Route("/increase")]
public async Task<IActionResult> Increase()
{
await _service.IncreaseCounter();
return Ok("Have a good day!");
}
}
The API exposes an endpoint that will make some changes to the system. The controller then dispatches the actual work to a service CountersService
. This approach keeps the controller simple, stupid, and clean.
The CountersService
handles the logic and finally publishes events.
public class CountersService
{
private readonly EventChannel _eventChannel;
public CountersService(EventChannel eventChannel)
{
_eventChannel = eventChannel;
}
public Task IncreaseCounter()
{
// Execute whatever the business logic here.
// And finally, write an event package into the event channel.
// Imagine there is an actual framework here which will publish events.
// Those events are then packaged into EventPackage and dispatch
return _eventChannel.Write(new EventPackage
{
EventName = "CounterIncreased"
});
}
}
That is the end of a service. It does a bunch of stuff and finally publish events (EventPackage).
Who is going to handle those package asynchronously? The Projection Service
public class ProjectionBackgroundService : BackgroundService
{
private readonly ILogger<ProjectionBackgroundService> _logger;
private readonly EventChannel _eventChannel;
public ProjectionBackgroundService(
ILogger<ProjectionBackgroundService> logger,
EventChannel eventChannel)
{
_logger = logger;
_eventChannel = eventChannel;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Keep running until asked to stop
while (!stoppingToken.IsCancellationRequested)
{
try
{
await foreach (var package in _eventChannel.ReadAllAsync(stoppingToken))
{
// Include the logic to proceed a package here.
// A package typical contains the actual event (or events) that the projection needs to project
_logger.LogInformation($"Package received: {package.EventName}");
}
}
catch (Exception e)
{
_logger.LogError(e, "Bad thing happened");
}
}
}
}
It picks up event packages from EventChannel
one by one and proceeds.
The final piece that connects them is the EventChannel
public class EventChannel
{
private readonly Channel<EventPackage> _channel;
public EventChannel()
{
var options = new BoundedChannelOptions(10_000)
{
SingleReader = true
};
_channel = Channel.CreateBounded<EventPackage>(options);
}
public IAsyncEnumerable<EventPackage> ReadAllAsync(CancellationToken cancellationToken)
=> _channel.Reader.ReadAllAsync(cancellationToken);
public async Task Write(EventPackage eventPackage)
{
if (await _channel.Writer.WaitToWriteAsync())
{
await _channel.Writer.WriteAsync(eventPackage);
}
}
}
public class EventPackage
{
public string EventName { get; init; }
}
And finally, register those components with ServiceCollection
(DI framework in .NET Core)
public static class ServiceCollectionExtensions
{
public static void RegisterComponents(this IServiceCollection services)
{
services.AddHostedService<ProjectionBackgroundService>();
services.AddScoped<CountersService>();
services.AddSingleton<EventChannel>();
}
}
Once run, you should see this in the console
And that is it. I have a decouple architecture that will work.