Go開發的輕量級的異步定時任務系統:kingtask
1. kingtask簡介
kingtask是一個由Go開發的輕量級的異步定時任務系統。主要特性包含以下幾個部分:
- 支持定時的異步任務。
- 支持失敗重試機制,重試時刻和次數可自定義。
- 任務執行結果可查詢。
- 一個異步任務由一個可執行文件組成,開發語言不限。
- 任務是無狀態的,執行異步任務之前,不需要向kingtask注冊任務。
- broker和worker通過redis解耦。
- 通過配置redis為master-slave架構,可實現kingtask的高可用,因為worker是無狀態的,redis的master宕機后,可以修改worker配置將其連接到slave上。
2. kingtask架構
kingtask的實現步驟如下所述:
- broker收到client發送過來的異步任務(一個異步任務由一個唯一的uuid標示)之后,判斷異步任務是否定時,如果未定時,則直接將異步任務封裝成一個結構體,存入redis。如果定時,則通過定時器觸發,將異步任務封裝成一個結構體,存入redis。
- worker從redis中獲取異步任務,或者到任務之后,執行該任務,并將任務結果存入redis。
- 對于失敗的任務,如果該任務有重試機制,broker會重新發送該任務到redis,然后worker會重新執行。
3. kingtask使用
3.1 編譯和安裝
1. 安裝Godep go get github.com/tools/godep 2.執行 sh ./dev.sh 3.make 在bin目錄下就會生成可執行文件
3.2 配置broker
#broker地址 addr : 0.0.0.0:9595 #redis地址 redis : 127.0.0.1:6379 #log輸出到文件,可不配置 #log_path: /Users/flike/src #日志級別 log_level: debug
3.3 配置worker
#broker地址 broker : 127.0.0.1:9595 #redis地址 redis : 127.0.0.1:6379 #異步任務可執行文件目錄 bin_path : /Users/flike/src #日志輸出目錄,可不配置 #log_path : /Users/flike/src #日志級別 log_level: debug #每個任務執行時間間隔,單位為秒 period : 1 #結果保存時間,單位為秒 result_keep_time : 1000 #任務執行最長時間,單位秒 task_run_time: 30
3.4 運行broker和worker
#將異步任務的可執行文件放到bin_path目錄 cp example /Users/flike/src #轉到kingtask目錄 cd kingtask #啟動broker ./bin/broker -config=etc/broker.yaml #啟動worker ./bin/worker -config=etc/worker.yaml
3.5 example異步任務源碼
異步任務的結果需要輸出到標準輸出(os.Stdout),出錯信息需要輸出到標準出錯輸出(os.Stderr)。
//example.go
package main
import (
"fmt"
"os"
"strconv"
)
func main() {
if len(os.Args) != 3 {
fmt.Fprintf(os.Stderr, "args count must be two")
return
}
left, err := strconv.ParseInt(os.Args[1], 10, 64)
if err != nil {
fmt.Fprintf(os.Stderr, "err:%s", err.Error())
return
}
right, err := strconv.ParseInt(os.Args[2], 10, 64)
if err != nil {
fmt.Fprintf(os.Stderr, "err:%s", err.Error())
return
}
sum := left + right
fmt.Fprintf(os.Stdout, "%d", sum)
} 3.6 調用異步任務源碼
//mytask.go
package main
import (
"fmt"
"time"
"github.com/flike/kingtask/task"
)
func main() {
brokerAddr := "127.0.0.1:9595"
//example異步任務的參數
args := []string{"12", "45"}
brokerClient, err := task.NewBrokerClient(brokerAddr)
if err != nil {
fmt.Println(err.Error())
return
}
//失敗重試的時間間隔序列,也就是失敗后隔5s后重試這個異步任務,如果再次失敗就8s后再重試
timeInterval := []int{5, 8, 9}
//第一個參數:可執行文件名
//第二個參數:異步任務參數,必須是string類型
//第三個參數:異步任務的開始時間戳,如果是未來的一個時刻,則到時后執行異步任務。如果為0則立即執行
//第四個參數:失敗重試時間序列
t, err := task.NewTaskRequest("example", args, 0, timeInterval)
if err != nil {
fmt.Printf("NewTaskRequest error:%s\n", err.Error())
return
}
err = brokerClient.Delay(t)
if err != nil {
fmt.Printf("Delay error:%s\n", err.Error())
return
}
time.Sleep(time.Second * 2)
reply, err := brokerClient.GetResult(t)
if err != nil {
fmt.Printf("GetResult error:%s\n", err.Error())
return
}
fmt.Println(reply)
brokerClient.Close()
} 執行結果:
//第一個1表示結果存在,因為異步任務有可能還未執行,所以結果有可能不存在
//第二個1表示異步任務執行成功
//第三個參數表示異步任務的結果
&{1 1 57} 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!
