Skip to content

API Reference

All public types are exported from src/root.zig and accessible via @import("zigbolt").



const cache_line_size: usize; // 128 on modern CPUs
const page_size: usize; // 4096
const is_linux: bool;
const is_macos: bool;
const supports_hugepages: bool; // true on Linux
const supports_io_uring: bool; // true on Linux
const frame_alignment: u32 = 8;
const default_term_length: usize; // 1 << 20 (1 MB)
const default_ring_capacity: usize; // 1 << 16 (64K)
fn timestampNs() u64; // nanosecond timestamp
fn alignUp(size: u32, alignment: u32) u32;
const SharedRegion = struct {
base: [*]u8,
size: usize,
fn deinit(self: *SharedRegion) void;
};
const MemoryConfig = struct {
use_hugepages: bool = false,
pre_fault: bool = true,
};
fn createShared(name: [*:0]const u8, size: usize, config: MemoryConfig) !SharedRegion;
fn openShared(name: [*:0]const u8, size: usize) !SharedRegion;
fn prefault(region: SharedRegion) void;

Lock-free single-producer single-consumer ring buffer. capacity must be a power of 2.

const RB = zigbolt.SpscRingBuffer(1024);
var rb = RB.init();
MethodSignatureDescription
initfn init() SelfCreate a zeroed ring buffer
writefn write(self: *Self, data: []const u8, msg_type_id: i32) boolWrite a framed message. Returns false if full.
readfn read(self: *Self) ?ReadResultRead the next message. Returns null if empty.

ReadResult:

pub const ReadResult = struct {
data: []const u8,
msg_type_id: i32,
};

Lock-free multi-producer single-consumer ring buffer using CAS. capacity must be a power of 2.

const RB = zigbolt.MpscRingBuffer(1024);
var rb = RB.init();
MethodSignatureDescription
initfn init() SelfCreate a zeroed ring buffer
writefn write(self: *Self, data: []const u8, msg_type_id: i32) boolThread-safe write via CAS. Returns false if full.
readfn read(self: *Self) ?ReadResultSingle-consumer read. Returns null if empty or uncommitted.

Aeron-style triple-buffered log with term rotation.

const Buf = zigbolt.LogBuffer(.{ .term_length = 1 << 20 });
var buf = Buf.init();

LogBufferConfig:

pub const LogBufferConfig = struct {
term_length: usize = 1 << 20, // must be power of 2
};
MethodSignatureDescription
initfn init() SelfCreate a zeroed log buffer
claimfn claim(self: *Self, length: u32) ?ClaimClaim space for a message. Returns null if consumer is too far behind.
commitfn commit(self: *Self, c: Claim, msg_type_id: i32) voidCommit a claimed frame, making it visible to readers.
readfn read(self: *Self, handler: *const fn([]const u8, i32) void, limit: u32) u32Read committed frames, calling handler for each. Returns count.

Claim:

pub const Claim = struct {
term_buffer: [*]u8,
term_offset: u32,
length: u32,
term_id: u32,
};
pub const FrameHeader = extern struct {
frame_length: i32 = 0, // >0: data, <0: padding, =0: uncommitted
msg_type_id: i32 = 0,
pub const SIZE: u32 = 8;
};
fn alignedFrameLength(payload_length: u32) u32;
fn isPaddingFrame(frame_length: i32) bool;
fn isDataFrame(frame_length: i32) bool;
fn isUncommitted(frame_length: i32) bool;
const MAX_PAYLOAD_SIZE: u32 = 1 << 24; // 16 MB

Comptime-generated zero-copy codec for packed structs. T must be a packed struct with no pointer or slice fields. Wire size must be a multiple of 8 bytes.

const Codec = zigbolt.WireCodec(zigbolt.TickMessage);
MemberTypeDescription
wire_sizeusizeSize of the wire representation in bytes
TypetypeThe underlying message type
MethodSignatureDescription
encodefn encode(msg: *const T, buf: []u8) voidCopy message bytes into buffer
decodefn decode(buf: []const u8) *align(1) const TZero-copy: returns pointer into buffer
decodeMutfn decodeMut(buf: []u8) *align(1) TMutable zero-copy decode
batchDecodefn batchDecode(buf: []const u8, out: []T) u32Decode multiple messages
batchEncodefn batchEncode(msgs: []const T, buf: []u8) u32Encode multiple messages

TickMessage (32 bytes):

pub const TickMessage = packed struct {
timestamp_ns: u64,
symbol_id: u32,
price: i64,
volume: u64,
side: enum(u8) { bid = 0, ask = 1 },
_padding: u24 = 0,
};

OrderMessage (48 bytes):

pub const OrderMessage = packed struct {
timestamp_ns: u64,
order_id: u64,
symbol_id: u32,
price: i64,
quantity: u64,
side: enum(u8) { buy = 0, sell = 1 },
order_type: enum(u8) { limit = 0, market = 1, cancel = 2 },
_padding: u16 = 0,
};

pub const IpcConfig = struct {
term_length: usize = default_term_length, // power of 2
use_hugepages: bool = false, // Linux only
pre_fault: bool = true, // pre-fault pages
};

