NATS, xRegistry, and Avro: A Routed Event Ingestion Plane
Table of Contents
Section titled “Table of Contents”- Overview
- Problem Statement
- Why JSON Config Alone Breaks Down
- Target Architecture
- Local NATS Setup
- How the Pieces Fit
- Avro Event Contract
- Go Producer Example
- Go Router Implementation
- xRegistry Lookup Example
- Automatic Routing Loop
- Avro Validation Example
- Dead Letter Replay Example
- Suggested Package Layout
- Routing and Dead Letters
- Operational Benefits
- Comparison With a JSON Router
- When to Use a Workflow Engine Instead
- Conclusion
NATS, xRegistry, and Avro: A Routed Event Ingestion Plane
Section titled “NATS, xRegistry, and Avro: A Routed Event Ingestion Plane”Overview
Section titled “Overview”This design treats event ingestion as a small, explicit control system:
- NATS delivers events by subject.
- xRegistry describes what the event is, how it is encoded, and where it should go.
- Avro enforces a stable schema for producers and consumers.
- A Go router validates, looks up policy, and sends the event to the correct sink.
The result is a system that is easier to evolve than a hand-written JSON router because routing metadata, schema contracts, and delivery rules are separated.
Problem Statement
Section titled “Problem Statement”Suppose a producer emits this event:
{ "type": "metric.cpu.usage", "version": "v1", "transport": { "kind": "nats", "subject": "metrics.cpu.usage.v1" }, "schema": { "format": "avro", "uri": "xregistry://schemas/metric-cpu-usage/v1" }, "ingestion": { "sink": "timescaledb", "table": "metric_events", "deadletter_subject": "metrics.deadletter" }}The routing question is simple:
- Is this event valid?
- Where should it go?
- What should happen if delivery fails?
A JSON router can answer those questions, but only by baking validation and policy into application code. That approach works early, then becomes fragile when producers, schemas, and destinations multiply.
Why JSON Config Alone Breaks Down
Section titled “Why JSON Config Alone Breaks Down”A JSON definition router usually starts out clean:
{ "type": "metric.cpu.usage", "version": "v1", "subject": "metrics.cpu.usage.v1", "sink": "timescaledb", "table": "metric_events"}That looks manageable until the system grows.
Problems appear quickly:
- Field validation is manual.
- Type changes can silently break consumers.
- Routing rules and payload shape drift apart.
- Every language re-implements the same checks.
- Old producers keep sending malformed payloads until runtime failures show up.
JSON is fine for transport and configuration. It is weak as a contract boundary when the system needs compatibility guarantees.
Target Architecture
Section titled “Target Architecture”The architecture has four layers:
- Producer publishes an Avro-encoded event to a NATS subject.
- Router consumes the subject and extracts event identity.
- Router looks up the xRegistry entry for policy and destination.
- Router writes to the sink or dead-letters the event.
Producer -> NATS subject -> Go router -> TimescaleDB / API / Dead letter | | | +-> xRegistry lookup | +-> Avro payload validationThe key idea is separation of concerns:
- NATS moves messages.
- Avro describes message shape.
- xRegistry describes routing and schema metadata.
- Go implements the runtime decision loop.
Local NATS Setup
Section titled “Local NATS Setup”For local development, run NATS in Docker and keep the broker reachable on the default client port.
docker run --rm -p 4222:4222 -p 8222:8222 nats -js -m 8222What this gives you:
4222for client connections.8222for the monitoring endpoint.- JetStream enabled for durable streams, consumers, and replayable workflows.
If you prefer Docker Compose, use a small service definition:
services: nats: image: nats command: ["-js", "-m", "8222"] ports: - "4222:4222" - "8222:8222"To verify the server is reachable, publish and subscribe with the NATS CLI:
nats server check localhost:4222nats sub metrics.cpu.usage.v1nats pub metrics.cpu.usage.v1 '{"hello":"world"}'If you are building the Go router locally, wire it up in this order:
- Start NATS.
- Start the router subscriber.
- Publish a test event.
- Confirm the router looks up xRegistry, validates Avro, and writes to the sink or dead letter.
Automatic Routing Loop
Section titled “Automatic Routing Loop”Routing becomes automatic when the consumer derives the route key from the message itself and then applies the registry record without a human choosing the destination.
package main
import ( "context" "fmt" "log" "strings"
"github.com/nats-io/nats.go")
type Router struct { Registry Registry}
func (r Router) Consume(ctx context.Context, nc *nats.Conn, subject string) (*nats.Subscription, error) { return nc.Subscribe(subject, func(msg *nats.Msg) { if err := r.routeMessage(ctx, msg.Subject, msg.Data); err != nil { log.Printf("route failed for %s: %v", msg.Subject, err) } })}
func (r Router) routeMessage(ctx context.Context, subject string, payload []byte) error { typeName, version, err := subjectToKey(subject) if err != nil { return err }
route, err := r.Registry.Lookup(ctx, typeName+":"+version) if err != nil { return sendToDeadLetter(Event{Type: typeName, Version: version, Payload: payload}, "route lookup failed") }
event := Event{ Type: typeName, Version: version, Payload: payload, } return routeEvent(ctx, route, event)}
func subjectToKey(subject string) (string, string, error) { parts := strings.Split(subject, ".") if len(parts) < 4 { return "", "", fmt.Errorf("invalid subject: %s", subject) } return strings.Join(parts[:3], "."), parts[3], nil}
func routeEvent(ctx context.Context, route Route, event Event) error { if route.Schema.Format != "avro" { return sendToDeadLetter(event, "unsupported schema format") }
if err := decodeAndValidateAvro(event.Payload, route.Schema.URI); err != nil { return sendToDeadLetter(event, "avro validation failed") }
switch route.Ingestion.Sink { case "timescaledb": return writeToTimescale(event, route.Ingestion.Table) case "api": return postToAPI(event, route.Ingestion.Endpoint) default: return sendToDeadLetter(event, "unknown sink") }}This loop makes routing automatic in the practical sense:
- The consumer subscribes once.
- Each message subject is turned into a route key.
- xRegistry resolves the sink and schema.
- The router validates and dispatches the event.
The human does not pick timescaledb or api at runtime. The registry record does.
How the Pieces Fit
Section titled “How the Pieces Fit”Think of the system in terms of responsibilities:
- NATS is the highway.
- xRegistry is the map.
- The router is the driver.
- Avro is the traffic rules for payload structure.
That structure gives you a cleaner operational model:
- NATS delivers by subject, so producers only need to publish to the right channel.
- xRegistry makes routing discoverable, versioned, and centrally managed.
- Avro ensures the payload matches the expected schema before data reaches storage.
- The router stays small because it does not need to understand every producer-specific payload variant.
Avro Event Contract
Section titled “Avro Event Contract”The event should have a stable envelope and a typed payload.
{ "type": "record", "name": "MetricCpuUsageEvent", "namespace": "com.example.metrics", "fields": [ { "name": "id", "type": "string" }, { "name": "type", "type": "string" }, { "name": "version", "type": "string" }, { "name": "source", "type": "string" }, { "name": "time", "type": { "type": "long", "logicalType": "timestamp-millis" } }, { "name": "data", "type": { "type": "record", "name": "MetricCpuUsageData", "fields": [ { "name": "name", "type": "string" }, { "name": "value", "type": "double" }, { "name": "unit", "type": ["null", "string"], "default": null }, { "name": "tags", "type": { "type": "map", "values": "string" }, "default": {} } ] } } ]}Why this helps:
valueis a number, not an arbitrary JSON value.- Optional fields are explicit.
- Schema changes can be managed with compatibility rules.
- Multiple languages can share the same contract.
Go Producer Example
Section titled “Go Producer Example”The producer should not know the routing table. It should know only the event contract and the subject.
package main
import ( "context" "encoding/binary" "time"
"github.com/nats-io/nats.go")
type MetricCpuUsageData struct { Name string Value float64 Unit *string Tags map[string]string}
type MetricCpuUsageEvent struct { ID string Type string Version string Source string Time time.Time Data MetricCpuUsageData}
type AvroCodec interface { Marshal(v any) ([]byte, error)}
func publishMetric(ctx context.Context, nc *nats.Conn, codec AvroCodec, subject string, event MetricCpuUsageEvent) error { _ = ctx
payload, err := codec.Marshal(event) if err != nil { return err }
return nc.Publish(subject, payload)}
func exampleProducer(nc *nats.Conn, codec AvroCodec) error { unit := "percent" event := MetricCpuUsageEvent{ ID: "evt-123", Type: "metric.cpu.usage", Version: "v1", Source: "collector-a", Time: time.UnixMilli(1735689600000), Data: MetricCpuUsageData{ Name: "cpu_usage", Value: 72.5, Unit: &unit, Tags: map[string]string{ "host": "node-1", }, }, }
return publishMetric(context.Background(), nc, codec, "metrics.cpu.usage.v1", event)}
func encodeWireFormat(schemaID uint32, payload []byte) []byte { buf := make([]byte, 4+len(payload)) binary.BigEndian.PutUint32(buf[:4], schemaID) copy(buf[4:], payload) return buf}The producer path stays small:
- Build a typed event.
- Encode it with the Avro schema.
- Publish it to the agreed subject.
The subject is a delivery concern. The schema is a contract concern.
Go Router Implementation
Section titled “Go Router Implementation”The router stays simple because the runtime logic is driven by metadata.
package main
import ( "context" "fmt" "log")
type Event struct { Type string Version string ID string Source string Payload []byte}
type Route struct { Transport struct { Kind string Subject string } Schema struct { Format string URI string } Ingestion struct { Sink string Table string Endpoint string DeadLetterSubject string }}
type Registry interface { Lookup(ctx context.Context, key string) (Route, error)}
func handleEvent(ctx context.Context, registry Registry, event Event) error { key := event.Type + ":" + event.Version route, err := registry.Lookup(ctx, key) if err != nil { return sendToDeadLetter(event, "route lookup failed") }
if route.Schema.Format != "avro" { return sendToDeadLetter(event, "unsupported schema format") }
if err := decodeAndValidateAvro(event.Payload, route.Schema.URI); err != nil { return sendToDeadLetter(event, "avro validation failed") }
switch route.Ingestion.Sink { case "timescaledb": return writeToTimescale(event, route.Ingestion.Table) case "api": return postToAPI(event, route.Ingestion.Endpoint) default: return sendToDeadLetter(event, "unknown sink") }}
func decodeAndValidateAvro(payload []byte, schemaURI string) error { _ = schemaURI _ = payload return nil}
func writeToTimescale(event Event, table string) error { log.Printf("write event %s to table %s", event.ID, table) return nil}
func postToAPI(event Event, endpoint string) error { log.Printf("post event %s to endpoint %s", event.ID, endpoint) return nil}
func sendToDeadLetter(event Event, reason string) error { return fmt.Errorf("dead letter event %s: %s", event.ID, reason)}The important part is the shape of the control flow:
- Build a routing key from
typeandversion. - Load route metadata from xRegistry.
- Validate the payload using the Avro schema.
- Send the event to the configured sink.
- Dead-letter anything that fails routing or validation.
xRegistry Lookup Example
Section titled “xRegistry Lookup Example”The router should ask a registry service for the route record instead of embedding route tables in code.
package main
import ( "context" "encoding/json" "fmt" "net/http")
type RegistryClient struct { BaseURL string Client *http.Client}
type RegistryRecord struct { Type string `json:"type"` Version string `json:"version"` Transport struct { Kind string `json:"kind"` Subject string `json:"subject"` } `json:"transport"` Schema struct { Format string `json:"format"` URI string `json:"uri"` } `json:"schema"` Ingestion struct { Sink string `json:"sink"` Table string `json:"table"` Endpoint string `json:"endpoint"` DeadLetterSubject string `json:"deadletter_subject"` } `json:"ingestion"`}
func (c RegistryClient) Lookup(ctx context.Context, key string) (Route, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/routes/%s", c.BaseURL, key), nil) if err != nil { return Route{}, err }
resp, err := c.Client.Do(req) if err != nil { return Route{}, err } defer resp.Body.Close()
if resp.StatusCode != http.StatusOK { return Route{}, fmt.Errorf("registry lookup failed: %s", resp.Status) }
var record RegistryRecord if err := json.NewDecoder(resp.Body).Decode(&record); err != nil { return Route{}, err }
var route Route route.Transport.Kind = record.Transport.Kind route.Transport.Subject = record.Transport.Subject route.Schema.Format = record.Schema.Format route.Schema.URI = record.Schema.URI route.Ingestion.Sink = record.Ingestion.Sink route.Ingestion.Table = record.Ingestion.Table route.Ingestion.Endpoint = record.Ingestion.Endpoint route.Ingestion.DeadLetterSubject = record.Ingestion.DeadLetterSubject return route, nil}That lookup gives you one place to manage:
- Subject naming.
- Schema URI or fingerprint.
- Sink selection.
- Dead-letter routing.
- Future version migrations.
Avro Validation Example
Section titled “Avro Validation Example”The router should reject payloads that do not match the expected schema before they touch the sink.
package main
import ( "fmt"
"github.com/hamba/avro/v2")
func decodeAndValidateAvro(payload []byte, schemaURI string) error { schema, err := loadSchema(schemaURI) if err != nil { return err }
value, err := avro.Parse(schema) if err != nil { return err }
decoded := map[string]any{} if err := avro.Unmarshal(value, payload, &decoded); err != nil { return err }
if decoded["type"] == "" { return fmt.Errorf("missing type") } if decoded["version"] == "" { return fmt.Errorf("missing version") } return nil}
func loadSchema(schemaURI string) (string, error) { _ = schemaURI return `{"type":"record","name":"MetricCpuUsageEvent","fields":[]}`, nil}The exact Avro library can vary, but the behavior should stay the same:
- Resolve the schema from xRegistry.
- Parse the schema.
- Decode the payload.
- Reject the event if required fields or types are wrong.
This is the point where schema enforcement replaces hand-written type assertions.
Dead Letter Replay Example
Section titled “Dead Letter Replay Example”Dead letters should not be a black hole. They should be a recoverable queue with enough metadata to reprocess safely.
package main
import ( "context" "log" "time"
"github.com/nats-io/nats.go")
type DeadLetterEvent struct { OriginalSubject string Reason string EventID string Payload []byte FailedAt time.Time}
func replayDeadLetters(ctx context.Context, nc *nats.Conn, subject string, handler func(context.Context, []byte) error) error { msgs, err := nc.SubscribeSync(subject) if err != nil { return err }
for { msg, err := msgs.NextMsgWithContext(ctx) if err != nil { return err }
var deadLetter DeadLetterEvent if err := decodeDeadLetter(msg.Data, &deadLetter); err != nil { log.Printf("skip invalid dead letter payload: %v", err) continue }
if err := handler(ctx, deadLetter.Payload); err != nil { log.Printf("replay failed for event %s: %v", deadLetter.EventID, err) continue }
log.Printf("replayed dead letter event %s", deadLetter.EventID) }}
func decodeDeadLetter(payload []byte, target *DeadLetterEvent) error { _ = payload _ = target return nil}This adds a practical recovery path:
- Capture the original subject and reason.
- Preserve the original payload.
- Re-run the same validation and routing logic after a fix.
- Keep manual intervention out of the hot path.
Suggested Package Layout
Section titled “Suggested Package Layout”Keep the code split by responsibility so the router remains readable:
cmd/router/main.gointernal/router/router.gointernal/router/consumer.gointernal/router/deadletter.gointernal/registry/client.gointernal/schema/avro.gointernal/producer/publish.goRecommended boundaries:
internal/producerbuilds typed events and publishes to NATS.internal/schemaresolves and validates Avro schemas.internal/registrytalks to xRegistry.internal/routerconverts NATS messages into sink operations.internal/router/deadletterhandles retries, DLQ writes, and replay hooks.
That structure keeps the routing plane easy to test and prevents the registry or NATS details from leaking into sink-specific code.
Routing and Dead Letters
Section titled “Routing and Dead Letters”Your original routing logic is the right mental model:
key := event.Type + ":" + event.Versionroute := registry[key]
switch route.Ingestion.Sink {case "timescaledb": writeToTimescale(event, route.Ingestion.Table)case "api": postToAPI(event, route.Ingestion.Endpoint)default: sendToDeadLetter(event)}That becomes stronger when the router also checks schema compatibility.
Recommended failure policy:
- Unknown route key -> dead letter.
- Unknown sink -> dead letter.
- Avro decode error -> dead letter.
- Schema mismatch -> dead letter.
- Temporary sink outage -> retry with backoff, then dead letter after the retry budget.
This keeps bad or stale events out of primary storage.
Operational Benefits
Section titled “Operational Benefits”Compared with a plain JSON router, this design improves the system in several ways:
- Stronger validation: Avro rejects malformed payloads before ingestion.
- Clearer evolution: schema compatibility can be enforced before deploy.
- Smaller payloads: binary Avro avoids repeating field names on every message.
- Better discoverability: xRegistry becomes a shared catalog for events, schemas, and routes.
- Lower router complexity: the router acts on metadata instead of hard-coded per-event logic.
- Better multi-language support: Go producers and consumers can share the same Avro contract with other runtimes.
The practical outcome is less runtime guessing and fewer breakages during schema changes.
Comparison With a JSON Router
Section titled “Comparison With a JSON Router”| Area | JSON Router | xRegistry + Avro |
|---|---|---|
| Routing | Manual config | Central metadata lookup |
| Validation | Ad hoc code | Schema-driven |
| Compatibility | Easy to break | Explicit rules |
| Payload size | Larger | Smaller |
| Discoverability | Weak | Strong |
| Multi-language support | Repeated effort | Shared contract |
| Dead-letter handling | Custom per service | Standardized |
| Versioning | Convention-based | First-class |
The biggest difference is not message format. It is control of change.
When to Use a Workflow Engine Instead
Section titled “When to Use a Workflow Engine Instead”This design is for event ingestion and routing, not multi-step orchestration.
Use a workflow engine when you need:
- Step dependencies.
- Retries across multiple stages.
- Backfills and manual reruns.
- Durable workflow state.
- Long-running pipelines.
For example:
- Event router:
metric.cpu.usage.v1arrives, validate it, write it to TimescaleDB. - Workflow engine: ingest raw metrics, roll them up hourly, compute anomalies, publish a report, and retry partial failures across the whole pipeline.
If the question is “where should this event go,” a router is enough. If the question is “what sequence of jobs should run over time,” use an orchestrator.
Conclusion
Section titled “Conclusion”NATS, xRegistry, and Avro solve different parts of the problem:
- NATS handles delivery.
- xRegistry handles routing metadata.
- Avro handles payload structure and compatibility.
- Go handles the routing decision at runtime.
That combination gives you a clean ingestion plane where producers publish once, the router stays small, and schemas remain explicit. The system scales better than a JSON-definition router because the contract is enforced outside the business logic.