Server-Sent Events (SSE) is a HTTP-based server-to-client streaming protocol that enables real-time updates from the server to the client. Goa provides native support for implementing SSE endpoints, making it easy to add real-time streaming capabilities to your services.
SSE is particularly useful for scenarios where you need to push updates from the server to the client. Think of it like a one-way radio broadcast - the server sends messages, and clients receive them. This makes it perfect for:
The protocol is built on standard HTTP, which means it’s simple to implement and works well with modern web browsers and HTTP clients. When a connection is lost, clients automatically attempt to reconnect, making it reliable for real-time applications.
Goa provides three main streaming options:
Choose SSE when:
Let’s create a complete SSE service. First, create a new file in your design
package (e.g., design/sse.go
):
package design
import . "goa.design/goa/v3/dsl"
// Event represents a message sent via SSE
var Event = Type("Event", func() {
Description("A notification message sent via SSE")
Attribute("message", String, "Message body")
Attribute("timestamp", Int, "Unix timestamp")
Required("message", "timestamp")
})
// SSEService defines the SSE service
var _ = Service("sse", func() {
Description("Service that demonstrates Server-Sent Events")
Method("stream", func() {
Description("Stream events using Server-Sent Events")
StreamingResult(Event) // SSE methods must use StreamingResult
HTTP(func() {
GET("/events/stream")
ServerSentEvents() // Use SSE instead of WebSocket
})
})
})
After defining the design, generate the service code:
goa gen github.com/yourusername/yourproject/design
This will create:
Create a new file for your service implementation (e.g., sse.go
):
package sse
import (
"context"
"time"
"github.com/yourusername/yourproject/gen/sse"
)
type Service struct {
// Add any dependencies here
}
// NewService creates a new SSE service
func NewService() *Service {
return &Service{}
}
// Stream implements the SSE endpoint
func (s *Service) Stream(ctx context.Context, stream sse.StreamServerStream) error {
// Send a message every second
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Create and send an event
event := &sse.Event{
Message: "Hello from server!",
Timestamp: time.Now().Unix(),
}
if err := stream.Send(event); err != nil {
return err
}
case <-ctx.Done():
return nil
}
}
}
The SSE protocol gives us several ways to customize how events are sent. Think of these as different channels on our radio broadcast - we can send different types of messages, keep track of message order, and control how clients reconnect.
Here’s how we can customize our events:
ServerSentEvents(func() {
SSEEventData("message") // The actual message content
SSEEventType("type") // What kind of message it is
SSEEventID("id") // A unique identifier for the message
SSEEventRetry("retry") // How long to wait before reconnecting
})
Let’s break down what each field does:
Data Field (SSEEventData
): This is the main content of your message. It
can be any type of data that can be converted to JSON. If you don’t specify
this, the entire event object will be sent as the data.
Event Type (SSEEventType
): This lets you categorize your messages. For
example, you might have “notification” messages and “alert” messages. Clients
can listen for specific types of messages.
Event ID (SSEEventID
): This is like a message number. It helps clients
keep track of which messages they’ve received, which is especially useful if the
connection is lost and needs to be restored.
Retry Interval (SSEEventRetry
): This tells clients how long to wait
before trying to reconnect if the connection is lost.
Here’s a complete example that uses all these features:
var Event = Type("Event", func() {
Description("A notification message sent via SSE")
Attribute("message", String, "Message body")
Attribute("type", String, "Event type (e.g., 'notification', 'alert')")
Attribute("id", String, "Unique event identifier")
Attribute("retry", Int, "Reconnection delay in milliseconds")
Required("message", "type", "id")
})
Method("stream", func() {
Description("Stream events using Server-Sent Events")
StreamingResult(Event)
HTTP(func() {
GET("/events/stream")
ServerSentEvents(func() {
SSEEventData("message")
SSEEventType("type")
SSEEventID("id")
SSEEventRetry("retry") // Only sent if the retry field it not nil
})
})
})
When this endpoint sends events, they’ll look like this:
event: notification
id: 123
data: {"message": "Hello"}
event: alert
id: 124
data: {"message": "Warning"}
One of the most powerful features of SSE is the ability to resume streaming from
where you left off if the connection is lost. This is handled through the
Last-Event-Id
header. Think of it like a bookmark - when a client reconnects,
it can tell the server “I last received message number X, please send me
everything after that.”
The Last-Event-Id
feature is crucial for building reliable real-time
applications. It ensures that clients don’t miss any messages when their
connection drops, and it helps maintain the correct order of messages. This is
especially important for applications where missing or out-of-order messages
could cause problems.
Let’s implement Last-Event-Id
support in our service:
Method("stream", func() {
Description("Stream events using Server-Sent Events")
Payload(func() {
Attribute("startID", String, "ID of the last event received", func() {
Description("Used to resume streaming from a specific event")
Example("123")
})
})
StreamingResult(Event)
HTTP(func() {
GET("/events/stream")
ServerSentEvents(func() {
SSERequestID("startID") // Maps the Last-Event-Id header to startID
})
})
})
func (s *svc) Stream(ctx context.Context, p *svc.StreamPayload, stream svc.StreamServerStream) error {
// Get the last event ID from the payload
lastID := p.StartID
// If we have a last ID, skip events until we find it
if lastID != "" {
// Skip events until we find the last received event
for ev := range s.events {
if ev.ID == lastID {
break
}
}
}
// Start streaming new events
for ev := range s.events {
if err := stream.Send(ev); err != nil {
return err
}
}
return nil
}
class EventSourceWithRetry extends EventSource {
constructor(url) {
super(url);
this.lastEventId = null;
// Store the last event ID
this.addEventListener('message', (event) => {
if (event.lastEventId) {
this.lastEventId = event.lastEventId;
}
});
}
// Override the default reconnection behavior
reconnect() {
if (this.lastEventId) {
// Create new EventSource with Last-Event-Id header
const headers = new Headers();
headers.append('Last-Event-Id', this.lastEventId);
return new EventSourceWithRetry(this.url, { headers });
}
return new EventSourceWithRetry(this.url);
}
}
// Usage
const eventSource = new EventSourceWithRetry('/events/stream');
When implementing Last-Event-Id
, keep these points in mind:
Event ID Format: Choose a format that makes sense for your application. Sequential numbers are good for maintaining order, while UUIDs are better for uniqueness. Make sure your IDs contain enough information to uniquely identify events.
Storage Considerations: Decide how long you want to keep track of old events. You might want to store the last event ID in the browser’s localStorage to survive page refreshes, or you might want to implement a cleanup mechanism for old event IDs.
Error Handling: Plan for cases where the last event ID is no longer available. Maybe the server has cleaned up old events, or maybe the ID is invalid. Have a fallback mechanism for these situations.
Performance: Be mindful of how you store and look up events. You might want to use a sliding window approach, where you only keep the most recent events in memory.
Connecting to an SSE endpoint is straightforward. Here’s a basic example using the
browser’s EventSource
API:
const eventSource = new EventSource('/events/stream');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Received event:', data);
};
eventSource.onerror = (error) => {
console.error('EventSource failed:', error);
eventSource.close();
};
Goa generates client code that you can use in your Go applications:
package main
import (
"context"
"log"
"github.com/yourusername/yourproject/gen/sse"
"github.com/yourusername/yourproject/gen/sse/client"
)
func main() {
// Create a new client
c := client.NewClient("http://localhost:8080")
// Create a context
ctx := context.Background()
// Start streaming
stream, err := c.Stream(ctx)
if err != nil {
log.Fatal(err)
}
// Receive events
for {
event, err := stream.Recv()
if err != nil {
log.Fatal(err)
}
log.Printf("Received: %+v", event)
}
}
Here’s how to test your SSE endpoint:
func TestStream(t *testing.T) {
// Create a new service
svc := NewService()
// Create a test context
ctx := context.Background()
// Create a test stream
stream := &TestStream{
events: make(chan *sse.Event),
errors: make(chan error),
}
// Start streaming in a goroutine
go func() {
err := svc.Stream(ctx, stream)
if err != nil {
stream.errors <- err
}
}()
// Receive events
for i := 0; i < 5; i++ {
select {
case event := <-stream.events:
if event.Message == "" {
t.Error("Expected message, got empty string")
}
case err := <-stream.errors:
t.Fatal(err)
case <-time.After(time.Second):
t.Fatal("Timeout waiting for event")
}
}
}
type TestStream struct {
events chan *sse.Event
errors chan error
}
func (s *TestStream) Send(event *sse.Event) error {
s.events <- event
return nil
}
func (s *TestStream) Close() error {
close(s.events)
close(s.errors)
return nil
}
For client-side testing, you can use a mock server:
func TestClient(t *testing.T) {
// Create a test server
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// Send test events
for i := 0; i < 5; i++ {
fmt.Fprintf(w, "data: {\"message\":\"test %d\"}\n\n", i)
w.(http.Flusher).Flush()
time.Sleep(100 * time.Millisecond)
}
}))
defer server.Close()
// Create a client
c := client.NewClient(server.URL)
// Start streaming
stream, err := c.Stream(context.Background())
if err != nil {
t.Fatal(err)
}
// Receive events
for i := 0; i < 5; i++ {
event, err := stream.Recv()
if err != nil {
t.Fatal(err)
}
if event.Message != fmt.Sprintf("test %d", i) {
t.Errorf("Expected message 'test %d', got '%s'", i, event.Message)
}
}
}
While SSE is powerful, it does have some limitations to be aware of: