Skip to main content

Command Palette

Search for a command to run...

Getting Metrics from Your OpenZiti Controller over WebSockets

What I learned subscribing to real-time controller events from Go

Published
7 min read
Getting Metrics from Your OpenZiti Controller over WebSockets

A topic came up on the OpenZiti Discourse about subscribing to controller events over WebSockets. I've been doing exactly this in a side project that needed to show some usage data for a Ziti network, so I figured it was worth writing up what I learned along the way.

The short version: the Ziti controller exposes a management WebSocket at /fabric/v1/ws-api that streams real-time events - usage metrics, session lifecycle, circuit events, etc. It speaks a binary "channel V2" protocol over the WebSocket, which is the same protocol the Ziti infrastructure components use to talk to each other. Once you know the wire format, it's pretty straightforward to connect, subscribe, and start processing events.

The Discourse thread above includes a working JavaScript implementation if that's your language of choice. The rest of this post focuses on Go, since that's what I used, but the concepts are the same regardless of language.

If you're using Go, you can skip the manual wire protocol handling entirely. The OpenZiti project provides the channel library, which handles all the binary framing, message routing, and connection lifecycle. The Ziti CLI's own stream events command uses these libraries - you build your subscription as JSON, send it as a channel message, and register a receive handler. The library takes care of everything else. The rest of this post covers the wire protocol directly, which is useful if you're not in Go or want to avoid the dependency.

Authenticating

First you need a session token. The event stream endpoint uses the same auth as the rest of the management API. You have a couple options:

Username/password:

POST https://<controller>/edge/management/v1/authenticate?method=password
Content-Type: application/json

{"username": "admin", "password": "your-password"}

Certificate-based:

POST https://<controller>/edge/management/v1/authenticate?method=cert

No body needed - the client certificate from the TLS handshake is the credential. You'd use the cert and key from a Ziti identity file for this.

Either way, the response gives you a token in data.token. Hold on to it.

If you're using Go, the Ziti SDK handles this for you:

auth := rest_util.NewAuthenticatorUpdb("admin", "password")
auth.RootCas = caPool
session, err := auth.Authenticate(controllerURL)
token := *session.Token

Opening the WebSocket

Connect to wss://<controller>/fabric/v1/ws-api with your session token in a zt-session header.

One thing that tripped me up: the controller's WebSocket handler uses http.Hijacker, which isn't available over HTTP/2. You need to force HTTP/1.1 for the WebSocket handshake. In Go:

tlsCfg := &tls.Config{
    RootCAs:    rootCAs,
    NextProtos: []string{"http/1.1"},
}
httpClient := &http.Client{
    Transport: &http.Transport{
        TLSClientConfig:  tlsCfg,
        ForceAttemptHTTP2: false,
    },
}

opts := &websocket.DialOptions{
    HTTPClient: httpClient,
    HTTPHeader: http.Header{
        "zt-session":      []string{token},
        "Accept-Encoding": []string{"identity"},
    },
    CompressionMode: websocket.CompressionDisabled,
}
conn, _, err := websocket.Dial(ctx, "wss://controller:1280/fabric/v1/ws-api", opts)

I'm using github.com/coder/websocket here, but any WebSocket library that lets you set headers and force HTTP/1.1 will work.

The Channel V2 Wire Protocol

This is where it gets interesting. The WebSocket doesn't carry plain JSON - each binary message is a "channel V2" message with this layout:

Offset  Size  Field
------  ----  -----
0       4     Magic bytes: 0x03 0x06 0x09 0x0C
4       4     Content type (uint32 LE)
8       4     Sequence number (uint32 LE)
12      4     Headers length (uint32 LE)
16      4     Body length (uint32 LE)
20      var   Headers
20+H    var   Body (JSON)

Headers are sequential key-value pairs: [4B key LE] [4B value_len LE] [value bytes].

The content types you care about:

Content type Value What it is
Result 2 Response to your subscription request
StreamEventsRequest 10040 Your subscription message
StreamEventsEvent 10041 An incoming event

And two header keys:

Key Value Meaning
ReplyFor 1 Which sequence number this result is for
ResultSuccess 2 Single byte, 1 = success

Building and parsing these messages is mechanical. Here's Go code for both:

var channelMagic = []byte{0x03, 0x06, 0x09, 0x0c}

func channelMarshal(contentType, sequence uint32, headers map[uint32][]byte, body []byte) []byte {
    var hdrs bytes.Buffer
    for k, v := range headers {
        binary.Write(&hdrs, binary.LittleEndian, k)
        binary.Write(&hdrs, binary.LittleEndian, uint32(len(v)))
        hdrs.Write(v)
    }
    hdrBytes := hdrs.Bytes()

    var buf bytes.Buffer
    buf.Write(channelMagic)
    binary.Write(&buf, binary.LittleEndian, contentType)
    binary.Write(&buf, binary.LittleEndian, sequence)
    binary.Write(&buf, binary.LittleEndian, uint32(len(hdrBytes)))
    binary.Write(&buf, binary.LittleEndian, uint32(len(body)))
    buf.Write(hdrBytes)
    buf.Write(body)
    return buf.Bytes()
}

