Practical Concurrency Patterns
Concurrent Design Patterns with WaitGroup
Parallel File Processing
package main
import (
"fmt"
"io/ioutil"
"log"
"sync"
)
func processFile(filepath string, wg *sync.WaitGroup, results chan string) {
defer wg.Done()
content, err := ioutil.ReadFile(filepath)
if err != nil {
log.Printf("Error reading file %s: %v", filepath, err)
return
}
results <- fmt.Sprintf("%s: %d bytes", filepath, len(content))
}
func main() {
files := []string{
"/etc/passwd",
"/etc/hosts",
"/etc/resolv.conf",
}
var wg sync.WaitGroup
results := make(chan string, len(files))
for _, file := range files {
wg.Add(1)
go processFile(file, &wg, results)
}
// Close results when all goroutines complete
go func() {
wg.Wait()
close(results)
}()
// Collect and print results
for result := range results {
fmt.Println(result)
}
}
Concurrency Patterns Comparison
Pattern |
Description |
Use Case |
Parallel Processing |
Distribute work across goroutines |
CPU-intensive tasks |
Worker Pool |
Limit concurrent workers |
Resource-constrained environments |
Fan-out/Fan-in |
Multiple goroutines producing, single collecting |
Complex data pipelines |
Worker Pool Implementation
func workerPool(jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
// Simulate processing
results <- job * job
}
}
func main() {
const (
jobCount = 100
workerCount = 5
)
var wg sync.WaitGroup
jobs := make(chan int, jobCount)
results := make(chan int, jobCount)
// Create worker pool
for w := 0; w < workerCount; w++ {
wg.Add(1)
go workerPool(jobs, results, &wg)
}
// Send jobs
go func() {
for i := 0; i < jobCount; i++ {
jobs <- i
}
close(jobs)
}()
// Wait for workers and close results
go func() {
wg.Wait()
close(results)
}()
// Collect results
for result := range results {
fmt.Println(result)
}
}
Concurrency Flow Visualization
graph TD
A[Main Goroutine] --> B[Create Channels]
B --> C[Spawn Worker Goroutines]
C --> D[Distribute Jobs]
D --> E[Workers Process Jobs]
E --> F[Collect Results]
F --> G[Final Processing]
Advanced Synchronization Techniques
- Use channels for communication
- Implement graceful shutdown
- Handle errors in goroutines
- Limit concurrent operations
Error Handling Pattern
func robustConcurrentOperation(wg *sync.WaitGroup, errChan chan<- error) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
errChan <- fmt.Errorf("panic recovered: %v", r)
}
}()
// Concurrent operation logic
}
LabEx recommends practicing these patterns to build scalable and efficient concurrent Go applications.