Stream Interceptors
Stream gRPC Interceptors
Stream interceptors handle streaming RPCs in gRPC services. They’re used when either the client, server, or both send multiple messages over a single connection. This guide shows you how to implement effective stream interceptors for your Goa services.
Basic Structure
A stream interceptor follows this pattern:
func StreamInterceptor(srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
// 1. Pre-stream operations
// - Extract metadata
// - Validate protocol requirements
// - Initialize stream state
// 2. Wrap the stream for monitoring
wrappedStream := &wrappedServerStream{
ServerStream: ss,
// Add fields for tracking stream state
}
// 3. Handle the stream
err := handler(srv, wrappedStream)
// 4. Post-stream operations
// - Record metrics
// - Clean up resources
// - Handle errors
return err
}
This structure allows you to:
- Set up stream-wide context and state
- Monitor message flow
- Handle stream lifecycle events
- Manage stream-specific resources
Stream Wrapper
The gRPC server stream interface provides basic message handling capabilities, but
interceptors often need to add functionality without modifying the original stream.
This is where stream wrappers become essential. A stream wrapper implements the
grpc.ServerStream interface while adding custom behavior through composition.
Here’s a standard implementation pattern:
type wrappedServerStream struct {
grpc.ServerStream // Embed the original interface
msgCount int64 // Track message count
startTime time.Time // Track stream duration
method string // Store RPC method name
}
func (w *wrappedServerStream) SendMsg(m interface{}) error {
err := w.ServerStream.SendMsg(m)
if err == nil {
atomic.AddInt64(&w.msgCount, 1) // Thread-safe counter
}
return err
}
func (w *wrappedServerStream) RecvMsg(m interface{}) error {
err := w.ServerStream.RecvMsg(m)
if err == nil {
atomic.AddInt64(&w.msgCount, 1) // Track received messages too
}
return err
}
This wrapper pattern serves several important purposes:
Message Tracking: The wrapper intercepts every message sent or received, allowing you to:
- Count total messages processed
- Implement rate limiting
- Log message sizes or contents
- Apply transformations
State Management: The wrapper maintains stream-specific state:
- Track timing information
- Store stream metadata
- Manage resource usage
- Coordinate multiple goroutines
Error Handling: The wrapper can enhance error handling by:
- Adding context to errors
- Implementing retry logic
- Recording error metrics
- Cleaning up resources
Here’s an example of a more sophisticated wrapper that adds functionality commonly needed in production environments:
type enhancedServerStream struct {
grpc.ServerStream
ctx context.Context // Enhanced context
method string // RPC method name
startTime time.Time // Stream start time
msgCount int64 // Message counter
msgSize int64 // Total bytes processed
metadata metadata.MD // Cached metadata
mu sync.RWMutex // Protect concurrent access
logger *zap.Logger // Structured logging
}
func newEnhancedServerStream(ss grpc.ServerStream, method string) *enhancedServerStream {
return &enhancedServerStream{
ServerStream: ss,
ctx: ss.Context(),
method: method,
startTime: time.Now(),
metadata: metadata.MD{},
logger: zap.L().With(zap.String("method", method)),
}
}
func (s *enhancedServerStream) Context() context.Context {
return s.ctx
}
func (s *enhancedServerStream) SendMsg(m interface{}) error {
// Pre-send processing
msgSize := proto.Size(m.(proto.Message))
s.mu.Lock()
s.msgSize += int64(msgSize)
s.mu.Unlock()
// Log large messages
if msgSize > maxMessageSize {
s.logger.Warn("large message detected",
zap.Int("size", msgSize))
}
// Send with timing
start := time.Now()
err := s.ServerStream.SendMsg(m)
duration := time.Since(start)
// Post-send processing
if err == nil {
atomic.AddInt64(&s.msgCount, 1)
metrics.RecordMessageMetrics(s.method, "send",
msgSize, duration)
} else {
s.logger.Error("send failed",
zap.Error(err))
}
return err
}
func (s *enhancedServerStream) RecvMsg(m interface{}) error {
// Similar enhancement pattern for receive...
}
func (s *enhancedServerStream) Stats() StreamStats {
s.mu.RLock()
defer s.mu.RUnlock()
return StreamStats{
Method: s.method,
Duration: time.Since(s.startTime),
MessageCount: atomic.LoadInt64(&s.msgCount),
TotalBytes: s.msgSize,
}
}
This enhanced wrapper demonstrates several production-ready features:
Metrics Collection: The wrapper automatically records:
- Message counts and sizes
- Processing durations
- Error rates
- Custom business metrics
Logging Integration: It provides structured logging with:
- Method-level context
- Size warnings
- Error details
- Timing information
Resource Tracking: The wrapper maintains:
- Total bytes processed
- Stream duration
- Message statistics
- Resource usage patterns
Thread Safety: It properly handles concurrent access through:
- Atomic operations for counters
- Mutex protection for shared state
- Safe context management
- Thread-safe logging
You can use these wrappers in your interceptors like this:
func StreamInterceptor(srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
// Create enhanced stream
ws := newEnhancedServerStream(ss, info.FullMethod)
// Use wrapper in handler
err := handler(srv, ws)
// Record final statistics
stats := ws.Stats()
metrics.RecordStreamStats(stats)
return err
}
These wrapper patterns are standard practice in gRPC services, and you’ll find similar implementations in many production systems. The specific enhancements you add will depend on your service’s requirements, but the basic pattern of wrapping the stream to add functionality remains consistent.
Common Patterns
1. Stream Monitoring
Monitor streaming RPC performance and behavior:
func MonitoringStreamInterceptor(srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
// Create wrapped stream
ws := &wrappedServerStream{
ServerStream: ss,
startTime: time.Now(),
method: info.FullMethod,
}
// Extract peer information
peer, _ := peer.FromContext(ss.Context())
// Handle stream
err := handler(srv, ws)
// Record metrics
duration := time.Since(ws.startTime)
msgCount := atomic.LoadInt64(&ws.msgCount)
status := status.Code(err)
metrics.RecordStreamMetrics(ws.method, peer.Addr.String(),
status, duration, msgCount)
return err
}
This pattern demonstrates comprehensive stream monitoring capabilities. The interceptor tracks the duration of each stream from start to finish, maintaining an accurate count of messages processed. It extracts and records peer information from the context, enabling you to identify and monitor client behavior. The interceptor properly handles stream errors, ensuring that failure scenarios are captured and recorded. All of this information is collected into stream-specific metrics, providing valuable insights into your service’s streaming behavior.
2. Resource Management
Manage resources for long-lived streams:
func ResourceManagementInterceptor(srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
// Create resource pool
pool := acquireResourcePool()
defer releaseResourcePool(pool)
// Create stream context with cancel
ctx, cancel := context.WithCancel(ss.Context())
defer cancel()
// Create wrapped stream with resource context
ws := &wrappedServerStream{
ServerStream: wrapStreamContext(ss, ctx),
resources: pool,
}
// Monitor resource usage
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if pool.Usage() > maxUsage {
cancel() // Terminate stream if resources exceeded
return
}
}
}
}()
return handler(srv, ws)
}
This example showcases essential resource management techniques for streaming RPCs. The interceptor creates and manages a dedicated resource pool for each stream, ensuring proper allocation and cleanup of resources. It implements active monitoring of resource usage through a background goroutine, which periodically checks consumption levels. When resource limits are exceeded, the interceptor gracefully terminates the stream using context cancellation. Throughout the stream’s lifecycle, it maintains proper cleanup through strategic use of defer statements, guaranteeing that resources are released even in error scenarios.
3. Flow Control
Implement flow control for streaming RPCs:
func FlowControlInterceptor(maxMsgsPerSecond int) grpc.StreamServerInterceptor {
return func(srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
limiter := rate.NewLimiter(rate.Limit(maxMsgsPerSecond), 1)
ws := &wrappedServerStream{
ServerStream: ss,
sendMsg: func(m interface{}) error {
if err := limiter.Wait(ss.Context()); err != nil {
return err
}
return ss.SendMsg(m)
},
recvMsg: func(m interface{}) error {
if err := limiter.Wait(ss.Context()); err != nil {
return err
}
return ss.RecvMsg(m)
},
}
return handler(srv, ws)
}
}
This pattern illustrates sophisticated flow control for streaming RPCs. The interceptor employs a token bucket algorithm to enforce rate limits on message flow, preventing resource exhaustion from high-volume streams. It carefully respects context cancellation, ensuring that rate limiting doesn’t block indefinitely when streams are terminated. The implementation handles both send and receive operations uniformly, providing consistent flow control in both directions. This approach allows for fine-grained control over message processing rates while maintaining responsiveness to cancellation and shutdown signals.
Testing
Testing streaming interceptors requires careful consideration of stream lifecycle, message flow, and state management. Here’s how to use Clue’s mock package to test stream interceptors effectively:
// Mock implementation of grpc.ServerStream
type mockServerStream struct {
*mock.Mock
t *testing.T
}
func newMockServerStream(t *testing.T) *mockServerStream {
return &mockServerStream{mock.New(), t}
}
func (m *mockServerStream) Context() context.Context {
if f := m.Next("Context"); f != nil {
return f.(func() context.Context)()
}
return context.Background()
}
func (m *mockServerStream) SendMsg(msg interface{}) error {
if f := m.Next("SendMsg"); f != nil {
return f.(func(interface{}) error)(msg)
}
return nil
}
func (m *mockServerStream) RecvMsg(msg interface{}) error {
if f := m.Next("RecvMsg"); f != nil {
return f.(func(interface{}) error)(msg)
}
return nil
}
func TestMonitoringStreamInterceptor(t *testing.T) {
tests := []struct {
name string
setup func(*mockServerStream)
msgCount int
wantErr bool
}{
{
name: "successful stream with multiple messages",
setup: func(s *mockServerStream) {
// Set up context call
s.Set("Context", func() context.Context {
return context.Background()
})
// Set up successful message sends
for i := 0; i < 10; i++ {
s.Add("SendMsg", func(msg interface{}) error {
return nil
})
}
},
msgCount: 10,
wantErr: false,
},
{
name: "stream with error",
setup: func(s *mockServerStream) {
s.Set("Context", func() context.Context {
return context.Background()
})
// First few messages succeed
for i := 0; i < 3; i++ {
s.Add("SendMsg", func(msg interface{}) error {
return nil
})
}
// Then error occurs
s.Add("SendMsg", func(msg interface{}) error {
return status.Error(codes.Internal, "stream error")
})
},
msgCount: 4,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create mock stream
stream := newMockServerStream(t)
if tt.setup != nil {
tt.setup(stream)
}
// Create test handler
handler := func(srv interface{}, stream grpc.ServerStream) error {
for i := 0; i < tt.msgCount; i++ {
if err := stream.SendMsg(i); err != nil {
return err
}
}
return nil
}
// Call interceptor
err := MonitoringStreamInterceptor(nil, stream,
&grpc.StreamServerInfo{}, handler)
// Verify error behavior
if (err != nil) != tt.wantErr {
t.Errorf("MonitoringStreamInterceptor() error = %v, wantErr %v",
err, tt.wantErr)
}
// Verify all expected calls were made
if stream.HasMore() {
t.Error("not all expected stream operations were performed")
}
})
}
}
// Testing resource management with Clue mocks
func TestResourceManagementInterceptor(t *testing.T) {
tests := []struct {
name string
setup func(*mockServerStream)
resources *ResourcePool
wantErr bool
}{
{
name: "respects resource limits",
setup: func(s *mockServerStream) {
s.Set("Context", func() context.Context {
return context.Background()
})
// Simulate message processing until resource limit
s.Add("SendMsg", func(msg interface{}) error {
return nil
})
},
resources: NewResourcePool(100),
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stream := newMockServerStream(t)
tt.setup(stream)
err := ResourceManagementInterceptor(nil, stream,
&grpc.StreamServerInfo{}, testHandler)
if (err != nil) != tt.wantErr {
t.Errorf("ResourceManagementInterceptor() error = %v, wantErr %v",
err, tt.wantErr)
}
if stream.HasMore() {
t.Error("not all expected stream operations were performed")
}
})
}
}
// Testing flow control with Clue mocks
func TestFlowControlInterceptor(t *testing.T) {
tests := []struct {
name string
setup func(*mockServerStream)
rate int
wantErr bool
}{
{
name: "throttles rapid messages",
setup: func(s *mockServerStream) {
s.Set("Context", func() context.Context {
return context.Background()
})
// Set up sequence of messages with timing checks
start := time.Now()
for i := 0; i < 3; i++ {
s.Add("SendMsg", func(msg interface{}) error {
if elapsed := time.Since(start); elapsed < time.Second/2 {
return fmt.Errorf("message sent too quickly: %v", elapsed)
}
return nil
})
}
},
rate: 2, // messages per second
wantErr: false,
},
}
// Test implementation...
}
This testing approach using Clue’s mock package offers several advantages:
Sequence Control: The
Addmethod allows precise control over the sequence of stream operations, making it easy to test different message patterns and error scenarios.Permanent Behaviors: The
Setmethod defines default behaviors for stream operations that don’t need to vary, reducing test setup code.Verification: The
HasMoremethod provides a simple way to verify that all expected operations were performed, catching missing or unexpected calls.Flexibility: The mock implementation can be easily extended to handle new stream behaviors or test different aspects of interceptor functionality.
The tests demonstrate several key patterns:
Setup Functions: Each test case includes a setup function that configures the mock stream’s behavior, making the test cases clear and self-contained.
Error Scenarios: The tests cover both successful operations and various error conditions, ensuring robust error handling.
Resource Management: Tests verify proper resource allocation, usage tracking, and cleanup.
Flow Control: Tests validate rate limiting and backpressure mechanisms using timing-aware mock implementations.
Best Practices
- Resource Management: Always clean up resources, even on errors.
- Context Handling: Respect context cancellation for stream operations.
- Flow Control: Implement rate limiting for high-volume streams.
- Error Handling: Use appropriate gRPC status codes for stream errors.
- Testing: Test stream lifecycle events and error conditions.
- Monitoring: Track stream health and performance metrics.
- Documentation: Document stream behavior and resource requirements.
Next Steps
- Review Error Handling
- Explore Observability
- Learn about Load Balancing