Messaging
The messaging subsystem gives your component one interface over two transports: Greengrass IPC
(--transport IPC) on a Nucleus-managed device, or a dual-MQTT provider (--transport MQTT)
that connects to a local broker and, optionally, AWS IoT Core. The same publish/subscribe,
request/reply, and message envelope work identically across Java, Python, Rust, and TypeScript, so
components written in any language interoperate on the same topics.
Connections and subscriptions block until confirmed — when publish or subscribe returns, the
broker has acknowledged it.
Reaching the messaging service
Section titled “Reaching the messaging service”Every component reaches messaging through its GGCommons handle. The accessor name and return shape
differ slightly per language.
getMessaging() returns a MessagingClient instance — its methods are instance methods.
MessagingClient messaging = gg.getMessaging();get_messaging() returns the MessagingClient class — its methods are @staticmethod, so you
call them on the returned class.
messaging = gg.get_messaging()messaging() returns a Result<Arc<dyn MessagingService>> — it errors if no transport was wired.
let messaging = gg.messaging()?; // Arc<dyn MessagingService>messaging() returns an IMessagingService — it throws if no transport was wired.
const messaging = gg.messaging(); // IMessagingServiceThe message envelope
Section titled “The message envelope”Every message GGCommons sends on the wire is a JSON object with three parts. The shape and key names are identical across all four languages so they interoperate.
{ "header": { "name": "TemperatureReading", "version": "1.0", "timestamp": 1719500000000, "uuid": "5f6c...", "correlation_id": "a1b2...", "reply_to": "ggcommons/reply-..." }, "tags": { "thing": "lab-5950x", "site": "lab" }, "body": { "celsius": 22.4 }}header— message metadata. Keys are snake_case on the wire (correlation_id,reply_to,uuid).reply_tois omitted when there is no reply topic;correlation_id/uuid/timestampare filled in automatically if you do not set them.tags— routing/source metadata. The Thing name is flattened to a"thing"key, and is only emitted when set; any custom tags you add appear alongside it.body— your payload. Any JSON value.
A raw message has no envelope — it serializes as { "raw": <value> } instead. When a component
receives a payload that is not an envelope (no header/tags/body), or non-JSON bytes, it is
delivered to your handler as a raw message (non-JSON bytes become a raw string). See
Raw messages below.
Building a message
Section titled “Building a message”Construct messages with MessageBuilder — not the raw constructors. The builder stamps the
uuid, correlation_id, and timestamp for you.
Message msg = MessageBuilder.create("TemperatureReading", "1.0") .withPayload(payload) // JsonObject, Map, POJO, or List .withConfig(configManager) // REQUIRED in Java .build();msg = ( MessageBuilder.create("TemperatureReading", "1.0") .with_payload({"celsius": 22.4}) .with_config(config) # or .with_tags({...}) .build())Rust uses different method names: new (not create), payload (not with_payload), and
from_config (not with_config).
use serde_json::json;
let msg = MessageBuilder::new("TemperatureReading", "1.0") .payload(json!({ "celsius": 22.4 })) .from_config(&config) // or .thing_name("lab-5950x") .build();const msg = MessageBuilder.create("TemperatureReading", "1.0") .withPayload({ celsius: 22.4 }) .withConfig({ thingName, parsed: { tags } }) // optional in TS .build();Publish and subscribe
Section titled “Publish and subscribe”publish sends an envelope to a topic. subscribe registers a handler that receives the
topic and the decoded Message for each match. Topic filters use the usual MQTT wildcards
(+ single level, # multi level).
// subscribe — 3rd arg is maxConcurrency. The callback is a BiConsumer<String,Message>.messaging.subscribe("sensors/temp", (topic, m) -> { System.out.println(m.getHeader().getName() + " -> " + m.getBody());}, 1);
// publishmessaging.publish("sensors/temp", msg);def handler(topic, m): print(m.get_header().name, "->", m.get_body())
# subscribe — 3rd arg is max_concurrencymessaging.subscribe("sensors/temp", handler, 1)
# publishmessaging.publish("sensors/temp", msg)The handler closure must be wrapped with message_handler(...) — a bare closure is not accepted.
The two trailing ints are (max_messages, max_concurrency).
use ggcommons::messaging::message_handler;
messaging.subscribe( "sensors/temp", message_handler(|topic, msg| async move { tracing::info!(%topic, name = %msg.header.name, "received"); }), 32, // max_messages 1, // max_concurrency).await?;
messaging.publish("sensors/temp", &msg).await?;The two trailing ints are (maxMessages, maxConcurrency); they default to 32 and 1.
await messaging.subscribe("sensors/temp", (topic, m) => { console.log(m.header.name, "->", m.getBody());}, 32, 1);
await messaging.publish("sensors/temp", msg);Raw publish and subscribe
Section titled “Raw publish and subscribe”Use the *Raw publish methods to send an unwrapped JSON value (for example to interoperate with a
non-GGCommons producer). On the receiving side, you do not call a separate “subscribe raw” —
your normal subscribe handler receives a raw Message whenever the incoming payload is not an
envelope, and you detect it with the raw accessors.
// publish a raw JSON object (no envelope)JsonObject metric = new JsonObject();metric.addProperty("value", 42);messaging.publishRaw("metrics/raw", metric);
// handle a possibly-raw messagemessaging.subscribe("metrics/raw", (topic, m) -> { if (m.getRaw() != null) { System.out.println("raw: " + m.getRaw()); } else { System.out.println("envelope body: " + m.getBody()); }}, 1);# publish a raw dict (no envelope)messaging.publish_raw("metrics/raw", {"value": 42})
# handle a possibly-raw messagedef handler(topic, m): if m.get_raw() is not None: print("raw:", m.get_raw()) else: print("envelope body:", m.get_body())
messaging.subscribe("metrics/raw", handler, 1)use serde_json::json;
// publish a raw JSON value (no envelope)messaging.publish_raw("metrics/raw", &json!({ "value": 42 })).await?;
// handle a possibly-raw messagemessaging.subscribe( "metrics/raw", message_handler(|_topic, msg| async move { match msg.get_raw() { Some(v) => tracing::info!(?v, "raw"), None => tracing::info!(body = ?msg.body, "envelope"), } }), 32, 1,).await?;// publish a raw value (no envelope)await messaging.publishRaw("metrics/raw", { value: 42 });
// handle a possibly-raw messageawait messaging.subscribe("metrics/raw", (_topic, m) => { if (m.isRaw()) { console.log("raw:", m.getRaw()); } else { console.log("envelope body:", m.getBody()); }}, 32, 1);Request/reply with correlation
Section titled “Request/reply with correlation”request publishes a message to a service topic, subscribes to a private reply topic, and gives you
a future that completes when the reply arrives. On the responder side, reply(request, reply)
copies the request’s correlation_id onto the reply and publishes it to the request’s reply_to
topic — so the requester’s future resolves to the right answer.
The future type differs per language: Java extends CompletableFuture<Message>, Python returns an
Iou whose get() yields a (done, result) tuple, Rust returns a Result<ReplyFuture> you
await, and TypeScript returns a ReplyFuture (PromiseLike) you await with the timeout passed as
an argument.
ReplyFuture extends CompletableFuture<Message>, so use the standard future API.
ReplyFuture future = messaging.request("svc/op", request);Message reply = future.get(5, TimeUnit.SECONDS);
// responder side:messaging.reply(request, replyMsg); // copies correlation id automaticallyrequest returns an Iou. Its get(timeout) returns a (done, result) tuple — unpack it.
iou = messaging.request("svc/op", request)done, reply = iou.get(timeout=5) # NOTE: get() returns a (done, result) tupleif done: print(reply.get_body())
# responder side:messaging.reply(request, reply_msg) # copies correlation id automaticallyrequest() returns Result<ReplyFuture> — await the call to get the future, then await the
future (wrap it in tokio::time::timeout for a deadline). Dropping the ReplyFuture unsubscribes
the ephemeral reply topic.
use std::time::Duration;
let fut = messaging.request("svc/op", request).await?;let reply = tokio::time::timeout(Duration::from_secs(5), fut).await??;
// responder side:responder.reply(&request, reply_msg).await?; // copies correlation id automaticallyrequest(topic, msg, timeoutMs?) returns a ReplyFuture you await; the timeout is the third
argument (default 0 = no timeout).
const reply = await messaging.request("svc/op", req, 5000); // 3rd arg = timeoutMsconsole.log(reply.getBody());
// responder side:await messaging.reply(req, replyMsg); // copies correlation id automaticallyTo abandon an in-flight request before it completes, call cancelRequest / cancel_request (Rust/TS
also expose .cancel() on the future), which tears down the reply subscription.
Per-subscription concurrency caps
Section titled “Per-subscription concurrency caps”Each subscription has two independent bounds:
maxConcurrency— the maximum number of handler invocations that run at once for that subscription. A value<= 0means uncapped (Java/Python skip the semaphore; Rust/TS clamp the value up to1). Use a low cap to serialize a handler that touches shared state; raise it to process bursts in parallel.maxMessages— the bounded in-memory queue depth per subscription. When the queue is full, new messages are dropped with a warning (the subscription is never blocked). The default is10000in Java/Python (DEFAULT_MAX_MESSAGES),32in TypeScript, and is caller-supplied in Rust.
Remember the argument-order divergence (see the warning above): Java/Python
take maxConcurrency first, Rust/TypeScript take maxMessages first.
Destination: local vs IoT Core, and QoS
Section titled “Destination: local vs IoT Core, and QoS”With the dual-MQTT transport, the plain methods (publish, subscribe, request, reply) target
the local broker. Mirror methods target AWS IoT Core directly and take a QoS:
| Local | IoT Core |
|---|---|
publish / publishRaw |
publishToIotCore / publishToIotCoreRaw |
subscribe |
subscribeToIoTCore |
request |
requestFromIoTCore |
reply |
replyToIoTCore |
unsubscribe |
unsubscribeFromIoTCore |
QoS has two levels — at-most-once and at-least-once — which the MQTT provider maps to QoS 0 and
1 respectively. The enum origin differs: Java and Python reuse the AWS SDK QOS enum
(AT_MOST_ONCE / AT_LEAST_ONCE), while Rust and TypeScript define their own Qos enum
(AtMostOnce / AtLeastOnce).
import software.amazon.awssdk.aws.greengrass.model.QOS;
messaging.publishToIotCore("cloud/telemetry", msg, QOS.AT_LEAST_ONCE);messaging.subscribeToIoTCore("cloud/cmd", (topic, m) -> handle(m), QOS.AT_LEAST_ONCE, 1);from awsiot.greengrasscoreipc.model import QOS
messaging.publish_to_iot_core("cloud/telemetry", msg, QOS.AT_LEAST_ONCE)messaging.subscribe_to_iot_core("cloud/cmd", handler, QOS.AT_LEAST_ONCE, 1)use ggcommons::messaging::Qos;
messaging.publish_to_iot_core("cloud/telemetry", &msg, Qos::AtLeastOnce).await?;messaging.subscribe_to_iot_core( "cloud/cmd", message_handler(|_t, m| async move { handle(m).await }), Qos::AtLeastOnce, 32, 1,).await?;import { Qos } from "ggcommons";
await messaging.publishToIotCore("cloud/telemetry", msg, Qos.AtLeastOnce);// subscribeToIotCore's qos arg defaults to Qos.AtLeastOnceawait messaging.subscribeToIotCore("cloud/cmd", (_t, m) => handle(m), Qos.AtLeastOnce, 32, 1);The dual-MQTT standalone config
Section titled “The dual-MQTT standalone config”On the HOST platform (and for local testing) the MQTT transport reads a JSON messaging section
that describes the brokers. Supply it either as the positional argument to
--transport MQTT <messaging_config.json> or from the active config source (-c).
{ "messaging": { "local": { "host": "localhost", "port": 8883, "clientId": "my-component", "credentials": { "caPath": "/certs/ca.crt", "certPath": "/certs/client.crt", "keyPath": "/certs/client.key" } }, "iotCore": { "endpoint": "xxxx-ats.iot.us-east-1.amazonaws.com", "port": 8883, "clientId": "my-component", "credentials": { "caPath": "/certs/AmazonRootCA1.pem", "certPath": "/certs/device.pem.crt", "keyPath": "/certs/private.pem.key" } } }}Key rules (all verified, identical across the four languages):
iotCoreis optional. Omit it for a local-only (“single-broker”) deployment — the IoT Core mirror methods are simply unused.- Local-broker TLS is keyed on
caPathpresence. If the localcredentials.caPathis set, the connection uses TLS; if it is absent, the connection is plaintext.caPathonly ⇒ server-only TLS.caPath+certPath+keyPath⇒ mutual TLS.- A client
certPath/keyPathwithout acaPathstays plaintext (the cert is ignored). - Username/password auth is independent of TLS for the local broker.
- IoT Core requires full mutual-TLS credentials (
caPath+certPath+keyPath) or it refuses to connect. hostis an opaque string — a KubernetesServiceDNS name works unchanged.local.typeis optional and ignored (it defaults to"mqtt"where read).
Testing: the service-interface seam
Section titled “Testing: the service-interface seam”Rust and TypeScript expose a substitutable seam so you can inject a fake transport in tests
without a broker. Java and Python do not — they have concrete clients only, so you test against
the real MessagingClient (point it at a local EMQX broker, or mock at the provider boundary).
No service interface or DI. Test against the concrete MessagingClient — typically against a local
broker. Because MessagingClient uses process-global state, reset/close it between tests so state
does not leak.
// no substitutable seam — exercise the real client against a local brokerMessagingClient messaging = gg.getMessaging();No service interface or DI. Test against the concrete static MessagingClient (against a local
broker), and reset its process-global state between tests.
# no substitutable seam — exercise the real client against a local brokermessaging = gg.get_messaging()The user-facing type is the MessagingService trait, injected as Arc<dyn MessagingService>. In a
test, construct DefaultMessagingService::new(provider) with a fake MessagingProvider, or supply
your own Arc<dyn MessagingService>.
use std::sync::Arc;use ggcommons::messaging::{DefaultMessagingService, MessagingService};
let svc: Arc<dyn MessagingService> = Arc::new(DefaultMessagingService::new(fake_provider));svc.publish("test/topic", &msg).await?;The user-facing type is the IMessagingService interface. In a test, construct
new DefaultMessagingService(provider) with a fake MessagingProvider, or pass any object
implementing IMessagingService.
import { DefaultMessagingService } from "ggcommons";
const svc: IMessagingService = new DefaultMessagingService(fakeProvider);await svc.publish("test/topic", msg);