实际应用示例
带速率限制的网络爬虫
package main
import (
"fmt"
"io"
"net/http"
"sync"
"time"
)
type Crawler struct {
concurrencyLimit int
semaphore chan struct{}
urls []string
}
func NewCrawler(urls []string, limit int) *Crawler {
return &Crawler{
concurrencyLimit: limit,
semaphore: make(chan struct{}, limit),
urls: urls,
}
}
func (c *Crawler) Crawl() {
var wg sync.WaitGroup
for _, url := range c.urls {
wg.Add(1)
go func(url string) {
defer wg.Done()
c.semaphore <- struct{}{}
defer func() { <-c.semaphore }()
resp, err := http.Get(url)
if err!= nil {
fmt.Printf("Error crawling %s: %v\n", url, err)
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
fmt.Printf("Crawled %s: %d bytes\n", url, len(body))
time.Sleep(time.Second) // 模拟处理过程
}(url)
}
wg.Wait()
}
func main() {
urls := []string{
"https://example.com",
"https://golang.org",
"https://github.com",
"https://stackoverflow.com",
"https://medium.com",
}
crawler := NewCrawler(urls, 3)
crawler.Crawl()
}
分布式任务队列
package main
import (
"fmt"
"sync"
"time"
)
type TaskQueue struct {
tasks chan func()
workers int
waitGroup sync.WaitGroup
}
func NewTaskQueue(workers int, bufferSize int) *TaskQueue {
return &TaskQueue{
tasks: make(chan func(), bufferSize),
workers: workers,
}
}
func (tq *TaskQueue) Start() {
for i := 0; i < tq.workers; i++ {
tq.waitGroup.Add(1)
go func() {
defer tq.waitGroup.Done()
for task := range tq.tasks {
task()
}
}()
}
}
func (tq *TaskQueue) Submit(task func()) {
tq.tasks <- task
}
func (tq *TaskQueue) Shutdown() {
close(tq.tasks)
tq.waitGroup.Wait()
}
func main() {
queue := NewTaskQueue(3, 100)
queue.Start()
for i := 0; i < 10; i++ {
taskID := i
queue.Submit(func() {
fmt.Printf("Processing task %d\n", taskID)
time.Sleep(time.Second)
})
}
queue.Shutdown()
}
API 速率限制器
package main
import (
"fmt"
"sync"
"time"
)
type APIRateLimiter struct {
limit int
interval time.Duration
tokens int
mutex sync.Mutex
lastReset time.Time
}
func NewAPIRateLimiter(limit int, interval time.Duration) *APIRateLimiter {
return &APIRateLimiter{
limit: limit,
interval: interval,
tokens: limit,
lastReset: time.Now(),
}
}
func (rl *APIRateLimiter) TryRequest() bool {
rl.mutex.Lock()
defer rl.mutex.Unlock()
now := time.Now()
if now.Sub(rl.lastReset) >= rl.interval {
rl.tokens = rl.limit
rl.lastReset = now
}
if rl.tokens > 0 {
rl.tokens--
return true
}
return false
}
func main() {
rateLimiter := NewAPIRateLimiter(5, time.Minute)
for i := 0; i < 10; i++ {
if rateLimiter.TryRequest() {
fmt.Printf("Request %d allowed\n", i)
} else {
fmt.Printf("Request %d rejected\n", i)
}
time.Sleep(time.Second)
}
}
并发处理工作流程
graph TD
A[输入数据] --> B[任务队列]
B --> C{速率限制器}
C -->|允许| D[工作线程池]
D --> E[处理任务]
E --> F[输出结果]
C -->|拒绝| G[等待/重试]
实际考量
场景 |
速率控制机制 |
关键考量因素 |
网络爬虫 |
信号量/令牌桶 |
遵守网站限制 |
API 交互 |
滑动窗口 |
防止速率限制错误 |
分布式系统 |
集中式速率限制器 |
跨节点保持一致 |
后台任务 |
工作线程池 |
管理系统资源 |
关键要点
- 根据具体需求选择合适的速率控制
- 在并发和系统资源之间取得平衡
- 实现优雅降级
- 使用上下文和超时
- 动态监控和调整
在 LabEx,我们建议精心设计具有智能速率控制机制的并发系统,以确保最佳性能和可靠性。