Goa provides comprehensive support for gRPC streaming, enabling you to build services that can handle continuous data transmission in real-time. This guide covers the different streaming patterns available in gRPC and how to implement them using Goa.
gRPC supports three streaming patterns:
In server-side streaming, the client sends a single request and receives a stream of responses. This pattern is useful for scenarios like:
Here’s how to define a server streaming method:
var _ = Service("monitor", func() {
Method("watch", func() {
Description("Stream system metrics")
Payload(func() {
Field(1, "interval", Int, "Sampling interval in seconds")
Required("interval")
})
StreamingResult(func() {
Field(1, "cpu", Float32, "CPU usage percentage")
Field(2, "memory", Float32, "Memory usage percentage")
Required("cpu", "memory")
})
GRPC(func() {
Response(CodeOK)
})
})
})
Client-side streaming allows the client to send a stream of requests while receiving a single response. This is ideal for:
Example definition:
var _ = Service("analytics", func() {
Method("process", func() {
Description("Process stream of analytics events")
StreamingPayload(func() {
Field(1, "event_type", String, "Type of event")
Field(2, "timestamp", String, "Event timestamp")
Field(3, "data", Bytes, "Event data")
Required("event_type", "timestamp", "data")
})
Result(func() {
Field(1, "processed_count", Int64, "Number of events processed")
Required("processed_count")
})
GRPC(func() {
Response(CodeOK)
})
})
})
Bidirectional streaming enables both the client and server to send streams of messages simultaneously. This pattern is perfect for:
Example definition:
var _ = Service("chat", func() {
Method("connect", func() {
Description("Establish bidirectional chat connection")
StreamingPayload(func() {
Field(1, "message", String, "Chat message")
Field(2, "user_id", String, "User identifier")
Required("message", "user_id")
})
StreamingResult(func() {
Field(1, "message", String, "Chat message")
Field(2, "user_id", String, "User identifier")
Field(3, "timestamp", String, "Message timestamp")
Required("message", "user_id", "timestamp")
})
GRPC(func() {
Response(CodeOK)
})
})
})
The implementation of gRPC streaming in Goa involves both server-side and client-side code. Goa generates the necessary interfaces and types based on your service definition, and you’ll need to implement these interfaces to handle the streaming logic.
On the server side, you’ll need to implement methods that handle the streaming communication. Each streaming pattern requires a different approach to handle the data flow. Let’s look at each pattern in detail:
In this example, we’ll implement a service that streams system metrics (CPU and memory usage) to clients at regular intervals. The server maintains an open connection and continuously sends data to the client.
The monitor.WatchServerStream
interface provided by Goa gives us two main capabilities:
Send(*WatchResult) error
: Sends a single result to the clientContext() context.Context
Here’s how we use these capabilities:
// Server-side streaming
func (s *monitorService) Watch(ctx context.Context, p *monitor.WatchPayload, stream monitor.WatchServerStream) error {
// Create a ticker that fires at the interval specified by the client
ticker := time.NewTicker(time.Duration(p.Interval) * time.Second)
// Ensure the ticker is cleaned up when we're done
defer ticker.Stop()
// Infinite loop to keep sending metrics
for {
select {
// Check if the client has cancelled the request using the context
case <-ctx.Done():
return ctx.Err()
// Wait for the next tick
case <-ticker.C:
// Get the current system metrics (implementation not shown)
metrics := getSystemMetrics()
// Use the stream's Send method to send metrics to the client
// Each call to Send transmits one message in the stream
if err := stream.Send(&monitor.WatchResult{
CPU: metrics.CPU,
Memory: metrics.Memory,
}); err != nil {
return err
}
}
}
}
This example shows how to handle a stream of events from the client. The
analytics.ProcessServerStream
interface provides three key methods:
Recv() (*ProcessPayload, error)
: Receives the next message from the clientSendAndClose(*ProcessResult) error
: Sends a final response and closes the streamContext() context.Context
Here’s how we use these capabilities:
// Client-side streaming
func (s *analyticsService) Process(ctx context.Context, stream analytics.ProcessServerStream) error {
// Keep track of how many events we've processed
var count int64
// Continue reading events from the stream until it's closed
for {
// Use Recv() to get the next message in the stream
// Recv blocks until a message is received or the stream is closed
event, err := stream.Recv()
if err == io.EOF {
// Client has finished sending data
// Use SendAndClose to send the final result and close the stream
// This is specific to client-streaming - we can only send one response
return stream.SendAndClose(&analytics.ProcessResult{
ProcessedCount: count,
})
}
if err != nil {
return err
}
// Process the received event (implementation not shown)
if err := processEvent(event); err != nil {
return err
}
count++
}
}
This example demonstrates a chat service where both sides can send messages at
any time. The chat.ConnectServerStream
interface combines capabilities of both
streaming types:
Recv() (*ConnectPayload, error)
: Receives messages from the clientSend(*ConnectResult) error
: Sends messages to the clientContext() context.Context
Here’s how we use these capabilities:
// Bidirectional streaming
func (s *chatService) Connect(ctx context.Context, stream chat.ConnectServerStream) error {
// Continue processing messages until the client disconnects
for {
// Use Recv() to wait for and receive the next client message
// This blocks until a message arrives or the client closes the stream
msg, err := stream.Recv()
if err == io.EOF {
// Client has closed their send stream
return nil
}
if err != nil {
return err
}
// Create a response with the current timestamp
response := &chat.ConnectResult{
Message: msg.Message,
UserID: msg.UserID,
Timestamp: time.Now().Format(time.RFC3339),
}
// Use Send() to send a message back to the client
// In bidirectional streaming, we can send and receive in any order
if err := stream.Send(response); err != nil {
return err
}
}
}
The client side interfaces mirror the server side but from the client’s perspective. Let’s look at each type:
The client receives a monitor.WatchClient
interface that provides:
Recv() (*WatchResult, error)
: Receives the next metrics updateClose() error
: Closes the stream// Server-side streaming client
func watchMetrics(ctx context.Context, client *monitor.Client) error {
// Start the streaming connection with initial parameters
// This returns a stream interface for receiving metrics
stream, err := client.Watch(ctx, &monitor.WatchPayload{
Interval: 5, // Request metrics every 5 seconds
})
if err != nil {
return err
}
// Continue receiving metrics until the stream ends
for {
// Use Recv() to get the next metrics update
// This blocks until new metrics arrive or the server closes the stream
metrics, err := stream.Recv()
if err == io.EOF {
// Server has closed the stream
break
}
if err != nil {
return err
}
// Process the received metrics (in this case, just log them)
log.Printf("CPU: %.2f%%, Memory: %.2f%%", metrics.CPU, metrics.Memory)
}
return nil
}
The client receives a analytics.ProcessClient
interface that provides:
Send(*ProcessPayload) error
: Sends an event to the serverCloseAndRecv() (*ProcessResult, error)
: Closes the send stream and waits for the final response// Client-side streaming client
func uploadEvents(ctx context.Context, client *analytics.Client, events []*analytics.Event) error {
// Initialize the streaming connection
// This returns a stream interface for sending events
stream, err := client.Process(ctx)
if err != nil {
return err
}
// Send each event to the server using the stream's Send method
for _, event := range events {
if err := stream.Send(event); err != nil {
return err
}
}
// Use CloseAndRecv to close our send stream and get the server's response
// This blocks until the server processes all events and sends the result
result, err := stream.CloseAndRecv()
if err != nil {
return err
}
log.Printf("Processed %d events", result.ProcessedCount)
return nil
}
The client receives a chat.ConnectClient
interface that combines both capabilities:
Send(*ConnectPayload) error
: Sends messages to the serverRecv() (*ConnectResult, error)
: Receives messages from the serverCloseSend() error
: Closes the send stream// Bidirectional streaming client
func startChat(ctx context.Context, client *chat.Client, userID string) error {
// Initialize the bidirectional stream
// This returns a stream interface for both sending and receiving
stream, err := client.Connect(ctx)
if err != nil {
return err
}
// Start a separate goroutine to send messages
// This demonstrates how we can send and receive concurrently
go func() {
for {
// Use Send to transmit messages to the server
if err := stream.Send(&chat.ConnectPayload{
Message: "Hello",
UserID: userID,
}); err != nil {
log.Printf("Send error: %v", err)
return
}
time.Sleep(time.Second)
}
}()
// Use the main goroutine to receive messages
for {
// Use Recv to get the next message from the server
msg, err := stream.Recv()
if err == io.EOF {
// Server has closed the stream
break
}
if err != nil {
return err
}
// Process the received message
log.Printf("Received: %s from %s at %s",
msg.Message, msg.UserID, msg.Timestamp)
}
return nil
}
When implementing streaming endpoints, proper error handling is crucial: