Skip to main content

RabbitMQ

Wraps the lapin crate for convenient sending and receiving of RabbitMQ messages. Supports sending and receiving in batches.

RabbitMQ versions

This component works with (and is tested against) the most recent versions of both RabbitMQ 3 and RabbitMQ 4.

AMQP 0-9-1

This component (as well as the underlying lapin crate) targets the AMQP 0-9-1 protocol, as implemented by the popular broker RabbitMQ. Despite the similarity in name and lineage, the AMQP 1.0 protocol is completely different from AMQP 0-9-1 and is out of scope for this component, despite native support from RabbitMQ since version 4.0. On the same note, RabbitMQ Streams are not part of either of the AMQP-named protocols, and are also not covered by this component.

Some level of understanding of the AMQP 0-9-1 protocol and its implementation by RabbitMQ are required to use this component. The following parts of the RabbitMQ documentation are excellent sources of knowledge:

Quickstart

  1. Enable the rabbitmq feature.
cargo add strut --features rabbitmq
  1. Configure an egress (outgoing message route) and an ingress (incoming message route).
config/rabbitmq.yaml
rabbitmq:
egress:
demo_egress:
routing_key: demo.key

ingress:
demo_ingress:
queue: demo.key
  1. Retrieve the publisher (for the egress) and the subscriber (for the ingress) from the RabbitMQ facade and use them:
use strut::rabbitmq::*;
use strut::RabbitMq;

#[strut::main]
async fn main() {
// Make publisher and subscriber
let publisher: Publisher = RabbitMq::publisher("demo_egress");
let subscriber: StringSubscriber = RabbitMq::string_subscriber("demo_ingress");

// Ensure the queue exists before we start sending
subscriber.declare().await;

// Send message
publisher.publish("Demo message").await;

// Receive message
let envelope: Envelope<String> = subscriber.receive().await;

assert_eq!(envelope.payload(), "Demo message");

// Ack message
envelope.complete().await;
}

Cargo features

⛳︎ Feature rabbitmq

This is the main gateway feature of this component.

⛳︎ Feature rabbitmq-json

Enables encoding/decoding messages to/from JSON using the serde-json.

Component structure

☑︎ Configuration

The RabbitMQ component allows configuring any number of RabbitMQ handles (sets of credentials). Most applications, however, would need just one. The default, unnamed handle is used implicitly by the RabbitMq facade. All other RabbitMQ handles are named and may be retrieved from AppConfig.

The component also allows configuring any number of RabbitMQ egresses and ingresses. Their corresponding Publishers and Subscribers can be retrieved from the component facade.

A full configuration is structured as such:

config/rabbitmq.yaml
rabbitmq:
host: localhost # default RabbitMQ handle
port: 5672
user: guest
password: guest
vhost: /

extra: # additional named RabbitMQ handles
named_handle:
host: localhost
port: 3372
user: admin
password: admin
vhost: /custom

egress:
named_publisher: "routing.key" # publisher definition goes here

ingress:
named_subscriber: "queue.name" # subscriber definition goes here

Both a publisher and a subscriber require at least one configuration value: a routing key and a queue name, respectively. More detailed configuration examples for both are given below.

Egress (publisher) config

An egress is an outgoing (away from the application) route for sending messages to RabbitMQ. The most basic form of an egress definition is simply a routing key to send the message to, as shown below.

Egress doesn’t declare exchange and queue

Egress and its Publisher merely target the configured RabbitMQ exchange and queue by name. It does not declare (create) them on the broker.

Declarations are performed on the receiving side, by the Subscriber.

config/rabbitmq.yaml
rabbitmq:
egress:
named_egress: demo.routing.key

A full egress definition is structured as shown below:

config/rabbitmq.yaml
rabbitmq:
egress:
named_egress:
exchange: amq.topic
routing_key: demo.routing.key
confirmation: routed # transmitted (default), accepted, routed
force_durable: false

Ingress (subscriber) config

An ingress is an incoming (toward the application) route for receiving messages from RabbitMQ. The most basic form of an ingress definition is simply a queue name to create and subscribe to, as shown below.

Ingress declares exchange and queue

Ingress and its Subscriber are responsible for declaring the RabbitMQ exchange and queue, as well as creating bindings between them. Consequently, ingress configuration is the place for configuring objects on the broker.

config/rabbitmq.yaml
rabbitmq:
ingress:
named_ingress: demo.queue.name

A full ingress definition is structured as shown below:

config/rabbitmq.yaml
rabbitmq:
ingress:
named_ingress:
exchange:
name: demo.exchange.name
kind: direct # direct (default), fanout, headers, topic, hash_key, hash_id
durable: true
auto_delete: false
queue: demo.queue.name
durable: false
exclusive: false
auto_delete: false
batch_size: 1
batch_timeout: 250ms
prefetch_count: ~
acking_behavior: manual # manual (default), auto
gibberish_behavior: complete # complete (default), backwash, abandon
binding_keys:
- demo.binding.key.1
- demo.binding.key.2
# binding_headers: # commented out because direct exchange cannot have headers
# header_a: 12
# header_b: true
headers_behavior: all # all (default), any

∅ Startup

This component does not include any startup logic.

☑︎ Facade

Use the RabbitMq facade to retrieve the pre-configured Publishers and Subscribers.

☑︎ Spindown

The component automatically attempts to close all open connections to RabbitMQ during spindown.