Skip to content

Telemetry streaming

Telemetry streaming is the SDK’s high-rate, one-way egress path. Your code appends records to an embedded, persistent (or in-memory) buffer; a background export engine batches them, retries on failure, and checkpoints on acknowledgement to Kinesis or Kafka. The buffer keeps accepting records while the network or sink is down, so a cloud disconnect never blocks your hot loop or loses durably-buffered data.

All four languages bind the same shared Rust core (ggstreamlog): Rust links it directly, while Java (FFM / Panama), Python (PyO3), and TypeScript (napi-rs) reach it through native bindings.

Reach for streaming when you have high-rate, fire-and-forget telemetry (sensor readings, metrics, events) that must reach Kinesis or Kafka with at-least-once delivery and must survive intermittent connectivity. It is deliberately one-way: there is no request/reply and no server-to-device path. For command-and-control or low-rate messaging, use the Messaging guide instead.

  1. Add a streaming section to your component config. Each entry under streaming.streams[] requires a name and a sink; buffer, batch, and delivery are optional and default to sensible values.

    {
    "component": { "name": "com.example.Telemetry" },
    "streaming": {
    "streams": [
    {
    "name": "telemetry",
    "sink": { "type": "kinesis", "streamName": "edge-{ThingName}" },
    "buffer": {
    "type": "disk",
    "path": "/var/lib/ggcommons/streams/telemetry",
    "maxDiskBytes": 1073741824,
    "onFull": "block",
    "fsync": "perBatch"
    }
    }
    ]
    }
    }

    Template variables ({ThingName}, {ComponentName}) in buffer.path, the Kinesis streamName, and the Kafka bootstrapServers / topic are resolved before the stream opens. A {"$secret": ...} reference anywhere in the streaming section is resolved from the credentials vault first.

  2. Make the native core available for your language (see the table below).

  3. Get the stream service from the runtime and append records (next section).

Language How the core is provided Runtime requirement
Java cdylib ggstreamlog loaded once per process (via -Dggstreamlog.library.path, java.library.path, or a bundled classpath resource) Run with --enable-native-access=ALL-UNNAMED (the sink/log bridges use upcall stubs)
Python PyO3 wheel ggstreamlog_native, imported lazily Install the wheel; failure surfaces on first use as GgStreamError(-1, ...)
TypeScript napi addon @mbreissi/ggstreamlog-node, require-d lazily on first open Install the addon; a component that never streams never loads it
Rust Linked directly — compile-time via cargo features Build with streaming plus a sink feature (below)

For Rust, the streaming feature enables the module and accessor; real export also needs a sink featurestreaming-kinesis and/or streaming-kafka. Without a sink feature, a stream still buffers durably but never exports (see Buffer-only fallback).

Cargo.toml
[dependencies]
ggcommons = { version = "*", features = ["streaming", "streaming-kinesis"] }

Only Java exposes a capability probe on the public StreamService; Python’s lives in ggcommons.streaming.service; TypeScript has no probe (loading is lazy and reported on first use); Rust availability is compile-time, so a cfg check is the equivalent.

if (StreamService.nativeAvailable()) {
// safe to open streams; returns false instead of throwing if the cdylib is missing
}

In a component you get the service from the runtime accessor, fetch a StreamHandle by name, then append / flush and optionally read stats. The runtime opens every configured stream for you and closes them on shutdown.

StreamService streams = gg.getStreams(); // null if there is no `streaming` config
if (streams != null) {
StreamHandle h = streams.stream("telemetry"); // throws GgStreamException (ERR_UNKNOWN_STREAM) if absent
h.append("pump-7", System.currentTimeMillis(),
"reading-0".getBytes(StandardCharsets.UTF_8)); // partitionKey <= 65535 bytes
h.flush(); // force the buffer durably to disk (does NOT wait for export)
StreamStats s = streams.stats("telemetry");
log.info("appended={} backlog={}", s.appendedTotal(), s.backlog());
}

When you use the runtime accessor, the SDK owns the service lifecycle and closes streams on shutdown. If you open a StreamService yourself (for standalone testing or a custom buffer), close it to flush all streams and stop the export engines. close() is idempotent.

String cfg = """
{"streams":[{"name":"telemetry","sink":{"type":"kinesis","streamName":"x"},
"buffer":{"path":"/tmp/telemetry","maxDiskBytes":1073741824,"onFull":"block"}}]}""";
try (StreamService svc = StreamService.open(cfg);
StreamHandle h = svc.stream("telemetry")) {
h.append("pump-7", 1000L, "reading-0".getBytes(StandardCharsets.UTF_8));
h.flush();
} // try-with-resources closes both

A stream’s sink is tagged by type. Two sink types are declarable in component config: kinesis and kafka. The same config is used by all four languages.

{
"name": "telemetry",
"sink": {
"type": "kinesis",
"streamName": "edge-{ThingName}",
"region": "us-east-1",
"endpointUrl": "http://localhost:4566"
}
}
Field Required Notes
streamName yes Target Kinesis data stream; supports template variables.
region no AWS region; falls back to the ambient AWS region when omitted.
endpointUrl no Override the endpoint for LocalStack / floci / a VPC endpoint.
{
"name": "telemetry",
"sink": {
"type": "kafka",
"bootstrapServers": "broker-1:9092,broker-2:9092",
"topic": "edge.telemetry",
"properties": { "compression.type": "lz4", "acks": "all" }
}
}
Field Required Notes
bootstrapServers yes host:port[,host:port...]; supports template variables.
topic yes Target Kafka topic; supports template variables.
properties no Open map of librdkafka producer properties passed through verbatim.

