Stream interceptors handle streaming RPCs in gRPC services. They’re used when either the client, server, or both send multiple messages over a single connection. This guide shows you how to implement effective stream interceptors for your Goa services.
A stream interceptor follows this pattern:
func StreamInterceptor(srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
// 1. Pre-stream operations
// - Extract metadata
// - Validate protocol requirements
// - Initialize stream state
// 2. Wrap the stream for monitoring
wrappedStream := &wrappedServerStream{
ServerStream: ss,
// Add fields for tracking stream state
}
// 3. Handle the stream
err := handler(srv, wrappedStream)
// 4. Post-stream operations
// - Record metrics
// - Clean up resources
// - Handle errors
return err
}
This structure allows you to:
The gRPC server stream interface provides basic message handling capabilities, but
interceptors often need to add functionality without modifying the original stream.
This is where stream wrappers become essential. A stream wrapper implements the
grpc.ServerStream
interface while adding custom behavior through composition.
Here’s a standard implementation pattern:
type wrappedServerStream struct {
grpc.ServerStream // Embed the original interface
msgCount int64 // Track message count
startTime time.Time // Track stream duration
method string // Store RPC method name
}
func (w *wrappedServerStream) SendMsg(m interface{}) error {
err := w.ServerStream.SendMsg(m)
if err == nil {
atomic.AddInt64(&w.msgCount, 1) // Thread-safe counter
}
return err
}
func (w *wrappedServerStream) RecvMsg(m interface{}) error {
err := w.ServerStream.RecvMsg(m)
if err == nil {
atomic.AddInt64(&w.msgCount, 1) // Track received messages too
}
return err
}
This wrapper pattern serves several important purposes:
Message Tracking: The wrapper intercepts every message sent or received, allowing you to:
State Management: The wrapper maintains stream-specific state:
Error Handling: The wrapper can enhance error handling by:
Here’s an example of a more sophisticated wrapper that adds functionality commonly needed in production environments:
type enhancedServerStream struct {
grpc.ServerStream
ctx context.Context // Enhanced context
method string // RPC method name
startTime time.Time // Stream start time
msgCount int64 // Message counter
msgSize int64 // Total bytes processed
metadata metadata.MD // Cached metadata
mu sync.RWMutex // Protect concurrent access
logger *zap.Logger // Structured logging
}
func newEnhancedServerStream(ss grpc.ServerStream, method string) *enhancedServerStream {
return &enhancedServerStream{
ServerStream: ss,
ctx: ss.Context(),
method: method,
startTime: time.Now(),
metadata: metadata.MD{},
logger: zap.L().With(zap.String("method", method)),
}
}
func (s *enhancedServerStream) Context() context.Context {
return s.ctx
}
func (s *enhancedServerStream) SendMsg(m interface{}) error {
// Pre-send processing
msgSize := proto.Size(m.(proto.Message))
s.mu.Lock()
s.msgSize += int64(msgSize)
s.mu.Unlock()
// Log large messages
if msgSize > maxMessageSize {
s.logger.Warn("large message detected",
zap.Int("size", msgSize))
}
// Send with timing
start := time.Now()
err := s.ServerStream.SendMsg(m)
duration := time.Since(start)
// Post-send processing
if err == nil {
atomic.AddInt64(&s.msgCount, 1)
metrics.RecordMessageMetrics(s.method, "send",
msgSize, duration)
} else {
s.logger.Error("send failed",
zap.Error(err))
}
return err
}
func (s *enhancedServerStream) RecvMsg(m interface{}) error {
// Similar enhancement pattern for receive...
}
func (s *enhancedServerStream) Stats() StreamStats {
s.mu.RLock()
defer s.mu.RUnlock()
return StreamStats{
Method: s.method,
Duration: time.Since(s.startTime),
MessageCount: atomic.LoadInt64(&s.msgCount),
TotalBytes: s.msgSize,
}
}
This enhanced wrapper demonstrates several production-ready features:
Metrics Collection: The wrapper automatically records:
Logging Integration: It provides structured logging with:
Resource Tracking: The wrapper maintains:
Thread Safety: It properly handles concurrent access through:
You can use these wrappers in your interceptors like this:
func StreamInterceptor(srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
// Create enhanced stream
ws := newEnhancedServerStream(ss, info.FullMethod)
// Use wrapper in handler
err := handler(srv, ws)
// Record final statistics
stats := ws.Stats()
metrics.RecordStreamStats(stats)
return err
}
These wrapper patterns are standard practice in gRPC services, and you’ll find similar implementations in many production systems. The specific enhancements you add will depend on your service’s requirements, but the basic pattern of wrapping the stream to add functionality remains consistent.
Monitor streaming RPC performance and behavior:
func MonitoringStreamInterceptor(srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
// Create wrapped stream
ws := &wrappedServerStream{
ServerStream: ss,
startTime: time.Now(),
method: info.FullMethod,
}
// Extract peer information
peer, _ := peer.FromContext(ss.Context())
// Handle stream
err := handler(srv, ws)
// Record metrics
duration := time.Since(ws.startTime)
msgCount := atomic.LoadInt64(&ws.msgCount)
status := status.Code(err)
metrics.RecordStreamMetrics(ws.method, peer.Addr.String(),
status, duration, msgCount)
return err
}
This pattern demonstrates comprehensive stream monitoring capabilities. The interceptor tracks the duration of each stream from start to finish, maintaining an accurate count of messages processed. It extracts and records peer information from the context, enabling you to identify and monitor client behavior. The interceptor properly handles stream errors, ensuring that failure scenarios are captured and recorded. All of this information is collected into stream-specific metrics, providing valuable insights into your service’s streaming behavior.
Manage resources for long-lived streams:
func ResourceManagementInterceptor(srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
// Create resource pool
pool := acquireResourcePool()
defer releaseResourcePool(pool)
// Create stream context with cancel
ctx, cancel := context.WithCancel(ss.Context())
defer cancel()
// Create wrapped stream with resource context
ws := &wrappedServerStream{
ServerStream: wrapStreamContext(ss, ctx),
resources: pool,
}
// Monitor resource usage
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if pool.Usage() > maxUsage {
cancel() // Terminate stream if resources exceeded
return
}
}
}
}()
return handler(srv, ws)
}
This example showcases essential resource management techniques for streaming RPCs. The interceptor creates and manages a dedicated resource pool for each stream, ensuring proper allocation and cleanup of resources. It implements active monitoring of resource usage through a background goroutine, which periodically checks consumption levels. When resource limits are exceeded, the interceptor gracefully terminates the stream using context cancellation. Throughout the stream’s lifecycle, it maintains proper cleanup through strategic use of defer statements, guaranteeing that resources are released even in error scenarios.
Implement flow control for streaming RPCs:
func FlowControlInterceptor(maxMsgsPerSecond int) grpc.StreamServerInterceptor {
return func(srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
limiter := rate.NewLimiter(rate.Limit(maxMsgsPerSecond), 1)
ws := &wrappedServerStream{
ServerStream: ss,
sendMsg: func(m interface{}) error {
if err := limiter.Wait(ss.Context()); err != nil {
return err
}
return ss.SendMsg(m)
},
recvMsg: func(m interface{}) error {
if err := limiter.Wait(ss.Context()); err != nil {
return err
}
return ss.RecvMsg(m)
},
}
return handler(srv, ws)
}
}
This pattern illustrates sophisticated flow control for streaming RPCs. The interceptor employs a token bucket algorithm to enforce rate limits on message flow, preventing resource exhaustion from high-volume streams. It carefully respects context cancellation, ensuring that rate limiting doesn’t block indefinitely when streams are terminated. The implementation handles both send and receive operations uniformly, providing consistent flow control in both directions. This approach allows for fine-grained control over message processing rates while maintaining responsiveness to cancellation and shutdown signals.
Testing streaming interceptors requires careful consideration of stream lifecycle, message flow, and state management. Here’s how to use Clue’s mock package to test stream interceptors effectively:
// Mock implementation of grpc.ServerStream
type mockServerStream struct {
*mock.Mock
t *testing.T
}
func newMockServerStream(t *testing.T) *mockServerStream {
return &mockServerStream{mock.New(), t}
}
func (m *mockServerStream) Context() context.Context {
if f := m.Next("Context"); f != nil {
return f.(func() context.Context)()
}
return context.Background()
}
func (m *mockServerStream) SendMsg(msg interface{}) error {
if f := m.Next("SendMsg"); f != nil {
return f.(func(interface{}) error)(msg)
}
return nil
}
func (m *mockServerStream) RecvMsg(msg interface{}) error {
if f := m.Next("RecvMsg"); f != nil {
return f.(func(interface{}) error)(msg)
}
return nil
}
func TestMonitoringStreamInterceptor(t *testing.T) {
tests := []struct {
name string
setup func(*mockServerStream)
msgCount int
wantErr bool
}{
{
name: "successful stream with multiple messages",
setup: func(s *mockServerStream) {
// Set up context call
s.Set("Context", func() context.Context {
return context.Background()
})
// Set up successful message sends
for i := 0; i < 10; i++ {
s.Add("SendMsg", func(msg interface{}) error {
return nil
})
}
},
msgCount: 10,
wantErr: false,
},
{
name: "stream with error",
setup: func(s *mockServerStream) {
s.Set("Context", func() context.Context {
return context.Background()
})
// First few messages succeed
for i := 0; i < 3; i++ {
s.Add("SendMsg", func(msg interface{}) error {
return nil
})
}
// Then error occurs
s.Add("SendMsg", func(msg interface{}) error {
return status.Error(codes.Internal, "stream error")
})
},
msgCount: 4,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create mock stream
stream := newMockServerStream(t)
if tt.setup != nil {
tt.setup(stream)
}
// Create test handler
handler := func(srv interface{}, stream grpc.ServerStream) error {
for i := 0; i < tt.msgCount; i++ {
if err := stream.SendMsg(i); err != nil {
return err
}
}
return nil
}
// Call interceptor
err := MonitoringStreamInterceptor(nil, stream,
&grpc.StreamServerInfo{}, handler)
// Verify error behavior
if (err != nil) != tt.wantErr {
t.Errorf("MonitoringStreamInterceptor() error = %v, wantErr %v",
err, tt.wantErr)
}
// Verify all expected calls were made
if stream.HasMore() {
t.Error("not all expected stream operations were performed")
}
})
}
}
// Testing resource management with Clue mocks
func TestResourceManagementInterceptor(t *testing.T) {
tests := []struct {
name string
setup func(*mockServerStream)
resources *ResourcePool
wantErr bool
}{
{
name: "respects resource limits",
setup: func(s *mockServerStream) {
s.Set("Context", func() context.Context {
return context.Background()
})
// Simulate message processing until resource limit
s.Add("SendMsg", func(msg interface{}) error {
return nil
})
},
resources: NewResourcePool(100),
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stream := newMockServerStream(t)
tt.setup(stream)
err := ResourceManagementInterceptor(nil, stream,
&grpc.StreamServerInfo{}, testHandler)
if (err != nil) != tt.wantErr {
t.Errorf("ResourceManagementInterceptor() error = %v, wantErr %v",
err, tt.wantErr)
}
if stream.HasMore() {
t.Error("not all expected stream operations were performed")
}
})
}
}
// Testing flow control with Clue mocks
func TestFlowControlInterceptor(t *testing.T) {
tests := []struct {
name string
setup func(*mockServerStream)
rate int
wantErr bool
}{
{
name: "throttles rapid messages",
setup: func(s *mockServerStream) {
s.Set("Context", func() context.Context {
return context.Background()
})
// Set up sequence of messages with timing checks
start := time.Now()
for i := 0; i < 3; i++ {
s.Add("SendMsg", func(msg interface{}) error {
if elapsed := time.Since(start); elapsed < time.Second/2 {
return fmt.Errorf("message sent too quickly: %v", elapsed)
}
return nil
})
}
},
rate: 2, // messages per second
wantErr: false,
},
}
// Test implementation...
}
This testing approach using Clue’s mock package offers several advantages:
Sequence Control: The Add
method allows precise control over the sequence
of stream operations, making it easy to test different message patterns and
error scenarios.
Permanent Behaviors: The Set
method defines default behaviors for stream
operations that don’t need to vary, reducing test setup code.
Verification: The HasMore
method provides a simple way to verify that all
expected operations were performed, catching missing or unexpected calls.
Flexibility: The mock implementation can be easily extended to handle new stream behaviors or test different aspects of interceptor functionality.
The tests demonstrate several key patterns:
Setup Functions: Each test case includes a setup function that configures the mock stream’s behavior, making the test cases clear and self-contained.
Error Scenarios: The tests cover both successful operations and various error conditions, ensuring robust error handling.
Resource Management: Tests verify proper resource allocation, usage tracking, and cleanup.
Flow Control: Tests validate rate limiting and backpressure mechanisms using timing-aware mock implementations.