Shared-memory IPC channel. SPSC: one publisher, one subscriber.

MethodSignatureDescription
createfn create(name: [*:0]const u8, config: IpcConfig) !IpcChannelCreate a new channel (publisher side)
openfn open(name: [*:0]const u8, config: IpcConfig) !IpcChannelOpen an existing channel (subscriber side)
publishfn publish(self: *IpcChannel, data: []const u8, msg_type_id: i32) !voidPublish a message
pollfn poll(self: *IpcChannel, handler: *const fn(ReadResult) void, limit: u32) u32Poll for messages. Returns count.
deinitfn deinit(self: *IpcChannel) voidClose and release resources

ReadResult:

pub const ReadResult = struct {
data: []const u8,
msg_type_id: i32,
};

Errors:

  • error.InvalidChannel — magic number mismatch on open
  • error.UnsupportedVersion — protocol version mismatch
  • error.MessageTooLarge — payload exceeds MAX_PAYLOAD_SIZE

pub const UdpConfig = struct {
bind_address: std.net.Address,
remote_address: ?std.net.Address = null,
multicast_group: ?[4]u8 = null,
send_buffer_size: u32 = 2 * 1024 * 1024, // 2 MB
recv_buffer_size: u32 = 2 * 1024 * 1024, // 2 MB
non_blocking: bool = true,
};

UDP unicast and multicast channel.

MethodSignatureDescription
initfn init(config: UdpConfig) !UdpChannelCreate and bind a UDP socket
deinitfn deinit(self: *UdpChannel) voidClose the socket
sendfn send(self: *UdpChannel, data: []const u8, dest: ?net.Address) !usizeSend a raw datagram
recvfn recv(self: *UdpChannel, buf: []u8) !?RecvResultReceive a raw datagram (non-blocking)
sendFramefn sendFrame(self: *UdpChannel, data: []const u8, msg_type_id: i32, dest: ?net.Address) !voidSend a framed message (FrameHeader + payload)
recvFramefn recvFrame(self: *UdpChannel, buf: []u8) !?FrameRecvResultReceive and parse a framed message

RecvResult:

pub const RecvResult = struct {
data: []const u8,
from: std.net.Address,
};

FrameRecvResult:

pub const FrameRecvResult = struct {
payload: []const u8,
msg_type_id: i32,
from: std.net.Address,
};

pub const NetworkConfig = struct {
udp: UdpConfig,
session_id: u32 = 1,
stream_id: u32 = 1,
send_buffer_capacity: usize = 4096,
recv_window_size: u64 = 4096,
flow_control_window: i64 = 4 * 1024 * 1024, // 4 MB
mtu: u32 = 1472,
max_message_size: u32 = 1 << 20,
heartbeat_interval_ns: u64 = 100_000_000, // 100 ms
nak_delay_ns: u64 = 1_000_000, // 1 ms
};

Reliable, ordered network channel. Combines UDP, NAK reliability, flow control, and fragmentation.

MethodSignatureDescription
initfn init(allocator: Allocator, config: NetworkConfig) !NetworkChannelInitialize all sub-components
deinitfn deinit(self: *NetworkChannel) voidRelease all resources
publishfn publish(self: *NetworkChannel, data: []const u8, msg_type_id: i32) !voidPublish with reliability and flow control
pollfn poll(self: *NetworkChannel, handler: *const fn([]const u8) void, limit: u32) !u32Poll for complete messages

Errors:

  • error.BackPressured — flow control window exhausted

pub const NetworkHeader = extern struct {
version: u8 = 1,
header_type: HeaderType,
session_id: u32,
stream_id: u32,
sequence: u64,
payload_length: u32,
_reserved: [3]u8 = .{0, 0, 0},
pub const HeaderType = enum(u8) { data, nak, heartbeat, setup, teardown };
pub const SIZE: usize;
};
pub const NakMessage = extern struct {
session_id: u32,
stream_id: u32,
from_sequence: u64,
count: u32,
_padding: [4]u8,
};

Stores sent payloads for retransmission on NAK.

MethodSignatureDescription
initfn init(allocator: Allocator, capacity: usize) !SendBufferAllocate entry ring
deinitfn deinit(self: *SendBuffer, allocator: Allocator) voidFree all entries
storefn store(self: *SendBuffer, sequence: u64, data: []const u8, allocator: Allocator) !voidStore a copy for retransmit
getfn get(self: *SendBuffer, sequence: u64) ?*SendEntryLook up by sequence
releasefn release(self: *SendBuffer, up_to_sequence: u64) voidRelease acknowledged entries

Bitmap-based gap detection.

MethodSignatureDescription
initfn init(allocator: Allocator, window_size: u64) !RecvTrackerAllocate bitmap
deinitfn deinit(self: *RecvTracker) voidFree bitmap
recordReceivedfn recordReceived(self: *RecvTracker, sequence: u64) ?GapInfoRecord a sequence, return gap if detected
getMissingfn getMissing(self: *RecvTracker, allocator: Allocator) ![]u64List all missing sequences in window
slideWindowfn slideWindow(self: *RecvTracker, new_base: u64) voidAdvance the window forward

