Circumventing AMQP with the RabbitMQ Erlang client

January 26, 2010

Categories: erlang, rabbitmq

AMQP (Advanced Message Queuing Protocol) is the standard binary wire protocol supported by all major queuing systems. RabbitMQ is no different. Clients written in Ruby, Python, C and many other languages communicate with RabbitMQ brokers via this standard format. However, when developing in a homogenous Erlang environment it would be ideal to avoid serializing/deserializing data and sending packets through the tcp stack. This can be accomplished through message passing between client nodes and broker nodes in a connected Erlang grid.

Setup

I'm using RabbitMQ Server 1.7.1 and the RabbitMQ Erlang client 1.7.0

http://www.rabbitmq.com/releases/rabbitmq-server/v1.7.1/rabbitmq-server-1.7.1.tar.gz
http://hg.rabbitmq.com/rabbitmq-erlang-client/archive/825eceaca84c.tar.gz

After unpacking, I built and installed the server and client in /usr/local/lib/erlang/lib/rabbitmq_server-1.7.1 and /usr/local/lib/erlang/lib/rabbitmq_erlang_client-1.7.0 respectively

** notice the lib directories are named with underscores and not dashes. This is important because when the Erlang runtime is looking for include files it parses the directory names expecting a dash as the delimiter between the app name and the version.

Start the server

jvorreuter$ sudo /usr/local/lib/erlang/rabbitmq_server-1.7.1/scripts/rabbitmq-server

+---+   +---+
|   |   |   |
|   |   |   |
|   |   |   |
|   +---+   +-------+
|                   |
| RabbitMQ  +---+   |
|           |   |   |
|   v1.7.1  +---+   |
|                   |
+-------------------+
AMQP 8-0
Copyright (C) 2007-2009 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
Licensed under the MPL.  See http://www.rabbitmq.com/

node          : rabbit@jacob-vorreuters-macbook-pro-3
app descriptor: /usr/local/lib/erlang/rabbitmq_server-1.7.1/scripts/../ebin/rabbit.app
home dir      : /Users/jvorreuter
cookie hash   : 9WGq9u8L8U1CCLtGpMyzrQ==
log           : /var/log/rabbitmq/rabbit.log
sasl log      : /var/log/rabbitmq/rabbit-sasl.log
database dir  : /var/lib/rabbitmq/mnesia/rabbit

starting database             ...done
starting core processes       ...done
starting recovery             ...done
starting persister            ...done
starting guid generator       ...done
starting builtin applications ...done
starting TCP listeners        ...done
starting SSL listeners        ...done

broker running

The test module

Our test module is going to be responsible for initializing a single queue and declaring and binding an exchange to that queue. Next we will spawn one or many publisher processes that will put things in the queue. Then we can spawn one or many subscriber processes that will pull things out of the queue. The interaction between publisher/subscriber and broker will only involve direct node-to-node message passing. There will be no use of AMQP.

The setup_queue/0 function first connects to other accessible nodes on the network using the net_adm:world/0 function. In this test we're assuming that the only other running Erlang node is the rabbit server, which is why we are able to fetch the rabbit node name by taking the head of the nodes() list. To initialize a connection with the rabbit server, we make an RPC call to amqp_connection:start_direct/0 on the rabbit node. This returns a reference to the remote PID of the gen_server started on the rabbit server. We pass the remote connection pid to amqp_connection:open_channel/1 and get back the remote pid of the channel gen_server also started on the rabbit server. Now that we have an open channel we can declare the queue and the exchange and bind them. It is important to close channels and connections when they are no longer being used to prevent unnecessary processes from staying alive on the rabbit node.

Run the test

jvorreuter$ erl -sname rabbit_test

Erlang R13B01 (erts-5.7.2) [source] [smp:2:2] [rq:2] [async-threads:0] [kernel-poll:false]
Eshell V5.7.2  (abort with ^G)

(rabbit_test@...)1> c(rabbit_test).
{ok,rabbit_test}
(rabbit_test@...)2> rabbit_test:setup_queue().      <--- setup the queue
ok
(rabbit_test@...)3> rabbit_test:length().           <--- queue is empty
0
(rabbit_test@...)4> rabbit_test:start_pub().        <--- start a publisher
{ok,<0.50.0>}
(rabbit_test@...)5> rabbit_test:length().           <--- the queue length begins to grow
244
(rabbit_test@...)6> rabbit_test:length().
556
(rabbit_test@...)7> rabbit_test:start_sub().        <--- start a subscriber
{ok,<0.54.0>}
(rabbit_test@...)8> rabbit_test:length().           <--- the queue length stabalizes
1095
(rabbit_test@...)9> rabbit_test:length().
1095
(rabbit_test@...)10> rabbit_test:start_sub().       <--- start another subscriber
{ok,<0.58.0>}
(rabbit_test@...)11> rabbit_test:length().   
713
(rabbit_test@...)12> rabbit_test:length().
364
(rabbit_test@...)13> rabbit_test:length().          <--- the queue length drops to zero
0

Scaling

Taking advantage of Erlang's distributed nature, we could easily start a grid of RabbitMQ servers and write a simple connection pool module to allow client nodes to reuse remote connections to queue/dequeue items.

blog comments powered by Disqus