Go開源:Gores - Redis 的消息隊列系統
Gores
An asynchronous job execution system based on Redis
Installation
Get the package
$ go get github.com/wang502/gores/gores
Import the package
import "github.com/wang502/gores/gores"
Usage
Configuration
Add a config.json in your project folder
{
"REDISURL": "127.0.0.1:6379",
"REDIS_PW": "mypassword",
"BLPOP_MAX_BLOCK_TIME" : 1,
"MAX_WORKERS": 2,
"Queues": ["queue1", "queue2"],
"DispatcherTimeout": 5,
"WorkerTimeout": 5
}
- REDISURL : Redis server address. If you run in a local Redis, the dafault host is 127.0.0.1:6379
- REDIS_PW : Redis password. If the password is not set, then password can be any string.
- BLPOP_MAX_BLOCK_TIME : Blocking time when calling BLPOP command in Redis.
- MAX_WORKERS : Maximum number of concurrent workers, each worker is a separate goroutine that execute specific task on the fetched item.
- Queues : Array of queue names on Redis message broker.
- DispatcherTimeout : Duration dispatcher will wait to dispatch new job before quitting.
- WorkerTimeout : Duration worker will wait to process new job before quitting.
Initialize config
configPath := flag.String("c", "config.json", "path to configuration file")
flag.Parse()
config, err := gores.InitConfig(*configPath)
Enqueue item to Redis queue
An item is a Go map. It is required to have several keys:
- Name : name of the item to enqueue, items with different names are mapped to different tasks.
- Queue : name of the queue you want to put the item in.
- Args : the required arguments that you need in order for the workers to execute those tasks.
- Enqueue_timestamp : the Unix timestamp of when the item is enqueued.
resq := gores.NewResQ(config)
item := map[string]interface{}{
"Name": "Rectangle",
"Queue": "TestJob",
"Args": map[string]interface{}{
"Length": 10,
"Width": 10,
},
"Enqueue_timestamp": time.Now().Unix(),
}
err = resq.Enqueue(item)
if err != nil {
log.Fatalf("ERROR Enqueue item to ResQ")
}</code></pre>
$ go run main.go -c ./config.json -o produce
Define tasks
package tasks
// task for item with 'Name' = 'Rectangle'
// calculating the area of an rectangle by multiplying Length with Width
func CalculateArea(args map[string]interface{}) error {
var err error
length := args["Length"]
width := args["Width"]
if length == nil || width == nil {
err = errors.New("Map has no required attributes")
return err
}
fmt.Printf("The area is %d\n", int(length.(float64)) * int(width.(float64)))
return err
}</code></pre>
Launch workers to consume items and execute tasks
tasks := map[string]interface{}{
"Item": tasks.PrintItem,
"Rectangle": tasks.CalculateArea,
}
gores.Launch(config, &tasks)
$ go run main.go -c ./config.json -o consume
The output will be:
The rectangle area is 100
Info about processed/failed job
resq := gores.NewResQ(config)
if resq == nil {
log.Fatalf("resq is nil")
}
info := resq.Info()
for k, v := range info {
switch v.(type) {
case string:
fmt.Printf("%s : %s\n", k, v)
case int:
fmt.Printf("%s : %d\n", k, v)
case int64:
fmt.Printf("%s : %d\n", k, v)
}
}
The output will be:
Gores Info:
queues : 2
workers : 0
failed : 0
host : 127.0.0.1:6379
pending : 0
processed : 1
Contribution
Please feel free to suggest new features. Also open to pull request!
本文由用戶 k5135 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!