Streaming


Panoramica

Goa permette anche la definizione di servizi unidirezionali di streaming dove un endpoint può ricevere un payload e ricevere un flusso di risposta. I risultati nello stream sono instanze dello stesso tipo.

Questo documento descrive un modo per creare uno stream di risultati e mostra cosa procuce il generatore, sia di transport-indipendent che di transport-specific.

Progettazione

Il DSL StreamingResult può essere definito su un metodo per impostare un endpoint che produce sequenze di risultati. Il DSL StreamingResult ha una sintassi simile a quella del DSL Result. StreamingResult e Result sono mutualmente esclusivi: solo uno dei due può essere usato all’interno di una espressione Method.

var _ = Service("cellar", func() {
    Method("list", func() {
        // StoredBottle è inviata al client attraverso uno stream.
        StreamingResult(StoredBottle)
    })
})

Il codice generato produce le seguenti interfacce per l’endpoint list nel package del servizio.

// Interfaccia che il server deve implementare.
type ListServerStream interface {
    // Manda in uno stream istanze di "StoredBottle".
    Send(*StoredBottle) error
    // Chiudi lo stream.
    Close() error
}

// Interfaccia che il client deve implementare.
type ListClientStream interface {
    // Recv legge un'instanza di "StoredBottle" dallo stream.
    Recv() (*StoredBottle, error)
}
  • Send può essere chiamata 0 o più volte per spedire le varie istanze del risultato al client. Se Send ritorna un errore tutte le seguenti chiamate a Send falliranno anch’esse, e non ci sarà bisogno di chiamare Close.
  • Close chiude lo stream. Tutte le chiamate successive a Send restituiranno un errore.
  • Recv legge il prossimo risultato in arrivo dallo stream. Ritorna io.EOF se il server ha chiuso lo stream.

La firma del metodo List nell’interfaccia Service accetta l’interfaccia di stream del server come uno degli argomenti. Il client generato da Goa ritorna invece l’interfaccia di stream del client.

La logica transport-specific implementa entrambe le interfacce con logiche di streaming specifiche.

Ecco un esempio di un servizio che manda uno stream di StoredBottle e chiude lo stream dopo l’invio.

// Lists elenca le bottiglie in magazzino.
func (s *cellarSvc) List(ctx context.Context, stream cellarsvc.ListServerStream) (err error) {
    bottles := loadStoredBottles()
    for _, c := range bottles {
        if err := stream.Send(c); err != nil {
            return err
        }
    }
    return stream.Close()
}

Streaming via HTTP

Streaming in HTTP sfrutta i websockets. goa usa i gorilla websocket per implementare le interfacce di streaming sia di server che di client.

Il package goa http fornisce due interfacce Upgrader e Dialer e un configuratore di connessioni websocket che può essere usato per personalizzare la connessione websocket, ottenuta appunto con Upgrader e Dialer.

Ecco un esempio che fornisce una connessione websocket personalizzata per stream di server e client.

/* service main.go */

// Default upgrader generated by goa
//upgrader := &websocket.Upgrader{}
// upgrader personalizzato
upgrader := &websocket.Upgrader {
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
}

// Configuratore della connessione Websocket
connConfigurer := func(conn *websocket.Conn) *websocket.Conn {
    conn.SetPingHandler(...)
    conn.SetPongHandler(...)
    conn.SetCloseHandler(...)
    return conn
}

cellarServer = cellarsvcsvr.New(cellarEndpoints, mux, dec, enc, eh, upgrader, connConfigurer)

/* client main.go */

// Default dialer generated by goa
//dialer = websocket.DefaultDialer
// dialer personalizzato
dialer = &websocket.Dialer {
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
}

endpoint, payload, err := cli.ParseEndpoint(
    scheme,
    host,
    doer,
    goahttp.RequestEncoder,
    goahttp.ResponseDecoder,
    debug,
    dialer,
    connConfigurer,
)

Result Type with Multiple Views

Se il metodo ritorna un result type con più viste, un metodo SetView è generato in entrambe le interfacce con la seguente firma:

SetView(view string)

Lo sviluppatore della soluzione deve chiamare questo metodo nell’implementazione prima di inviare i dati nello stream, in modo che la vista sia correttamente impostata. Se tale metodo non viene mai invocato, la vista default viene usata.

Ecco un esempio che usa una vista differente prima di inviare le “bottiglie” nello stream.

// Lists elenca le bottiglie in magazzino.
func (s *cellarSvc) List(ctx context.Context, p *cellarsvc.ListPayload, stream cellarsvc.ListServerStream) (err error) {
    stream.SetView(p.View)
    bottles := loadStoredBottles()
    for _, c := range bottles {
        if err := stream.Send(c); err != nil {
            return err
        }
    }
    return stream.Close()
}