Architecture
This document describes the internal architecture of ZigBolt, covering module dependencies, data structures, memory layouts, threading model, and data flow.
Layer Diagram
Section titled “Layer Diagram”ZigBolt is organized into seven layers, each depending only on layers below it:
+================================================================+| Application Layer || Transport, Publisher(T), Subscriber(T), RawPublisher/Sub || AgentRunner, CompositeAgent, DutyCycleTracker |+================================================================+| Channel Layer || IpcChannel (shm) UdpChannel NetworkChannel (reliable) || CongestionControl FlowControl(Min/Max/Tagged) |+================================================================+| Protocol Layer || Reliability (NAK) Fragmenter/Reassembler NakController || DataHeaderFlyweight StatusMessageFlyweight NakFlyweight || SetupFlyweight RttMeasurementFlyweight ErrorFlyweight |+================================================================+| Codec Layer || WireCodec(T) SbeEncoder/SbeDecoder FIX Messages || FrameHeader TickMessage OrderMessage Decimal64 |+================================================================+| Core Layer || SpscRingBuffer MpscRingBuffer LogBuffer Sequencer || BroadcastBuffer (1-to-N) CounterSet GlobalCounters || IdleStrategy (BusySpin/Yielding/Sleeping/Backoff/NoOp) |+================================================================+| Cluster / Archive Layer || RaftNode Cluster WriteAheadLog SnapshotManager || Archive Catalog SparseIndex Compressor/Decompressor |+================================================================+| Platform Layer || config.zig (cache lines, timestamps) memory.zig (shm/mmap) |+================================================================+Module Dependency Graph
Section titled “Module Dependency Graph”root.zig | +-- platform/config.zig (constants, timestampNs) +-- platform/memory.zig (SharedRegion, mmap/shm) | +-- core/frame.zig (FrameHeader, alignment) +-- core/spsc.zig <-- frame, config +-- core/mpsc.zig <-- frame, config +-- core/log_buffer.zig <-- frame, config | +-- codec/wire.zig (WireCodec, TickMessage, OrderMessage) +-- codec/sbe.zig (SbeEncoder, SbeDecoder, MessageHeader, GroupHeader, Decimal64) +-- codec/fix_messages.zig (NewOrderSingle, ExecutionReport, MarketData, etc.) <-- sbe | +-- protocol/flyweight.zig (DataHeader, StatusMessage, NAK, Setup, RTT, Error flyweights) | +-- core/broadcast.zig (BroadcastTransmitter, BroadcastReceiver) <-- config +-- core/idle_strategy.zig (BusySpin, Yielding, Sleeping, Backoff, NoOp) +-- core/agent.zig (AgentFn, AgentRunner, CompositeAgent) <-- idle_strategy +-- core/counters.zig (Counter, CounterSet, GlobalCounters) | +-- channel/ipc.zig <-- memory, frame, config +-- channel/udp.zig <-- frame, config +-- channel/reliability.zig <-- frame, config +-- channel/fragment.zig +-- channel/network.zig <-- udp, reliability, fragment +-- channel/congestion.zig (CongestionControl, RttEstimator, NakController) +-- channel/flow_control.zig (MinFlowControl, MaxFlowControl, TaggedFlowControl) | +-- api/publisher.zig <-- ipc, wire +-- api/subscriber.zig <-- ipc, wire +-- api/transport.zig <-- ipc, publisher, subscriber | +-- archive/segment.zig +-- archive/archive.zig <-- segment +-- archive/catalog.zig (Catalog, CatalogEntry) +-- archive/index.zig (SparseIndex, IndexEntry) +-- archive/compression.zig (Compressor, Decompressor, compressFrame/decompressFrame) | +-- sequencer/sequencer.zig | +-- cluster/raft_log.zig +-- cluster/raft.zig <-- raft_log +-- cluster/cluster.zig <-- raft, raft_log +-- cluster/wal.zig (WriteAheadLog, WalEntry, VoteState) +-- cluster/snapshot.zig (SnapshotManager, SnapshotData) | +-- ffi/exports.zig <-- zigbolt (root)Key Data Structures
Section titled “Key Data Structures”FrameHeader (8 bytes)
Section titled “FrameHeader (8 bytes)”Every message in a ring buffer or log buffer is prefixed by this header:
Offset Size Field Description------ ---- ----- -----------0 4 frame_length i32: >0 data, <0 padding, =0 uncommitted4 4 msg_type_id i32: user-defined message typeThe total frame size is alignUp(8 + payload_len, 8) — always 8-byte aligned.
SpscRingBuffer Memory Layout
Section titled “SpscRingBuffer Memory Layout” Cache Line 0 Cache Line 1 +------------------+ +------------------+ | head (atomic u64)| | tail (atomic u64)| | + padding | | + padding | +------------------+ +------------------+ | | | buffer[capacity] | | (cache-line aligned, power of 2) | +----------------------------------------+
head and tail are on separate cache lines to prevent false sharingbetween the producer (writes head) and consumer (writes tail).- head: write position, modified only by producer, stored with
.release - tail: read position, modified only by consumer, stored with
.release - mask:
capacity - 1(comptime constant, capacity must be power of 2) - Wrap-around uses modular arithmetic:
pos & mask
MpscRingBuffer Memory Layout
Section titled “MpscRingBuffer Memory Layout”Same structure as SPSC, but:
- head is advanced via CAS (compare-and-swap) for multiple producers
- tail is a plain
usize(single consumer only) - Two-phase commit: CAS claims space, then
frame_lengthstored with.releaseto commit
LogBuffer Memory Layout (Aeron-style)
Section titled “LogBuffer Memory Layout (Aeron-style)”+-------------------+-------------------+-------------------+| Term 0 | Term 1 | Term 2 || (term_length) | (term_length) | (term_length) |+-------------------+-------------------+-------------------+
tail_position (atomic u64) -- absolute byte offset, wraps across termshead_position (atomic u64) -- consumer read position
Term rotation: when a message doesn't fit in the current term,a padding frame is inserted and tail advances to the next term.Term index = (position / term_length) % 3Term offset = position % term_lengthThe Claim API provides two-phase publishing:
claim(length)— atomically reserves space, returnsClaim- Write payload into
claim.term_buffer[claim.term_offset + 8 ..] commit(claim, msg_type_id)— release-storesframe_lengthto make visible
IPC Channel Shared Memory Layout
Section titled “IPC Channel Shared Memory Layout”Offset Size Content------ ---- -------0 4096 Metadata (cache-line padded) +0 8 magic: 0x5A49_4742_4F4C_5421 ("ZIGBOLT!") +8 4 version: 1 +12 4 term_length +CL 8 tail_position (atomic u64) +2*CL 8 head_position (atomic u64)4096 term_length Term 04096+TL term_length Term 14096+2*TL term_length Term 2
Total size: 4096 + 3 * term_lengthCL = cache_line_size (128 bytes on modern CPUs)NetworkHeader (Network Protocol)
Section titled “NetworkHeader (Network Protocol)”Offset Size Field Description------ ---- ----- -----------0 1 version Protocol version (1)1 1 header_type data(0), nak(1), heartbeat(2), setup(3), teardown(4)2 4 session_id Publisher-subscriber pair identifier6 4 stream_id Topic/channel within session10 8 sequence Monotonically increasing per stream18 4 payload_length Bytes following this header22 3 _reserved PaddingWireCodec Packed Message Layout
Section titled “WireCodec Packed Message Layout”Messages must be packed struct with no pointers. Validated entirely at comptime.
TickMessage (32 bytes):
Offset Size Field------ ---- -----0 8 timestamp_ns (u64)8 4 symbol_id (u32)12 8 price (i64)20 8 volume (u64)28 1 side (enum u8: bid=0, ask=1)29 3 _padding (u24)OrderMessage (48 bytes):
Offset Size Field------ ---- -----0 8 timestamp_ns (u64)8 8 order_id (u64)16 4 symbol_id (u32)20 8 price (i64)28 8 quantity (u64)36 1 side (enum u8: buy=0, sell=1)37 1 order_type (enum u8: limit=0, market=1, cancel=2)38 2 _padding (u16)Wire size must be a multiple of 8 bytes. Encoding is a direct @memcpy of the
packed representation — zero overhead.
Threading Model
Section titled “Threading Model”IPC Channel (SPSC)
Section titled “IPC Channel (SPSC)”Process A (Publisher) Shared Memory Process B (Subscriber)+-------------------+ +-------------------+ +-------------------+| publish() | -> | tail_position | <- | poll() || writes payload | | [Term Buffers] | | reads frames || stores frame_len | | head_position | | advances head || advances tail | +-------------------+ +-------------------++-------------------+
- Publisher writes payload, then release-stores frame_length- Subscriber acquire-loads frame_length, reads payload, advances head- No locks, no CAS -- pure acquire/release orderingMPSC Ring Buffer
Section titled “MPSC Ring Buffer”Thread 1 (Producer) Thread 2 (Producer) Thread 3 (Consumer) | | | v v | CAS(head) CAS(head) | | | | write payload write payload | | | | release-store release-store | frame_length frame_length | v acquire-load frame_length read payload advance tailNetwork Channel
Section titled “Network Channel”Single-threaded event loop:
publish()— encode, fragment if needed, send via UDPpoll()— receive UDP datagrams, track sequences, reassemble, deliver- NAK generation happens at the end of each poll cycle
Raft Cluster
Section titled “Raft Cluster”Each node runs a single-threaded tick loop:
- Receive messages from peers
- Handle via
handleMessage()(state transitions, log replication) tick()applies committed entries to the state machine- Heartbeats sent periodically by the leader
Data Flow: Publish to Receive
Section titled “Data Flow: Publish to Receive”IPC Path (lowest latency)
Section titled “IPC Path (lowest latency)”Publisher.offer(&msg) | vWireCodec.encode() -- @memcpy packed struct to bytes | vIpcChannel.publish() -- write FrameHeader + payload into term buffer | release-store frame_length, advance tail v --- shared memory --- | vIpcChannel.poll() -- acquire-load tail, read frames | vWireCodec.decode() -- pointer cast into shared memory (zero-copy) | vSubscriber handler(msg) -- user callback with *const MsgTypeTotal copies: 1 (encode). Decode is zero-copy (pointer cast).
Network Path (reliable UDP)
Section titled “Network Path (reliable UDP)”NetworkChannel.publish(data) | vFlowControl.tryConsume() -- check credit window | vFragmenter (if needed) -- split into MTU-sized chunks | vsendWithReliability() -- assign sequence number | store copy in SendBuffer v prepend NetworkHeaderUdpChannel.send() -- sendto() syscall | v --- network (UDP datagram) --- | vUdpChannel.recv() -- recvfrom() syscall | vNetworkChannel.poll() -- parse NetworkHeader | RecvTracker.recordReceived() v handle NAKs, heartbeatsReassembler (if fragmented) -- collect fragments, deliver complete | vhandler(data) -- user callbackArchive Path
Section titled “Archive Path”Archive.record(stream_id, msg_type_id, data, timestamp_ns) | vSegmentManager.write(Record) -- append to current segment file | rotate segment when full v[disk: /tmp/zigbolt/archive/segment_NNNN.dat]
Archive.replay(params, handler) | vSegmentManager.openSegment() -- memory-map segment file | vSegment.readRecord() -- sequential scan with offset tracking | optional stream_id filter vhandler(Record) -- user callback per archived messageWire Protocol Flyweights
Section titled “Wire Protocol Flyweights”Aeron-compatible wire protocol frames. Each flyweight wraps a raw []u8 buffer
and provides typed accessor methods at fixed byte offsets (little-endian).
DataHeaderFlyweight (32 bytes)
Section titled “DataHeaderFlyweight (32 bytes)”Offset Size Field Description------ ---- ----- -----------0 4 frame_length i32: total frame size including header4 1 version u8: protocol version5 1 flags u8: BEGIN(0x80), END(0x40), EOS(0x20)6 2 frame_type u16: FrameType enum (DATA=0x01)8 4 term_offset u32: offset within the term buffer12 4 session_id i32: publication session identifier16 4 stream_id i32: channel stream identifier20 4 term_id i32: term buffer identifier24 8 reserved_value i64: user-defined metadataStatusMessageFlyweight (36 bytes)
Section titled “StatusMessageFlyweight (36 bytes)”Offset Size Field Description------ ---- ----- -----------0 8 [HeaderFlyweight] Base header (frame_length, version, flags, type=SM)8 4 session_id i3212 4 stream_id i3216 4 consumption_term_id i32: term consumed up to20 4 consumption_term_offset i32: offset within consumption term24 4 receiver_window_length i32: advertised window (bytes)28 8 receiver_id i64: unique receiver identifierNakFlyweight (28 bytes)
Section titled “NakFlyweight (28 bytes)”Offset Size Field Description------ ---- ----- -----------0 8 [Header] Base header (type=NAK)8 4 session_id i3212 4 stream_id i3216 4 term_id i32: term containing missing data20 4 term_offset i32: start of missing range24 4 nak_length i32: length of missing rangeOther Flyweights
Section titled “Other Flyweights”- SetupFlyweight (40 bytes) — session establishment with term_length, MTU, TTL
- RttMeasurementFlyweight (40 bytes) — echo_timestamp_ns + reception_delta_ns
- ErrorFlyweight (28+ bytes) — error_code + variable-length error string
Frame Types
Section titled “Frame Types”PAD=0x00, DATA=0x01, NAK=0x02, SM=0x03, ERR=0x04, SETUP=0x05, RTTM=0x06, RES=0x07SBE Message Format
Section titled “SBE Message Format”SBE (Simple Binary Encoding) messages follow the FIX Trading Community standard:
[MessageHeader: 8 bytes] block_length: u16 -- root block size in bytes template_id: u16 -- message type ID schema_id: u16 -- schema identifier version: u16 -- schema version[Root block: block_length bytes] Fixed fields in schema-defined order (zero-copy access)[Groups: variable] [GroupHeader: 4 bytes] (block_length: u16, num_in_group: u16) [Entry x num_in_group: block_length bytes each] (groups may nest -- each entry can contain sub-groups)[VarData: variable] [length: u32][data: length bytes]SbeEncoder writes into caller-provided buffers with zero heap allocations.
SbeDecoder reads via zero-copy: getBytes() returns pointers directly into the
underlying buffer. Decimal64 represents fixed-point prices (mantissa only on wire,
exponent in schema).
BroadcastBuffer Memory Layout
Section titled “BroadcastBuffer Memory Layout”1-to-N fan-out buffer for market data distribution. One transmitter, many receivers.
+-------------------------------------------+---------------------------+| Buffer Region (capacity bytes) | Trailer (4 cache lines) || [record][record][record]... | tail_intent (CL 0) || | tail (CL 1) || capacity must be a power of 2 | latest (CL 2) |+-------------------------------------------+---------------------------+
Record format: [i32 payload_length][i32 msg_type_id][payload...][padding to 8-byte alignment]
payload_length: actual payload size (excluding header) msg_type_id: user-defined type; 0 = padding record (skip on read)- BroadcastTransmitter: writes with two-phase commit (tail_intent, then tail)
- BroadcastReceiver: tracks its own cursor; detects lapping via tail_intent
- CopyBroadcastReceiver: copies payload to scratch buffer for safe retention
- Max message size:
(capacity / 8) - 8bytes
Flow Control Architecture
Section titled “Flow Control Architecture”Three flow control strategies matching Aeron’s design:
Receiver Status Messages (position + window)Sender ─────────────────────────────────────────── Receivers | | v |FlowControl.onStatusMessage() | | | +── MinFlowControl: sender_limit = min(all positions + windows) | Reliable multicast -- sender waits for slowest receiver | +── MaxFlowControl: sender_limit = sender_position + window | Best-effort -- sender never blocked, slow receivers lose data | +── TaggedFlowControl: sender_limit = min(tagged receivers only) Group-based -- risk checkers constrain, market data does notEach strategy tracks up to 16 receivers in a fixed-size array (zero allocation
on hot path). Stale receivers are timed out after receiver_timeout_ns.
Agent / IdleStrategy Threading Model
Section titled “Agent / IdleStrategy Threading Model”AgentRunner (dedicated thread) | v while (running) { work_count = agent.doWork() // poll channels, process messages idle_strategy.idle(work_count) // back off when idle }
IdleStrategy state machine (BackoffIdleStrategy): NOT_IDLE ──(no work)──> SPINNING ──(max_spins)──> YIELDING ──(max_yields)──> PARKING ^ | |──────────────────────(work_count > 0)──────────────────────────────────────+
CompositeAgent: aggregates work from multiple sub-agents into a single runner.DutyCycleTracker: measures cycle duration, max cycle time, work ratio.Congestion Control
Section titled “Congestion Control”AIMD (Additive Increase / Multiplicative Decrease) with RTT estimation:
Slow Start: cwnd += MSS per ACK (exponential growth)Congestion Avoidance: cwnd += MSS*MSS/cwnd (linear growth, ~1 MSS per RTT)On Loss (NAK): ssthresh = cwnd/2, cwnd = ssthreshOn Timeout: ssthresh = cwnd/2, cwnd = min_window, re-enter slow start
RTT Estimator (RFC 6298 EWMA): SRTT = 7/8 * SRTT + 1/8 * sample RTTVAR = 3/4 * RTTVAR + 1/4 * |SRTT - sample| RTO = SRTT + max(G, 4 * RTTVAR)
NakController: exponential backoff for NAK timing (delay = base * 2^backoff)WAL Record Format
Section titled “WAL Record Format”Write-Ahead Log for Raft consensus durability:
Offset Size Field Description------ ---- ----- -----------0 4 record_length u32: term(8) + index(8) + payload + crc(4)4 8 term u64: Raft term12 8 index u64: Raft log index20 variable payload Entry data20+N 4 crc32 u32: CRC32 over term + index + payloadTotal per-entry overhead: 24 bytes.
Sync policies: every_entry (safest), every_n_entries (batched), explicit.
Recovery: sequential scan, CRC validation, truncation of corrupt tail.
Snapshot Format
Section titled “Snapshot Format”Raft state snapshots for log compaction:
Offset Size Field Description------ ---- ----- -----------0 4 magic u32: 0x5A425350 ("ZBSP")4 2 version u16: 16 8 last_included_term u6414 8 last_included_index u6422 4 state_size u3226 variable state_data Application state machine bytes26+N 4 crc32 u32: CRC32 over header + state_dataFile naming: snapshot_{last_index}.zbsp. SnapshotManager triggers after
a configurable number of committed entries and supports old snapshot cleanup.
Data Flow: Broadcast Path
Section titled “Data Flow: Broadcast Path”BroadcastTransmitter.transmit(msg_type_id, payload) | vCalculate aligned record length | vCheck wrap-around: |-- fits: write record at current offset |-- wraps: insert padding record, write at offset 0 | vTwo-phase commit: 1. tail_intent_counter.store(new_tail, .release) 2. Write RecordHeader + payload into buffer 3. tail_counter.store(new_tail, .release) | vBroadcastReceiver.receiveNext() [each receiver independently] | vCheck tail vs cursor: |-- cursor >= tail: return null (no new data) |-- lapped (tail > cursor + capacity): skip forward, increment lapped_count | vRead RecordHeader: |-- padding (msg_type_id == 0): skip, loop |-- data: extract payload, advance cursor | vValidate (tail_intent <= cursor + capacity): data not overwritten | vReturn Message { msg_type_id, payload }