Skip to content

Messaging

The messaging subsystem gives your component one interface over two transports: Greengrass IPC (--transport IPC) on a Nucleus-managed device, or a dual-MQTT provider (--transport MQTT) that connects to a local broker and, optionally, AWS IoT Core. The same publish/subscribe, request/reply, and message envelope work identically across Java, Python, Rust, and TypeScript, so components written in any language interoperate on the same topics.

Connections and subscriptions block until confirmed — when publish or subscribe returns, the broker has acknowledged it.

Every component reaches messaging through its GGCommons handle. The accessor name and return shape differ slightly per language.

getMessaging() returns a MessagingClient instance — its methods are instance methods.

MessagingClient messaging = gg.getMessaging();

Every message GGCommons sends on the wire is a JSON object with three parts. The shape and key names are identical across all four languages so they interoperate.

{
"header": {
"name": "TemperatureReading",
"version": "1.0",
"timestamp": 1719500000000,
"uuid": "5f6c...",
"correlation_id": "a1b2...",
"reply_to": "ggcommons/reply-..."
},
"tags": { "thing": "lab-5950x", "site": "lab" },
"body": { "celsius": 22.4 }
}
  • header — message metadata. Keys are snake_case on the wire (correlation_id, reply_to, uuid). reply_to is omitted when there is no reply topic; correlation_id/uuid/timestamp are filled in automatically if you do not set them.
  • tags — routing/source metadata. The Thing name is flattened to a "thing" key, and is only emitted when set; any custom tags you add appear alongside it.
  • body — your payload. Any JSON value.

A raw message has no envelope — it serializes as { "raw": <value> } instead. When a component receives a payload that is not an envelope (no header/tags/body), or non-JSON bytes, it is delivered to your handler as a raw message (non-JSON bytes become a raw string). See Raw messages below.

Construct messages with MessageBuilder — not the raw constructors. The builder stamps the uuid, correlation_id, and timestamp for you.

Message msg = MessageBuilder.create("TemperatureReading", "1.0")
.withPayload(payload) // JsonObject, Map, POJO, or List
.withConfig(configManager) // REQUIRED in Java
.build();

