Skip to content

NATS, xRegistry, and Avro: A Routed Event Ingestion Plane

NATS, xRegistry, and Avro: A Routed Event Ingestion Plane

Section titled “NATS, xRegistry, and Avro: A Routed Event Ingestion Plane”

This design treats event ingestion as a small, explicit control system:

  1. NATS delivers events by subject.
  2. xRegistry describes what the event is, how it is encoded, and where it should go.
  3. Avro enforces a stable schema for producers and consumers.
  4. A Go router validates, looks up policy, and sends the event to the correct sink.

The result is a system that is easier to evolve than a hand-written JSON router because routing metadata, schema contracts, and delivery rules are separated.

Suppose a producer emits this event:

{
"type": "metric.cpu.usage",
"version": "v1",
"transport": {
"kind": "nats",
"subject": "metrics.cpu.usage.v1"
},
"schema": {
"format": "avro",
"uri": "xregistry://schemas/metric-cpu-usage/v1"
},
"ingestion": {
"sink": "timescaledb",
"table": "metric_events",
"deadletter_subject": "metrics.deadletter"
}
}

The routing question is simple:

  1. Is this event valid?
  2. Where should it go?
  3. What should happen if delivery fails?

A JSON router can answer those questions, but only by baking validation and policy into application code. That approach works early, then becomes fragile when producers, schemas, and destinations multiply.

A JSON definition router usually starts out clean:

{
"type": "metric.cpu.usage",
"version": "v1",
"subject": "metrics.cpu.usage.v1",
"sink": "timescaledb",
"table": "metric_events"
}

That looks manageable until the system grows.

Problems appear quickly:

  1. Field validation is manual.
  2. Type changes can silently break consumers.
  3. Routing rules and payload shape drift apart.
  4. Every language re-implements the same checks.
  5. Old producers keep sending malformed payloads until runtime failures show up.

JSON is fine for transport and configuration. It is weak as a contract boundary when the system needs compatibility guarantees.

The architecture has four layers:

  1. Producer publishes an Avro-encoded event to a NATS subject.
  2. Router consumes the subject and extracts event identity.
  3. Router looks up the xRegistry entry for policy and destination.
  4. Router writes to the sink or dead-letters the event.
Producer -> NATS subject -> Go router -> TimescaleDB / API / Dead letter
| |
| +-> xRegistry lookup
|
+-> Avro payload validation

The key idea is separation of concerns:

  1. NATS moves messages.
  2. Avro describes message shape.
  3. xRegistry describes routing and schema metadata.
  4. Go implements the runtime decision loop.

For local development, run NATS in Docker and keep the broker reachable on the default client port.

Terminal window
docker run --rm -p 4222:4222 -p 8222:8222 nats -js -m 8222

What this gives you:

  1. 4222 for client connections.
  2. 8222 for the monitoring endpoint.
  3. JetStream enabled for durable streams, consumers, and replayable workflows.

If you prefer Docker Compose, use a small service definition:

services:
nats:
image: nats
command: ["-js", "-m", "8222"]
ports:
- "4222:4222"
- "8222:8222"

To verify the server is reachable, publish and subscribe with the NATS CLI:

Terminal window
nats server check localhost:4222
nats sub metrics.cpu.usage.v1
nats pub metrics.cpu.usage.v1 '{"hello":"world"}'

If you are building the Go router locally, wire it up in this order:

  1. Start NATS.
  2. Start the router subscriber.
  3. Publish a test event.
  4. Confirm the router looks up xRegistry, validates Avro, and writes to the sink or dead letter.

Routing becomes automatic when the consumer derives the route key from the message itself and then applies the registry record without a human choosing the destination.

package main
import (
"context"
"fmt"
"log"
"strings"
"github.com/nats-io/nats.go"
)
type Router struct {
Registry Registry
}
func (r Router) Consume(ctx context.Context, nc *nats.Conn, subject string) (*nats.Subscription, error) {
return nc.Subscribe(subject, func(msg *nats.Msg) {
if err := r.routeMessage(ctx, msg.Subject, msg.Data); err != nil {
log.Printf("route failed for %s: %v", msg.Subject, err)
}
})
}
func (r Router) routeMessage(ctx context.Context, subject string, payload []byte) error {
typeName, version, err := subjectToKey(subject)
if err != nil {
return err
}
route, err := r.Registry.Lookup(ctx, typeName+":"+version)
if err != nil {
return sendToDeadLetter(Event{Type: typeName, Version: version, Payload: payload}, "route lookup failed")
}
event := Event{
Type: typeName,
Version: version,
Payload: payload,
}
return routeEvent(ctx, route, event)
}
func subjectToKey(subject string) (string, string, error) {
parts := strings.Split(subject, ".")
if len(parts) < 4 {
return "", "", fmt.Errorf("invalid subject: %s", subject)
}
return strings.Join(parts[:3], "."), parts[3], nil
}
func routeEvent(ctx context.Context, route Route, event Event) error {
if route.Schema.Format != "avro" {
return sendToDeadLetter(event, "unsupported schema format")
}
if err := decodeAndValidateAvro(event.Payload, route.Schema.URI); err != nil {
return sendToDeadLetter(event, "avro validation failed")
}
switch route.Ingestion.Sink {
case "timescaledb":
return writeToTimescale(event, route.Ingestion.Table)
case "api":
return postToAPI(event, route.Ingestion.Endpoint)
default:
return sendToDeadLetter(event, "unknown sink")
}
}

