Skip to content

Messaging API

This is the API reference for the messaging subsystem: every public type and method signature, per language. For the conceptual walkthrough (transports, the wire envelope, concurrency caps, the broker config rules), read the Messaging guide first — this page is the exhaustive surface.

All signatures are verified against source. Java is canonical; Python, Rust, and TypeScript mirror it. The same publish/subscribe, request/reply, and message envelope work identically across all four languages, so components written in any language interoperate on the same topics.

How a component reaches the messaging service from its GGCommons handle.

Returns a MessagingClient instance (instance methods).

MessagingClient getMessaging(); // GGCommons.getMessaging()

The primary type carrying all publish/subscribe/request/reply methods. Plain methods target the local broker (or Greengrass IPC); the *ToIotCore / *FromIoTCore mirror methods target AWS IoT Core directly and take a QoS.

MessagingClient — instance methods. Built internally via MessagingClientBuilder.create(parsedCommandLine).withReceiveOwnMessages(boolean).build() (receiveOwnMessages defaults to true); you normally obtain it from gg.getMessaging(). The IoT-Core method casing is intentionally inconsistent: publishToIotCore / publishToIotCoreRaw use “Iot”, the rest use “IoT”.

public static final int DEFAULT_MAX_MESSAGES = 10_000;
void publish(String topic, Message msg);
void publishToIotCore(String topic, Message msg, QOS qos); // note "Iot"
void publishRaw(String topic, JsonObject metricObject);
void publishToIotCoreRaw(String topic, JsonObject metricObject, QOS qos); // note "Iot"
void subscribe(String topicFilter, BiConsumer<String,Message> callback);
void subscribe(String topicFilter, BiConsumer<String,Message> callback, int maxConcurrency);
void subscribe(String topicFilter, BiConsumer<String,Message> callback, int maxConcurrency, int maxMessages);
void subscribeToIoTCore(String topicFilter, BiConsumer<String,Message> callback, QOS qos);
void subscribeToIoTCore(String topicFilter, BiConsumer<String,Message> callback, QOS qos, int maxConcurrency);
void subscribeToIoTCore(String topicFilter, BiConsumer<String,Message> callback, QOS qos, int maxConcurrency, int maxMessages);
ReplyFuture request(String topic, Message request);
ReplyFuture requestFromIoTCore(String topic, Message request);
void cancelRequest(ReplyFuture replyFuture);
void cancelRequestFromIoTCore(ReplyFuture replyFuture);
void reply(Message request, Message reply);
void replyToIoTCore(Message request, Message reply);
void unsubscribe(String topicFilter);
void unsubscribeFromIoTCore(String topicFilter);
static boolean topicMatchesFilter(String topicFilter, String topic);
boolean connected();
void close();
Object getNativeLocalClient();
Object getNativeIotCoreClient();

The 3-arg subscribe overload’s int is maxConcurrency, NOT maxMessages (the 4-arg overload adds maxMessages last).

  • maxConcurrency — maximum concurrent handler invocations for the subscription. A value <= 0 is uncapped (Java/Python skip the semaphore; Rust/TS clamp up to 1).
  • maxMessages — bounded in-memory queue depth per subscription; on overflow, new messages are dropped with a warning (the subscription never blocks). Default 10000 in Java/Python (DEFAULT_MAX_MESSAGES), 32 in TypeScript, caller-supplied in Rust.
  • connected() reports the local broker link only (an IoT Core outage does not flip it); it backs /readyz, never /livez.

The on-wire shape is identical across all four languages: { "header": {...}, "tags": {...}, "body": <any> }, or { "raw": <value> } for a raw message. Header keys are snake_case; reply_to and an empty thing are omitted. A received payload with no header/tags/body (or non-JSON bytes) is delivered as a raw message; non-JSON bytes become a raw string.

Message (package-private fields MessageHeader header; MessageTags tags; Object body; Object raw;).

JsonObject toDict();
String getCorrelationId(); // lazily generates a UUID if null
MessageHeader getHeader();
MessageTags getTags();
void injectTag(String key, String value);
Object getBody();
Object getRaw();
String makeRequest(); // also: makeRequest(String replyTo)
void setCorrelationId(String id);
// Deprecated static factories — prefer MessageBuilder:
static Message buildFromConfig(...); // @Deprecated
static Message build(Object contents); // @Deprecated

A non-JsonElement payload (Map/POJO/List) passed to the builder is converted via the Gson reflective tree, so withPayload(Map) works.

The header carries metadata; tags carries routing/source metadata (the Thing name is flattened to a "thing" key, emitted only when set). The reply-topic prefix is the literal string ggcommons/reply- (trailing -, not /) in all four languages.

class MessageHeader {
String name; String version; long timestamp;
String correlationId; String uuid; String replyTo;
static final String REPLY_MESSAGE_TOPIC_PREFIX = "ggcommons/reply-";
JsonObject toDict(); // emits name, version, timestamp, uuid, correlation_id, [reply_to]
}
class MessageTags {
String thingName; JsonObject tags;
JsonObject toDict(); // flattens tags; adds "thing" only when non-null
static MessageTags fromConfig(ConfigManager config);
void injectTag(String key, String value);
}