Credit-based flow control.

MethodSignatureDescription
initfn init(window_size: i64) FlowControlInitialize with credit window
tryConsumefn tryConsume(self: *FlowControl, bytes: usize) boolAtomically consume credits
replenishfn replenish(self: *FlowControl, bytes: usize) voidAdd credits back
availablefn available(self: *FlowControl) i64Current available credits

Splits large messages into MTU-sized fragments.

Collects fragments and delivers complete messages.

pub const FragmentConfig = struct {
mtu: u32 = 1472,
max_message_size: u32 = 1 << 20,
};

Typed publisher using WireCodec(MsgType) over IPC.

var pub = zigbolt.Publisher(TickMessage).init(&channel, 1);
try pub.offer(&tick_msg);
MethodSignatureDescription
initfn init(channel: *IpcChannel, msg_type_id: i32) SelfBind to a channel
offerfn offer(self: *Self, msg: *const MsgType) !voidPublish a typed message
tryOfferfn tryOffer(self: *Self, msg: *const MsgType) boolNon-blocking publish, returns false on back-pressure
offerRawfn offerRaw(self: *Self, data: []const u8) !voidPublish pre-encoded bytes

Untyped publisher for raw byte messages.

MethodSignatureDescription
initfn init(channel: *IpcChannel, msg_type_id: i32) RawPublisherBind to a channel
offerfn offer(self: *RawPublisher, data: []const u8) !voidPublish raw bytes

Typed subscriber using WireCodec(MsgType) over IPC.

var sub = zigbolt.Subscriber(TickMessage).init(&channel, 1);
_ = sub.poll(&handleTick, 100);
MethodSignatureDescription
initfn init(channel: *IpcChannel, msg_type_id: i32) SelfBind to a channel
pollfn poll(self: *Self, handler: *const fn(*const MsgType) void, limit: u32) u32Poll and decode messages
pollRawfn pollRaw(self: *Self, handler: *const fn(IpcChannel.ReadResult) void, limit: u32) u32Poll raw frames

Untyped subscriber for raw byte messages.

MethodSignatureDescription
initfn init(channel: *IpcChannel) RawSubscriberBind to a channel
pollfn poll(self: *RawSubscriber, handler: *const fn(IpcChannel.ReadResult) void, limit: u32) u32Poll raw frames

pub const TransportConfig = struct {
term_length: usize = 1 << 20,
use_hugepages: bool = false,
pre_fault: bool = true,
};

Main entry point. Manages IPC channels and creates typed publishers/subscribers.

MethodSignatureDescription
initfn init(allocator: Allocator, config: TransportConfig) TransportCreate a transport
deinitfn deinit(self: *Transport) voidShut down all channels
addPublicationfn addPublication(self, comptime MsgType, name: [:0]const u8, msg_type_id: i32) !Publisher(MsgType)Create a typed publisher
addSubscriptionfn addSubscription(self, comptime MsgType, name: [:0]const u8, msg_type_id: i32) !Subscriber(MsgType)Create a typed subscriber
addRawPublicationfn addRawPublication(self, name: [:0]const u8, msg_type_id: i32) !RawPublisherCreate a raw publisher
addRawSubscriptionfn addRawSubscription(self, name: [:0]const u8) !RawSubscriberCreate a raw subscriber

pub const ArchiveConfig = struct {
segment_size: usize = 256 * 1024 * 1024, // 256 MB
base_path: []const u8 = "/tmp/zigbolt/archive",
sync_policy: SyncPolicy = .periodic,
sync_interval_ms: u32 = 1000,
compression: ?CompressionAlgo = null,
pub const SyncPolicy = enum { none, periodic, every_segment };
pub const CompressionAlgo = enum { lz4, zstd };
};

Segment-based message recording and replay.

MethodSignatureDescription
initfn init(allocator: Allocator, config: ArchiveConfig) !ArchiveInitialize archive
deinitfn deinit(self: *Archive) voidRelease resources
recordfn record(self: *Archive, stream_id: u32, msg_type_id: i32, data: []const u8, timestamp_ns: u64) !voidRecord a message
replayfn replay(self: *Archive, params: ReplayParams, handler: *const fn(Record) void) !u64Replay messages. Returns count.
statsfn stats(self: *const Archive) StatsGet archive statistics

ReplayParams:

pub const ReplayParams = struct {
stream_id: ?u32 = null, // null = all streams
from_segment: u64 = 0,
from_offset: u64 = 0,
limit: ?u64 = null,
};

Stats:

pub const Stats = struct {
total_records: u64,
total_bytes: u64,
segment_count: u64,
};

Atomic total-order sequence assignment.

var seq = zigbolt.Sequencer.init(.{ .initial_sequence = 0 });
const event = seq.sequence(stream_id, payload);
MethodSignatureDescription
initfn init(config: SequencerConfig) SequencerInitialize sequencer
sequencefn sequence(self: *Sequencer, stream_id: u32, payload: []const u8) SequencedEventAssign next sequence number (thread-safe)
peekNextSequencefn peekNextSequence(self: *const Sequencer) u64Read next sequence without consuming
resetfn reset(self: *Sequencer, initial_sequence: u64) voidReset for testing/replay

