Messaging API
This is the API reference for the messaging subsystem: every public type and method signature, per language. For the conceptual walkthrough (transports, the wire envelope, concurrency caps, the broker config rules), read the Messaging guide first — this page is the exhaustive surface.
All signatures are verified against source. Java is canonical; Python, Rust, and TypeScript mirror it. The same publish/subscribe, request/reply, and message envelope work identically across all four languages, so components written in any language interoperate on the same topics.
Accessor
Section titled “Accessor”How a component reaches the messaging service from its GGCommons handle.
Returns a MessagingClient instance (instance methods).
MessagingClient getMessaging(); // GGCommons.getMessaging()Returns the MessagingClient class — its methods are @staticmethod, called on the class.
def get_messaging() -> MessagingClient: ... # GGCommons.get_messaging()Returns Result<Arc<dyn MessagingService>> — errors if no transport was wired.
fn messaging(&self) -> Result<Arc<dyn MessagingService>>; // GgCommons::messaging()Returns an IMessagingService — throws if no transport was wired.
messaging(): IMessagingService; // GgCommons.messaging()The messaging client / service
Section titled “The messaging client / service”The primary type carrying all publish/subscribe/request/reply methods. Plain methods target the
local broker (or Greengrass IPC); the *ToIotCore / *FromIoTCore mirror methods target AWS
IoT Core directly and take a QoS.
MessagingClient — instance methods. Built internally via
MessagingClientBuilder.create(parsedCommandLine).withReceiveOwnMessages(boolean).build()
(receiveOwnMessages defaults to true); you normally obtain it from gg.getMessaging(). The IoT-Core method casing is intentionally
inconsistent: publishToIotCore / publishToIotCoreRaw use “Iot”, the rest use “IoT”.
public static final int DEFAULT_MAX_MESSAGES = 10_000;
void publish(String topic, Message msg);void publishToIotCore(String topic, Message msg, QOS qos); // note "Iot"void publishRaw(String topic, JsonObject metricObject);void publishToIotCoreRaw(String topic, JsonObject metricObject, QOS qos); // note "Iot"
void subscribe(String topicFilter, BiConsumer<String,Message> callback);void subscribe(String topicFilter, BiConsumer<String,Message> callback, int maxConcurrency);void subscribe(String topicFilter, BiConsumer<String,Message> callback, int maxConcurrency, int maxMessages);void subscribeToIoTCore(String topicFilter, BiConsumer<String,Message> callback, QOS qos);void subscribeToIoTCore(String topicFilter, BiConsumer<String,Message> callback, QOS qos, int maxConcurrency);void subscribeToIoTCore(String topicFilter, BiConsumer<String,Message> callback, QOS qos, int maxConcurrency, int maxMessages);
ReplyFuture request(String topic, Message request);ReplyFuture requestFromIoTCore(String topic, Message request);void cancelRequest(ReplyFuture replyFuture);void cancelRequestFromIoTCore(ReplyFuture replyFuture);void reply(Message request, Message reply);void replyToIoTCore(Message request, Message reply);
void unsubscribe(String topicFilter);void unsubscribeFromIoTCore(String topicFilter);
static boolean topicMatchesFilter(String topicFilter, String topic);boolean connected();void close();Object getNativeLocalClient();Object getNativeIotCoreClient();The 3-arg subscribe overload’s int is maxConcurrency, NOT maxMessages (the 4-arg overload
adds maxMessages last).
MessagingClient — every method is a @staticmethod (call on the class returned by
get_messaging()). receive_own_messages defaults to False here (Java’s default is True).
def init(args, standalone_config_path=None, receive_own_messages=False) -> MessagingProvider: ...def shutdown() -> None: ... # idempotentdef get_messaging_provider(): ...def connected() -> bool: ... # never raises
def publish(topic: str, msg: Message) -> None: ...def publish_raw(topic: str, msg: dict) -> None: ...def publish_to_iot_core(topic: str, msg: Message, qos: str) -> None: ...def publish_to_iot_core_raw(topic: str, msg: dict, qos: str) -> None: ...
def subscribe(topic, callback, max_concurrency=None, max_messages=None) -> None: ...def subscribe_to_iot_core(topic, callback, qos, max_concurrency=None, max_messages=None) -> None: ...
def unsubscribe(topic) -> None: ...def unsubscribe_from_iot_core(topic) -> None: ...
def request(topic, msg) -> Iou: ...def request_from_iot_core(topic, msg) -> Iou: ...def cancel_request(iou) -> Iou: ...def cancel_request_from_iot_core(iou) -> Iou: ...def reply(request, reply) -> None: ...def reply_to_iot_core(request, reply) -> None: ...
def topic_matches_sub(sub, topic) -> bool: ...def get_native_client(): ... # -> {'local': ..., 'iot_core': ...}The 3rd positional arg of subscribe is max_concurrency (matching Java); max_messages is
4th. There is a single get_native_client() (returning a {'local', 'iot_core'} dict), not
separate local/IoT-Core getters.
The user-facing seam is the MessagingService trait, obtained as Arc<dyn MessagingService>. It is
an async_trait; everything is async except cancel_request* and connected.
#[async_trait]pub trait MessagingService: Send + Sync { async fn publish(&self, topic: &str, msg: &Message) -> Result<()>; async fn publish_to_iot_core(&self, topic: &str, msg: &Message, qos: Qos) -> Result<()>; async fn publish_raw(&self, topic: &str, payload: &Value) -> Result<()>; async fn publish_to_iot_core_raw(&self, topic: &str, payload: &Value, qos: Qos) -> Result<()>;
// NOTE: max_messages THEN max_concurrency (opposite order from Java/Python) async fn subscribe(&self, filter: &str, handler: Arc<dyn MessageHandler>, max_messages: usize, max_concurrency: usize) -> Result<()>; async fn subscribe_to_iot_core(&self, filter: &str, handler: Arc<dyn MessageHandler>, qos: Qos, max_messages: usize, max_concurrency: usize) -> Result<()>;
async fn unsubscribe(&self, filter: &str) -> Result<()>; async fn unsubscribe_from_iot_core(&self, filter: &str) -> Result<()>;
async fn request(&self, topic: &str, msg: Message) -> Result<ReplyFuture>; async fn request_from_iot_core(&self, topic: &str, msg: Message) -> Result<ReplyFuture>; async fn reply(&self, request: &Message, reply: Message) -> Result<()>; async fn reply_to_iot_core(&self, request: &Message, reply: Message) -> Result<()>;
fn cancel_request(&self, reply_future: ReplyFuture); fn cancel_request_from_iot_core(&self, reply_future: ReplyFuture);
fn connected(&self) -> bool;}subscribe takes max_messages first, then max_concurrency — the opposite order from
Java/Python. The default implementation is DefaultMessagingService::new(provider: Arc<dyn MessagingProvider>) -> Self.
A subscription handler must be an Arc<dyn MessageHandler>. Wrap a closure with the
message_handler helper — a bare closure is not accepted:
pub trait MessageHandler: Send + Sync { async fn handle(&self, topic: String, message: Message);}
pub fn message_handler<F, Fut>(f: F) -> Arc<dyn MessageHandler>where /* F: Fn(String, Message) -> Fut, Fut: Future<Output = ()> */;
// usage:message_handler(|topic, msg| async move { /* ... */ })A free function topic_matches(filter, topic) -> bool is exported from the module.
The user-facing seam is the IMessagingService interface, implemented by DefaultMessagingService.
Construct it with new DefaultMessagingService(provider: MessagingProvider).
interface IMessagingService { publish(topic: string, msg: Message): Promise<void>; publishToIotCore(topic: string, msg: Message, qos?: Qos): Promise<void>; publishRaw(topic: string, payload: unknown): Promise<void>; publishToIotCoreRaw(topic: string, payload: unknown, qos?: Qos): Promise<void>;
// NOTE: maxMessages THEN maxConcurrency; defaults 32 and 1 subscribe(filter: string, handler: MessageHandler, maxMessages?: number, maxConcurrency?: number): Promise<void>; subscribeToIotCore(filter: string, handler: MessageHandler, qos?: Qos, maxMessages?: number, maxConcurrency?: number): Promise<void>;
unsubscribe(filter: string): Promise<void>; unsubscribeFromIotCore(filter: string): Promise<void>;
request(topic: string, msg: Message, timeoutMs?: number): ReplyFuture; // synchronous return requestFromIotCore(topic: string, msg: Message, timeoutMs?: number): ReplyFuture; reply(request: Message, reply: Message): Promise<void>; replyToIotCore(request: Message, reply: Message): Promise<void>;
cancelRequest(reply: ReplyFuture): void; cancelRequestFromIotCore(reply: ReplyFuture): void;
connected(): boolean;}
type MessageHandler = (topic: string, message: Message) => void | Promise<void>;subscribe takes maxMessages first, then maxConcurrency (defaults 32 and 1).
request returns the ReplyFuture synchronously and takes the timeout as an argument
(timeoutMs default 0 = no timeout). subscribeToIotCore’s qos defaults to Qos.AtLeastOnce;
the Java/Python IoT-Core methods require an explicit QoS.
Concurrency and queue semantics
Section titled “Concurrency and queue semantics”maxConcurrency— maximum concurrent handler invocations for the subscription. A value<= 0is uncapped (Java/Python skip the semaphore; Rust/TS clamp up to1).maxMessages— bounded in-memory queue depth per subscription; on overflow, new messages are dropped with a warning (the subscription never blocks). Default10000in Java/Python (DEFAULT_MAX_MESSAGES),32in TypeScript, caller-supplied in Rust.connected()reports the local broker link only (an IoT Core outage does not flip it); it backs/readyz, never/livez.
The Message envelope
Section titled “The Message envelope”The on-wire shape is identical across all four languages: { "header": {...}, "tags": {...}, "body": <any> },
or { "raw": <value> } for a raw message. Header keys are snake_case; reply_to and an empty
thing are omitted. A received payload with no header/tags/body (or non-JSON bytes) is
delivered as a raw message; non-JSON bytes become a raw string.
Message (package-private fields MessageHeader header; MessageTags tags; Object body; Object raw;).
JsonObject toDict();String getCorrelationId(); // lazily generates a UUID if nullMessageHeader getHeader();MessageTags getTags();void injectTag(String key, String value);Object getBody();Object getRaw();String makeRequest(); // also: makeRequest(String replyTo)void setCorrelationId(String id);
// Deprecated static factories — prefer MessageBuilder:static Message buildFromConfig(...); // @Deprecatedstatic Message build(Object contents); // @DeprecatedA non-JsonElement payload (Map/POJO/List) passed to the builder is converted via the Gson
reflective tree, so withPayload(Map) works.
Message is a @dataclass(header=None, tags=None, body=None, raw=None).
def to_dict(self) -> dict: ...def __str__(self) -> str: ...def dumps(self) -> str: ...def get_correlation_id(self): ...def get_header(self) -> MessageHeader: ...def get_tags(self) -> MessageTags: ...def get_body(self): ...def get_raw(self): ...def get_source(self): ... # alias of get_tagsdef get_payload(self): ... # alias of get_bodydef inject_tag(self, key, value): ...def make_request(self): ...def set_correlation_id(self, id): ...
@staticmethoddef from_object(msg_contents) -> "Message": ...pub struct Message { pub header: MessageHeader, pub tags: MessageTags, pub body: Value, pub raw: Option<Value>,}
impl Message { pub fn raw(value: Value) -> Self; pub fn is_raw(&self) -> bool; pub fn get_raw(&self) -> Option<&Value>; pub fn to_vec(&self) -> Result<Vec<u8>>; pub fn from_slice(bytes: &[u8]) -> Result<Message>; // non-JSON → raw string pub fn correlation_id(&self) -> &str;}Custom Serialize/Deserialize: a raw message serializes as {"raw": ..}, otherwise as
{"header", "tags", "body"}.
class Message { header: MessageHeader; tags: MessageTags; body: unknown;
static envelope(header: MessageHeader, tags: MessageTags, body: unknown): Message; static raw(value: unknown): Message; static fromObject(value: unknown): Message; static fromWire(data: Buffer | string): Message; // non-JSON → raw string
isRaw(): boolean; getRaw(): unknown; getBody(): unknown; getCorrelationId(): string; getReplyTo(): string | undefined; toObject(): unknown; toJSON(): string;}MessageHeader and MessageTags
Section titled “MessageHeader and MessageTags”The header carries metadata; tags carries routing/source metadata (the Thing name is flattened to
a "thing" key, emitted only when set). The reply-topic prefix is the literal string
ggcommons/reply- (trailing -, not /) in all four languages.
class MessageHeader { String name; String version; long timestamp; String correlationId; String uuid; String replyTo; static final String REPLY_MESSAGE_TOPIC_PREFIX = "ggcommons/reply-"; JsonObject toDict(); // emits name, version, timestamp, uuid, correlation_id, [reply_to]}
class MessageTags { String thingName; JsonObject tags; JsonObject toDict(); // flattens tags; adds "thing" only when non-null static MessageTags fromConfig(ConfigManager config); void injectTag(String key, String value);}@dataclassclass MessageHeader: name: str version: str correlation_id: Optional[str] = None timestamp: Optional[int] = None uuid: Optional[str] = None reply_to: Optional[str] = None REPLY_MESSAGE_TOPIC_PREFIX = "ggcommons/reply-" # __post_init__ fills timestamp/correlation_id/uuid def to_dict(self) -> dict: ... # snake_case: name, version, timestamp, uuid, correlation_id, [reply_to]
@dataclassclass MessageTags: thing_name: Optional[str] = None tags: Optional[dict] = None def to_dict(self) -> dict: ... # adds "thing" only when thing_name setpub struct MessageHeader { pub name: String, pub version: String, pub timestamp: i64, pub correlation_id: String, pub uuid: String, pub reply_to: Option<String>, // serde skips when None}
pub struct MessageTags { pub thing_name: String, // serde rename "thing", skipped if empty pub extra: BTreeMap<String, Value>, // serde flatten}interface MessageHeader { name: string; version: string; timestamp: number; correlation_id: string; uuid: string; reply_to?: string;}
type MessageTags = Record<string, unknown>;MessageBuilder
Section titled “MessageBuilder”Construct messages with the builder, not raw constructors — it stamps uuid, correlation_id, and
timestamp for you.
static MessageBuilder create(String name, String version);static Message fromObject(Object msgContents); // classifies dict→envelope vs raw
MessageBuilder withCorrelationId(String id);MessageBuilder withPayload(Object payload); // JsonObject / Map / POJO / ListMessageBuilder withConfig(ConfigManager config); // REQUIRED before build()Message build(); // throws IllegalStateException if withConfig() not calledA string payload that parses as JSON is stored as the parsed object; otherwise it is stored as a raw
string. The companion MessageHeaderBuilder.create(name, version) offers
.withCorrelationId/.withTimestamp/.withUuid/.withReplyTo and .build().
@staticmethoddef create(name, version) -> MessageBuilder: ...@staticmethoddef from_object(msg_contents) -> Message: ...
def with_correlation_id(self, id): ...def with_payload(self, body): ...def with_config(self, config): ...def with_tags(self, tags): ... # alternative to with_configdef with_uuid(self, uuid): ...def with_timestamp(self, ts): ...def with_reply_to(self, topic): ...def build(self) -> Message: ... # raises ValueError if BOTH config and tags are NoneDifferent method names — new (not create), payload (not with_payload), from_config
(not with_config):
impl MessageBuilder { pub fn new(name: &str, version: &str) -> Self; // stamps uuid/correlation_id/timestamp pub fn payload(self, body: Value) -> Self; pub fn correlation_id(self, id: &str) -> Self; pub fn reply_to(self, topic: &str) -> Self; pub fn thing_name(self, thing: &str) -> Self; pub fn tag(self, key: &str, value: Value) -> Self; pub fn from_config(self, config: &Config) -> Self; pub fn build(self) -> Message; // never requires config}class MessageBuilder { static create(name: string, version: string): MessageBuilder; withPayload(body: unknown): this; withTags(tags: MessageTags): this; withTag(key: string, value: unknown): this; withThingName(thing: string): this; withConfig(config: { thingName: string; parsed: { tags: MessageTags } }): this; // optional withCorrelationId(id: string): this; withReplyTo(topic: string): this; build(): Message; // never requires config}Request/reply future
Section titled “Request/reply future”request returns a future that resolves when the matching reply arrives. The responder’s
reply(request, reply) copies the request’s correlation_id onto the reply before publishing it to
the request’s reply_to topic. The future type differs per language.
ReplyFuture extends CompletableFuture<Message> — use the standard future API (get(),
orTimeout(), thenAccept()). It exposes a public String replyTopic; field.
public class ReplyFuture extends CompletableFuture<Message> { public String replyTopic;}
// usage:ReplyFuture future = messaging.request("svc/op", request);Message reply = future.get(5, TimeUnit.SECONDS);request returns an Iou. Its get(timeout) returns a (done, result) tuple — not the reply
directly. timeout < 0 blocks forever; on timeout it returns (False, self).
class Iou: def get(self, timeout=-1) -> Tuple[bool, Any]: ... # (done, result) def set_result(self, result) -> None: ... def done(self) -> bool: ... def get_user_data(self): ...
# usage:iou = messaging.request("svc/op", request)done, reply = iou.get(timeout=5)request() returns Result<ReplyFuture>; await the call to get the future, then await the
future (wrap in tokio::time::timeout for a deadline). Dropping the ReplyFuture unsubscribes the
ephemeral reply topic.
// ReplyFuture: impl Future<Output = Result<Message>>let fut = messaging.request("svc/op", request).await?;let reply = tokio::time::timeout(Duration::from_secs(5), fut).await??;ReplyFuture implements PromiseLike<Message> — await it directly. The timeout is the third
argument to request (timeoutMs, default 0 = no timeout). It also exposes .cancel().
interface ReplyFuture extends PromiseLike<Message> { cancel(): void;}
// usage:const reply = await messaging.request("svc/op", req, 5000);Qos and Destination enums
Section titled “Qos and Destination enums”Two QoS levels — at-most-once and at-least-once — which the MQTT provider maps to QoS 0 and 1.
Java and Python reuse the AWS SDK enum; Rust and TypeScript define their own. Destination
exists in Rust/TS as a Local / IotCore selector used by the lower-level provider; Java/Python do
not expose an equivalent enum (they have the per-destination method pairs instead).
QOS is the AWS SDK type software.amazon.awssdk.aws.greengrass.model.QOS, values
AT_MOST_ONCE / AT_LEAST_ONCE. There is no separate Destination enum — use the plain vs
*ToIotCore method pairs.
import software.amazon.awssdk.aws.greengrass.model.QOS;
QOS.AT_MOST_ONCE; // mapped to MQTT QoS 0QOS.AT_LEAST_ONCE; // mapped to MQTT QoS 1QOS is the AWS SDK type from awsiot.greengrasscoreipc.model import QOS. No separate
Destination enum — use the plain vs *_to_iot_core method pairs.
from awsiot.greengrasscoreipc.model import QOS
QOS.AT_MOST_ONCE # mapped to MQTT QoS 0QOS.AT_LEAST_ONCE # mapped to MQTT QoS 1Qos is a Rust-native enum. Destination is used only by the lower-level MessagingProvider trait.
pub enum Qos { AtMostOnce, AtLeastOnce }pub enum Destination { Local, IotCore }Qos and Destination are TS-native string enums.
enum Qos { AtMostOnce = "atMostOnce", AtLeastOnce = "atLeastOnce" }enum Destination { Local = "local", IotCore = "iotcore" }The provider / interface seam
Section titled “The provider / interface seam”Below the client/service sits a transport MessagingProvider. Rust and TypeScript expose a
substitutable seam — the MessagingService trait / IMessagingService interface plus a swappable
provider — so you can inject a fake transport in tests. Java and Python have concrete clients
only (no service interface, no DI); the two providers are internal.
No service interface or DI. The abstract base MessagingProvider has two concrete subclasses —
GreengrassMessagingProvider (IPC) and StandaloneMessagingProvider (dual-MQTT). There is no
single-client MqttProvider class. Test against the concrete MessagingClient (point it at a local
broker; reset its process-global state between tests).
No service interface or DI. The MessagingProvider ABC (DEFAULT_MAX_MESSAGES = 10000) has two
concrete subclasses — GreengrassIpcProvider and StandaloneProvider. There is no
MqttProvider. Test against the concrete static MessagingClient.
MessagingService (above) is the user-facing trait. Below it, the lower-level MessagingProvider
trait carries raw transport:
#[async_trait]pub trait MessagingProvider: Send + Sync { async fn publish(&self, topic: &str, payload: Vec<u8>, dest: Destination, qos: Qos) -> Result<()>; async fn subscribe(&self, filter: &str, dest: Destination, qos: Qos, max_messages: usize) -> Result<Subscription>; async fn unsubscribe(&self, filter: &str, dest: Destination) -> Result<()>; fn connected(&self) -> bool;}
// Inject a fake provider in tests:let svc: Arc<dyn MessagingService> = Arc::new(DefaultMessagingService::new(fake_provider));subscribe returns a Subscription — a bounded, polled queue of (topic, payload) messages
(max_messages cap), not a handler callback.
IMessagingService (above) is the user-facing interface. Below it, the MessagingProvider interface
carries raw bytes:
interface MessagingProvider { publishBytes(topic: string, payload: Buffer, dest: Destination, qos: Qos): Promise<void>; subscribeRaw( filter: string, dest: Destination, qos: Qos, onMessage: (topic: string, payload: Buffer) => void, ): Promise<RawSubscription>; connected(): boolean; disconnect(): Promise<void>;}
// Inject a fake provider in tests:const svc: IMessagingService = new DefaultMessagingService(fakeProvider);Messaging (broker) config types
Section titled “Messaging (broker) config types”With the MQTT transport, the broker config is a JSON messaging section with a local block and an
optional iotCore block. The wire shape is identical across the four languages (each maps it to its
own config type — Java MessagingConfiguration, Python messaging_config.py dataclasses, Rust
config.rs, TS config.ts).
{ "messaging": { "local": { "host": "localhost", "port": 8883, "clientId": "my-component", "credentials": { "caPath": "/certs/ca.crt", "certPath": "/certs/client.crt", "keyPath": "/certs/client.key", "username": "optional-user", "password": "optional-pass" } }, "iotCore": { "endpoint": "xxxx-ats.iot.us-east-1.amazonaws.com", "port": 8883, "clientId": "my-component", "credentials": { "caPath": "/certs/AmazonRootCA1.pem", "certPath": "/certs/device.pem.crt", "keyPath": "/certs/private.pem.key" } } }}Field reference (verified, identical across all four languages):
local—host(string),port(int),clientId(string), optionalcredentials.iotCore—endpoint(string),port(int),clientId(string), requiredcredentials. The wholeiotCoreblock is optional: omit it for a local-only (“single-broker”) deployment.credentials—caPath,certPath,keyPath, and optionalusername/password.
Rules:
- Local-broker TLS is keyed on
caPathpresence.caPathonly ⇒ server-only TLS;caPath+certPath+keyPath⇒ mutual TLS; acertPath/keyPathwithoutcaPathstays plaintext. Username/password auth is independent of TLS. - IoT Core requires full mutual-TLS credentials (
caPath+certPath+keyPath) or it refuses to connect. hostis an opaque string — a KubernetesServiceDNS name works unchanged.local.typeis optional and ignored (defaults to"mqtt"where read).
End-to-end example
Section titled “End-to-end example”A minimal publish, subscribe, and request/reply round trip in each language.
MessagingClient messaging = gg.getMessaging();
// subscribe — 3rd arg is maxConcurrencymessaging.subscribe("itest/pubsub", (topic, m) -> { System.out.println(m.getHeader().getName() + " -> " + m.getBody());}, 1);
// publishMessage msg = MessageBuilder.create("Hello", "1.0") .withPayload(payload) // JsonObject / Map / POJO .withConfig(configManager) // REQUIRED in Java .build();messaging.publish("itest/pubsub", msg);
// request/reply — ReplyFuture extends CompletableFuture<Message>ReplyFuture future = messaging.request("itest/request", request);Message reply = future.get(5, TimeUnit.SECONDS);// responder side:messaging.reply(request, replyMsg); // copies correlation idmessaging = gg.get_messaging() # the MessagingClient class (static methods)
def handler(topic, m): print(m.get_header().name, "->", m.get_body())
messaging.subscribe("evt/topic", handler, 1) # 3rd arg = max_concurrency
msg = MessageBuilder.create("Hello", "1.0").with_payload({"n": 1}).with_config(config).build()messaging.publish("evt/topic", msg)
iou = messaging.request("svc/op", request) # returns an Ioudone, reply = iou.get(timeout=5) # get() returns a (done, result) TUPLE# responder side:messaging.reply(request, reply_msg) # copies correlation iduse std::time::Duration;use serde_json::json;use ggcommons::messaging::message_handler;
let messaging = gg.messaging()?; // Arc<dyn MessagingService>
messaging.subscribe( "itest/pubsub", message_handler(|topic, msg| async move { // MUST wrap the closure tracing::info!(%topic, name = %msg.header.name, "received"); }), 32, // max_messages 1, // max_concurrency).await?;
let msg = MessageBuilder::new("Hello", "1.0") // new(), not create() .payload(json!({ "n": 1 })) .thing_name("test-thing") .build();messaging.publish("itest/pubsub", &msg).await?;
// request/reply: request() -> Result<ReplyFuture>, then await the futurelet fut = messaging.request("svc/op", request).await?;let reply = tokio::time::timeout(Duration::from_secs(5), fut).await??;// responder side:responder.reply(&request, reply_msg).await?; // copies correlation idconst messaging = gg.messaging(); // IMessagingService
await messaging.subscribe("evt/topic", (_t, m) => { console.log(m.getBody());}, 32, 1); // maxMessages, maxConcurrency
const msg = MessageBuilder.create("evt", "1.0.0").withPayload({ n: 1 }).build();await messaging.publish("evt/topic", msg);
// request/reply: request() returns a ReplyFuture you await; 3rd arg = timeoutMsconst reply = await messaging.request("rpc/echo", req, 1000);console.log(reply.getBody());// responder side:await messaging.reply(req, replyMsg); // copies correlation id