Distribute messages to different RabbitMQ node using federation plugin.
Preliminary information
I will distribute messages from RabbitMQ node reindeer
to raccoon
.
Install RabbitMQ server on every node
Install RabbitMQ message broker on every node.
$ sudo apt update $ sudo apt install gnupg2 apt-transport-https curl $ curl https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc | sudo apt-key add - $ echo "deb https://dl.bintray.com/rabbitmq/debian stretch main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list $ echo "deb https://dl.bintray.com/rabbitmq-erlang/debian buster erlang-22.x" | sudo tee -a /etc/apt/sources.list.d/rabbitmq.list $ apt update $ apt install rabbitmq-server
Use rabbitmq-diagnostics
to display RabbitMQ version.
$ sudo rabbitmq-diagnostics server_version Asking node rabbit@buster for its RabbitMQ version... 3.8.0
Enable management plugin.
$ sudo rabbitmq-plugins enable rabbitmq_management
Define admin
user with password
password.
$ sudo rabbitmqctl add_user admin password $ sudo rabbitmqctl set_user_tags admin administrator $ sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
Delete guest user.
$ sudo rabbitmqctl delete_user guest
Get rabbitmqadmin
utility.
$ curl -o rabbitmqadmin http://127.0.0.1:15672/cli/rabbitmqadmin
Ensure that executable bit is set.
$ chmod +x rabbitmqadmin
Setup the dedicated vhost and user
Setup dedicated vhost and user on both nodes.
Create federation_example
virtualhost and ensure that admin can manage it.
$ sudo rabbitmqctl add_vhost federation_example
$ sudo rabbitmqctl set_permissions -p federation_example admin ".*" ".*" ".*"
Declare federation_example
user with password
password, grant permissions to federation_example
virtual host.
$ sudo rabbitmqctl add_user federation_example password
$ sudo rabbitmqctl set_permissions federation_example --vhost federation_example ".*" ".*" ".*"
federation_example
is not a management user.Setup exchanges
Setup exchanges, queues and bindings on both nodes.
Declare exchange.
$ ./rabbitmqadmin --username admin --password password --vhost federation_example declare exchange name=directives type=fanout
Declare queue.
$ ./rabbitmqadmin --username admin --password password --vhost federation_example declare queue name=application-directives
Declare bindings.
$ ./rabbitmqadmin --username admin --password password --vhost federation_example declare binding source=directives destination=application-directives
Setup federation plugin on the downstream RabbitMQ node
Setup federation plugin on the downstream RabbitMQ node – raccoon
.
Enable federation plugin.
$ sudo rabbitmq-plugins enable rabbitmq_federation
Define upstream RabbitMQ server.
$ sudo rabbitmqctl set_parameter federation-upstream reindeer-upstream --vhost federation_example '{"uri":"amqp://federation_example:password@reindeer","prefetch-count": 1}'
Define upstream set.
$ sudo rabbitmqctl set_parameter --vhost federation_example federation-upstream-set reindeer '[{"upstream":"reindeer-upstream"}]'
Define federation policy.
$ sudo rabbitmqctl set_policy federate-directives --vhost federation_example application-directives '{"federation-upstream-set":"reindeer"}' --priority 1 --apply-to queues
Display defined parameters.
$ sudo rabbitmqctl list_parameters -p federation_example Listing runtime parameters for vhost "federation_example" ... component name value federation-upstream reindeer-upstream {"prefetch-count":1,"uri":"amqp://federation_example:password@reindeer"} federation-upstream-set reindeer [{"upstream":"reindeer-upstream"}]
Declare queue on downstream server.
$ ./rabbitmqadmin --username admin --password password --vhost federation_example declare queue name=application-directives
Check federation status.
$ sudo rabbitmqctl federation_status Listing federation links on node rabbit@raccoon... [#{consumer_tag => <<"federation-link-reindeer-upstream">>,error => <<>>, exchange => <<>>,id => <<"c57e258c">>, last_changed => <<"2019-10-24 22:12:37">>, local_connection => <<"<rabbit@raccoon.1.1746.0>">>, queue => <<"application-directives">>,status => running,type => queue, upstream => <<"reindeer-upstream">>,upstream_exchange => <<>>, upstream_queue => <<"application-directives">>, uri => <<"amqp://reindeer">>,vhost => <<"federation_example">>}]
Create consumer
Use Python to create simple consumer.
#!/usr/bin/env python3 # Consume messages from RabbitMQ server # import parser for command-line options import argparse # import a pure-Python implementation of the AMQP 0-9-1 import pika # import time import time # define callback def consumer_callback(channel, method, properties, body): if type(properties.headers) is dict and "x-received-from" in properties.headers: print("Received message %r from %r" % (body.decode(), properties.headers["x-received-from"][0]["uri"])) else: print("Received message %r " % body.decode()) # sleep for 1 second to visualize things time.sleep(1) # ack this message channel.basic_ack(delivery_tag=method.delivery_tag) # define and parse command-line options parser = argparse.ArgumentParser(description='Check connection to RabbitMQ server') parser.add_argument('--server', required=True, help='Define RabbitMQ server') parser.add_argument('--virtual_host', default='/', help='Define virtual host') parser.add_argument('--port', type=int, default=5672, help='Define port (default: %(default)s)') parser.add_argument('--username', default='guest', help='Define username (default: %(default)s)') parser.add_argument('--password', default='guest', help='Define password (default: %(default)s)') args = vars(parser.parse_args()) # set amqp credentials credentials = pika.PlainCredentials(args['username'], args['password']) # set amqp connection parameters parameters = pika.ConnectionParameters(host=args['server'], port=args['port'], virtual_host=args['virtual_host'], credentials=credentials) # try to establish connection and consume messages try: connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='application-directives', on_message_callback=consumer_callback) channel.start_consuming() connection.close() except KeyboardInterrupt: channel.stop_consuming() connection.close()
Publish messages and inspect queues
Start consumer that will connect to the upstream node.
$ python3 client.py --server reindeer --virtual_host federation_example --username federation_example --password password
Start consumer that will connect to the downstream node.
$ python3 client.py --server raccoon --virtual_host federation_example --username federation_example --password password
Publish fifty messages on the upstream node.
$ for i in $(seq 1 50); do ./rabbitmqadmin --username admin --password password --host reindeer --vhost federation_example publish exchange=directives routing_key= payload="Sample application directive $i"; done
Messages consumed from the upstream node.
Received message 'Sample application directive 1' Received message 'Sample application directive 4' Received message 'Sample application directive 7' Received message 'Sample application directive 8' Received message 'Sample application directive 11' Received message 'Sample application directive 12' Received message 'Sample application directive 15' Received message 'Sample application directive 16' Received message 'Sample application directive 19' Received message 'Sample application directive 20' Received message 'Sample application directive 23' Received message 'Sample application directive 24' Received message 'Sample application directive 27' Received message 'Sample application directive 28' Received message 'Sample application directive 31' Received message 'Sample application directive 32' Received message 'Sample application directive 35' Received message 'Sample application directive 36' Received message 'Sample application directive 38' Received message 'Sample application directive 41' Received message 'Sample application directive 42' Received message 'Sample application directive 45' Received message 'Sample application directive 46' Received message 'Sample application directive 49' Received message 'Sample application directive 50'
Messages consumed from the downstream node.
Received message 'Sample application directive 2' from 'amqp://reindeer' Received message 'Sample application directive 3' from 'amqp://reindeer' Received message 'Sample application directive 5' from 'amqp://reindeer' Received message 'Sample application directive 6' from 'amqp://reindeer' Received message 'Sample application directive 9' from 'amqp://reindeer' Received message 'Sample application directive 10' from 'amqp://reindeer' Received message 'Sample application directive 13' from 'amqp://reindeer' Received message 'Sample application directive 14' from 'amqp://reindeer' Received message 'Sample application directive 17' from 'amqp://reindeer' Received message 'Sample application directive 18' from 'amqp://reindeer' Received message 'Sample application directive 21' from 'amqp://reindeer' Received message 'Sample application directive 22' from 'amqp://reindeer' Received message 'Sample application directive 25' from 'amqp://reindeer' Received message 'Sample application directive 26' from 'amqp://reindeer' Received message 'Sample application directive 29' from 'amqp://reindeer' Received message 'Sample application directive 30' from 'amqp://reindeer' Received message 'Sample application directive 33' from 'amqp://reindeer' Received message 'Sample application directive 34' from 'amqp://reindeer' Received message 'Sample application directive 37' from 'amqp://reindeer' Received message 'Sample application directive 39' from 'amqp://reindeer' Received message 'Sample application directive 40' from 'amqp://reindeer' Received message 'Sample application directive 43' from 'amqp://reindeer' Received message 'Sample application directive 44' from 'amqp://reindeer' Received message 'Sample application directive 47' from 'amqp://reindeer' Received message 'Sample application directive 48' from 'amqp://reindeer'
Configuration
$ sudo apt install jq
Export upstream configuration.
$ ./rabbitmqadmin --username admin --password password --host reindeer export config-reindeer.json
$ cat config-reindeer.json | jq . { "rabbit_version": "3.8.0", "users": [ { "name": "admin", "password_hash": "Nsl1tZOOHNq2jT3y6vSjTLKW1ttuhxQxoUtRzPaPFkTH+13P", "hashing_algorithm": "rabbit_password_hashing_sha256", "tags": "administrator" }, { "name": "federation_example", "password_hash": "DVLe5INRSZIKNAa0Ap48UqjEZjjhW0TrcpBIdxv+mFis9+WO", "hashing_algorithm": "rabbit_password_hashing_sha256", "tags": "" } ], "vhosts": [ { "name": "/" }, { "name": "federation_example" } ], "permissions": [ { "user": "federation_example", "vhost": "federation_example", "configure": ".*", "write": ".*", "read": ".*" }, { "user": "admin", "vhost": "/", "configure": ".*", "write": ".*", "read": ".*" }, { "user": "admin", "vhost": "federation_example", "configure": ".*", "write": ".*", "read": ".*" } ], "topic_permissions": [], "parameters": [], "global_parameters": [ { "name": "cluster_name", "value": "rabbit@reindeer" } ], "policies": [], "queues": [ { "name": "application-directives", "vhost": "federation_example", "durable": true, "auto_delete": false, "arguments": {} } ], "exchanges": [ { "name": "directives", "vhost": "federation_example", "type": "fanout", "durable": true, "auto_delete": false, "internal": false, "arguments": {} } ], "bindings": [ { "source": "directives", "vhost": "federation_example", "destination": "application-directives", "destination_type": "queue", "routing_key": "", "arguments": {} } ] }
$ ./rabbitmqadmin --username admin --password password --host raccoon export config-raccoon.json
$ cat config-raccoon.json | jq . { "rabbit_version": "3.8.0", "users": [ { "name": "admin", "password_hash": "tmFTX7O65LjfY9zouR1cZXkMBSBFDD7+6MH6vfMSH9jwx9QR", "hashing_algorithm": "rabbit_password_hashing_sha256", "tags": "administrator" }, { "name": "federation_example", "password_hash": "58v85kIDTU5e6voT6OLceCp82tvrMIztTgvky7mQpEXuj9ao", "hashing_algorithm": "rabbit_password_hashing_sha256", "tags": "" } ], "vhosts": [ { "name": "/" }, { "name": "federation_example" } ], "permissions": [ { "user": "federation_example", "vhost": "federation_example", "configure": ".*", "write": ".*", "read": ".*" }, { "user": "admin", "vhost": "/", "configure": ".*", "write": ".*", "read": ".*" }, { "user": "admin", "vhost": "federation_example", "configure": ".*", "write": ".*", "read": ".*" } ], "topic_permissions": [], "parameters": [ { "value": { "prefetch-count": 1, "uri": "amqp://federation_example:password@reindeer" }, "vhost": "federation_example", "component": "federation-upstream", "name": "reindeer-upstream" }, { "value": [ { "upstream": "reindeer-upstream" } ], "vhost": "federation_example", "component": "federation-upstream-set", "name": "reindeer" } ], "global_parameters": [ { "name": "cluster_name", "value": "rabbit@raccoon" } ], "policies": [ { "vhost": "federation_example", "name": "federate-directives", "pattern": "application-directives", "apply-to": "queues", "definition": { "federation-upstream-set": "reindeer" }, "priority": 1 } ], "queues": [ { "name": "application-directives", "vhost": "federation_example", "durable": true, "auto_delete": false, "arguments": {} } ], "exchanges": [], "bindings": [] }
Additional notes
Enable rabbitmq_federation_management
to define parameters and inspect plugin status using web-gui.
$ sudo rabbitmq-plugins enable rabbitmq_federation_management Enabling plugins on node rabbit@reindeer: rabbitmq_federation_management The following plugins have been configured: rabbitmq_federation rabbitmq_federation_management rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@reindeer... The following plugins have been enabled: rabbitmq_federation_management started 1 plugins.