SequencedEvent:

pub const SequencedEvent = struct {
sequence: u64,
timestamp_ns: u64,
stream_id: u32,
payload: []const u8,
};

Merges multiple input streams into one globally ordered output.

MethodSignatureDescription
initfn init(config: SequencerConfig) MultiStreamSequencerInitialize
sequenceFromfn sequenceFrom(self, stream_id: u32, payload: []const u8) SequencedEventSequence from a specific stream
getStreamStatsfn getStreamStats(self, stream_id: u32) StreamStatsPer-stream statistics
totalEventsfn totalEvents(self) u64Total events across all streams

Maps sequence numbers to stream/offset for replay.

MethodSignatureDescription
initfn init(allocator: Allocator) SequenceIndexInitialize
deinitfn deinit(self: *SequenceIndex) voidFree memory
addfn add(self, entry: IndexEntry) !voidAdd an index entry
lookupfn lookup(self, seq: u64) ?IndexEntryLook up by sequence number
rangeFromfn rangeFrom(self, from_sequence: u64) []const IndexEntryGet all entries from a sequence

pub const RaftConfig = struct {
node_id: u32,
peer_count: u32,
election_timeout_min_ms: u32 = 150,
election_timeout_max_ms: u32 = 300,
heartbeat_interval_ms: u32 = 50,
};

Full Raft consensus implementation.

MethodSignatureDescription
initfn init(allocator: Allocator, config: RaftConfig) !RaftNodeInitialize as follower
deinitfn deinit(self: *RaftNode) voidFree resources
handleMessagefn handleMessage(self, from: u32, msg: RaftMessage) ?MessageResponseHandle incoming Raft message
startElectionfn startElection(self) RaftMessageBegin leader election
proposefn propose(self, data: []const u8) !u64Propose a log entry (leader only)
createAppendEntriesfn createAppendEntries(self, peer_id: u32) AppendEntriesCreate replication message for peer
createHeartbeatfn createHeartbeat(self) AppendEntriesCreate empty heartbeat
getApplicableEntriesfn getApplicableEntries(self) []const StoredEntryGet committed but unapplied entries
markAppliedfn markApplied(self, up_to: u64) voidMark entries as applied
updateCommitIndexfn updateCommitIndex(self) voidRecalculate commit index from match_index

NodeState: enum { follower, candidate, leader }

RaftMessage:

pub const RaftMessage = union(enum) {
request_vote: RequestVote,
request_vote_response: RequestVoteResponse,
append_entries: AppendEntries,
append_entries_response: AppendEntriesResponse,
};
pub const ClusterConfig = struct {
node_id: u32,
peer_count: u32,
election_timeout_min_ms: u32 = 150,
election_timeout_max_ms: u32 = 300,
heartbeat_interval_ms: u32 = 50,
};

User-implemented interface for applying committed entries.

pub const StateMachine = struct {
apply_fn: *const fn (entry: []const u8) void,
snapshot_fn: ?*const fn () []const u8 = null,
restore_fn: ?*const fn (snapshot: []const u8) void = null,
};

High-level cluster that wraps RaftNode and a StateMachine.

MethodSignatureDescription
initfn init(allocator: Allocator, config: ClusterConfig, sm: ?StateMachine) !ClusterInitialize
deinitfn deinit(self: *Cluster) voidShut down
proposefn propose(self, data: []const u8) !u64Propose command (leader only)
handleMessagefn handleMessage(self, from: u32, msg: RaftMessage) ?MessageResponseProcess message
tickfn tick(self: *Cluster) voidApply committed entries to state machine
isLeaderfn isLeader(self) boolCheck leadership
getStatefn getState(self) NodeStateCurrent Raft state

Persistent WAL for Raft consensus. Each entry is CRC32-validated on disk.

var wal = try WriteAheadLog.init(allocator, .{
.path = "zigbolt_raft.wal",
.sync_policy = .every_n_entries,
.sync_interval = 100,
});
defer wal.deinit();

WalConfig:

pub const WalConfig = struct {
path: []const u8 = "zigbolt_raft.wal",
sync_policy: SyncPolicy = .every_n_entries,
sync_interval: u32 = 100,
};
pub const SyncPolicy = enum { every_entry, every_n_entries, explicit };
MethodSignatureDescription
initfn init(allocator: Allocator, config: WalConfig) !WriteAheadLogCreate or open a WAL file
deinitfn deinit(self: *WriteAheadLog) voidSync and close
appendfn append(self, term: u64, index: u64, data: []const u8) !voidAppend a CRC32-validated entry
readEntryfn readEntry(self, log_index: u64) !?WalEntryRead entry by log index
truncateFromfn truncateFrom(self, from_index: u64) !voidRemove entries >= from_index
recoverfn recover(self) ![]WalEntryScan file, rebuild index, return valid entries
flushfn flush(self) !voidForce fsync to disk
lastIndexfn lastIndex(self) u64Last written log index
lastTermfn lastTerm(self) u64Term of last entry
entryCountfn entryCount(self) u64Number of entries

