Implementing Client-Side Streaming
Once you’ve designed your client streaming endpoints using Goa’s
StreamingPayload
DSL, the next step is to implement both the client-side logic
that handles the streaming of data and the server-side code that processes the
stream. This guide walks through implementing both sides of a streaming endpoint
in Goa.
Client-Side Implementation
When you define a client streaming method in the DSL, Goa generates specific stream interfaces for the client to implement. These interfaces facilitate the sending of streamed data to the server.
Client Stream Interface
Assuming the following design:
var _ = Service("logger", func() {
Method("upload", func() {
StreamingPayload(LogEntry)
HTTP(func() {
GET("/logs/upload")
Response(StatusOK)
})
GRPC(func() {})
})
})
The client stream interface includes methods for sending data and closing the stream:
// Interface that the client must satisfy
type UploadClientStream interface {
// Send streams instances of "LogEntry"
Send(*LogEntry) error
// Close closes the stream
Close() error
}
Key Methods
- Send: Sends an instance of the specified type (
LogEntry
) to the server. This method can be called multiple times to stream multiple payloads. - Close: Closes the stream,
Send
will return an error after callingClose
.
Example Implementation
Here’s an example of implementing a client-side streaming endpoint:
func uploadLogEntries(client *logger.Client, logEntries []*LogEntry) error {
stream, err := client.Upload(context.Background())
if err != nil {
return fmt.Errorf("failed to start upload stream: %w", err)
}
for _, logEntry := range logEntries {
if err := stream.Send(logEntry); err != nil {
return fmt.Errorf("failed to send log entry: %w", err)
}
}
if err := stream.Close(); err != nil {
return fmt.Errorf("failed to close stream: %w", err)
}
return nil
}
Error Handling
Proper error handling ensures robust streaming behavior:
- Always check the return value of
Send
to handle potential transmission errors - The
Send
method will return an error if the server disconnects or the context is canceled - Ensure that errors are wrapped with appropriate context for debugging
- Consider implementing retry logic for transient failures if appropriate
Server-Side Implementation
The server-side implementation involves receiving and processing the streamed data. Goa generates server interfaces that make it easy to handle incoming streams.
Server Stream Interface
The generated server interface includes methods for receiving data and handling the stream:
// Interface that the server uses to receive the stream
type UploadServerStream interface {
// Recv returns the next payload in the stream
Recv() (*LogEntry, error)
// Close closes the stream
Close() error
}
Example Server Implementation
Here’s how to process a stream on the server side:
func (s *loggerSvc) Upload(ctx context.Context, stream logger.UploadServerStream) error {
for {
logEntry, err := stream.Recv()
if err == io.EOF {
// Stream is finished
return nil
}
if err != nil {
return fmt.Errorf("error receiving log entry: %w", err)
}
// Process the received log entry
if err := s.processLogEntry(logEntry); err != nil {
return fmt.Errorf("error processing log entry: %w", err)
}
}
}
Key Considerations for Servers
Stream Processing:
- Use a loop to continuously receive data until EOF or error
- Handle
io.EOF
as the normal end-of-stream condition - Process incoming data as it arrives
Resource Management:
- Consider implementing rate limiting for incoming data
- Monitor memory usage when processing large streams
- Implement proper error handling and logging
Error Handling:
- Return appropriate errors for validation failures
- Handle context cancellation appropriately
- Consider implementing partial success responses
Summary
Implementing client-side streaming in Goa involves both client-side sending of data and server-side processing of the stream. By following these patterns and best practices for error handling and resource management, you can build robust streaming endpoints that enhance the efficiency of your APIs.
The client implementation focuses on efficiently sending data and handling errors, while the server implementation provides a clean interface for receiving and processing the streamed data. Together, they create a powerful mechanism for handling uploads or real-time data ingestion in your Goa services.