Categories
Uncategorized

How to copy messages to different RabbitMQ node

Copy messages to different RabbitMQ node using federation plugin.

Preliminary information

I will copy messages between reindeer and raccoon RabbitMQ nodes.

Install RabbitMQ server on every node

Install RabbitMQ message broker on every node.

$ sudo apt update
$ sudo apt install gnupg2 apt-transport-https curl
$ curl https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc | sudo apt-key add -
$ echo "deb https://dl.bintray.com/rabbitmq/debian stretch main"              | sudo tee    /etc/apt/sources.list.d/rabbitmq.list
$ echo "deb https://dl.bintray.com/rabbitmq-erlang/debian buster erlang-22.x" | sudo tee -a /etc/apt/sources.list.d/rabbitmq.list
$ apt update
$ apt install rabbitmq-server

Use rabbitmq-diagnostics to display RabbitMQ version.

$ sudo rabbitmq-diagnostics server_version
Asking node rabbit@buster for its RabbitMQ version...
3.8.0

Enable management plugin.

$ sudo rabbitmq-plugins enable rabbitmq_management

Define admin user with password password.

.

$ sudo rabbitmqctl add_user admin password
$ sudo rabbitmqctl set_user_tags admin administrator
$ sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

Delete guest user.

$ sudo rabbitmqctl delete_user guest

Get rabbitmqadmin utility.

$ curl -o rabbitmqadmin http://127.0.0.1:15672/cli/rabbitmqadmin

Ensure that executable bit is set.

$ chmod +x rabbitmqadmin

Setup the dedicated vhost and user

Setup dedicated vhost and user on both nodes.

Create federation_example virtualhost and ensure that admin can manage it.

$ sudo rabbitmqctl add_vhost federation_example
$ sudo rabbitmqctl set_permissions -p federation_example admin ".*" ".*" ".*"

Declare federation_example user with password password, grant permissions to federation_example virtual host.

$ sudo rabbitmqctl add_user federation_example password
$ sudo rabbitmqctl set_permissions federation_example --vhost federation_example ".*" ".*" ".*"
Notice, federation_example is not a management user.

Setup exchanges

Setup exchanges, queues and bindings on both nodes.

Declare exchange.

$ ./rabbitmqadmin --username admin --password password --vhost federation_example declare exchange name=directives type=fanout

Declare queue.

$ ./rabbitmqadmin --username admin --password password --vhost federation_example declare queue name=application-directives

Declare bindings.

$ ./rabbitmqadmin --username admin --password password --vhost federation_example declare binding source=directives destination=application-directives

Setup federation plugin on the first RabbitMQ node

Setup federation plugin on the first RabbitMQ node – reindeer.

Enable federation plugin.

$ sudo rabbitmq-plugins enable rabbitmq_federation

Define upstream RabbitMQ server.

$ sudo rabbitmqctl set_parameter federation-upstream raccoon-upstream --vhost federation_example '{"uri":"amqp://federation_example:password@raccoon"}'

Define upstream set.

$ sudo rabbitmqctl set_parameter  --vhost federation_example federation-upstream-set raccoon '[{"upstream":"raccoon-upstream"}]'

Define federation policy.

$ sudo rabbitmqctl set_policy federate-directives --vhost federation_example directives '{"federation-upstream-set":"raccoon"}' --priority 1 --apply-to exchanges

Display defined parameters.

$ sudo rabbitmqctl list_parameters -p federation_example
Listing runtime parameters for vhost "federation_example" ...
component       name    value
federation-upstream     raccoon-upstream        {"uri":"amqp://federation_example:password@raccoon"}
federation-upstream-set raccoon [{"upstream":"raccoon-upstream"}]

Check federation status.

$ sudo rabbitmqctl federation_status
Listing federation links on node rabbit@reindeer...
[#{error => <<>>,exchange => <<"directives">>,id => <<"2d543d9d">>,
   last_changed => <<"2019-10-22 21:41:08">>,
   local_connection => <<"<rabbit@reindeer.3.2854.0>">>,queue => <<>>,
   status => running,type => exchange,upstream => <<"raccoon-upstream">>,
   upstream_exchange => <<"directives">>,upstream_queue => <<>>,
   uri => <<"amqp://raccoon">>,vhost => <<"federation_example">>}]

