L’integrazione WebSocket in Goa permette ai tuoi servizi di gestire comunicazioni bidirezionali in tempo reale. Questa sezione copre come implementare e gestire connessioni WebSocket nei tuoi servizi Goa.
WebSocket è un protocollo che fornisce comunicazione full-duplex su una singola connessione TCP. In Goa, il supporto WebSocket è implementato attraverso il DSL di streaming e viene utilizzato principalmente per:
Per creare un endpoint WebSocket, usa il DSL di streaming di Goa:
var _ = Service("logger", func() {
Method("monitor", func() {
// Payload per l'inizializzazione della connessione
Payload(func() {
Field(1, "topic", String, "Argomento da monitorare")
Field(2, "api_key", String, "Chiave API per l'autenticazione")
Required("topic", "api_key")
})
// Stream bidirezionale
StreamingPayload(LogFilter) // Filtri dal client
StreamingResult(LogEntry) // Log entries al client
HTTP(func() {
GET("/logs/{topic}/monitor")
Param("api_key")
Response(StatusOK)
})
})
})
Goa supporta tre pattern di streaming WebSocket:
Streaming Server-Side
Streaming Client-Side
Streaming Bidirezionale
L’implementazione del server gestisce sia l’inizializzazione della connessione che lo streaming dei dati:
func (s *loggerSvc) Monitor(ctx context.Context, p *logger.MonitorPayload, stream logger.MonitorServerStream) error {
// Validazione iniziale
if err := validateAPIKey(p.APIKey); err != nil {
return err
}
// Gestione del flusso bidirezionale
for {
// Ricevi filtri dal client
filter, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return fmt.Errorf("errore nella ricezione del filtro: %w", err)
}
// Applica i filtri e invia i log corrispondenti
logs := s.filterLogs(p.Topic, filter)
for _, log := range logs {
if err := stream.Send(log); err != nil {
return fmt.Errorf("errore nell'invio del log: %w", err)
}
}
}
}
Il client deve gestire sia l’invio che la ricezione dei dati:
func monitorLogs(client logger.Client) error {
stream, err := client.Monitor(context.Background(), &logger.MonitorPayload{
Topic: "database",
APIKey: "your-api-key",
})
if err != nil {
return fmt.Errorf("errore nell'inizializzazione dello stream: %w", err)
}
defer stream.Close()
// Goroutine per la ricezione dei log
go func() {
for {
log, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Printf("errore nella ricezione: %v", err)
return
}
processLog(log)
}
}()
// Invio di filtri
for {
filter := createFilter()
if err := stream.Send(filter); err != nil {
return fmt.Errorf("errore nell'invio del filtro: %w", err)
}
time.Sleep(time.Second * 30) // Aggiorna i filtri ogni 30 secondi
}
}
La gestione degli errori è cruciale per le connessioni WebSocket:
Errori di Connessione
Errori di Validazione
Errori di Timeout
Gestione delle Risorse
Sicurezza
Affidabilità