Implementing Server-Side Streaming
Once you’ve designed your server streaming endpoints using Goa’s
StreamingResult
DSL, the next step is to implement both the server-side logic
that handles the streaming of results and the client-side code that consumes the
stream. This guide walks through implementing both sides of a streaming endpoint
in Goa.
Server-Side Implementation
When you define a server streaming method in the DSL, Goa generates specific stream interfaces for the server to implement. These interfaces facilitate the sending of streamed data to clients.
Server Stream Interface
Assuming the following design:
var _ = Service("logger", func() {
Method("subscribe", func() {
StreamingResult(LogEntry)
HTTP(func() {
GET("/logs/stream")
Response(StatusOK)
})
})
})
The server stream interface includes methods for sending data and closing the stream:
// Interface that the server must satisfy
type ListServerStream interface {
// Send streams instances of "StoredBottle"
Send(*LogEntry) error
// Close the stream
Close() error
}
Key Methods
- Send: Sends an instance of the specified type (
LogEntry
) to the client. This method can be called multiple times to stream multiple results. - Close: Closes the stream, signaling the end of the data transmission.
After calling
Close
, any subsequent calls toSend
will result in an error.
Example Implementation
Here’s an example of implementing a server-side streaming endpoint:
// Lists streams the log entries to the client
func (s *loggerSvc) Subscribe(ctx context.Context, stream logger.SubscribeServerStream) error {
logEntries, err := loadLogEntries()
if err != nil {
return fmt.Errorf("failed to load log entries: %w", err)
}
for _, logEntry := range logEntries {
if err := stream.Send(logEntry); err != nil {
return fmt.Errorf("failed to send log entry: %w", err)
}
}
return stream.Close()
}
Error Handling
Proper error handling ensures robust streaming behavior:
- Always check the return value of
Send
to handle potential transmission errors - The
Send
method will return an error if the client disconnects or the context is canceled - Ensure that errors are wrapped with appropriate context for debugging
- Consider implementing retry logic for transient failures if appropriate
Client-Side Implementation
The client-side implementation involves receiving and processing the streamed data. Goa generates client interfaces that make it easy to consume streams.
Client Stream Interface
The generated client interface includes methods for receiving data and handling the stream:
// Interface that the client uses to receive the stream
type ListClientStream interface {
// Recv returns the next result in the stream
Recv() (*LogEntry, error)
// Close closes the stream
Close() error
}
Example Client Implementation
Here’s how to consume a stream from the client side:
func processLogEntryStream(client logger.Client) error {
stream, err := client.List(context.Background())
if err != nil {
return fmt.Errorf("failed to start stream: %w", err)
}
defer stream.Close()
for {
logEntry, err := stream.Recv()
if err == io.EOF {
// Stream is finished
return nil
}
if err != nil {
return fmt.Errorf("error receiving log entry: %w", err)
}
// Process the received log entry
processLogEntry(logEntry)
}
}
Key Considerations for Clients
Stream Initialization:
- Create the stream using the generated client method
- Check for initialization errors before proceeding
- Use
defer stream.Close()
to ensure proper cleanup
Receiving Data:
- Use a loop to continuously receive data until EOF or error
- Handle
io.EOF
as the normal end-of-stream condition - Process other errors appropriately based on your application needs
Resource Management:
- Always close the stream when done
- Consider using timeouts or deadlines via context if needed
- Implement proper error handling and logging
Summary
Implementing streaming in Goa involves both server-side streaming of data and client-side consumption of the stream. By following these patterns and best practices for error handling and resource management, you can build robust streaming endpoints that enhance the responsiveness and scalability of your APIs.
The server implementation focuses on efficiently sending data and handling errors, while the client implementation provides a clean interface for receiving and processing the streamed data. Together, they create a powerful mechanism for handling real-time or large datasets in your Goa services.