Setup federation plugin on the second RabbitMQ node

Setup federation plugin on the second RabbitMQ node – raccoon.

Enable federation plugin.

$ sudo rabbitmq-plugins enable rabbitmq_federation

Define upstream RabbitMQ server.

$ sudo rabbitmqctl set_parameter federation-upstream reindeer-upstream --vhost federation_example '{"uri":"amqp://federation_example:password@reindeer"}'

Define upstream set.

$ sudo rabbitmqctl set_parameter  --vhost federation_example federation-upstream-set reindeer '[{"upstream":"reindeer-upstream"}]'

Define federation policy.

$ sudo rabbitmqctl set_policy federate-directives --vhost federation_example directives '{"federation-upstream-set":"reindeer"}' --priority 1 --apply-to exchanges

Display defined parameters.

$ sudo rabbitmqctl list_parameters -p federation_example
Listing runtime parameters for vhost "federation_example" ...
component       name    value
federation-upstream     reindeer-upstream       {"uri":"amqp://federation_example:password@reindeer"}
federation-upstream-set reindeer        [{"upstream":"reindeer-upstream"}]

Check federation status.

$ sudo rabbitmqctl federation_status
Listing federation links on node rabbit@raccoon...
[#{error => <<>>,exchange => <<"directives">>,id => <<"58c6f033">>,
   last_changed => <<"2019-10-22 21:39:46">>,
   local_connection => <<"<rabbit@raccoon.3.2882.0>">>,queue => <<>>,
   status => running,type => exchange,upstream => <<"reindeer-upstream">>,
   upstream_exchange => <<"directives">>,upstream_queue => <<>>,
   uri => <<"amqp://reindeer">>,vhost => <<"federation_example">>}]

Publish messages and inspect queues

Publish message on the first RabbitMQ node – reindeer.

$ ./rabbitmqadmin --username admin --password password --host reindeer --vhost federation_example publish exchange=directives routing_key= payload="sample application directive"

Inspect queues on the first RabbitMQ node.

$ ./rabbitmqadmin --username admin --password password --host reindeer --vhost federation_example list queues vhost name node messages
+--------------------+------------------------------------------+-----------------+----------+
|       vhost        |                   name                   |      node       | messages |
+--------------------+------------------------------------------+-----------------+----------+
| federation_example | application-directives                   | rabbit@reindeer | 1        |
| federation_example | federation: directives -> rabbit@raccoon | rabbit@reindeer | 0        |
+--------------------+------------------------------------------+-----------------+----------+
Notice, using exchange to federate messages in this example will create additional queue to consume these.

Inspect queues on the second RabbitMQ node.

$ ./rabbitmqadmin --username admin --password password --host raccoon --vhost federation_example list queues vhost name node messages
+--------------------+-------------------------------------------+----------------+----------+
|       vhost        |                   name                    |      node      | messages |
+--------------------+-------------------------------------------+----------------+----------+
| federation_example | application-directives                    | rabbit@raccoon | 1        |
| federation_example | federation: directives -> rabbit@reindeer | rabbit@raccoon | 0        |
+--------------------+-------------------------------------------+----------------+----------+

Publish messages on the second RabbitMQ node – raccoon.

$ ./rabbitmqadmin --username admin --password password --host raccoon --vhost federation_example publish exchange=directives routing_key= payload="sample application directive"

Inspect queues on the first RabbitMQ node.

$ ./rabbitmqadmin --username admin --password password --host reindeer --vhost federation_example list queues vhost name node messages
+--------------------+------------------------------------------+-----------------+----------+
|       vhost        |                   name                   |      node       | messages |
+--------------------+------------------------------------------+-----------------+----------+
| federation_example | application-directives                   | rabbit@reindeer | 2        |
| federation_example | federation: directives -> rabbit@raccoon | rabbit@reindeer | 0        |
+--------------------+------------------------------------------+-----------------+----------+