WalEntry:

pub const WalEntry = struct {
term: u64,
index: u64,
data: []const u8,
};

Persistent Raft vote state (16-byte file).

MethodSignatureDescription
savefn save(self: VoteState, path: []const u8) !voidAtomically save to file
loadfn load(path: []const u8) !?VoteStateLoad from file, null if missing

Manages Raft snapshots on disk with CRC32 validation.

var mgr = SnapshotManager.init(allocator, .{
.base_path = "/var/lib/zigbolt/snapshots",
.snapshot_interval = 10000,
});
defer mgr.deinit();
MethodSignatureDescription
initfn init(allocator: Allocator, config: SnapshotConfig) SnapshotManagerInitialize
deinitfn deinit(self: *SnapshotManager) voidCleanup
shouldSnapshotfn shouldSnapshot(self) boolTrue if interval reached
onEntryCommittedfn onEntryCommitted(self) voidTrack committed entries
takeSnapshotfn takeSnapshot(self, last_term: u64, last_index: u64, state_data: []const u8) !voidWrite snapshot to disk
loadLatestSnapshotfn loadLatestSnapshot(self) !?SnapshotDataLoad newest snapshot
getLatestMetafn getLatestMeta(self) ?SnapshotMetaMetadata without loading state
cleanOldSnapshotsfn cleanOldSnapshots(self, keep_count: usize) !voidDelete all but N newest

SnapshotData (caller must call deinit()):

pub const SnapshotData = struct {
last_included_term: u64,
last_included_index: u64,
data: []u8,
allocator: std.mem.Allocator,
pub fn deinit(self: *SnapshotData) void;
};

Encodes SBE messages into caller-provided byte buffers. Zero heap allocations.

var buf: [4096]u8 = undefined;
var enc = SbeEncoder.init(&buf);
const hdr_pos = try enc.putMessageHeader(42, 1, 1);
try enc.putU64(timestamp);
try enc.putI64(price);
enc.finishHeader(hdr_pos);
const wire_bytes = buf[0..enc.encodedLength()];
MethodSignatureDescription
initfn init(buf: []u8) SbeEncoderInitialize over buffer
encodedLengthfn encodedLength(self) usizeBytes written so far
putMessageHeaderfn putMessageHeader(self, template_id: u16, schema_id: u16, version: u16) !usizeWrite 8-byte header, returns position for finishHeader
finishHeaderfn finishHeader(self, header_pos: usize) voidPatch block_length after root fields
putU8..putU64fn putU64(self, val: u64) !voidWrite unsigned integers
putI8..putI64fn putI64(self, val: i64) !voidWrite signed integers
putF32/putF64fn putF64(self, val: f64) !voidWrite floats
putCharfn putChar(self, val: u8) !voidWrite character
putBytesfn putBytes(self, data: []const u8) !voidWrite fixed-length bytes
putEnumfn putEnum(self, comptime E: type, val: E) !voidWrite enum as integer
beginGroupfn beginGroup(self, block_length: u16, count: u16) !voidWrite group header
putVarDatafn putVarData(self, data: []const u8) !voidWrite [u32 len][data]

Zero-copy SBE decoder. Returns pointers directly into the underlying buffer.

MethodSignatureDescription
initfn init(buf: []const u8) SbeDecoderInitialize over buffer
positionfn position(self) usizeCurrent read position
remainingfn remaining(self) usizeBytes left
skipfn skip(self, n: usize) !voidAdvance position
getMessageHeaderfn getMessageHeader(self) !MessageHeaderRead 8-byte header
getGroupHeaderfn getGroupHeader(self) !GroupHeaderRead 4-byte group header
getU8..getU64fn getU64(self) !u64Read unsigned integers
getI8..getI64fn getI64(self) !i64Read signed integers
getF32/getF64fn getF64(self) !f64Read floats
getBytesfn getBytes(self, comptime N: usize) !*const [N]u8Zero-copy fixed bytes
getBytesSlicefn getBytesSlice(self, n: usize) ![]const u8Zero-copy runtime-length bytes
getEnumfn getEnum(self, comptime E: type) !ERead enum
getVarDatafn getVarData(self) ![]const u8Zero-copy variable-length data

Fixed-point decimal for financial prices. Only the mantissa is transmitted on the wire.

MethodSignatureDescription
fromFloatfn fromFloat(val: f64, exp: i8) Decimal64Construct from float
toFloatfn toFloat(self) f64Convert to f64
isNullfn isNull(self) boolCheck null sentinel
nullValuefn nullValue() Decimal64Create null sentinel

SBE-encoded FIX protocol messages in src/codec/fix_messages.zig.

