RxGo —— Go 語言的 Reactive 擴展

EzeRinehart 7年前發布 | 11K 次閱讀 Reactive Go語言 Google Go/Golang開發

RxGo 是 Go 語言的 Reactive 擴展。

安裝

go get -u github.com/jochasinga/rxgo

用法

watcher := observer.Observer{

    // Register a handler function for every next available item.
    NextHandler: func(item interface{}) {
        fmt.Printf("Processing: %v\n", item)
    },

    // Register a handler for any emitted error.
    ErrHandler: func(err error) {
        fmt.Printf("Encountered error: %v\n", err)
    },

    // Register a handler when a stream is completed.
    DoneHandler: func() {
        fmt.Println("Done!")
    },
}

it, _ := iterable.New([]interface{}{1, 2, 3, 4, errors.New("bang"), 5})
source := observable.From(it)
sub := source.Subscribe(watcher)

// wait for the async operation
<-sub

以上將:

  • 將切片中每個數字的格式字符串 print 為4。

  • print 錯誤“bang”

重要的是要記住,只有一個 OnError 或 OnDone 可以在 stream 中調用。 如果 stream 中有錯誤,處理停止,OnDone 將永遠不會被調用,反之亦然。

概念是將所有“side effects”分組到這些處理程序中,讓一個 Observer 或任何 EventHandler 處理它們。

package main
import (
    "fmt"
    "time"

    "github.com/jochasinga/rx"
    "github.com/jochasinga/rx/handlers"
)

func main() {

    score := 9

    onNext := handlers.NextFunc(func(item interface{}) {
        if num, ok := item.(int); ok {
            score += num
        }
    })

    onDone := handlers.DoneFunc(func() {
        score *= 2
    })

    watcher := observer.New(onNext, onDone)

    // Create an `Observable` from a single item and subscribe to the observer.
    sub := observable.Just(1).Subscribe(watcher)
    <-sub

    fmt.Println(score) // 20
}

 

 

 

 本文由用戶 EzeRinehart 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!