This loop makes routing automatic in the practical sense:

  1. The consumer subscribes once.
  2. Each message subject is turned into a route key.
  3. xRegistry resolves the sink and schema.
  4. The router validates and dispatches the event.

The human does not pick timescaledb or api at runtime. The registry record does.

Think of the system in terms of responsibilities:

  1. NATS is the highway.
  2. xRegistry is the map.
  3. The router is the driver.
  4. Avro is the traffic rules for payload structure.

That structure gives you a cleaner operational model:

  1. NATS delivers by subject, so producers only need to publish to the right channel.
  2. xRegistry makes routing discoverable, versioned, and centrally managed.
  3. Avro ensures the payload matches the expected schema before data reaches storage.
  4. The router stays small because it does not need to understand every producer-specific payload variant.

The event should have a stable envelope and a typed payload.

{
"type": "record",
"name": "MetricCpuUsageEvent",
"namespace": "com.example.metrics",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "type", "type": "string" },
{ "name": "version", "type": "string" },
{ "name": "source", "type": "string" },
{
"name": "time",
"type": { "type": "long", "logicalType": "timestamp-millis" }
},
{
"name": "data",
"type": {
"type": "record",
"name": "MetricCpuUsageData",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "value", "type": "double" },
{ "name": "unit", "type": ["null", "string"], "default": null },
{
"name": "tags",
"type": {
"type": "map",
"values": "string"
},
"default": {}
}
]
}
}
]
}

Why this helps:

  1. value is a number, not an arbitrary JSON value.
  2. Optional fields are explicit.
  3. Schema changes can be managed with compatibility rules.
  4. Multiple languages can share the same contract.

The producer should not know the routing table. It should know only the event contract and the subject.

package main
import (
"context"
"encoding/binary"
"time"
"github.com/nats-io/nats.go"
)
type MetricCpuUsageData struct {
Name string
Value float64
Unit *string
Tags map[string]string
}
type MetricCpuUsageEvent struct {
ID string
Type string
Version string
Source string
Time time.Time
Data MetricCpuUsageData
}
type AvroCodec interface {
Marshal(v any) ([]byte, error)
}
func publishMetric(ctx context.Context, nc *nats.Conn, codec AvroCodec, subject string, event MetricCpuUsageEvent) error {
_ = ctx
payload, err := codec.Marshal(event)
if err != nil {
return err
}
return nc.Publish(subject, payload)
}
func exampleProducer(nc *nats.Conn, codec AvroCodec) error {
unit := "percent"
event := MetricCpuUsageEvent{
ID: "evt-123",
Type: "metric.cpu.usage",
Version: "v1",
Source: "collector-a",
Time: time.UnixMilli(1735689600000),
Data: MetricCpuUsageData{
Name: "cpu_usage",
Value: 72.5,
Unit: &unit,
Tags: map[string]string{
"host": "node-1",
},
},
}
return publishMetric(context.Background(), nc, codec, "metrics.cpu.usage.v1", event)
}
func encodeWireFormat(schemaID uint32, payload []byte) []byte {
buf := make([]byte, 4+len(payload))
binary.BigEndian.PutUint32(buf[:4], schemaID)
copy(buf[4:], payload)
return buf
}

The producer path stays small:

  1. Build a typed event.
  2. Encode it with the Avro schema.
  3. Publish it to the agreed subject.

The subject is a delivery concern. The schema is a contract concern.

The router stays simple because the runtime logic is driven by metadata.

