Site icon Adam N England

Clustering Web Sockets with Socket.IO and Express 3

Node.js gets a lot of well-deserved press for its impressive performance. The event loop can handle pretty impressive loads with a single process. However, most servers have multiple processors, and I, for one, would like to take advantage of them. Node’s cluster api can help.

While cluster is a core api in node.js, I’d like to incorporate it with Express 3 and Socket.io.

Final source code available on github

The node cluster docs gives us the following example.

cluster = require("cluster")
http = require("http")
numCPUs = require("os").cpus().length
if cluster.isMaster
  i = 0
  while i < numCPUs     cluster.fork()     i++   cluster.on "exit", (worker, code, signal) ->
    console.log "worker " + worker.process.pid + " died"
else
  http.createServer((req, res) ->
    res.writeHead 200
    res.end "hello world\n"
  ).listen 8000

The code compiles and runs, but I have not confirmation that things are actually working. I’d like to add a little logging to confirm that we actually have multiple workers going. Lets add these lines right before the ‘exit’ listener.

cluster.on 'fork', (worker) ->
    console.log 'forked worker ' + worker.id

On my machine, we get this output:

coffee server
forked worker 1
forked worker 2
forked worker 3
forked worker 4
forked worker 5
forked worker 6
forked worker 7
forked worker 8

So far, so good. Lets add express to the mix.

cluster = require("cluster")
http = require("http")
numCPUs = require("os").cpus().length
if cluster.isMaster
  i = 0
  while i < numCPUs     cluster.fork()     i++   cluster.on 'fork', (worker) ->
    console.log 'forked worker ' + worker.process.pid
  cluster.on "listening", (worker, address) ->
    console.log "worker " + worker.process.pid + " is now connected to " + address.address + ":" + address.port
  cluster.on "exit", (worker, code, signal) ->
    console.log "worker " + worker.process.pid + " died"
else
  app = require("express")()
  server = require("http").createServer(app)
  server.listen 8000
  app.get "/", (req, res) ->
    console.log 'request handled by worker with pid ' + process.pid
    res.writeHead 200
    res.end "hello world\n"

At this point, I’d like to throw a few http requests against the setup to ensure that I’m really utilizing all my processors.Running (curl -XGET “http://localhost:8000”) 6 times makes the node process go:

request handled by worker with pid 85229
request handled by worker with pid 85231
request handled by worker with pid 85231
request handled by worker with pid 85231
request handled by worker with pid 85227
request handled by worker with pid 85229

Alright, last step is getting socket.io involved. Just a couple extra lines for the socket, however, we’ll need to add a basic index.html file to actually make the socket calls.

cluster = require("cluster")
http = require("http")
numCPUs = require("os").cpus().length
if cluster.isMaster
  i = 0
  while i < numCPUs     cluster.fork()     i++   cluster.on 'fork', (worker) ->
    console.log 'forked worker ' + worker.process.pid
  cluster.on "listening", (worker, address) ->
    console.log "worker " + worker.process.pid + " is now connected to " + address.address + ":" + address.port
  cluster.on "exit", (worker, code, signal) ->
    console.log "worker " + worker.process.pid + " died"
else
  app = require("express")()
  server = require("http").createServer(app)
  io = require("socket.io").listen(server)
  server.listen 8000
  app.get "/", (req, res) ->
    res.sendfile(__dirname + '/index.html');
  io.sockets.on "connection", (socket) ->
    console.log 'socket call handled by worker with pid ' + process.pid
    socket.emit "news",
      hello: "world"
<script class="hiddenSpellError" type="text/javascript">// <![CDATA[
  src</span>="/socket.io/socket.io.js">
  // ]]></script><script type="text/javascript">// <![CDATA[

  // ]]></script>
   var socket = io.connect('http://localhost');
   socket.on('news', function (data) {
   console.log(data);
   socket.emit('my other event', { my: 'data' });
   });
  // ]]></script>

When I run this code, problems start to appear. Specifically, the following message shows up in my output

warn – client not handshaken client should reconnect

Not surprisingly, we have issues with sockets appearing disconnected. Socket.io defaults to storing its open sockets in an in-memory store. As a result, sockets in other processes have no access to the information. We can easily fix the problem by using the redis store for socket.io. The docs we need are here.

With the redis store in place, it looks like this:

cluster = require("cluster")
http = require("http")
numCPUs = require("os").cpus().length
RedisStore = require("socket.io/lib/stores/redis")
redis = require("socket.io/node_modules/redis")
pub = redis.createClient()
sub = redis.createClient()
client = redis.createClient()
if cluster.isMaster
  i = 0
  while i < numCPUs     cluster.fork()     i++   cluster.on 'fork', (worker) ->
    console.log 'forked worker ' + worker.process.pid
  cluster.on "listening", (worker, address) ->
    console.log "worker " + worker.process.pid + " is now connected to " + address.address + ":" + address.port
  cluster.on "exit", (worker, code, signal) ->
    console.log "worker " + worker.process.pid + " died"
else
  app = require("express")()
  server = require("http").createServer(app)
  io = require("socket.io").listen(server)
  io.set "store", new RedisStore(
    redisPub: pub
    redisSub: sub
    redisClient: client
  )
  server.listen 8000
  app.get "/", (req, res) ->
    res.sendfile(__dirname + '/index.html');
  io.sockets.on "connection", (socket) ->
    console.log 'socket call handled by worker with pid ' + process.pid
    socket.emit "news",
      hello: "world"
Exit mobile version