Skip to content

Streaming API

API reference for the streaming subsystem: durable store-and-forward telemetry egress to Kinesis or Kafka. This page enumerates the public surface per language; for the task-oriented walkthrough see the Streaming guide.

Streaming is opt-in: it is active only when the component config has a streaming section, and in Rust only when the streaming cargo feature is enabled. All four languages bind the same shared Rust core (ggstreamlog): Rust links it directly, while Java (FFM / Panama), Python (PyO3), and TypeScript (napi-rs) reach it through a native binding.

How you get the service from the GGCommons runtime handle. The return shape encodes the opt-in contract.

Language Accessor Returns
Java StreamService getStreams() StreamService, or null if there is no streaming config
Python get_streams(self) StreamService, or None if there is no streaming config
Rust streams(&self) -> Arc<dyn streaming::StreamService> always an Arc (never None); stream() errors if not configured. Behind #[cfg(feature = "streaming")]
TypeScript streams(): StreamService | undefined StreamService, or undefined if there is no streaming config

Opens and owns every stream declared in the streaming section. In Java, Python, and TypeScript it is a class with a static open factory; in Rust it is a trait (StreamService) implemented by DefaultStreamService. Java, Python, and TypeScript instances are closeable; the Rust value is dropped via RAII.

com.mbreissi.ggcommons.streaming.StreamService implements AutoCloseable

static StreamService open(String configJson) // opens every stream; loads the native lib on first use
StreamHandle stream(String name) // throws GgStreamException (ERR_UNKNOWN_STREAM) if absent
StreamStats stats(String name) // throws GgStreamException (ERR_UNKNOWN_STREAM) if absent
String configJson() // the resolved config this service was opened with
static List<String> streamNames(String configJson) // stream names declared in a config, without opening
static void registerSink(SinkFunction sink) // BYO-sink for `callback` streams; call BEFORE open()
static void unregisterSink()
static boolean nativeAvailable() // capability probe; returns false instead of throwing
synchronized void close() // flush all + stop engines; idempotent

A handle to one configured stream. Append records, force the buffer to disk with flush(), and read the handle name. Java, Python, and TypeScript handles are closeable; the Rust handle is Clone / Arc-backed with no close().

com.mbreissi.ggcommons.streaming.StreamHandle implements AutoCloseable

String name();
void append(String partitionKey, long timestampMs, byte[] payload);
// partitionKey <= 65535 bytes (IllegalArgumentException otherwise);
// throws GgStreamException on buffer / IO / sink error
void flush(); // force the buffer durably to disk; does NOT wait for export
synchronized void close(); // idempotent; the handle stays usable for append/flush even after service.close()

Get the service from the runtime accessor, fetch a StreamHandle by name, append, then flush.

StreamService streams = gg.getStreams(); // null if there is no `streaming` config
if (streams != null) {
StreamHandle h = streams.stream("telemetry");
h.append("pump-7", System.currentTimeMillis(),
"reading-0".getBytes(StandardCharsets.UTF_8));
h.flush();
StreamStats s = streams.stats("telemetry");
log.info("appended={} backlog={}", s.appendedTotal(), s.backlog());
}

A snapshot of per-stream counters returned by stats(name). The same ten numeric fields exist in all four languages — camelCase in Java and TypeScript, snake_case in Python and Rust. Rust carries one additional field.

Field (camelCase / snake_case) Type Meaning
appendedTotal / appended_total long / int / u64 / number Records accepted into the buffer.
exportedTotal / exported_total Records sent to the sink.
droppedTotal / dropped_total Records dropped (e.g. by onFull: dropOldest).
retriesTotal / retries_total Export retry attempts.
failedTotal / failed_total Records that failed permanently.
backlog Records buffered but not yet exported.
diskBytes / disk_bytes Bytes currently on disk.
ackedOffset / acked_offset Highest acknowledged log offset.
nextOffset / next_offset Next log offset to be assigned.
oldestUnackedAgeMs / oldest_unacked_age_ms Age of the oldest un-acked record, in ms.
  • Java: StreamStats is a record with accessor methods (appendedTotal(), backlog(), …).
  • Python: StreamStats is a frozen @dataclass with snake_case attributes.
  • Rust: Stats exposes the ten u64 counters plus an 11th field last_export_error: Option<String> (currently always None in the wrapper). The prelude re-exports it as StreamStats.
  • TypeScript: StreamStats is an interface with the ten camelCase number fields.

A stream’s sink is tagged by type. Two sink types are declarable in component config — kinesis and kafka — and the same JSON is used by all four languages. See the Streaming guide for context.

