Clustering Web Sockets with Socket.IO and Express 3
Node.js gets a lot of well-deserved press for its impressive performance. The event loop can handle pretty impressive loads with a single process. However, most servers have multiple processors, and I, for one, would like to take advantage of them. Node’s cluster api can help.
While cluster is a core api in node.js, I’d like to incorporate it with Express 3 and Socket.io.
Final source code available on github
The node cluster docs gives us the following example.
cluster = require("cluster")
http = require("http")
numCPUs = require("os").cpus().length
if cluster.isMaster
i = 0
while i < numCPUs cluster.fork() i++ cluster.on "exit", (worker, code, signal) ->
console.log "worker " + worker.process.pid + " died"
else
http.createServer((req, res) ->
res.writeHead 200
res.end "hello world\n"
).listen 8000
The code compiles and runs, but I have not confirmation that things are actually working. I’d like to add a little logging to confirm that we actually have multiple workers going. Lets add these lines right before the ‘exit’ listener.
cluster.on 'fork', (worker) ->
console.log 'forked worker ' + worker.id
On my machine, we get this output:
coffee server
forked worker 1
forked worker 2
forked worker 3
forked worker 4
forked worker 5
forked worker 6
forked worker 7
forked worker 8
So far, so good. Lets add express to the mix.
cluster = require("cluster")
http = require("http")
numCPUs = require("os").cpus().length
if cluster.isMaster
i = 0
while i < numCPUs cluster.fork() i++ cluster.on 'fork', (worker) ->
console.log 'forked worker ' + worker.process.pid
cluster.on "listening", (worker, address) ->
console.log "worker " + worker.process.pid + " is now connected to " + address.address + ":" + address.port
cluster.on "exit", (worker, code, signal) ->
console.log "worker " + worker.process.pid + " died"
else
app = require("express")()
server = require("http").createServer(app)
server.listen 8000
app.get "/", (req, res) ->
console.log 'request handled by worker with pid ' + process.pid
res.writeHead 200
res.end "hello world\n"
At this point, I’d like to throw a few http requests against the setup to ensure that I’m really utilizing all my processors.Running (curl -XGET “http://localhost:8000”) 6 times makes the node process go:
request handled by worker with pid 85229
request handled by worker with pid 85231
request handled by worker with pid 85231
request handled by worker with pid 85231
request handled by worker with pid 85227
request handled by worker with pid 85229
Alright, last step is getting socket.io involved. Just a couple extra lines for the socket, however, we’ll need to add a basic index.html file to actually make the socket calls.
cluster = require("cluster")
http = require("http")
numCPUs = require("os").cpus().length
if cluster.isMaster
i = 0
while i < numCPUs cluster.fork() i++ cluster.on 'fork', (worker) ->
console.log 'forked worker ' + worker.process.pid
cluster.on "listening", (worker, address) ->
console.log "worker " + worker.process.pid + " is now connected to " + address.address + ":" + address.port
cluster.on "exit", (worker, code, signal) ->
console.log "worker " + worker.process.pid + " died"
else
app = require("express")()
server = require("http").createServer(app)
io = require("socket.io").listen(server)
server.listen 8000
app.get "/", (req, res) ->
res.sendfile(__dirname + '/index.html');
io.sockets.on "connection", (socket) ->
console.log 'socket call handled by worker with pid ' + process.pid
socket.emit "news",
hello: "world"
<script class="hiddenSpellError" type="text/javascript">// <![CDATA[
src</span>="/socket.io/socket.io.js">
// ]]></script><script type="text/javascript">// <![CDATA[
// ]]></script>
var socket = io.connect('http://localhost');
socket.on('news', function (data) {
console.log(data);
socket.emit('my other event', { my: 'data' });
});
// ]]></script>
When I run this code, problems start to appear. Specifically, the following message shows up in my output
warn – client not handshaken client should reconnect
Not surprisingly, we have issues with sockets appearing disconnected. Socket.io defaults to storing its open sockets in an in-memory store. As a result, sockets in other processes have no access to the information. We can easily fix the problem by using the redis store for socket.io. The docs we need are here.
With the redis store in place, it looks like this:
cluster = require("cluster")
http = require("http")
numCPUs = require("os").cpus().length
RedisStore = require("socket.io/lib/stores/redis")
redis = require("socket.io/node_modules/redis")
pub = redis.createClient()
sub = redis.createClient()
client = redis.createClient()
if cluster.isMaster
i = 0
while i < numCPUs cluster.fork() i++ cluster.on 'fork', (worker) ->
console.log 'forked worker ' + worker.process.pid
cluster.on "listening", (worker, address) ->
console.log "worker " + worker.process.pid + " is now connected to " + address.address + ":" + address.port
cluster.on "exit", (worker, code, signal) ->
console.log "worker " + worker.process.pid + " died"
else
app = require("express")()
server = require("http").createServer(app)
io = require("socket.io").listen(server)
io.set "store", new RedisStore(
redisPub: pub
redisSub: sub
redisClient: client
)
server.listen 8000
app.get "/", (req, res) ->
res.sendfile(__dirname + '/index.html');
io.sockets.on "connection", (socket) ->
console.log 'socket call handled by worker with pid ' + process.pid
socket.emit "news",
hello: "world"