func channelUnmarshal(data []byte) (contentType, sequence uint32, headers map[uint32][]byte, body []byte, err error) {
    if len(data) < 20 || !bytes.Equal(data[:4], channelMagic) {
        return 0, 0, nil, nil, fmt.Errorf("invalid channel message")
    }

    contentType = binary.LittleEndian.Uint32(data[4:8])
    sequence = binary.LittleEndian.Uint32(data[8:12])
    hdrLen := binary.LittleEndian.Uint32(data[12:16])
    bodyLen := binary.LittleEndian.Uint32(data[16:20])

    headers = make(map[uint32][]byte)
    hdrData := data[20 : 20+hdrLen]
    for i := 0; i < len(hdrData); {
        if i+8 > len(hdrData) { break }
        key := binary.LittleEndian.Uint32(hdrData[i : i+4])
        vlen := binary.LittleEndian.Uint32(hdrData[i+4 : i+8])
        headers[key] = hdrData[i+8 : i+8+vlen]
        i += 8 + int(vlen)
    }

    body = data[20+hdrLen : 20+hdrLen+bodyLen]
    return
}

Subscribing to Events

Send a StreamEventsRequest (content type 10040) with a JSON body listing what you want:

sub := map[string]any{
    "format": "json",
    "subscriptions": []map[string]any{
        {"type": "fabric.usage", "options": map[string]any{"version": 3}},
        {"type": "edge.sessions"},
    },
}
body, _ := json.Marshal(sub)
msg := channelMarshal(10040, 1, nil, body)
conn.Write(ctx, websocket.MessageBinary, msg)

Then read messages until you get a Result (content type 2) that replies to your sequence number. Check that header key 2 has a first byte of 1 for success.

Available event types include fabric.usage, edge.sessions, fabric.circuits, edge.routers, services, fabric.links, and fabric.routers. For usage events, request version 3 - it gives you a cleaner format with separate tags and usage maps.

Reading Events

From here it's a read loop. Filter for content type 10041 and parse the JSON body:

for {
    _, data, err := conn.Read(ctx)
    if err != nil {
        break // reconnect
    }
    ct, _, _, body, err := channelUnmarshal(data)
    if err != nil || ct != 10041 {
        continue
    }

    var event struct {
        Namespace string            `json:"namespace"`
        Tags      map[string]string `json:"tags"`
        Usage     map[string]uint64 `json:"usage"`
    }
    json.Unmarshal(body, &event)

    if event.Namespace == "fabric.usage" {
        fmt.Printf("service=%s client=%s tx=%d rx=%d\n",
            event.Tags["serviceId"], event.Tags["clientId"],
            event.Usage["ingress.tx"], event.Usage["ingress.rx"])
    }
}

Usage events include both client and host identity IDs in the tags (clientId and hostId), along with the serviceId. The usage map has directional counters: ingress.tx and egress.rx represent client upload, ingress.rx and egress.tx represent client download.

Things to Know

The controller doesn't buffer events. If your WebSocket disconnects, events that occur during the gap are lost. Build your client to reconnect with backoff and re-subscribe. Session tokens expire, so you'll want to re-authenticate on reconnect too.

Usage events arrive roughly every minute. The controller aggregates and emits them on an interval, so don't expect sub-second granularity.

HTTP/2 will silently break the handshake. I learned this the hard way, but the controller requires HTTP/1.1 for the WebSocket upgrade. Force it explicitly in your client.

For self-hosted controllers that use Ziti's internal CA for TLS, you can fetch the CA bundle from the controller's well-known endpoint at /edge/client/v1/.well-known/est/cacerts and add those CAs to your trust store alongside system CAs. The Go SDK has rest_util.GetControllerWellKnownCas() for this.

A note on NetFoundry-hosted controllers. If you're connecting to a NetFoundry controller with a DNS name ending in -p, you're talking to a controller configured with publicly signed TLS certificates. Standard system CA trust stores will work - no custom CA bundles needed. Note that some NetFoundry environments require connecting to the management API through Ziti itself (i.e., the management API isn't directly reachable on the internet). Check with your NetFoundry contact if you're not sure which applies to you, but if you can't access the Ziti management API from a NetFoundry-hosted controller, this is probably why.

That's pretty much it. The wire protocol looks intimidating at first, but it's simple once you've written the marshal/unmarshal functions. From there, it's just JSON.

If you have questions or run into issues, the OpenZiti Discourse is the best place to ask. And if you do something interesting with the event stream, I'd love to hear about it.