package main
import (
"context"
"fmt"
"log"
)
type Event struct {
Type string
Version string
ID string
Source string
Payload []byte
}
type Route struct {
Transport struct {
Kind string
Subject string
}
Schema struct {
Format string
URI string
}
Ingestion struct {
Sink string
Table string
Endpoint string
DeadLetterSubject string
}
}
type Registry interface {
Lookup(ctx context.Context, key string) (Route, error)
}
func handleEvent(ctx context.Context, registry Registry, event Event) error {
key := event.Type + ":" + event.Version
route, err := registry.Lookup(ctx, key)
if err != nil {
return sendToDeadLetter(event, "route lookup failed")
}
if route.Schema.Format != "avro" {
return sendToDeadLetter(event, "unsupported schema format")
}
if err := decodeAndValidateAvro(event.Payload, route.Schema.URI); err != nil {
return sendToDeadLetter(event, "avro validation failed")
}
switch route.Ingestion.Sink {
case "timescaledb":
return writeToTimescale(event, route.Ingestion.Table)
case "api":
return postToAPI(event, route.Ingestion.Endpoint)
default:
return sendToDeadLetter(event, "unknown sink")
}
}
func decodeAndValidateAvro(payload []byte, schemaURI string) error {
_ = schemaURI
_ = payload
return nil
}
func writeToTimescale(event Event, table string) error {
log.Printf("write event %s to table %s", event.ID, table)
return nil
}
func postToAPI(event Event, endpoint string) error {
log.Printf("post event %s to endpoint %s", event.ID, endpoint)
return nil
}
func sendToDeadLetter(event Event, reason string) error {
return fmt.Errorf("dead letter event %s: %s", event.ID, reason)
}

The important part is the shape of the control flow:

  1. Build a routing key from type and version.
  2. Load route metadata from xRegistry.
  3. Validate the payload using the Avro schema.
  4. Send the event to the configured sink.
  5. Dead-letter anything that fails routing or validation.

The router should ask a registry service for the route record instead of embedding route tables in code.

package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
)
type RegistryClient struct {
BaseURL string
Client *http.Client
}
type RegistryRecord struct {
Type string `json:"type"`
Version string `json:"version"`
Transport struct {
Kind string `json:"kind"`
Subject string `json:"subject"`
} `json:"transport"`
Schema struct {
Format string `json:"format"`
URI string `json:"uri"`
} `json:"schema"`
Ingestion struct {
Sink string `json:"sink"`
Table string `json:"table"`
Endpoint string `json:"endpoint"`
DeadLetterSubject string `json:"deadletter_subject"`
} `json:"ingestion"`
}
func (c RegistryClient) Lookup(ctx context.Context, key string) (Route, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/routes/%s", c.BaseURL, key), nil)
if err != nil {
return Route{}, err
}
resp, err := c.Client.Do(req)
if err != nil {
return Route{}, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return Route{}, fmt.Errorf("registry lookup failed: %s", resp.Status)
}
var record RegistryRecord
if err := json.NewDecoder(resp.Body).Decode(&record); err != nil {
return Route{}, err
}
var route Route
route.Transport.Kind = record.Transport.Kind
route.Transport.Subject = record.Transport.Subject
route.Schema.Format = record.Schema.Format
route.Schema.URI = record.Schema.URI
route.Ingestion.Sink = record.Ingestion.Sink
route.Ingestion.Table = record.Ingestion.Table
route.Ingestion.Endpoint = record.Ingestion.Endpoint
route.Ingestion.DeadLetterSubject = record.Ingestion.DeadLetterSubject
return route, nil
}

That lookup gives you one place to manage:

  1. Subject naming.
  2. Schema URI or fingerprint.
  3. Sink selection.
  4. Dead-letter routing.
  5. Future version migrations.

The router should reject payloads that do not match the expected schema before they touch the sink.

package main
import (
"fmt"
"github.com/hamba/avro/v2"
)
func decodeAndValidateAvro(payload []byte, schemaURI string) error {
schema, err := loadSchema(schemaURI)
if err != nil {
return err
}
value, err := avro.Parse(schema)
if err != nil {
return err
}
decoded := map[string]any{}
if err := avro.Unmarshal(value, payload, &decoded); err != nil {
return err
}
if decoded["type"] == "" {
return fmt.Errorf("missing type")
}
if decoded["version"] == "" {
return fmt.Errorf("missing version")
}
return nil
}
func loadSchema(schemaURI string) (string, error) {
_ = schemaURI
return `{"type":"record","name":"MetricCpuUsageEvent","fields":[]}`, nil
}

The exact Avro library can vary, but the behavior should stay the same:

  1. Resolve the schema from xRegistry.
  2. Parse the schema.
  3. Decode the payload.
  4. Reject the event if required fields or types are wrong.

This is the point where schema enforcement replaces hand-written type assertions.

Dead letters should not be a black hole. They should be a recoverable queue with enough metadata to reprocess safely.