pub const Side = enum(u8) { buy = 1, sell = 2 };
pub const OrdType = enum(u8) { market = 1, limit = 2, stop = 3, stop_limit = 4 };
pub const TimeInForce = enum(u8) { day = 0, gtc = 1, ioc = 3, gtd = 6 };
pub const ExecType = enum(u8) { new = 0, fill = 1, partial_fill = 2, canceled = 4, rejected = 8 };
pub const OrdStatus = enum(u8) { new = 0, partially_filled = 1, filled = 2, canceled = 4, rejected = 8 };
pub const MDUpdateAction = enum(u8) { new = 0, change = 1, delete = 2 };
pub const MDEntryType = enum(u8) { bid = 0, offer = 1, trade = 2 };
MessageTemplate IDBlock SizeFields
NewOrderSingle157 bytescl_ord_id, account, symbol, side, transact_time, order_qty, ord_type, price, stop_px, time_in_force
ExecutionReport289 bytesorder_id, cl_ord_id, exec_id, ord_status, exec_type, symbol, side, leaves_qty, cum_qty, avg_px, transact_time, text_len
Heartbeat516 bytestest_req_id, timestamp_ns
Logon620 bytesheart_bt_int, encrypt_method, reset_seq_num_flag, timestamp_ns
MessageTemplate IDDescription
MarketDataIncrementalRefresh3MD entries group (action, type, symbol, price, size, etc.)
MassQuote4Quote sets group, each with nested quote entries

Each group-based message provides an encode() method (returns SbeEncoder for streaming) and a decode() method (returns SbeDecoder positioned after the root block).


Aeron-compatible flyweights in src/protocol/flyweight.zig. Each wraps a []u8 buffer.

MethodSignatureDescription
wrapfn wrap(buf: []u8) DataHeaderFlyweightWrap existing buffer
initfn init(buf: []u8) DataHeaderFlyweightWrap and set type=DATA
frameLength/setFrameLengthi32Total frame size
flags/setFlagsu8BEGIN/END/EOS flags
termOffset/setTermOffsetu32Offset in term
sessionId/setSessionIdi32Session identifier
streamId/setStreamIdi32Stream identifier
termId/setTermIdi32Term identifier
reservedValue/setReservedValuei64User metadata
payloadfn payload(self) []u8Payload region after header
isBeginMessage/isEndMessage/isEndOfStreamboolFlag checks
MethodSignatureDescription
sessionId/streamIdi32Identifiers
consumptionTermId/consumptionTermOffseti32Consumption position
receiverWindowLengthi32Advertised window
receiverIdi64Unique receiver ID
MethodSignatureDescription
sessionId/streamId/termIdi32Identifiers
termOffseti32Start of missing range
nakLengthi32Length of missing range

SetupFlyweight (40 bytes), RttMeasurementFlyweight (40 bytes), ErrorFlyweight (28+ bytes)

Section titled “SetupFlyweight (40 bytes), RttMeasurementFlyweight (40 bytes), ErrorFlyweight (28+ bytes)”

All follow the same pattern: wrap(buf), init(buf), typed getters/setters.

FunctionSignatureDescription
computePositionfn computePosition(term_offset, term_id, shift, initial_term_id) i64Absolute position from term addressing
computeTermIdFromPositionfn computeTermIdFromPosition(position, shift, initial_term_id) i32Term ID from position
computeTermOffsetFromPositionfn computeTermOffsetFromPosition(position, shift) i32Offset from position

Single-producer transmitter for 1-to-N messaging.

var buf: [1024 + TRAILER_LENGTH]u8 align(cache_line_size) = [_]u8{0} ** (1024 + TRAILER_LENGTH);
var tx = BroadcastTransmitter.init(&buf);
tx.transmit(42, "market data update");
MethodSignatureDescription
initfn init(buf: []u8) BroadcastTransmitterInitialize (capacity must be power of 2)
transmitfn transmit(self, msg_type_id: i32, msg: []const u8) voidTransmit a message (always succeeds, old data overwritten)
calculateMaxMessageLengthfn calculateMaxMessageLength(self) u32Max payload size: (capacity / 8) - 8

Per-consumer receiver. Each maintains its own cursor.

MethodSignatureDescription
initfn init(buf: []const u8) BroadcastReceiverJoin from current tail position
receiveNextfn receiveNext(self) ?MessageRead next message, or null if none
validatefn validate(self) boolCheck data not overwritten
lappedCountfn lappedCount(self) u64Times receiver was lapped

Wrapper that copies payload to internal scratch buffer for safe retention.

MethodSignatureDescription
initfn init(buf: []const u8) CopyBroadcastReceiverInitialize
receiveNextfn receiveNext(self) ?MessageReceive with copy to scratch
lappedCountfn lappedCount(self) u64Times lapped

Tagged union dispatching to concrete strategies via idle(work_count) and reset().

var strategy = idle_strategy.backoff();
strategy.idle(0); // no work -> back off
strategy.idle(1); // work done -> reset to active
StrategyLatencyCPUDescription
BusySpinIdleStrategyLowestHighestHardware PAUSE instruction
YieldingIdleStrategyLowHighThread.yield()
SleepingIdleStrategyMediumLowThread.sleep(N)
BackoffIdleStrategyAdaptiveAdaptiveNOT_IDLE -> SPINNING -> YIELDING -> PARKING
NoOpIdleStrategyN/AN/ADoes nothing

Convenience constructors: busySpin(), yielding(), sleeping(ns), backoff(), noOp().


Function-pointer-based agent interface for composable units of work.

