Real-World Examples
Web Crawler with Rate Limiting
package main
import (
"fmt"
"io"
"net/http"
"sync"
"time"
)
type Crawler struct {
concurrencyLimit int
semaphore chan struct{}
urls []string
}
func NewCrawler(urls []string, limit int) *Crawler {
return &Crawler{
concurrencyLimit: limit,
semaphore: make(chan struct{}, limit),
urls: urls,
}
}
func (c *Crawler) Crawl() {
var wg sync.WaitGroup
for _, url := range c.urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
c.semaphore <- struct{}{}
defer func() { <-c.semaphore }()
resp, err := http.Get(url)
if err != nil {
fmt.Printf("Error crawling %s: %v\n", url, err)
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
fmt.Printf("Crawled %s: %d bytes\n", url, len(body))
time.Sleep(time.Second) // Simulate processing
}(url)
}
wg.Wait()
}
func main() {
urls := []string{
"https://example.com",
"https://golang.org",
"https://github.com",
"https://stackoverflow.com",
"https://medium.com",
}
crawler := NewCrawler(urls, 3)
crawler.Crawl()
}
Distributed Task Queue
package main
import (
"fmt"
"sync"
"time"
)
type TaskQueue struct {
tasks chan func()
workers int
waitGroup sync.WaitGroup
}
func NewTaskQueue(workers int, bufferSize int) *TaskQueue {
return &TaskQueue{
tasks: make(chan func(), bufferSize),
workers: workers,
}
}
func (tq *TaskQueue) Start() {
for i := 0; i < tq.workers; i++ {
tq.waitGroup.Add(1)
go func() {
defer tq.waitGroup.Done()
for task := range tq.tasks {
task()
}
}()
}
}
func (tq *TaskQueue) Submit(task func()) {
tq.tasks <- task
}
func (tq *TaskQueue) Shutdown() {
close(tq.tasks)
tq.waitGroup.Wait()
}
func main() {
queue := NewTaskQueue(3, 100)
queue.Start()
for i := 0; i < 10; i++ {
taskID := i
queue.Submit(func() {
fmt.Printf("Processing task %d\n", taskID)
time.Sleep(time.Second)
})
}
queue.Shutdown()
}
API Rate Limiter
package main
import (
"fmt"
"sync"
"time"
)
type APIRateLimiter struct {
limit int
interval time.Duration
tokens int
mutex sync.Mutex
lastReset time.Time
}
func NewAPIRateLimiter(limit int, interval time.Duration) *APIRateLimiter {
return &APIRateLimiter{
limit: limit,
interval: interval,
tokens: limit,
lastReset: time.Now(),
}
}
func (rl *APIRateLimiter) TryRequest() bool {
rl.mutex.Lock()
defer rl.mutex.Unlock()
now := time.Now()
if now.Sub(rl.lastReset) >= rl.interval {
rl.tokens = rl.limit
rl.lastReset = now
}
if rl.tokens > 0 {
rl.tokens--
return true
}
return false
}
func main() {
rateLimiter := NewAPIRateLimiter(5, time.Minute)
for i := 0; i < 10; i++ {
if rateLimiter.TryRequest() {
fmt.Printf("Request %d allowed\n", i)
} else {
fmt.Printf("Request %d rejected\n", i)
}
time.Sleep(time.Second)
}
}
Workflow of Concurrent Processing
graph TD
A[Input Data] --> B[Task Queue]
B --> C{Rate Limiter}
C -->|Allowed| D[Worker Pool]
D --> E[Process Task]
E --> F[Output Result]
C -->|Rejected| G[Wait/Retry]
Practical Considerations
Scenario |
Rate Control Mechanism |
Key Considerations |
Web Crawling |
Semaphore/Token Bucket |
Respect website limits |
API Interactions |
Sliding Window |
Prevent rate limit errors |
Distributed Systems |
Centralized Rate Limiter |
Consistent across nodes |
Background Jobs |
Worker Pool |
Manage system resources |
Key Takeaways
- Choose appropriate rate control based on specific requirements
- Balance between concurrency and system resources
- Implement graceful degradation
- Use context and timeouts
- Monitor and adjust dynamically
At LabEx, we recommend careful design of concurrent systems with intelligent rate control mechanisms to ensure optimal performance and reliability.