Streaming
Panoramica
Goa permette anche la definizione di servizi unidirezionali di streaming dove un endpoint può ricevere un payload e ricevere un flusso di risposta. I risultati nello stream sono instanze dello stesso tipo.
Questo documento descrive un modo per creare uno stream di risultati e mostra cosa procuce il generatore, sia di transport-indipendent che di transport-specific.
Progettazione
Il DSL StreamingResult
può essere definito su un metodo per impostare un endpoint che produce sequenze di
risultati. Il DSL StreamingResult
ha una sintassi simile a quella del DSL Result
.
StreamingResult
e Result
sono mutualmente esclusivi: solo uno dei due può essere
usato all’interno di una espressione Method
.
var _ = Service("cellar", func() {
Method("list", func() {
// StoredBottle è inviata al client attraverso uno stream.
StreamingResult(StoredBottle)
})
})
Il codice generato produce le seguenti interfacce per l’endpoint list
nel package del servizio.
// Interfaccia che il server deve implementare.
type ListServerStream interface {
// Manda in uno stream istanze di "StoredBottle".
Send(*StoredBottle) error
// Chiudi lo stream.
Close() error
}
// Interfaccia che il client deve implementare.
type ListClientStream interface {
// Recv legge un'instanza di "StoredBottle" dallo stream.
Recv() (*StoredBottle, error)
}
Send
può essere chiamata 0 o più volte per spedire le varie istanze del risultato al client. SeSend
ritorna un errore tutte le seguenti chiamate aSend
falliranno anch’esse, e non ci sarà bisogno di chiamareClose
.Close
chiude lo stream. Tutte le chiamate successive aSend
restituiranno un errore.Recv
legge il prossimo risultato in arrivo dallo stream. Ritornaio.EOF
se il server ha chiuso lo stream.
La firma del metodo List
nell’interfaccia Service
accetta l’interfaccia di
stream del server come uno degli argomenti. Il client generato da Goa ritorna
invece l’interfaccia di stream del client.
La logica transport-specific implementa entrambe le interfacce con logiche di streaming specifiche.
Ecco un esempio di un servizio che manda uno stream di StoredBottle
e chiude lo stream dopo l’invio.
// Lists elenca le bottiglie in magazzino.
func (s *cellarSvc) List(ctx context.Context, stream cellarsvc.ListServerStream) (err error) {
bottles := loadStoredBottles()
for _, c := range bottles {
if err := stream.Send(c); err != nil {
return err
}
}
return stream.Close()
}
Streaming via HTTP
Streaming in HTTP sfrutta i websockets. goa usa i gorilla websocket per implementare le interfacce di streaming sia di server che di client.
Il package goa http
fornisce due interfacce Upgrader e Dialer e un configuratore
di connessioni websocket che può essere usato per personalizzare la connessione
websocket, ottenuta appunto con Upgrader e Dialer.
Ecco un esempio che fornisce una connessione websocket personalizzata per stream di server e client.
/* service main.go */
// Default upgrader generated by goa
//upgrader := &websocket.Upgrader{}
// upgrader personalizzato
upgrader := &websocket.Upgrader {
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// Configuratore della connessione Websocket
connConfigurer := func(conn *websocket.Conn) *websocket.Conn {
conn.SetPingHandler(...)
conn.SetPongHandler(...)
conn.SetCloseHandler(...)
return conn
}
cellarServer = cellarsvcsvr.New(cellarEndpoints, mux, dec, enc, eh, upgrader, connConfigurer)
/* client main.go */
// Default dialer generated by goa
//dialer = websocket.DefaultDialer
// dialer personalizzato
dialer = &websocket.Dialer {
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
endpoint, payload, err := cli.ParseEndpoint(
scheme,
host,
doer,
goahttp.RequestEncoder,
goahttp.ResponseDecoder,
debug,
dialer,
connConfigurer,
)
Result Type with Multiple Views
Se il metodo ritorna un result type con più viste, un metodo SetView
è generato
in entrambe le interfacce con la seguente firma:
SetView(view string)
Lo sviluppatore della soluzione deve chiamare questo metodo nell’implementazione
prima di inviare i dati nello stream, in modo che la vista sia correttamente
impostata.
Se tale metodo non viene mai invocato, la vista default
viene usata.
Ecco un esempio che usa una vista differente prima di inviare le “bottiglie” nello stream.
// Lists elenca le bottiglie in magazzino.
func (s *cellarSvc) List(ctx context.Context, p *cellarsvc.ListPayload, stream cellarsvc.ListServerStream) (err error) {
stream.SetView(p.View)
bottles := loadStoredBottles()
for _, c := range bottles {
if err := stream.Send(c); err != nil {
return err
}
}
return stream.Close()
}