Creating a Router and Dealer messaging system using ZeroMQ, Node.js and PHP

-

Introduction

When building distributed systems, there is usually the need for more than just a simple one to one communication (client and server).

The router and dealer communication method allows many components to speak to each other as well as multiple components to speak to one other component e.g. router.

Node.js and ZeroMQ allows us to asynchronously collect and respond to dealers messages.

Example

Lets imagine that you have a social platform that requires authentication. Now this authentication doesn’t just occur at a single point of entry (login) but it also occurs during HTML5 websocket bi-directional communication on the client side at a regular heart beat, and maybe at regular intervals on a mobile app.

Having to write a script to directly access the database and do authentication tasks in all the different languages on each platform or point of call is a time consuming and a complicated/multi computer language skilled task.

Luckily, instead of the above ZeroMQ opens opportunities to create a central system (with a router) and multiple dealers (at each point of authentication) so the real authentication logic and database access only has to be written at one place in the appropriate languages.

The Router (Node.js)

exports.init = function(zeromq) {
  AuthManager = exports.AuthManager = function() {
    var self = this;
    this.zeromq = zeromq;
    this.ports = {
      ‘auth’: ‘tcp://127.0.0.1:12345
    }
    this.socket = this.zeromq.socket(‘router’);
    this.socket.identity = ‘AuthManager’;
    this.socket.bind(this.ports['auth'], function(err) {
      if(err){
        // handle error
      } else {
        console.log(‘AuthManager: router bound!);
        self.socket.on(‘message’, function(envelope, obj) {
          obj = JSON.parse(obj);
          if(obj.type != undefined && obj.value != undefined) {
            switch(obj.type) {
              case ‘login’ :
                response = self.login(obj.value, function(result) {
                  result = JSON.stringify(result);
                  self.socket.send([ envelope, result ]);
                });
                break;
              case ‘logout’ :
                response = self.logout(obj.value, function(result) {
                  result = JSON.stringify(result);
                  self.socket.send([ envelope, result ]);
                });
                break;
              default:
                response = JSON.stringify({ error: true, message: ‘invalid.type’ });
                self.socket.send([ envelope, response ]);
            }
          } else {
            response = JSON.stringify({ error: true, message: ‘invalid.params’ });
            self.socket.send([ envelope, response ]);
          }
        });
      }
    });
  }
  AuthManager.prototype.login = function(obj, callback) {
    if(obj.email != undefined && obj.password != undefined) {
      callback(user); // access database etc and success
    } else {
      callback({ error: true, message: ‘login.credentials.invalid’ });
    }
  }  
  AuthManager.prototype.logout = function(obj, callback) {
    callback({ }); // access database etc and log user out
  }
  return new AuthManager();
}

The Dealer (Node.js)

Message Manager:

exports.init = function(zeromq) {
  MessageManager = exports.MessageManager = function() {
    this.zeromq = zeromq;
    this.ports = { 
      ‘auth’: ‘tcp://127.0.0.1:12345
    };
    this.socket = this.zeromq.socket(‘dealer’);
    this.socket.connect(this.ports['auth']);
    console.log(‘MessageManager: dealer bound!);
  }
  MessageManager.prototype.send = function(obj, callback) {
    if(typeof(obj) == ‘object’) {
      if(obj.type != undefined && obj.value != undefined) {
        var message = JSON.stringify(obj);
        this.socket.send(message);
        this.socket.once(‘message’, function(data) {
          var result = JSON.parse(data);
          callback(result);
        });
      }
    }
  }
  return new MessageManager();
}

Tip: Notice ‘this.socket.once‘, this makes sure that the socket doesn't get bound each time its called. Its easy to just use this.socket.bind() but that will create a queue of multiple JavaScript bindings.

Usage:

var express = require(‘express’);
var app = express();
var zeromq = require(‘zmq’);
var MessageModule = require(process.cwd() +/modules/MessageManager’);
var MessageManager = MessageModule.init(zeromq);
app.get(/login’, function(req, res) {
  MessageManager.send({ type: ‘login’, 
                        value: { username: Jordizle, 
                        password: Developer } }, function(result) {
    if(!result.error) {
      res.redirect(‘http://localhost/);
      res.end();
    } else {
      res.send({ error: true, message: result.message });
    }
  });
});

The Dealer (PHP)

Message Manager:

<?php 
class MessageManager {
  public $zeromq;
  public $ports;
  public $socket;
  public function __construct() {
    $this->zeromq = new ZMQContext();
    $this->ports['auth'] = “tcp://127.0.0.1:12345;
    $this->socket = $this->zeromq->getSocket(ZMQ::SOCKET_DEALER);
    $this->socket->connect($this->ports['auth']);
  }
  public function send($obj) {
    if(isset($obj->type) && isset($obj->value)) {
      $message = json_encode($obj);
      return $this->socket->send($message)->recv();
    }
  }
} 
?>

Usage:

<?php 
class Message {
  public $type;
  public $value;
  public function __construct($type, $value) {
    $this->type = $type;
    $this->value = $value;
  }
}
class Value {
  public $presence =;
  public function __construct($username, $password) {
    if($username != null) {
      $this->username = $username;
    }        
    if($password != null) {
      $this->password = $password;
    }
  }
}
$Value = new Value(‘Jordizle, ‘Developer’);
$Message = new Message(‘login’, $Value);
$MessageManager = new MessageManager();
$result = json_decode($MessageManager->send($Message));
?>

Your comments.