Each stream owns an embedded buffer that decouples your producer loop from the network. Configure it under buffer.

  • disk (default) — a durable, segmented on-disk log. path is required. Survives process restarts and powers store-and-forward across cloud disconnects.
  • memory — a non-durable in-RAM ring. path must be omitted. maxDiskBytes is reinterpreted as the in-RAM byte budget. Records are lost on restart — use only when durability is not needed.
Field Default Meaning
type disk disk (durable) or memory (non-durable RAM ring).
path Required for disk; must be omitted for memory.
segmentBytes 67108864 (64 MiB) On-disk segment size.
maxDiskBytes 1073741824 (1 GiB) Total buffer budget (on disk, or in RAM for memory). Must be >= segmentBytes.
maxAgeSecs null Records older than this become eligible for dropOldest.
onFull dropOldest Behavior when the budget is reached (below).
fsync perBatch Durability flush policy: always, perBatch, or interval.
fsyncIntervalMs 1000 Flush cadence when fsync: interval.
maxBufferedRecords 10000 In-memory ingest-queue bound — the actual backpressure point.

The onFull policy decides what happens when the buffer budget is exhausted:

  • dropOldest (default) — never blocks the producer; drops the oldest records to stay within budget. Drops are metered (droppedTotal), never silent.
  • block — the producer blocks in append until the export engine reclaims space. Lossless.
  • rejectNewappend fails immediately with ERR_FULL / BufferFull.

These optional sections tune how the background engine batches and retries. The defaults are production-ready; tune only when you have a reason.

{
"batch": { "maxRecords": 500, "maxBytes": 4194304, "maxLatencyMs": 1000, "compression": "none" },
"delivery": { "maxRetries": -1, "backoffBaseMs": 50, "backoffMaxMs": 30000, "pollIntervalMs": 100 }
}
Section.field Default Meaning
batch.maxRecords 500 Max records per export batch.
batch.maxBytes 4194304 (4 MiB) Max bytes per export batch.
batch.maxLatencyMs 1000 Max time to wait before flushing a partial batch.
batch.compression none none or zstd.
delivery.maxRetries -1 -1 = retry forever.
delivery.backoffBaseMs 50 Initial retry backoff.
delivery.backoffMaxMs 30000 Backoff ceiling.
delivery.pollIntervalMs 100 Export poll cadence.

Streaming is at-least-once. The export engine advances the buffer checkpoint past a batch only when every record is acknowledged:

  • All records acked → the checkpoint advances; the batch is durably committed as delivered.
  • Partial ack → only the failed offsets are re-delivered.
  • Retryable failure (or a sink callback that throws/raises) → the whole batch is re-delivered and the checkpoint is not advanced, so a transient outage cannot lose buffered data.

Records carry (partitionKey, timestampMs, payload); the engine assigns each one an internal log offset. De-duplicate downstream on partitionKey plus that offset — there is no producer-supplied sequence number.

flush() forces the buffer durably to disk; it does not wait for export to the sink. To confirm delivery, watch exportedTotal / ackedOffset in stats.

A stream with no usable sink still persists appends durably and reports stats, but nothing exports. This happens in Rust when built without streaming-kinesis / streaming-kafka, and in Java / Python / TypeScript when a callback stream is opened with no sink registered. It is useful for local capture and tests.

stats(name) returns a snapshot of counters. Field names are camelCase in Java and TypeScript, and snake_case in Python and Rust.

Field Meaning
appendedTotal Records accepted into the buffer.
exportedTotal Records sent to the sink.
droppedTotal Records dropped (e.g. dropOldest).
retriesTotal Export retry attempts.
failedTotal Records that failed permanently.
backlog Records buffered but not yet exported.
diskBytes Bytes currently on disk.
ackedOffset Highest acknowledged log offset.
nextOffset Next offset to be assigned.
oldestUnackedAgeMs Age of the oldest un-acked record.

Rust’s Stats carries one extra field, last_export_error: Option<String>, which is currently always None in the wrapper. Java, Python, and TypeScript expose exactly the ten counters above.

The error type mirrors the core’s status codes across languages. Java exposes the codes as named constants; Python and TypeScript carry a numeric code; Rust wraps a message string.

Code Name When
0 OK success
1 ERR_CONFIG invalid streaming config / bad JSON
2 ERR_IO durable-store I/O error
3 ERR_CORRUPT corrupt on-disk segment or checkpoint
4 ERR_FULL ingest queue full under rejectNew
5 ERR_UNKNOWN_STREAM stream() / stats() for an unconfigured name
6 ERR_SINK export sink error
7 ERR_PANIC a Rust panic crossed the FFI boundary
8 ERR_INVALID_ARG null / invalid native-ABI argument
try {
StreamHandle h = streams.stream("telemetry");
h.append("pump-7", System.currentTimeMillis(), payload);
} catch (GgStreamException e) {
if (e.code() == GgStreamException.ERR_UNKNOWN_STREAM) {
log.error("no such stream", e);
} else if (e.code() == GgStreamException.ERR_FULL) {
log.warn("buffer rejected the record (onFull: rejectNew)");
}
}
Aspect Java Python Rust TypeScript
Accessor getStreams()null get_streams()None streams() → always Arc<dyn ...> streams()StreamService | undefined
append 3 args 3 args one Record value 3 args
Batch append append_batch (Rust-only)
Unknown stream throws throws Err / None throws
Handle close() yes (AutoCloseable) yes (context manager) none (RAII) yes
Native probe nativeAvailable() native_available() compile-time cfg none
Stats fields 10 10 11 (last_export_error) 10