Telemetry streaming
Telemetry streaming is the SDK’s high-rate, one-way egress path. Your code appends records to an embedded, persistent (or in-memory) buffer; a background export engine batches them, retries on failure, and checkpoints on acknowledgement to Kinesis or Kafka. The buffer keeps accepting records while the network or sink is down, so a cloud disconnect never blocks your hot loop or loses durably-buffered data.
All four languages bind the same shared Rust core (ggstreamlog): Rust links it directly, while
Java (FFM / Panama), Python (PyO3), and TypeScript (napi-rs) reach it through native bindings.
When to use it
Section titled “When to use it”Reach for streaming when you have high-rate, fire-and-forget telemetry (sensor readings, metrics, events) that must reach Kinesis or Kafka with at-least-once delivery and must survive intermittent connectivity. It is deliberately one-way: there is no request/reply and no server-to-device path. For command-and-control or low-rate messaging, use the Messaging guide instead.
Enable streaming
Section titled “Enable streaming”-
Add a
streamingsection to your component config. Each entry understreaming.streams[]requires anameand asink;buffer,batch, anddeliveryare optional and default to sensible values.{"component": { "name": "com.example.Telemetry" },"streaming": {"streams": [{"name": "telemetry","sink": { "type": "kinesis", "streamName": "edge-{ThingName}" },"buffer": {"type": "disk","path": "/var/lib/ggcommons/streams/telemetry","maxDiskBytes": 1073741824,"onFull": "block","fsync": "perBatch"}}]}}Template variables (
{ThingName},{ComponentName}) inbuffer.path, the KinesisstreamName, and the KafkabootstrapServers/topicare resolved before the stream opens. A{"$secret": ...}reference anywhere in thestreamingsection is resolved from the credentials vault first. -
Make the native core available for your language (see the table below).
-
Get the stream service from the runtime and append records (next section).
Native-core prerequisites
Section titled “Native-core prerequisites”| Language | How the core is provided | Runtime requirement |
|---|---|---|
| Java | cdylib ggstreamlog loaded once per process (via -Dggstreamlog.library.path, java.library.path, or a bundled classpath resource) |
Run with --enable-native-access=ALL-UNNAMED (the sink/log bridges use upcall stubs) |
| Python | PyO3 wheel ggstreamlog_native, imported lazily |
Install the wheel; failure surfaces on first use as GgStreamError(-1, ...) |
| TypeScript | napi addon @mbreissi/ggstreamlog-node, require-d lazily on first open |
Install the addon; a component that never streams never loads it |
| Rust | Linked directly — compile-time via cargo features | Build with streaming plus a sink feature (below) |
For Rust, the streaming feature enables the module and accessor; real export also needs a sink
feature — streaming-kinesis and/or streaming-kafka. Without a sink feature, a stream still
buffers durably but never exports (see Buffer-only fallback).
[dependencies]ggcommons = { version = "*", features = ["streaming", "streaming-kinesis"] }Check the native core is available
Section titled “Check the native core is available”Only Java exposes a capability probe on the public StreamService; Python’s lives in
ggcommons.streaming.service; TypeScript has no probe (loading is lazy and reported on first use);
Rust availability is compile-time, so a cfg check is the equivalent.
if (StreamService.nativeAvailable()) { // safe to open streams; returns false instead of throwing if the cdylib is missing}from ggcommons.streaming.service import native_available
if native_available(): ... # the ggstreamlog_native wheel imported cleanly// Availability is compile-time: gate streaming code on the cargo feature.#[cfg(feature = "streaming")]{ // streaming module + accessor are compiled in}// No nativeAvailable() helper. The @mbreissi/ggstreamlog-node addon is required// lazily on the first StreamService.open(); a GgStreamError is thrown if it is missing.Get a stream and append records
Section titled “Get a stream and append records”In a component you get the service from the runtime accessor, fetch a StreamHandle by name, then
append / flush and optionally read stats. The runtime opens every configured stream for you and
closes them on shutdown.
StreamService streams = gg.getStreams(); // null if there is no `streaming` configif (streams != null) { StreamHandle h = streams.stream("telemetry"); // throws GgStreamException (ERR_UNKNOWN_STREAM) if absent h.append("pump-7", System.currentTimeMillis(), "reading-0".getBytes(StandardCharsets.UTF_8)); // partitionKey <= 65535 bytes h.flush(); // force the buffer durably to disk (does NOT wait for export)
StreamStats s = streams.stats("telemetry"); log.info("appended={} backlog={}", s.appendedTotal(), s.backlog());}streams = gg.get_streams() # None if there is no `streaming` configif streams is not None: h = streams.stream("telemetry") # raises GgStreamError (ERR_UNKNOWN_STREAM) if absent h.append("pump-7", int(time.time() * 1000), b"reading-0") h.flush() # force the buffer durably to disk (does NOT wait for export)
s = streams.stats("telemetry") log.info("appended=%s backlog=%s", s.appended_total, s.backlog)#[cfg(feature = "streaming")]{ use ggcommons::streaming::StreamRecord;
let streams = gg.streams(); // Arc<dyn StreamService> — always present let h = streams.stream("telemetry")?; // Err(GgError::Streaming) if not configured h.append(StreamRecord::new("pump-7", now_ms, b"reading-0".to_vec()))?; h.flush()?; // force the buffer durably to disk (does NOT wait for export)
if let Some(s) = streams.stats("telemetry") { // None (not an error) if not configured tracing::info!(appended = s.appended_total, backlog = s.backlog); }}const streams = gg.streams(); // StreamService | undefined when there is no `streaming` configif (streams) { const h = streams.stream("telemetry"); // throws GgStreamError (ERR_UNKNOWN_STREAM) if absent h.append("pump-7", Date.now(), Buffer.from("reading-0")); // payload: Buffer | Uint8Array h.flush(); // force the buffer durably to disk (does NOT wait for export)
const s = streams.stats("telemetry"); console.log(`appended=${s.appendedTotal} backlog=${s.backlog}`);}Closing streams
Section titled “Closing streams”When you use the runtime accessor, the SDK owns the service lifecycle and closes streams on shutdown.
If you open a StreamService yourself (for standalone testing or a custom buffer), close it to flush
all streams and stop the export engines. close() is idempotent.
String cfg = """ {"streams":[{"name":"telemetry","sink":{"type":"kinesis","streamName":"x"}, "buffer":{"path":"/tmp/telemetry","maxDiskBytes":1073741824,"onFull":"block"}}]}""";try (StreamService svc = StreamService.open(cfg); StreamHandle h = svc.stream("telemetry")) { h.append("pump-7", 1000L, "reading-0".getBytes(StandardCharsets.UTF_8)); h.flush();} // try-with-resources closes bothcfg = json.dumps({"streams": [{"name": "telemetry", "sink": {"type": "kinesis", "streamName": "x"}, "buffer": {"path": "/tmp/telemetry", "maxDiskBytes": 1073741824, "onFull": "block"}}]})with StreamService.open(cfg) as svc, svc.stream("telemetry") as h: h.append("pump-7", 1000, b"reading-0") h.flush()# both context managers close on exit// No close(): the StreamService and StreamHandle are dropped via RAII (Arc-backed).// Build a service explicitly with DefaultStreamService::open(&config) when not using the accessor.let svc = ggcommons::streaming::DefaultStreamService::open(&config)?;let h = svc.stream("telemetry")?;h.append(StreamRecord::new("pump-7", 1000, b"reading-0".to_vec()))?;h.flush()?;// dropping `svc` flushes and stops the export enginesconst cfg = JSON.stringify({ streams: [{ name: "telemetry", sink: { type: "kinesis", streamName: "x" }, buffer: { path: "/tmp/telemetry", maxDiskBytes: 1073741824, onFull: "block" } }] });const svc = StreamService.open(cfg);const h = svc.stream("telemetry");h.append("pump-7", 1000, Buffer.from("reading-0"));h.flush();h.close();svc.close();A stream’s sink is tagged by type. Two sink types are declarable in component config: kinesis
and kafka. The same config is used by all four languages.
Kinesis
Section titled “Kinesis”{ "name": "telemetry", "sink": { "type": "kinesis", "streamName": "edge-{ThingName}", "region": "us-east-1", "endpointUrl": "http://localhost:4566" }}| Field | Required | Notes |
|---|---|---|
streamName |
yes | Target Kinesis data stream; supports template variables. |
region |
no | AWS region; falls back to the ambient AWS region when omitted. |
endpointUrl |
no | Override the endpoint for LocalStack / floci / a VPC endpoint. |
{ "name": "telemetry", "sink": { "type": "kafka", "bootstrapServers": "broker-1:9092,broker-2:9092", "topic": "edge.telemetry", "properties": { "compression.type": "lz4", "acks": "all" } }}| Field | Required | Notes |
|---|---|---|
bootstrapServers |
yes | host:port[,host:port...]; supports template variables. |
topic |
yes | Target Kafka topic; supports template variables. |
properties |
no | Open map of librdkafka producer properties passed through verbatim. |
The buffer
Section titled “The buffer”Each stream owns an embedded buffer that decouples your producer loop from the network. Configure it
under buffer.
Disk vs memory
Section titled “Disk vs memory”disk(default) — a durable, segmented on-disk log.pathis required. Survives process restarts and powers store-and-forward across cloud disconnects.memory— a non-durable in-RAM ring.pathmust be omitted.maxDiskBytesis reinterpreted as the in-RAM byte budget. Records are lost on restart — use only when durability is not needed.
| Field | Default | Meaning |
|---|---|---|
type |
disk |
disk (durable) or memory (non-durable RAM ring). |
path |
— | Required for disk; must be omitted for memory. |
segmentBytes |
67108864 (64 MiB) |
On-disk segment size. |
maxDiskBytes |
1073741824 (1 GiB) |
Total buffer budget (on disk, or in RAM for memory). Must be >= segmentBytes. |
maxAgeSecs |
null | Records older than this become eligible for dropOldest. |
onFull |
dropOldest |
Behavior when the budget is reached (below). |
fsync |
perBatch |
Durability flush policy: always, perBatch, or interval. |
fsyncIntervalMs |
1000 |
Flush cadence when fsync: interval. |
maxBufferedRecords |
10000 |
In-memory ingest-queue bound — the actual backpressure point. |
onFull semantics
Section titled “onFull semantics”The onFull policy decides what happens when the buffer budget is exhausted:
dropOldest(default) — never blocks the producer; drops the oldest records to stay within budget. Drops are metered (droppedTotal), never silent.block— the producer blocks inappenduntil the export engine reclaims space. Lossless.rejectNew—appendfails immediately withERR_FULL/BufferFull.
Batch and delivery tuning
Section titled “Batch and delivery tuning”These optional sections tune how the background engine batches and retries. The defaults are production-ready; tune only when you have a reason.
{ "batch": { "maxRecords": 500, "maxBytes": 4194304, "maxLatencyMs": 1000, "compression": "none" }, "delivery": { "maxRetries": -1, "backoffBaseMs": 50, "backoffMaxMs": 30000, "pollIntervalMs": 100 }}| Section.field | Default | Meaning |
|---|---|---|
batch.maxRecords |
500 |
Max records per export batch. |
batch.maxBytes |
4194304 (4 MiB) |
Max bytes per export batch. |
batch.maxLatencyMs |
1000 |
Max time to wait before flushing a partial batch. |
batch.compression |
none |
none or zstd. |
delivery.maxRetries |
-1 |
-1 = retry forever. |
delivery.backoffBaseMs |
50 |
Initial retry backoff. |
delivery.backoffMaxMs |
30000 |
Backoff ceiling. |
delivery.pollIntervalMs |
100 |
Export poll cadence. |
Delivery semantics
Section titled “Delivery semantics”Streaming is at-least-once. The export engine advances the buffer checkpoint past a batch only when every record is acknowledged:
- All records acked → the checkpoint advances; the batch is durably committed as delivered.
- Partial ack → only the failed offsets are re-delivered.
- Retryable failure (or a sink callback that throws/raises) → the whole batch is re-delivered and the checkpoint is not advanced, so a transient outage cannot lose buffered data.
Records carry (partitionKey, timestampMs, payload); the engine assigns each one an internal log
offset. De-duplicate downstream on partitionKey plus that offset — there is no
producer-supplied sequence number.
flush() forces the buffer durably to disk; it does not wait for export to the sink. To
confirm delivery, watch exportedTotal / ackedOffset in stats.
Buffer-only fallback
Section titled “Buffer-only fallback”A stream with no usable sink still persists appends durably and reports stats, but nothing exports.
This happens in Rust when built without streaming-kinesis / streaming-kafka, and in Java / Python /
TypeScript when a callback stream is opened with no sink registered. It is useful for local capture
and tests.
Reading stats
Section titled “Reading stats”stats(name) returns a snapshot of counters. Field names are camelCase in Java and TypeScript, and
snake_case in Python and Rust.
| Field | Meaning |
|---|---|
appendedTotal |
Records accepted into the buffer. |
exportedTotal |
Records sent to the sink. |
droppedTotal |
Records dropped (e.g. dropOldest). |
retriesTotal |
Export retry attempts. |
failedTotal |
Records that failed permanently. |
backlog |
Records buffered but not yet exported. |
diskBytes |
Bytes currently on disk. |
ackedOffset |
Highest acknowledged log offset. |
nextOffset |
Next offset to be assigned. |
oldestUnackedAgeMs |
Age of the oldest un-acked record. |
Rust’s Stats carries one extra field, last_export_error: Option<String>, which is currently always
None in the wrapper. Java, Python, and TypeScript expose exactly the ten counters above.
Error handling
Section titled “Error handling”The error type mirrors the core’s status codes across languages. Java exposes the codes as named
constants; Python and TypeScript carry a numeric code; Rust wraps a message string.
| Code | Name | When |
|---|---|---|
| 0 | OK |
success |
| 1 | ERR_CONFIG |
invalid streaming config / bad JSON |
| 2 | ERR_IO |
durable-store I/O error |
| 3 | ERR_CORRUPT |
corrupt on-disk segment or checkpoint |
| 4 | ERR_FULL |
ingest queue full under rejectNew |
| 5 | ERR_UNKNOWN_STREAM |
stream() / stats() for an unconfigured name |
| 6 | ERR_SINK |
export sink error |
| 7 | ERR_PANIC |
a Rust panic crossed the FFI boundary |
| 8 | ERR_INVALID_ARG |
null / invalid native-ABI argument |
try { StreamHandle h = streams.stream("telemetry"); h.append("pump-7", System.currentTimeMillis(), payload);} catch (GgStreamException e) { if (e.code() == GgStreamException.ERR_UNKNOWN_STREAM) { log.error("no such stream", e); } else if (e.code() == GgStreamException.ERR_FULL) { log.warn("buffer rejected the record (onFull: rejectNew)"); }}from ggcommons.streaming import GgStreamError
try: h = streams.stream("telemetry") h.append("pump-7", int(time.time() * 1000), payload)except GgStreamError as e: if e.code == 5: # ERR_UNKNOWN_STREAM log.error("no such stream: %s", e) elif e.code == 4: # ERR_FULL log.warning("buffer rejected the record (onFull: rejectNew)") # code == -1 means the ggstreamlog_native wheel is missing / unparseable// Errors are GgError::Streaming(String) — a message wrapper with no numeric code.match streams.stream("telemetry") { Ok(h) => { if let Err(e) = h.append(StreamRecord::new("pump-7", now_ms, payload)) { tracing::error!(error = %e, "append failed"); } } Err(e) => tracing::error!(error = %e, "stream not configured"),}import { GgStreamError } from "ggcommons";
try { const h = streams.stream("telemetry"); h.append("pump-7", Date.now(), payload);} catch (e) { if (e instanceof GgStreamError) { if (e.code === 5) console.error("no such stream"); // ERR_UNKNOWN_STREAM else if (e.code === 4) console.warn("buffer rejected"); // ERR_FULL // code === -1 means the native addon is missing / message unparseable }}Cross-language differences at a glance
Section titled “Cross-language differences at a glance”| Aspect | Java | Python | Rust | TypeScript |
|---|---|---|---|---|
| Accessor | getStreams() → null |
get_streams() → None |
streams() → always Arc<dyn ...> |
streams() → StreamService | undefined |
append |
3 args | 3 args | one Record value |
3 args |
| Batch append | — | — | append_batch (Rust-only) |
— |
| Unknown stream | throws | throws | Err / None |
throws |
Handle close() |
yes (AutoCloseable) | yes (context manager) | none (RAII) | yes |
| Native probe | nativeAvailable() |
native_available() |
compile-time cfg |
none |
| Stats fields | 10 | 10 | 11 (last_export_error) |
10 |