Buffered Channel Patterns
Worker Pool Pattern
The worker pool pattern is a common use case for buffered channels, allowing concurrent processing of tasks.
func workerPool(jobs <-chan int, results chan<- int) {
for job := range jobs {
results <- processJob(job)
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// Create worker pool
for w := 1; w <= 3; w++ {
go workerPool(jobs, results)
}
// Send jobs
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// Collect results
for a := 1; a <= 5; a++ {
<-results
}
}
Rate Limiting Pattern
Buffered channels can implement effective rate limiting:
sequenceDiagram
participant Sender
participant RateLimiter
participant Processor
Sender->>RateLimiter: Send request
RateLimiter-->>Processor: Allow processing
Processor->>Sender: Return result
func rateLimiter(requests <-chan int, limit chan struct{}) {
for req := range requests {
limit <- struct{}{} // Acquire token
go func(r int) {
processRequest(r)
<-limit // Release token
}(req)
}
}
func main() {
requests := make(chan int, 10)
limit := make(chan struct{}, 3) // Limit to 3 concurrent requests
go rateLimiter(requests, limit)
}
Semaphore Pattern
Implement resource management using buffered channels:
type Semaphore struct {
sem chan struct{}
}
func NewSemaphore(max int) *Semaphore {
return &Semaphore{
sem: make(chan struct{}, max),
}
}
func (s *Semaphore) Acquire() {
s.sem <- struct{}{}
}
func (s *Semaphore) Release() {
<-s.sem
}
Buffering Strategies
Strategy |
Buffer Size |
Use Case |
Pros |
Cons |
Fixed Size |
Predefined |
Consistent load |
Predictable |
Less flexible |
Dynamic |
Adjustable |
Variable workload |
Adaptive |
More complex |
Unbounded |
Unlimited |
Burst processing |
No blocking |
Memory intensive |
Cancellation and Timeout Pattern
func processWithTimeout(data <-chan int, done chan<- bool) {
select {
case <-data:
// Process data
done <- true
case <-time.After(5 * time.Second):
// Timeout occurred
done <- false
}
}
Advanced Buffering Techniques
- Backpressure Handling
func handleBackpressure(input <-chan int, output chan<- int, maxBuffer int) {
buffer := make(chan int, maxBuffer)
go func() {
for v := range input {
select {
case buffer <- v:
default:
// Buffer full, handle overflow
}
}
close(buffer)
}()
for v := range buffer {
output <- v
}
}
LabEx recommends:
- Choose buffer size carefully
- Monitor channel utilization
- Avoid over-buffering
- Use profiling tools to optimize
Error Handling in Buffered Channels
func safeChannelSend(ch chan<- int, value int) (success bool) {
defer func() {
if recover() != nil {
success = false
}
}()
select {
case ch <- value:
return true
default:
return false
}
}