Construct messages with the builder, not raw constructors — it stamps uuid, correlation_id, and timestamp for you.

static MessageBuilder create(String name, String version);
static Message fromObject(Object msgContents); // classifies dict→envelope vs raw
MessageBuilder withCorrelationId(String id);
MessageBuilder withPayload(Object payload); // JsonObject / Map / POJO / List
MessageBuilder withConfig(ConfigManager config); // REQUIRED before build()
Message build(); // throws IllegalStateException if withConfig() not called

A string payload that parses as JSON is stored as the parsed object; otherwise it is stored as a raw string. The companion MessageHeaderBuilder.create(name, version) offers .withCorrelationId/.withTimestamp/.withUuid/.withReplyTo and .build().

request returns a future that resolves when the matching reply arrives. The responder’s reply(request, reply) copies the request’s correlation_id onto the reply before publishing it to the request’s reply_to topic. The future type differs per language.

ReplyFuture extends CompletableFuture<Message> — use the standard future API (get(), orTimeout(), thenAccept()). It exposes a public String replyTopic; field.

public class ReplyFuture extends CompletableFuture<Message> {
public String replyTopic;
}
// usage:
ReplyFuture future = messaging.request("svc/op", request);
Message reply = future.get(5, TimeUnit.SECONDS);

Two QoS levels — at-most-once and at-least-once — which the MQTT provider maps to QoS 0 and 1. Java and Python reuse the AWS SDK enum; Rust and TypeScript define their own. Destination exists in Rust/TS as a Local / IotCore selector used by the lower-level provider; Java/Python do not expose an equivalent enum (they have the per-destination method pairs instead).

QOS is the AWS SDK type software.amazon.awssdk.aws.greengrass.model.QOS, values AT_MOST_ONCE / AT_LEAST_ONCE. There is no separate Destination enum — use the plain vs *ToIotCore method pairs.

import software.amazon.awssdk.aws.greengrass.model.QOS;
QOS.AT_MOST_ONCE; // mapped to MQTT QoS 0
QOS.AT_LEAST_ONCE; // mapped to MQTT QoS 1

Below the client/service sits a transport MessagingProvider. Rust and TypeScript expose a substitutable seam — the MessagingService trait / IMessagingService interface plus a swappable provider — so you can inject a fake transport in tests. Java and Python have concrete clients only (no service interface, no DI); the two providers are internal.

No service interface or DI. The abstract base MessagingProvider has two concrete subclasses — GreengrassMessagingProvider (IPC) and StandaloneMessagingProvider (dual-MQTT). There is no single-client MqttProvider class. Test against the concrete MessagingClient (point it at a local broker; reset its process-global state between tests).

With the MQTT transport, the broker config is a JSON messaging section with a local block and an optional iotCore block. The wire shape is identical across the four languages (each maps it to its own config type — Java MessagingConfiguration, Python messaging_config.py dataclasses, Rust config.rs, TS config.ts).

{
"messaging": {
"local": {
"host": "localhost",
"port": 8883,
"clientId": "my-component",
"credentials": {
"caPath": "/certs/ca.crt",
"certPath": "/certs/client.crt",
"keyPath": "/certs/client.key",
"username": "optional-user",
"password": "optional-pass"
}
},
"iotCore": {
"endpoint": "xxxx-ats.iot.us-east-1.amazonaws.com",
"port": 8883,
"clientId": "my-component",
"credentials": {
"caPath": "/certs/AmazonRootCA1.pem",
"certPath": "/certs/device.pem.crt",
"keyPath": "/certs/private.pem.key"
}
}
}
}

Field reference (verified, identical across all four languages):

  • localhost (string), port (int), clientId (string), optional credentials.
  • iotCoreendpoint (string), port (int), clientId (string), required credentials. The whole iotCore block is optional: omit it for a local-only (“single-broker”) deployment.
  • credentialscaPath, certPath, keyPath, and optional username / password.

Rules:

  • Local-broker TLS is keyed on caPath presence. caPath only ⇒ server-only TLS; caPath + certPath + keyPath ⇒ mutual TLS; a certPath/keyPath without caPath stays plaintext. Username/password auth is independent of TLS.
  • IoT Core requires full mutual-TLS credentials (caPath + certPath + keyPath) or it refuses to connect.
  • host is an opaque string — a Kubernetes Service DNS name works unchanged.
  • local.type is optional and ignored (defaults to "mqtt" where read).

A minimal publish, subscribe, and request/reply round trip in each language.

MessagingClient messaging = gg.getMessaging();
// subscribe — 3rd arg is maxConcurrency
messaging.subscribe("itest/pubsub", (topic, m) -> {
System.out.println(m.getHeader().getName() + " -> " + m.getBody());
}, 1);
// publish
Message msg = MessageBuilder.create("Hello", "1.0")
.withPayload(payload) // JsonObject / Map / POJO
.withConfig(configManager) // REQUIRED in Java
.build();
messaging.publish("itest/pubsub", msg);
// request/reply — ReplyFuture extends CompletableFuture<Message>
ReplyFuture future = messaging.request("itest/request", request);
Message reply = future.get(5, TimeUnit.SECONDS);
// responder side:
messaging.reply(request, replyMsg); // copies correlation id