pub const AgentFn = struct {
doWorkFn: *const fn (ctx: *anyopaque) u32, // returns work count
onStartFn: ?*const fn (ctx: *anyopaque) void, // lifecycle start
onCloseFn: ?*const fn (ctx: *anyopaque) void, // lifecycle close
ctx: *anyopaque,
name: []const u8,
};

Runs an agent on a dedicated thread with an idle strategy.

MethodSignatureDescription
initfn init(agent: AgentFn, idle: IdleStrategy) AgentRunnerCreate runner
startfn start(self) !voidStart agent on new thread
stopfn stop(self) voidStop agent and join thread
isRunningfn isRunning(self) boolCheck if running
errorCountfn errorCount(self) u64Error counter

Combines multiple agents. Returns sum of work from all sub-agents.

MethodSignatureDescription
initfn init(agents: []const AgentFn) CompositeAgentCreate composite
agentFnfn agentFn(self) AgentFnGet AgentFn interface

Measures cycle performance for monitoring and tuning.

MethodSignatureDescription
cycleStartfn cycleStart(self) voidRecord cycle start
cycleEndfn cycleEnd(self, work_count: u32) voidRecord cycle end
averageCycleNsfn averageCycleNs(self) u64Recent cycle duration
workRatiofn workRatio(self) f64Ratio of busy vs idle cycles (0.0—1.0)

Lightweight atomic i64 counter handle for hot-path instrumentation.

MethodSignatureDescription
incrementfn increment(self) voidAtomic +1 (monotonic)
incrementByfn incrementBy(self, n: i64) voidAtomic +n
decrementfn decrement(self) voidAtomic -1
getfn get(self) i64Load (acquire)
setfn set(self, val: i64) voidStore (release)
getAndResetfn getAndReset(self) i64Swap to 0 (acq_rel)

Fixed-capacity set of named atomic counters (max 64).

MethodSignatureDescription
initfn init() CounterSetZero-initialized set
allocatefn allocate(self, counter_type: CounterType, name: []const u8) ?CounterAllocate counter slot
getByTypefn getByType(self, counter_type: CounterType) ?CounterLook up by type
forEachfn forEach(self, callback) voidIterate all active counters
snapshotfn snapshot(self, out: []CounterSnapshot) u32Copy all values
resetAllfn resetAll(self) voidReset all to zero

System-wide registry organized by subsystem (IPC, Network, Reliability, Archive, Cluster, Sequencer, System).

MethodSignatureDescription
initfn init() GlobalCountersEmpty counter sets
initWithDefaultsfn initWithDefaults() GlobalCountersPre-register all standard counters
formatReportfn formatReport(self, buf: []u8) []const u8Human-readable report

AIMD congestion control with slow start and congestion avoidance phases.

var cc = CongestionControl.init(.{
.initial_window = 64 * 1024,
.max_window = 16 * 1024 * 1024,
.min_window = 4 * 1024,
.mss = 1460,
.initial_ssthresh = 1024 * 1024,
});
MethodSignatureDescription
initfn init(cfg: CongestionConfig) CongestionControlInitialize
onAckfn onAck(self, bytes_acked: u64) voidWindow increase (slow start or CA)
onLossfn onLoss(self) voidMultiplicative decrease
onTimeoutfn onTimeout(self) voidReset to min_window, re-enter slow start
canSendfn canSend(self, bytes: u64) boolCheck window allows sending
onSendfn onSend(self, bytes: u64) voidRecord bytes in flight
availableWindowfn availableWindow(self) u64Bytes available in window

RFC 6298 EWMA-based RTT estimation.

MethodSignatureDescription
initfn init() RttEstimatorInitialize (1s initial RTO)
updatefn update(self, rtt_ns: u64) voidRecord RTT sample
retransmitTimeoutfn retransmitTimeout(self) u64Current RTO (ns)
smoothedRttfn smoothedRtt(self) u64Current SRTT (ns)

Exponential backoff for NAK timing.

MethodSignatureDescription
initfn init(config: NakConfig) NakControllerInitialize
shouldSendNakfn shouldSendNak(self, now_ns: u64) boolCheck if enough time elapsed
onNakSentfn onNakSent(self, now_ns: u64) voidRecord NAK sent, increase backoff
onGapFilledfn onGapFilled(self) voidReset state for reuse
isExhaustedfn isExhausted(self) boolMax retransmits exceeded
currentDelayfn currentDelay(self) u64Current delay with backoff (ns)

Unified flow control dispatching to Min, Max, or Tagged strategy.

var fc = FlowControl.init(.{ .strategy = .min, .receiver_timeout_ns = 5_000_000_000 });
const new_limit = fc.onStatusMessage(status, sender_limit, initial_term_id, shift, now_ns);
MethodSignatureDescription
initfn init(cfg: FlowControlConfig) FlowControlCreate from config
onStatusMessagefn onStatusMessage(self, status, sender_limit, initial_term_id, shift, now_ns) i64Process receiver status, return new sender limit
onIdlefn onIdle(self, now_ns, sender_limit, sender_position, is_eos) i64Remove stale receivers, return current limit
hasRequiredReceiversfn hasRequiredReceivers(self) boolCheck for active receivers