package main
import (
"context"
"log"
"time"
"github.com/nats-io/nats.go"
)
type DeadLetterEvent struct {
OriginalSubject string
Reason string
EventID string
Payload []byte
FailedAt time.Time
}
func replayDeadLetters(ctx context.Context, nc *nats.Conn, subject string, handler func(context.Context, []byte) error) error {
msgs, err := nc.SubscribeSync(subject)
if err != nil {
return err
}
for {
msg, err := msgs.NextMsgWithContext(ctx)
if err != nil {
return err
}
var deadLetter DeadLetterEvent
if err := decodeDeadLetter(msg.Data, &deadLetter); err != nil {
log.Printf("skip invalid dead letter payload: %v", err)
continue
}
if err := handler(ctx, deadLetter.Payload); err != nil {
log.Printf("replay failed for event %s: %v", deadLetter.EventID, err)
continue
}
log.Printf("replayed dead letter event %s", deadLetter.EventID)
}
}
func decodeDeadLetter(payload []byte, target *DeadLetterEvent) error {
_ = payload
_ = target
return nil
}

This adds a practical recovery path:

  1. Capture the original subject and reason.
  2. Preserve the original payload.
  3. Re-run the same validation and routing logic after a fix.
  4. Keep manual intervention out of the hot path.

Keep the code split by responsibility so the router remains readable:

cmd/router/main.go
internal/router/router.go
internal/router/consumer.go
internal/router/deadletter.go
internal/registry/client.go
internal/schema/avro.go
internal/producer/publish.go

Recommended boundaries:

  1. internal/producer builds typed events and publishes to NATS.
  2. internal/schema resolves and validates Avro schemas.
  3. internal/registry talks to xRegistry.
  4. internal/router converts NATS messages into sink operations.
  5. internal/router/deadletter handles retries, DLQ writes, and replay hooks.

That structure keeps the routing plane easy to test and prevents the registry or NATS details from leaking into sink-specific code.

Your original routing logic is the right mental model:

key := event.Type + ":" + event.Version
route := registry[key]
switch route.Ingestion.Sink {
case "timescaledb":
writeToTimescale(event, route.Ingestion.Table)
case "api":
postToAPI(event, route.Ingestion.Endpoint)
default:
sendToDeadLetter(event)
}

That becomes stronger when the router also checks schema compatibility.

Recommended failure policy:

  1. Unknown route key -> dead letter.
  2. Unknown sink -> dead letter.
  3. Avro decode error -> dead letter.
  4. Schema mismatch -> dead letter.
  5. Temporary sink outage -> retry with backoff, then dead letter after the retry budget.

This keeps bad or stale events out of primary storage.

Compared with a plain JSON router, this design improves the system in several ways:

  1. Stronger validation: Avro rejects malformed payloads before ingestion.
  2. Clearer evolution: schema compatibility can be enforced before deploy.
  3. Smaller payloads: binary Avro avoids repeating field names on every message.
  4. Better discoverability: xRegistry becomes a shared catalog for events, schemas, and routes.
  5. Lower router complexity: the router acts on metadata instead of hard-coded per-event logic.
  6. Better multi-language support: Go producers and consumers can share the same Avro contract with other runtimes.

The practical outcome is less runtime guessing and fewer breakages during schema changes.

AreaJSON RouterxRegistry + Avro
RoutingManual configCentral metadata lookup
ValidationAd hoc codeSchema-driven
CompatibilityEasy to breakExplicit rules
Payload sizeLargerSmaller
DiscoverabilityWeakStrong
Multi-language supportRepeated effortShared contract
Dead-letter handlingCustom per serviceStandardized
VersioningConvention-basedFirst-class

The biggest difference is not message format. It is control of change.

This design is for event ingestion and routing, not multi-step orchestration.

Use a workflow engine when you need:

  1. Step dependencies.
  2. Retries across multiple stages.
  3. Backfills and manual reruns.
  4. Durable workflow state.
  5. Long-running pipelines.

For example:

  1. Event router: metric.cpu.usage.v1 arrives, validate it, write it to TimescaleDB.
  2. Workflow engine: ingest raw metrics, roll them up hourly, compute anomalies, publish a report, and retry partial failures across the whole pipeline.

If the question is “where should this event go,” a router is enough. If the question is “what sequence of jobs should run over time,” use an orchestrator.

NATS, xRegistry, and Avro solve different parts of the problem:

  1. NATS handles delivery.
  2. xRegistry handles routing metadata.
  3. Avro handles payload structure and compatibility.
  4. Go handles the routing decision at runtime.

That combination gives you a clean ingestion plane where producers publish once, the router stays small, and schemas remain explicit. The system scales better than a JSON-definition router because the contract is enforced outside the business logic.