Streaming API
API reference for the streaming subsystem: durable store-and-forward telemetry egress to Kinesis or Kafka. This page enumerates the public surface per language; for the task-oriented walkthrough see the Streaming guide.
Streaming is opt-in: it is active only when the component config has a streaming section, and in
Rust only when the streaming cargo feature is enabled. All four languages bind the same shared Rust
core (ggstreamlog): Rust links it directly, while Java (FFM / Panama), Python (PyO3), and
TypeScript (napi-rs) reach it through a native binding.
Accessor
Section titled “Accessor”How you get the service from the GGCommons runtime handle. The return shape encodes the opt-in
contract.
| Language | Accessor | Returns |
|---|---|---|
| Java | StreamService getStreams() |
StreamService, or null if there is no streaming config |
| Python | get_streams(self) |
StreamService, or None if there is no streaming config |
| Rust | streams(&self) -> Arc<dyn streaming::StreamService> |
always an Arc (never None); stream() errors if not configured. Behind #[cfg(feature = "streaming")] |
| TypeScript | streams(): StreamService | undefined |
StreamService, or undefined if there is no streaming config |
StreamService
Section titled “StreamService”Opens and owns every stream declared in the streaming section. In Java, Python, and TypeScript it is
a class with a static open factory; in Rust it is a trait (StreamService) implemented by
DefaultStreamService. Java, Python, and TypeScript instances are closeable; the Rust value is
dropped via RAII.
com.mbreissi.ggcommons.streaming.StreamService implements AutoCloseable
static StreamService open(String configJson) // opens every stream; loads the native lib on first useStreamHandle stream(String name) // throws GgStreamException (ERR_UNKNOWN_STREAM) if absentStreamStats stats(String name) // throws GgStreamException (ERR_UNKNOWN_STREAM) if absentString configJson() // the resolved config this service was opened withstatic List<String> streamNames(String configJson) // stream names declared in a config, without openingstatic void registerSink(SinkFunction sink) // BYO-sink for `callback` streams; call BEFORE open()static void unregisterSink()static boolean nativeAvailable() // capability probe; returns false instead of throwingsynchronized void close() // flush all + stop engines; idempotentggcommons.streaming.StreamService (context-manager)
@staticmethoddef open(config_json: str) -> "StreamService": ...
@staticmethoddef open_with_callback(config_json: str, callback: SinkCallback) -> "StreamService": """Bind `callback` as the sink for every {"type": "callback"} stream."""
def stream(self, name: str) -> StreamHandle: ... # raises GgStreamError (ERR_UNKNOWN_STREAM) if absentdef stats(self, name: str) -> StreamStats: ... # raises GgStreamError (ERR_UNKNOWN_STREAM) if absent
@staticmethoddef stream_names(config_json: str) -> List[str]: ...
def close(self) -> None: ...def __enter__(self) -> "StreamService": ... # `with StreamService.open(cfg) as svc:`def __exit__(self, *exc) -> None: ...ggcommons::streaming — feature-gated behind the off-by-default streaming cargo feature.
// The substitutable seam:pub trait StreamService: Send + Sync { fn stream(&self, name: &str) -> Result<StreamHandle>; // Err(GgError::Streaming) if not configured fn stream_names(&self) -> Vec<String>; fn stats(&self, name: &str) -> Option<Stats>; // None (not an error) if not configured}
// The default implementation:pub struct DefaultStreamService { /* ... */ }impl DefaultStreamService { pub fn open(config: &Config) -> Result<Self>; pub fn open_with(config: &Config, sink_factory: &SinkFactory) -> Result<Self>; // custom / test sink}// No close(): the service is dropped via RAII (flushes + stops the export engines on drop).There is no nativeAvailable() — availability is compile-time (see Native core).
ggcommons.StreamService
static open(configJson: string): StreamService;stream(name: string): StreamHandle; // throws GgStreamError (ERR_UNKNOWN_STREAM) if absentstats(name: string): StreamStats; // throws GgStreamError (ERR_UNKNOWN_STREAM) if absentstatic streamNames(configJson: string): string[];close(): void; // flush all + stop enginesStreamHandle
Section titled “StreamHandle”A handle to one configured stream. Append records, force the buffer to disk with flush(), and read
the handle name. Java, Python, and TypeScript handles are closeable; the Rust handle is Clone /
Arc-backed with no close().
com.mbreissi.ggcommons.streaming.StreamHandle implements AutoCloseable
String name();void append(String partitionKey, long timestampMs, byte[] payload); // partitionKey <= 65535 bytes (IllegalArgumentException otherwise); // throws GgStreamException on buffer / IO / sink errorvoid flush(); // force the buffer durably to disk; does NOT wait for exportsynchronized void close(); // idempotent; the handle stays usable for append/flush even after service.close()ggcommons.streaming.StreamHandle (context-manager)
def append(self, partition_key: str, timestamp_ms: int, payload: bytes) -> None: ...def flush(self) -> None: ... # force the buffer durably to disk; does NOT wait for exportdef close(self) -> None: ...def __enter__(self) -> "StreamHandle": ...def __exit__(self, *exc) -> None: ...ggcommons::streaming::StreamHandle — #[derive(Clone)], Arc-backed.
pub fn append(&self, rec: Record) -> Result<()>; // takes one Record value, not 3 argspub fn append_batch(&self, recs: &[Record]) -> Result<()>; // Rust-only — no batch append elsewherepub fn flush(&self) -> Result<()>; // force the buffer durably to diskpub fn name(&self) -> &str;// No close(): dropped via RAII.
// Record is re-exported as StreamRecord:pub struct Record { /* partition_key: String, timestamp_ms: u64, payload: Vec<u8> */ }impl Record { pub fn new( partition_key: impl Into<String>, timestamp_ms: u64, payload: impl Into<Vec<u8>>, ) -> Record;}ggcommons.StreamHandle
readonly name: string;append(partitionKey: string, timestampMs: number, payload: Buffer | Uint8Array): void;flush(): void; // force the buffer durably to disk; does NOT wait for exportclose(): void;Append example
Section titled “Append example”Get the service from the runtime accessor, fetch a StreamHandle by name, append, then flush.
StreamService streams = gg.getStreams(); // null if there is no `streaming` configif (streams != null) { StreamHandle h = streams.stream("telemetry"); h.append("pump-7", System.currentTimeMillis(), "reading-0".getBytes(StandardCharsets.UTF_8)); h.flush();
StreamStats s = streams.stats("telemetry"); log.info("appended={} backlog={}", s.appendedTotal(), s.backlog());}streams = gg.get_streams() # None if there is no `streaming` configif streams is not None: h = streams.stream("telemetry") h.append("pump-7", int(time.time() * 1000), b"reading-0") h.flush()
s = streams.stats("telemetry") log.info("appended=%s backlog=%s", s.appended_total, s.backlog)#[cfg(feature = "streaming")]{ use ggcommons::streaming::StreamRecord;
let streams = gg.streams(); // Arc<dyn StreamService> — always present let h = streams.stream("telemetry")?; // Err(GgError::Streaming) if not configured h.append(StreamRecord::new("pump-7", now_ms, b"reading-0".to_vec()))?; h.flush()?;
if let Some(s) = streams.stats("telemetry") { // None (not an error) if not configured tracing::info!(appended = s.appended_total, backlog = s.backlog); }}const streams = gg.streams(); // StreamService | undefined when there is no `streaming` configif (streams) { const h = streams.stream("telemetry"); h.append("pump-7", Date.now(), Buffer.from("reading-0")); h.flush();
const s = streams.stats("telemetry"); console.log(`appended=${s.appendedTotal} backlog=${s.backlog}`);}StreamStats
Section titled “StreamStats”A snapshot of per-stream counters returned by stats(name). The same ten numeric fields exist in all
four languages — camelCase in Java and TypeScript, snake_case in Python and Rust. Rust carries one
additional field.
| Field (camelCase / snake_case) | Type | Meaning |
|---|---|---|
appendedTotal / appended_total |
long / int / u64 / number |
Records accepted into the buffer. |
exportedTotal / exported_total |
“ | Records sent to the sink. |
droppedTotal / dropped_total |
“ | Records dropped (e.g. by onFull: dropOldest). |
retriesTotal / retries_total |
“ | Export retry attempts. |
failedTotal / failed_total |
“ | Records that failed permanently. |
backlog |
“ | Records buffered but not yet exported. |
diskBytes / disk_bytes |
“ | Bytes currently on disk. |
ackedOffset / acked_offset |
“ | Highest acknowledged log offset. |
nextOffset / next_offset |
“ | Next log offset to be assigned. |
oldestUnackedAgeMs / oldest_unacked_age_ms |
“ | Age of the oldest un-acked record, in ms. |
- Java:
StreamStatsis arecordwith accessor methods (appendedTotal(),backlog(), …). - Python:
StreamStatsis a frozen@dataclasswith snake_case attributes. - Rust:
Statsexposes the tenu64counters plus an 11th fieldlast_export_error: Option<String>(currently alwaysNonein the wrapper). The prelude re-exports it asStreamStats. - TypeScript:
StreamStatsis aninterfacewith the ten camelCasenumberfields.
Sink config types
Section titled “Sink config types”A stream’s sink is tagged by type. Two sink types are declarable in component config — kinesis
and kafka — and the same JSON is used by all four languages. See the
Streaming guide for context.
kinesis
Section titled “kinesis”{ "type": "kinesis", "streamName": "edge-{ThingName}", "region": "us-east-1", "endpointUrl": "http://localhost:4566"}| Field | Required | Notes |
|---|---|---|
streamName |
yes | Target Kinesis data stream; supports template variables ({ThingName}, {ComponentName}). |
region |
no | AWS region; falls back to the ambient AWS region when omitted (nullable). |
endpointUrl |
no | Override the endpoint for LocalStack / floci / a VPC endpoint (nullable). |
{ "type": "kafka", "bootstrapServers": "broker-1:9092,broker-2:9092", "topic": "edge.telemetry", "properties": { "compression.type": "lz4", "acks": "all" }}| Field | Required | Notes |
|---|---|---|
bootstrapServers |
yes | host:port[,host:port...]; supports template variables. |
topic |
yes | Target Kafka topic; supports template variables. |
properties |
no | Open map of librdkafka producer properties, passed through verbatim. |
For the full buffer, batch, and delivery config (with defaults and validation), see the
Streaming guide and the
Configuration reference.
Error type and codes
Section titled “Error type and codes”The error type mirrors the core’s status codes across languages. Java exposes the codes as named
constants on GgStreamException; Python and TypeScript carry a numeric code; Rust wraps a message
string with no numeric code.
| Language | Type | Code accessor |
|---|---|---|
| Java | GgStreamException extends RuntimeException |
int code() + constants (ERR_CONFIG, …) |
| Python | GgStreamError(Exception) |
e.code (int) |
| Rust | GgError::Streaming(String) (wraps core ggstreamlog::GgStreamError) |
none — message string only |
| TypeScript | GgStreamError extends Error |
readonly code: number |
| Code | Name | When |
|---|---|---|
| 0 | OK |
success |
| 1 | ERR_CONFIG |
invalid streaming config / bad JSON |
| 2 | ERR_IO |
durable-store I/O error |
| 3 | ERR_CORRUPT |
corrupt on-disk segment or checkpoint |
| 4 | ERR_FULL |
ingest queue full under onFull: rejectNew |
| 5 | ERR_UNKNOWN_STREAM |
stream() / stats() for an unconfigured name |
| 6 | ERR_SINK |
export sink error |
| 7 | ERR_PANIC |
a Rust panic crossed the native (FFI) boundary |
| 8 | ERR_INVALID_ARG |
null / invalid native-ABI argument |
Java exposes constants 0–8 on GgStreamException. Python and TypeScript derive the code from the
native message and use a synthetic -1 when the native lib is missing or the message is unparseable.
ERR_PANIC (7) and ERR_INVALID_ARG (8) originate in the native layer.
try { StreamHandle h = streams.stream("telemetry"); h.append("pump-7", System.currentTimeMillis(), payload);} catch (GgStreamException e) { if (e.code() == GgStreamException.ERR_UNKNOWN_STREAM) { log.error("no such stream", e); } else if (e.code() == GgStreamException.ERR_FULL) { log.warn("buffer rejected the record (onFull: rejectNew)"); }}from ggcommons.streaming import GgStreamError
try: h = streams.stream("telemetry") h.append("pump-7", int(time.time() * 1000), payload)except GgStreamError as e: if e.code == 5: # ERR_UNKNOWN_STREAM log.error("no such stream: %s", e) elif e.code == 4: # ERR_FULL log.warning("buffer rejected the record (onFull: rejectNew)") # code == -1 means the ggstreamlog_native wheel is missing / unparseable// Errors are GgError::Streaming(String) — a message wrapper with no numeric code.match streams.stream("telemetry") { Ok(h) => { if let Err(e) = h.append(StreamRecord::new("pump-7", now_ms, payload)) { tracing::error!(error = %e, "append failed"); } } Err(e) => tracing::error!(error = %e, "stream not configured"),}import { GgStreamError } from "ggcommons";
try { const h = streams.stream("telemetry"); h.append("pump-7", Date.now(), payload);} catch (e) { if (e instanceof GgStreamError) { if (e.code === 5) console.error("no such stream"); // ERR_UNKNOWN_STREAM else if (e.code === 4) console.warn("buffer rejected"); // ERR_FULL // code === -1 means the native addon is missing / message unparseable }}Native core
Section titled “Native core”The streaming engine lives in the shared ggstreamlog native core. How it is provided, and how you
probe its availability, differs per language.
| Language | How the core is provided | Probe |
|---|---|---|
| Java | cdylib ggstreamlog loaded once per process (-Dggstreamlog.library.path, then java.library.path, then a bundled classpath resource). Run with --enable-native-access=ALL-UNNAMED. |
StreamService.nativeAvailable() (static; returns false instead of throwing) |
| Python | PyO3 wheel ggstreamlog_native, imported lazily; failure surfaces on first use as GgStreamError(-1, ...). |
ggcommons.streaming.service.native_available() (not re-exported from streaming/__init__.py) |
| Rust | Linked directly — availability is compile-time via cargo features. | #[cfg(feature = "streaming")] |
| TypeScript | napi addon @mbreissi/ggstreamlog-node, require-d lazily on the first StreamService.open. |
none — loading is lazy and reported on first use |
For Rust, the streaming feature enables the module and accessor; real export also needs a sink
feature — streaming-kinesis and/or streaming-kafka. Without a sink feature, a stream still
buffers durably but never exports.
[dependencies]ggcommons = { version = "*", features = ["streaming", "streaming-kinesis"] }The probe call in each language:
if (StreamService.nativeAvailable()) { // safe to open streams; returns false instead of throwing if the cdylib is missing}from ggcommons.streaming.service import native_available
if native_available(): ... # the ggstreamlog_native wheel imported cleanly// Availability is compile-time: gate streaming code on the cargo feature.#[cfg(feature = "streaming")]{ // the streaming module + the streams() accessor are compiled in}// No nativeAvailable() helper. The @mbreissi/ggstreamlog-node addon is require-d// lazily on the first StreamService.open(); a GgStreamError is thrown if it is missing.Cross-language differences at a glance
Section titled “Cross-language differences at a glance”| Aspect | Java | Python | Rust | TypeScript |
|---|---|---|---|---|
| Accessor | getStreams() → null |
get_streams() → None |
streams() → always Arc<dyn ...> |
streams() → StreamService | undefined |
append |
3 args | 3 args | one Record value |
3 args |
| Batch append | — | — | append_batch (Rust-only) |
— |
| Unknown stream | throws | throws | Err / None |
throws |
Handle close() |
yes (AutoCloseable) |
yes (context manager) | none (RAII) | yes |
| Native probe | nativeAvailable() |
native_available() |
compile-time cfg |
none |
| Stats fields | 10 | 10 | 11 (last_export_error) |
10 |
| BYO-sink | registerSink (static) |
open_with_callback |
open_with |
registerSinkCallback |