Toda aplicação de médio porte tem algum processamento que não deveria bloquear a requisição HTTP: enviar e-mails, indexar em Elasticsearch, emitir webhooks, processar imagens. A solução clássica é jogar em uma fila externa (RabbitMQ, SQS, Redis Streams), o que funciona — mas introduz latência de rede, custo de infraestrutura e complexidade operacional para casos que são puramente in-process.
System.Threading.Channels resolve essa categoria de problema. É uma fila assíncrona de alta performance que vive dentro do processo, com suporte nativo a backpressure, múltiplos produtores e consumidores, e integração natural com async/await. Disponível desde o .NET Core 2.1, mas ainda pouco explorado.
O que é um Channel
Um Channel<T> é uma estrutura de dados que conecta produtores e consumidores de forma assíncrona:
ChannelWriter<T>: o lado do produtor. Escreve itens de forma não-bloqueante.ChannelReader<T>: o lado do consumidor. Lê itens de forma assíncrona, esperando quando não há nada disponível.
Internamente, é uma implementação lock-free altamente otimizada — muito mais eficiente do que ConcurrentQueue<T> com polling ou BlockingCollection<T> que bloqueia threads.
Criando um Channel
Unbounded (sem limite de capacidade)
var channel = Channel.CreateUnbounded<EmailMessage>(new UnboundedChannelOptions
{
SingleReader = false, // múltiplos consumidores
SingleWriter = false, // múltiplos produtores
AllowSynchronousContinuations = false
});
ChannelWriter<EmailMessage> writer = channel.Writer;
ChannelReader<EmailMessage> reader = channel.Reader;
SingleReader/SingleWriter = true ativa otimizações internas — use sempre que possível.
Bounded (com backpressure)
var channel = Channel.CreateBounded<EmailMessage>(new BoundedChannelOptions(capacity: 1000)
{
FullMode = BoundedChannelFullMode.Wait, // bloqueia o produtor quando cheio
// FullMode = BoundedChannelFullMode.DropOldest, // descarta o mais antigo
// FullMode = BoundedChannelFullMode.DropNewest, // descarta o novo que chegou
SingleReader = true,
SingleWriter = false
});
BoundedChannelFullMode.Wait é o comportamento correto para backpressure real: o produtor aguarda (de forma assíncrona, sem bloquear thread) até haver espaço. Isso impede que a memória cresça indefinidamente se o consumidor não conseguir acompanhar.
Padrão básico: produtor e consumidor
// Produtor
async Task ProducerAsync(ChannelWriter<int> writer, CancellationToken ct)
{
for (int i = 0; i < 100; i++)
{
await writer.WriteAsync(i, ct);
}
writer.Complete(); // sinaliza que não virão mais itens
}
// Consumidor
async Task ConsumerAsync(ChannelReader<int> reader, CancellationToken ct)
{
// ReadAllAsync itera automaticamente até o canal ser completado
await foreach (var item in reader.ReadAllAsync(ct))
{
Console.WriteLine($"Processando: {item}");
await Task.Delay(10, ct); // simula trabalho
}
}
ReadAllAsync retorna um IAsyncEnumerable<T> — o consumidor aguarda novos itens sem bloquear thread, e encerra quando o produtor chama Complete().
Integração com ASP.NET Core e IHostedService
O padrão mais comum em APIs: a requisição HTTP escreve no channel, um BackgroundService consome e processa de forma independente.
Registrando o Channel como Singleton
// Program.cs
builder.Services.AddSingleton(_ =>
Channel.CreateBounded<EmailMessage>(new BoundedChannelOptions(500)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false
})
);
builder.Services.AddSingleton(sp =>
sp.GetRequiredService<Channel<EmailMessage>>().Writer);
builder.Services.AddSingleton(sp =>
sp.GetRequiredService<Channel<EmailMessage>>().Reader);
builder.Services.AddHostedService<EmailDispatcherWorker>();
O Worker (consumidor)
public class EmailDispatcherWorker : BackgroundService
{
private readonly ChannelReader<EmailMessage> _reader;
private readonly IEmailClient _emailClient;
private readonly ILogger<EmailDispatcherWorker> _logger;
public EmailDispatcherWorker(
ChannelReader<EmailMessage> reader,
IEmailClient emailClient,
ILogger<EmailDispatcherWorker> logger)
{
_reader = reader;
_emailClient = emailClient;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var message in _reader.ReadAllAsync(stoppingToken))
{
try
{
await _emailClient.SendAsync(message, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Falha ao enviar e-mail para {To}", message.To);
// Decisão: recolocar na fila, descartar ou mover para dead-letter
}
}
}
}
O Controller/Endpoint (produtor)
app.MapPost("/users/register", async (
RegisterUserRequest request,
ChannelWriter<EmailMessage> emailChannel,
IUserRepository users) =>
{
var user = await users.CreateAsync(request);
// Enfileira o e-mail de boas-vindas — não bloqueia a resposta HTTP
await emailChannel.WriteAsync(new EmailMessage(
To: user.Email,
Subject: "Bem-vindo à plataforma",
Body: $"Olá {user.Name}, sua conta foi criada com sucesso!"
));
return Results.Created($"/users/{user.Id}", user);
});
A resposta HTTP retorna imediatamente. O e-mail é enviado de forma assíncrona pelo worker.
Múltiplos consumidores paralelos
Para aumentar throughput, basta iniciar múltiplas tarefas consumindo o mesmo reader:
public class ParallelEmailWorker : BackgroundService
{
private readonly ChannelReader<EmailMessage> _reader;
private readonly IEmailClient _emailClient;
private const int Parallelism = 4;
public ParallelEmailWorker(ChannelReader<EmailMessage> reader, IE
<p>####PUB1####</p>
mailClient emailClient)
{
_reader = reader;
_emailClient = emailClient;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var workers = Enumerable
.Range(0, Parallelism)
.Select(_ => ProcessLoopAsync(stoppingToken));
await Task.WhenAll(workers);
}
private async Task ProcessLoopAsync(CancellationToken ct)
{
await foreach (var message in _reader.ReadAllAsync(ct))
{
await _emailClient.SendAsync(message, ct);
}
}
}
Múltiplos loops consumindo o mesmo ChannelReader são thread-safe — o canal garante que cada item seja entregue a exatamente um consumidor.
Para este caso, configure SingleReader = false ao criar o channel.
Pipeline com múltiplos estágios
Channels permitem compor pipelines onde a saída de um estágio é a entrada do próximo:
// Estágio 1: recebe eventos brutos
var rawChannel = Channel.CreateBounded<RawEvent>(1000);
// Estágio 2: recebe eventos parseados
var parsedChannel = Channel.CreateBounded<ParsedEvent>(500);
// Estágio 3: recebe eventos enriquecidos
var enrichedChannel = Channel.CreateBounded<EnrichedEvent>(200);
// Stage 1 → 2: parser
_ = Task.Run(async () =>
{
await foreach (var raw in rawChannel.Reader.ReadAllAsync())
{
var parsed = Parse(raw);
await parsedChannel.Writer.WriteAsync(parsed);
}
parsedChannel.Writer.Complete();
});
// Stage 2 → 3: enricher
_ = Task.Run(async () =>
{
await foreach (var parsed in parsedChannel.Reader.ReadAllAsync())
{
var enriched = await EnrichAsync(parsed);
await enrichedChannel.Writer.WriteAsync(enriched);
}
enrichedChannel.Writer.Complete();
});
// Stage 3: persistência
await foreach (var enriched in enrichedChannel.Reader.ReadAllAsync())
{
await db.SaveAsync(enriched);
}
Cada estágio roda de forma independente. O backpressure se propaga naturalmente: se a persistência ficar lenta, enrichedChannel enche, que segura o enricher, que segura o parser, que segura a ingestão.
Tratamento de erros e drain no shutdown
Uma questão importante: quando a aplicação encerra, itens no channel podem ser perdidos. O shutdown deve drenar o canal antes de parar:
public class DrainableEmailWorker : BackgroundService
{
private readonly Channel<EmailMessage> _channel;
private readonly IEmailClient _emailClient;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var message in _channel.Reader.ReadAllAsync(stoppingToken))
{
await _emailClient.SendAsync(message, CancellationToken.None);
// Usa CancellationToken.None no send para não cancelar mid-flight
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
// Sinaliza que não virão mais itens
_channel.Writer.TryComplete();
// Aguarda o drain (com timeout de segurança)
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try
{
await base.StopAsync(cts.Token);
}
catch (OperationCanceledException)
{
// Timeout — aceita a perda ou registra para reprocessamento
}
}
}
Monitoramento: métricas do channel
Para observabilidade, exponha o tamanho da fila como métrica:
public class ChannelMetricsService : BackgroundService
{
private readonly Channel<EmailMessage> _channel;
private readonly IMeterFactory _meterFactory;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var meter = _meterFactory.Create("neryx.email");
meter.CreateObservableGauge(
"email_queue_depth",
() => _channel.Reader.Count,
description: "Itens aguardando processamento na fila de e-mail"
);
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}
Com OpenTelemetry, esse gauge aparece no Grafana como um indicador de saúde do processamento.
Comparação: Channel vs alternativas
| Critério | Channel<T> | BlockingCollection<T> | ConcurrentQueue<T> + polling | RabbitMQ/SQS |
|---|---|---|---|---|
| Assíncrono (async/await) | ✅ Nativo | ❌ Bloqueia thread | ❌ Polling | ✅ Com cliente |
| Backpressure | ✅ BoundedChannel | ✅ Capacidade máxima | ❌ Sem limite | ✅ |
| Múltiplos produtores | ✅ | ✅ | ✅ | ✅ |
| Múltiplos consumidores | ✅ | ✅ | ✅ | ✅ |
| Persistência cross-process | ❌ In-memory | ❌ In-memory | ❌ In-memory | ✅ |
| Overhead de infraestrutura | Nenhum | Nenhum | Nenhum | Alto |
| Replay após crash | ❌ | ❌ | ❌ | ✅ |
Use Channel<T> quando:
- O processamento é in-process (produtor e consumidor no mesmo serviço).
- A perda de alguns itens no crash é aceitável (ou você tem outra durabilidade, como o Outbox Pattern).
- Você quer eliminar a latência e complexidade de uma fila externa para casos locais.
Use fila externa quando:
- Múltiplos serviços precisam consumir os mesmos eventos.
- Durabilidade e replay são requisitos.
- O volume exige escalabilidade horizontal dos consumidores em instâncias separadas.
Resumo
System.Threading.Channels é a ferramenta certa para desacoplar produção e consumo in-process no .NET:
Channel.CreateBoundedcomFullMode.Waitimplementa backpressure real sem polling.ReadAllAsyncé o loop idiomático para consumidores — não bloqueia thread, encerra naturalmente quando o canal é completado.- Múltiplos workers sobre o mesmo
ChannelReaderentregam paralelismo sem lock explícito. - Pipelines em estágios compõem bem: a saída de um channel é a entrada do próximo.
- Drain no shutdown evita perda de itens enfileirados quando a aplicação encerra.
Para a maioria das necessidades de processamento assíncrono local — notificações, indexação, webhooks, eventos de auditoria — Channel<T> elimina a necessidade de RabbitMQ com uma solução mais simples, mais rápida e com zero infraestrutura adicional.