{
"type": "kinesis",
"streamName": "edge-{ThingName}",
"region": "us-east-1",
"endpointUrl": "http://localhost:4566"
}
Field Required Notes
streamName yes Target Kinesis data stream; supports template variables ({ThingName}, {ComponentName}).
region no AWS region; falls back to the ambient AWS region when omitted (nullable).
endpointUrl no Override the endpoint for LocalStack / floci / a VPC endpoint (nullable).
{
"type": "kafka",
"bootstrapServers": "broker-1:9092,broker-2:9092",
"topic": "edge.telemetry",
"properties": { "compression.type": "lz4", "acks": "all" }
}
Field Required Notes
bootstrapServers yes host:port[,host:port...]; supports template variables.
topic yes Target Kafka topic; supports template variables.
properties no Open map of librdkafka producer properties, passed through verbatim.

For the full buffer, batch, and delivery config (with defaults and validation), see the Streaming guide and the Configuration reference.

The error type mirrors the core’s status codes across languages. Java exposes the codes as named constants on GgStreamException; Python and TypeScript carry a numeric code; Rust wraps a message string with no numeric code.

Language Type Code accessor
Java GgStreamException extends RuntimeException int code() + constants (ERR_CONFIG, …)
Python GgStreamError(Exception) e.code (int)
Rust GgError::Streaming(String) (wraps core ggstreamlog::GgStreamError) none — message string only
TypeScript GgStreamError extends Error readonly code: number
Code Name When
0 OK success
1 ERR_CONFIG invalid streaming config / bad JSON
2 ERR_IO durable-store I/O error
3 ERR_CORRUPT corrupt on-disk segment or checkpoint
4 ERR_FULL ingest queue full under onFull: rejectNew
5 ERR_UNKNOWN_STREAM stream() / stats() for an unconfigured name
6 ERR_SINK export sink error
7 ERR_PANIC a Rust panic crossed the native (FFI) boundary
8 ERR_INVALID_ARG null / invalid native-ABI argument

Java exposes constants 08 on GgStreamException. Python and TypeScript derive the code from the native message and use a synthetic -1 when the native lib is missing or the message is unparseable. ERR_PANIC (7) and ERR_INVALID_ARG (8) originate in the native layer.

try {
StreamHandle h = streams.stream("telemetry");
h.append("pump-7", System.currentTimeMillis(), payload);
} catch (GgStreamException e) {
if (e.code() == GgStreamException.ERR_UNKNOWN_STREAM) {
log.error("no such stream", e);
} else if (e.code() == GgStreamException.ERR_FULL) {
log.warn("buffer rejected the record (onFull: rejectNew)");
}
}

The streaming engine lives in the shared ggstreamlog native core. How it is provided, and how you probe its availability, differs per language.

Language How the core is provided Probe
Java cdylib ggstreamlog loaded once per process (-Dggstreamlog.library.path, then java.library.path, then a bundled classpath resource). Run with --enable-native-access=ALL-UNNAMED. StreamService.nativeAvailable() (static; returns false instead of throwing)
Python PyO3 wheel ggstreamlog_native, imported lazily; failure surfaces on first use as GgStreamError(-1, ...). ggcommons.streaming.service.native_available() (not re-exported from streaming/__init__.py)
Rust Linked directly — availability is compile-time via cargo features. #[cfg(feature = "streaming")]
TypeScript napi addon @mbreissi/ggstreamlog-node, require-d lazily on the first StreamService.open. none — loading is lazy and reported on first use

For Rust, the streaming feature enables the module and accessor; real export also needs a sink featurestreaming-kinesis and/or streaming-kafka. Without a sink feature, a stream still buffers durably but never exports.

Cargo.toml
[dependencies]
ggcommons = { version = "*", features = ["streaming", "streaming-kinesis"] }

The probe call in each language:

if (StreamService.nativeAvailable()) {
// safe to open streams; returns false instead of throwing if the cdylib is missing
}
Aspect Java Python Rust TypeScript
Accessor getStreams()null get_streams()None streams() → always Arc<dyn ...> streams()StreamService | undefined
append 3 args 3 args one Record value 3 args
Batch append append_batch (Rust-only)
Unknown stream throws throws Err / None throws
Handle close() yes (AutoCloseable) yes (context manager) none (RAII) yes
Native probe nativeAvailable() native_available() compile-time cfg none
Stats fields 10 10 11 (last_export_error) 10
BYO-sink registerSink (static) open_with_callback open_with registerSinkCallback