Inspect queues on the second RabbitMQ node.

$ ./rabbitmqadmin --username admin --password password --host raccoon --vhost federation_example list queues vhost name node messages
+--------------------+-------------------------------------------+----------------+----------+
|       vhost        |                   name                    |      node      | messages |
+--------------------+-------------------------------------------+----------------+----------+
| federation_example | application-directives                    | rabbit@raccoon | 2        |
| federation_example | federation: directives -> rabbit@reindeer | rabbit@raccoon | 0        |
+--------------------+-------------------------------------------+----------------+----------+

Consume a message.

$ ./rabbitmqadmin --username admin --password password --host reindeer --vhost federation_example get  queue=application-directives ackmode=ack_requeue_false
+-------------+------------+---------------+------------------------------+---------------+------------------+------------+-------------+
| routing_key |  exchange  | message_count |           payload            | payload_bytes | payload_encoding | properties | redelivered |
+-------------+------------+---------------+------------------------------+---------------+------------------+------------+-------------+
|             | directives | 1             | sample application directive | 28            | string           |            | True        |
+-------------+------------+---------------+------------------------------+---------------+------------------+------------+-------------+

Inspect queues on the first RabbitMQ node.

$ ./rabbitmqadmin --username admin --password password --host reindeer --vhost federation_example list queues vhost name node messages
+--------------------+------------------------------------------+-----------------+----------+
|       vhost        |                   name                   |      node       | messages |
+--------------------+------------------------------------------+-----------------+----------+
| federation_example | application-directives                   | rabbit@reindeer | 1        |
| federation_example | federation: directives -> rabbit@raccoon | rabbit@reindeer | 0        |
+--------------------+------------------------------------------+-----------------+----------+

Inspect queues on the second RabbitMQ node.

$ ./rabbitmqadmin --username admin --password password --host raccoon --vhost federation_example list queues vhost name node messages
+--------------------+-------------------------------------------+----------------+----------+
|       vhost        |                   name                    |      node      | messages |
+--------------------+-------------------------------------------+----------------+----------+
| federation_example | application-directives                    | rabbit@raccoon | 2        |
| federation_example | federation: directives -> rabbit@reindeer | rabbit@raccoon | 0        |
+--------------------+-------------------------------------------+----------------+----------+

Configuration

$ sudo apt install jq

Export upstream configuration.

