Implementing Bidirectional Streaming
Once you’ve designed your bidirectional streaming endpoints using Goa’s
StreamingPayload
and StreamingResult
DSL, the next step is to implement both
sides of the streaming connection. This guide walks through implementing both
client and server components of a bidirectional streaming endpoint in Goa.
Design
Assuming the following design:
var _ = Service("logger", func() {
Method("monitor", func() {
StreamingPayload(LogFilter)
StreamingResult(LogEntry)
HTTP(func() {
GET("/logs/monitor")
Response(StatusOK)
})
GRPC(func() {})
})
})
Client-Side Implementation
When you define a bidirectional streaming method, Goa generates specific stream interfaces for the client to implement. These interfaces facilitate both sending and receiving streamed data.
Client Stream Interface
The client stream interface includes methods for both sending and receiving data:
// Interface that the client must satisfy
type MonitorClientStream interface {
// Send streams instances of "LogFilter"
Send(*LogFilter) error
// Recv returns the next result in the stream
Recv() (*LogEntry, error)
// Close closes the stream
Close() error
}
Key Methods
- Send: Sends filter updates to the server. Can be called multiple times to update filtering criteria.
- Recv: Receives log entries from the server that match the current filters.
- Close: Closes the bidirectional stream. After calling Close, both Send and Recv will return errors.
Example Implementation
Here’s an example of implementing a client-side bidirectional streaming endpoint:
func monitorLogs(client logger.Client, initialFilter *LogFilter) error {
stream, err := client.Monitor(context.Background())
if err != nil {
return fmt.Errorf("failed to start monitor stream: %w", err)
}
defer stream.Close()
// Start a goroutine to handle receiving logs
go func() {
for {
logEntry, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Printf("error receiving log entry: %v", err)
return
}
processLogEntry(logEntry)
}
}()
// Send initial filter
if err := stream.Send(initialFilter); err != nil {
return fmt.Errorf("failed to send initial filter: %w", err)
}
// Update filters based on some condition
for {
newFilter := waitForFilterUpdate()
if err := stream.Send(newFilter); err != nil {
return fmt.Errorf("failed to update filter: %w", err)
}
}
}
Server-Side Implementation
The server-side implementation handles both incoming filter updates and streams matching log entries back to the client.
Server Stream Interface
// Interface that the server must satisfy
type MonitorServerStream interface {
// Send streams instances of "LogEntry"
Send(*LogEntry) error
// Recv returns the next filter in the stream
Recv() (*LogFilter, error)
// Close closes the stream
Close() error
}
Example Server Implementation
Here’s how to implement bidirectional streaming on the server side:
func (s *loggerSvc) Monitor(ctx context.Context, stream logger.MonitorServerStream) error {
// Start a goroutine to handle filter updates
filterCh := make(chan *LogFilter, 1)
go func() {
defer close(filterCh)
for {
filter, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Printf("error receiving filter update: %v", err)
return
}
filterCh <- filter
}
}()
// Main loop for processing logs and applying filters
var currentFilter *LogFilter
for {
select {
case filter, ok := <-filterCh:
if !ok {
// Channel closed, stop processing
return nil
}
currentFilter = filter
case <-ctx.Done():
// Context cancelled, stop processing
return ctx.Err()
default:
if currentFilter != nil {
logEntry := s.getNextMatchingLog(currentFilter)
if err := stream.Send(logEntry); err != nil {
return fmt.Errorf("error sending log entry: %w", err)
}
}
}
}
}
Key Considerations
Concurrent Operations:
- Use goroutines to handle sending and receiving independently
- Implement proper synchronization for shared state
- Handle graceful shutdown of both directions
Resource Management:
- Monitor memory usage for both incoming and outgoing streams
- Implement rate limiting in both directions
- Clean up resources when either side closes the stream
Error Handling:
- Handle errors from both Send and Recv operations
- Propagate errors appropriately to both sides
- Consider implementing reconnection logic for transient failures
Context Management:
- Honor context cancellation for both directions
- Implement appropriate timeouts
- Clean up resources when context is cancelled
Summary
Implementing bidirectional streaming in Goa requires careful coordination of both sending and receiving operations on both client and server sides. By following these patterns and best practices for concurrent operations, error handling, and resource management, you can build robust bidirectional streaming endpoints that enable real-time, interactive communication between client and server.
The implementation allows for dynamic updates to streaming behavior through client-sent filters while maintaining a continuous stream of server responses, creating a flexible and powerful mechanism for real-time data exchange in your Goa services.