Sender limit = minimum position across all active receivers. Guarantees no receiver left behind.

Sender always advances. No back-pressure. Suitable for market data where stale quotes are worthless.

Only receivers matching required_group_tag constrain the sender. Untagged receivers are tracked but do not limit.

pub const ReceiverStatus = struct {
session_id: i32,
stream_id: i32,
consumption_term_id: i32,
consumption_term_offset: i32,
receiver_window_length: i32,
receiver_id: i64,
timestamp_ns: u64,
};

Tracks segment metadata with time-range and stream queries.

MethodSignatureDescription
initfn init(allocator: Allocator, base_path: []const u8) !CatalogInitialize
deinitfn deinit(self: *Catalog) voidFree entries
addEntryfn addEntry(self, entry: CatalogEntry) !voidAdd segment metadata
updateEntryfn updateEntry(self, segment_id: u32, entry: CatalogEntry) !voidUpdate existing
getEntryfn getEntry(self, segment_id: u32) ?CatalogEntryLook up by ID
findByTimestampfn findByTimestamp(self, from_ns: u64, to_ns: u64) []const CatalogEntryTime range query
findByStreamfn findByStream(self, stream_id: u32) ![]CatalogEntryStream filter (caller frees)
savefn save(self) !voidPersist to disk
loadfn load(allocator: Allocator, path: []const u8) !CatalogLoad from disk
totalRecordsfn totalRecords(self) u64Sum of record counts
totalBytesfn totalBytes(self) u64Sum of payload bytes
segmentCountfn segmentCount(self) u32Number of segments

CatalogEntry (56 bytes serialized):

pub const CatalogEntry = struct {
segment_id: u32,
start_offset: u64,
end_offset: u64,
start_timestamp_ns: u64,
end_timestamp_ns: u64,
stream_id: u32,
record_count: u32,
total_bytes: u64,
closed: bool,
};

Indexes every Nth record for fast binary-search lookup within segments.

MethodSignatureDescription
initfn init(allocator: Allocator, segment_id: u32, interval: u32) SparseIndexInitialize
deinitfn deinit(self: *SparseIndex) voidFree entries
recordfn record(self, seq: u32, offset: u64, timestamp_ns: u64, stream_id: u32) !voidRecord an entry (indexes every Nth)
findByTimestampfn findByTimestamp(self, timestamp_ns: u64) ?IndexEntryBinary search by timestamp
findBySequencefn findBySequence(self, record_seq: u32) ?IndexEntryBinary search by sequence
savefn save(self, base_path: []const u8) !voidSave to disk
loadfn load(allocator: Allocator, base_path: []const u8, segment_id: u32) !SparseIndexLoad from disk
rebuildfn rebuild(allocator, segment_file, segment_id, interval) !SparseIndexRebuild by scanning segment

IndexEntry (24 bytes serialized):

pub const IndexEntry = struct {
record_seq: u32,
file_offset: u64,
timestamp_ns: u64,
stream_id: u32,
};

LZ4-style compression with hash-table-based matching. 64 KB sliding window.

MethodSignatureDescription
initfn init(allocator: Allocator) !CompressorAllocate hash table
deinitfn deinit(self, allocator: Allocator) voidFree hash table
compressfn compress(self, src: []const u8, dst: []u8) !usizeCompress into buffer, returns bytes written
maxCompressedSizefn maxCompressedSize(input_size: usize) usizeWorst-case output size
MethodSignatureDescription
decompressfn decompress(src: []const u8, dst: []u8) !usizeDecompress, returns bytes written
FunctionSignatureDescription
compressFramefn compressFrame(allocator, src: []const u8) ![]u8Compress with 16-byte header + CRC32
decompressFramefn decompressFrame(allocator, frame_data: []const u8) ![]u8Decompress and validate checksum

CompressedFrame (16-byte header):

pub const CompressedFrame = struct {
magic: u32, // 0x5A424C5A ("ZBLZ")
original_size: u32,
compressed_size: u32,
checksum: u32, // CRC32 of original data
};

C-ABI functions exported from src/ffi/exports.zig:

FunctionSignatureDescription
zigbolt_transport_create(term_length: u32, use_hugepages: u8, pre_fault: u8) ?*anyopaqueCreate transport
zigbolt_transport_destroy(handle: ?*anyopaque) voidDestroy transport
zigbolt_ipc_create(name: ?[*:0]const u8, term_length: u32) ?*anyopaqueCreate IPC channel
zigbolt_ipc_open(name: ?[*:0]const u8, term_length: u32) ?*anyopaqueOpen IPC channel
zigbolt_ipc_destroy(handle: ?*anyopaque) voidDestroy IPC channel
zigbolt_publish(handle: ?*anyopaque, data: ?[*]const u8, len: u32, msg_type_id: i32) i32Publish (0=success)
zigbolt_poll(handle: ?*anyopaque, callback: ?FragmentHandlerFn, limit: u32) u32Poll messages
zigbolt_version_major() u32Major version (0)
zigbolt_version_minor() u32Minor version (1)
zigbolt_version_patch() u32Patch version (0)