RabbitMQ
Wraps the lapin crate for convenient sending and receiving of RabbitMQ messages.
Supports sending and receiving in batches.
This component works with (and is tested against) the most recent versions of both RabbitMQ 3 and RabbitMQ 4.
This component (as well as the underlying lapin crate) targets the AMQP 0-9-1 protocol, as implemented by the popular broker RabbitMQ.
Despite the similarity in name and lineage, the AMQP 1.0 protocol is completely different from AMQP 0-9-1 and is out of scope for this component, despite native support from RabbitMQ since version 4.0.
On the same note, RabbitMQ Streams are not part of either of the AMQP-named protocols, and are also not covered by this component.
Some level of understanding of the AMQP 0-9-1 protocol and its implementation by RabbitMQ are required to use this component. The following parts of the RabbitMQ documentation are excellent sources of knowledge:
Quickstart
- Enable the
rabbitmqfeature.
cargo add strut --features rabbitmq
- Configure an egress (outgoing message route) and an ingress (incoming message route).
- YAML
- TOML
rabbitmq:
egress:
demo_egress:
routing_key: demo.key
ingress:
demo_ingress:
queue: demo.key
[rabbitmq.egress.demo_egress]
routing_key = "demo.key"
[rabbitmq.ingress.demo_ingress]
queue = "demo.key"
- Retrieve the publisher (for the egress) and the subscriber (for the ingress) from the
RabbitMQfacade and use them:
use strut::rabbitmq::*;
use strut::RabbitMq;
#[strut::main]
async fn main() {
// Make publisher and subscriber
let publisher: Publisher = RabbitMq::publisher("demo_egress");
let subscriber: StringSubscriber = RabbitMq::string_subscriber("demo_ingress");
// Ensure the queue exists before we start sending
subscriber.declare().await;
// Send message
publisher.publish("Demo message").await;
// Receive message
let envelope: Envelope<String> = subscriber.receive().await;
assert_eq!(envelope.payload(), "Demo message");
// Ack message
envelope.complete().await;
}
Cargo features
⛳︎ Feature rabbitmq
This is the main gateway feature of this component.
⛳︎ Feature rabbitmq-json
Enables encoding/decoding messages to/from JSON using the serde-json.
Component structure
☑︎ Configuration
The RabbitMQ component allows configuring any number of RabbitMQ handles (sets of credentials).
Most applications, however, would need just one.
The default, unnamed handle is used implicitly by the RabbitMq facade.
All other RabbitMQ handles are named and may be retrieved from AppConfig.
The component also allows configuring any number of RabbitMQ egresses and ingresses.
Their corresponding Publishers and Subscribers can be retrieved from the component facade.
A full configuration is structured as such:
- YAML
- TOML
rabbitmq:
host: localhost # default RabbitMQ handle
port: 5672
user: guest
password: guest
vhost: /
extra: # additional named RabbitMQ handles
named_handle:
host: localhost
port: 3372
user: admin
password: admin
vhost: /custom
egress:
named_publisher: "routing.key" # publisher definition goes here
ingress:
named_subscriber: "queue.name" # subscriber definition goes here
[rabbitmq]
host = "localhost" # default RabbitMQ handle
port = 5672
user = "guest"
password = "guest"
vhost = "/"
[rabbitmq.extra.named_handle] # additional named RabbitMQ handles
host = "localhost"
port = 3372
user = "admin"
password = "admin"
vhost = "/custom"
[rabbitmq.egress]
named_publisher = "routing.key" # publisher definition goes here
[rabbitmq.ingress]
named_subscriber = "queue.name" # subscriber definition goes here
Both a publisher and a subscriber require at least one configuration value: a routing key and a queue name, respectively. More detailed configuration examples for both are given below.
Egress (publisher) config
An egress is an outgoing (away from the application) route for sending messages to RabbitMQ. The most basic form of an egress definition is simply a routing key to send the message to, as shown below.
Egress and its Publisher merely target the configured RabbitMQ exchange and queue by name.
It does not declare (create) them on the broker.
Declarations are performed on the receiving side, by the Subscriber.
- YAML
- TOML
rabbitmq:
egress:
named_egress: demo.routing.key
[rabbitmq.egress]
named_egress = "demo.routing.key"
A full egress definition is structured as shown below:
- YAML
- TOML
rabbitmq:
egress:
named_egress:
exchange: amq.topic
routing_key: demo.routing.key
confirmation: routed # transmitted (default), accepted, routed
force_durable: false
[rabbitmq.egress.named_egress]
exchange = "amq.topic"
routing_key = "demo.routing.key"
confirmation = "routed" # transmitted (default), accepted, routed
force_durable = false
Ingress (subscriber) config
An ingress is an incoming (toward the application) route for receiving messages from RabbitMQ. The most basic form of an ingress definition is simply a queue name to create and subscribe to, as shown below.
Ingress and its Subscriber are responsible for declaring the RabbitMQ exchange and queue, as well as creating bindings between them.
Consequently, ingress configuration is the place for configuring objects on the broker.
- YAML
- TOML
rabbitmq:
ingress:
named_ingress: demo.queue.name
[rabbitmq.ingress]
named_ingress = "demo.queue.name"
A full ingress definition is structured as shown below:
- YAML
- TOML
rabbitmq:
ingress:
named_ingress:
exchange:
name: demo.exchange.name
kind: direct # direct (default), fanout, headers, topic, hash_key, hash_id
durable: true
auto_delete: false
queue: demo.queue.name
durable: false
exclusive: false
auto_delete: false
batch_size: 1
batch_timeout: 250ms
prefetch_count: ~
acking_behavior: manual # manual (default), auto
gibberish_behavior: complete # complete (default), backwash, abandon
binding_keys:
- demo.binding.key.1
- demo.binding.key.2
# binding_headers: # commented out because direct exchange cannot have headers
# header_a: 12
# header_b: true
headers_behavior: all # all (default), any
[rabbitmq.ingress.named_ingress.exchange]
name = "demo.exchange.name"
kind = "direct" # direct (default), fanout, headers, topic, hash_key, hash_id
durable = true
auto_delete = false
[rabbitmq.ingress.named_ingress]
queue = "demo.queue.name"
durable = false
exclusive = false
auto_delete = false
batch_size = 1
batch_timeout = "250ms"
# prefetch_count = 100 # TOML doesn’t support null values, so we have to omit the value if we don’t want any prefetch
acking_behavior = "manual" # manual (default), auto
gibberish_behavior = "complete" # complete (default), backwash, abandon
binding_keys = [
"demo.binding.key.1",
"demo.binding.key.2"
]
# binding_headers = { header_a = 12, header_b = true } # commented out because direct exchange cannot have headers
headers_behavior = "all" # all (default), any
∅ Startup
This component does not include any startup logic.
☑︎ Facade
Use the RabbitMq facade to retrieve the pre-configured Publishers and Subscribers.
☑︎ Spindown
The component automatically attempts to close all open connections to RabbitMQ during spindown.