$ ./rabbitmqadmin --username admin --password password --host reindeer export config-reindeer.json
$ cat config-reindeer.json | jq .
{
  "rabbit_version": "3.8.0",
  "users": [
    {
      "name": "admin",
      "password_hash": "e/fHqe5PFXC/m68fcBa8W612ko9Jg5Tlo6L6cjiN/jGqXbXA",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": "administrator"
    },
    {
      "name": "federation_example",
      "password_hash": "p+bEErHIP5+hdD4Vw66GK+8EsR1VkVCg4hdNfm4Sj03k55pa",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": ""
    }
  ],
  "vhosts": [
    {
      "name": "/"
    },
    {
      "name": "federation_example"
    }
  ],
  "permissions": [
    {
      "user": "federation_example",
      "vhost": "federation_example",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "admin",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "admin",
      "vhost": "federation_example",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "topic_permissions": [],
  "parameters": [
    {
      "value": {
        "uri": "amqp://federation_example:password@raccoon"
      },
      "vhost": "federation_example",
      "component": "federation-upstream",
      "name": "raccoon-upstream"
    },
    {
      "value": [
        {
          "upstream": "raccoon-upstream"
        }
      ],
      "vhost": "federation_example",
      "component": "federation-upstream-set",
      "name": "raccoon"
    }
  ],
  "global_parameters": [
    {
      "name": "cluster_name",
      "value": "rabbit@reindeer"
    }
  ],
  "policies": [
    {
      "vhost": "federation_example",
      "name": "federate-directives",
      "pattern": "directives",
      "apply-to": "exchanges",
      "definition": {
        "federation-upstream-set": "raccoon"
      },
      "priority": 1
    }
  ],
  "queues": [
    {
      "name": "application-directives",
      "vhost": "federation_example",
      "durable": true,
      "auto_delete": false,
      "arguments": {}
    }
  ],
  "exchanges": [
    {
      "name": "directives",
      "vhost": "federation_example",
      "type": "fanout",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    }
  ],
  "bindings": [
    {
      "source": "directives",
      "vhost": "federation_example",
      "destination": "application-directives",
      "destination_type": "queue",
      "routing_key": "",
      "arguments": {}
    }
  ]
}

Clustering

Define multiple upstream addresses when you connect to a cluster.

$ sudo rabbitmqctl set_parameter federation-upstream cluster-upstream --vhost federation_example '{"uri":["amqp://federation_example:password@fox","amqp://federation_example:password@falcon","amqp://federation_example:password@ferret"]}'
Setting runtime parameter "cluster-upstream" for component "federation-upstream" to "{"uri":["amqp://federation_example:password@fox","amqp://federation_example:password@falcon","amqp://federation_example:password@ferret"]}" in vhost "federation_example" ...

Remember to also mirror ^federation queues, so the downstream can safely reconnect to other cluster nodes.

$ sudo rabbitmqctl set_policy -p federation_example mirror-directives "^application-directives$" '{"ha-mode": "all", "ha-sync-mode":"automatic"}'
Setting policy "mirror-directives" for pattern "^application-directives" to "{"ha-mode": "all", "ha-sync-mode":"automatic"}" with priority "0" for vhost "federation_example" ...
$ sudo rabbitmqctl set_policy -p federation_example mirror-federation "^federation" '{"ha-mode": "all", "ha-sync-mode":"automatic"}'
Setting policy "mirror-federation" for pattern "^federation" to "{"ha-mode": "all", "ha-sync-mode":"automatic"}" with priority "0" for vhost "federation_example" ...

You will get the following error when you forget to mirror dynamically created queues used for federation purposes.

Error detail:
{server_initiated_close,404,
                        <<"NOT_FOUND - home node 'rabbit@falcon' of durable queue 'federation: directives -> rabbit@bear' in vhost 'federation_example' is down or inaccessible">>}

You can restart federation link in case of trouble.

$ sudo rabbitmqctl eval 'rabbit_federation_status:status().'
[[{exchange,<<"directives">>},
  {upstream_exchange,<<"directives">>},
  {type,exchange},
  {vhost,<<"federation_example">>},
  {upstream,<<"reindeer-upstream">>},
  {id,<<"58c6f033">>},
  {status,running},
  {local_connection,<<"<rabbit@bear.1.9902.0>">>},
  {uri,<<"amqp://fox">>},
  {timestamp,{{2019,10,23},{22,8,39}}}]]
$ sudo rabbitmqctl restart_federation_link 58c6f033
Restarting federation link 58c6f033 on node rabbit@bear
$ sudo rabbitmqctl eval 'rabbit_federation_status:status().'
[[{exchange,<<"directives">>},
  {upstream_exchange,<<"directives">>},
  {type,exchange},
  {vhost,<<"federation_example">>},
  {upstream,<<"reindeer-upstream">>},
  {id,<<"58c6f033">>},
  {status,running},
  {local_connection,<<"<rabbit@bear.1.10002.0>">>},
  {uri,<<"amqp://falcon">>},
  {timestamp,{{2019,10,23},{22,8,56}}}]]

Additional notes

Enable rabbitmq_federation_management to define parameters and inspect plugin status using web-gui.

$ sudo rabbitmq-plugins enable rabbitmq_federation_management
Enabling plugins on node rabbit@reindeer:
rabbitmq_federation_management
The following plugins have been configured:
  rabbitmq_federation
  rabbitmq_federation_management
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@reindeer...
The following plugins have been enabled:
  rabbitmq_federation_management
started 1 plugins.

References

Federation Plugin