publish sends an envelope to a topic. subscribe registers a handler that receives the topic and the decoded Message for each match. Topic filters use the usual MQTT wildcards (+ single level, # multi level).

// subscribe — 3rd arg is maxConcurrency. The callback is a BiConsumer<String,Message>.
messaging.subscribe("sensors/temp", (topic, m) -> {
System.out.println(m.getHeader().getName() + " -> " + m.getBody());
}, 1);
// publish
messaging.publish("sensors/temp", msg);

Use the *Raw publish methods to send an unwrapped JSON value (for example to interoperate with a non-GGCommons producer). On the receiving side, you do not call a separate “subscribe raw” — your normal subscribe handler receives a raw Message whenever the incoming payload is not an envelope, and you detect it with the raw accessors.

// publish a raw JSON object (no envelope)
JsonObject metric = new JsonObject();
metric.addProperty("value", 42);
messaging.publishRaw("metrics/raw", metric);
// handle a possibly-raw message
messaging.subscribe("metrics/raw", (topic, m) -> {
if (m.getRaw() != null) {
System.out.println("raw: " + m.getRaw());
} else {
System.out.println("envelope body: " + m.getBody());
}
}, 1);

request publishes a message to a service topic, subscribes to a private reply topic, and gives you a future that completes when the reply arrives. On the responder side, reply(request, reply) copies the request’s correlation_id onto the reply and publishes it to the request’s reply_to topic — so the requester’s future resolves to the right answer.

The future type differs per language: Java extends CompletableFuture<Message>, Python returns an Iou whose get() yields a (done, result) tuple, Rust returns a Result<ReplyFuture> you await, and TypeScript returns a ReplyFuture (PromiseLike) you await with the timeout passed as an argument.

ReplyFuture extends CompletableFuture<Message>, so use the standard future API.

ReplyFuture future = messaging.request("svc/op", request);
Message reply = future.get(5, TimeUnit.SECONDS);
// responder side:
messaging.reply(request, replyMsg); // copies correlation id automatically

To abandon an in-flight request before it completes, call cancelRequest / cancel_request (Rust/TS also expose .cancel() on the future), which tears down the reply subscription.

Each subscription has two independent bounds:

  • maxConcurrency — the maximum number of handler invocations that run at once for that subscription. A value <= 0 means uncapped (Java/Python skip the semaphore; Rust/TS clamp the value up to 1). Use a low cap to serialize a handler that touches shared state; raise it to process bursts in parallel.
  • maxMessages — the bounded in-memory queue depth per subscription. When the queue is full, new messages are dropped with a warning (the subscription is never blocked). The default is 10000 in Java/Python (DEFAULT_MAX_MESSAGES), 32 in TypeScript, and is caller-supplied in Rust.

Remember the argument-order divergence (see the warning above): Java/Python take maxConcurrency first, Rust/TypeScript take maxMessages first.

With the dual-MQTT transport, the plain methods (publish, subscribe, request, reply) target the local broker. Mirror methods target AWS IoT Core directly and take a QoS:

Local IoT Core
publish / publishRaw publishToIotCore / publishToIotCoreRaw
subscribe subscribeToIoTCore
request requestFromIoTCore
reply replyToIoTCore
unsubscribe unsubscribeFromIoTCore

QoS has two levels — at-most-once and at-least-once — which the MQTT provider maps to QoS 0 and 1 respectively. The enum origin differs: Java and Python reuse the AWS SDK QOS enum (AT_MOST_ONCE / AT_LEAST_ONCE), while Rust and TypeScript define their own Qos enum (AtMostOnce / AtLeastOnce).

import software.amazon.awssdk.aws.greengrass.model.QOS;
messaging.publishToIotCore("cloud/telemetry", msg, QOS.AT_LEAST_ONCE);
messaging.subscribeToIoTCore("cloud/cmd", (topic, m) -> handle(m), QOS.AT_LEAST_ONCE, 1);

On the HOST platform (and for local testing) the MQTT transport reads a JSON messaging section that describes the brokers. Supply it either as the positional argument to --transport MQTT <messaging_config.json> or from the active config source (-c).

{
"messaging": {
"local": {
"host": "localhost",
"port": 8883,
"clientId": "my-component",
"credentials": {
"caPath": "/certs/ca.crt",
"certPath": "/certs/client.crt",
"keyPath": "/certs/client.key"
}
},
"iotCore": {
"endpoint": "xxxx-ats.iot.us-east-1.amazonaws.com",
"port": 8883,
"clientId": "my-component",
"credentials": {
"caPath": "/certs/AmazonRootCA1.pem",
"certPath": "/certs/device.pem.crt",
"keyPath": "/certs/private.pem.key"
}
}
}
}

Key rules (all verified, identical across the four languages):

  • iotCore is optional. Omit it for a local-only (“single-broker”) deployment — the IoT Core mirror methods are simply unused.
  • Local-broker TLS is keyed on caPath presence. If the local credentials.caPath is set, the connection uses TLS; if it is absent, the connection is plaintext.
    • caPath only ⇒ server-only TLS.
    • caPath + certPath + keyPathmutual TLS.
    • A client certPath/keyPath without a caPath stays plaintext (the cert is ignored).
    • Username/password auth is independent of TLS for the local broker.
  • IoT Core requires full mutual-TLS credentials (caPath + certPath + keyPath) or it refuses to connect.
  • host is an opaque string — a Kubernetes Service DNS name works unchanged.
  • local.type is optional and ignored (it defaults to "mqtt" where read).

Rust and TypeScript expose a substitutable seam so you can inject a fake transport in tests without a broker. Java and Python do not — they have concrete clients only, so you test against the real MessagingClient (point it at a local EMQX broker, or mock at the provider boundary).

No service interface or DI. Test against the concrete MessagingClient — typically against a local broker. Because MessagingClient uses process-global state, reset/close it between tests so state does not leak.

// no substitutable seam — exercise the real client against a local broker
MessagingClient messaging = gg.getMessaging();