Anti-RDBMS: A list of distributed key-value stores
Perhaps you’re considering using a dedicated key-value or document store instead of a traditional relational database. Reasons for this might include:
- You’re suffering from Cloud-computing Mania.
- You need an excuse to ‘get your Erlang on’
- You heard CouchDB was cool.
- You hate MySQL, and although PostgreSQL is much better, it still doesn’t have decent replication. There’s no chance you’re buying Oracle licenses.
- Your data is stored and retrieved mainly by primary key, without complex joins.
- You have a non-trivial amount of data, and the thought of managing lots of RDBMS shards and replication failure scenarios gives you the fear.
Whatever your reasons, there are a lot of options to chose from. At Last.fm we do a lot of batch computation in Hadoop, then dump it out to other machines where it’s indexed and served up over HTTP and Thrift as an internal service (stuff like ‘most popular songs in London, UK this week’ etc). Presently we’re using a home-grown index format which points into large files containing lots of data spanning many keys, similar to the Haystack approach mentioned in this article about Facebook photo storage. It works, but rather than build our own replication and partitioning system on top of this, we are looking to potentially replace it with a distributed, resilient key-value store for reasons 4, 5 and 6 above.
This article represents my notes and research to date on distributed key-value stores (and some other stuff) that might be suitable as RDBMS replacements under the right conditions. I’m expecting to try some of these out and investigate further in the coming months.
Glossary and Background Reading
- Distributed Hash Table (DHT) and algorithms such as Chord or Kadmelia
- Amazon’s Dynamo Paper, and this ReadWriteWeb article about Dynamo which explains why such a system is invaluable
- Amazon’s SimpleDB Service, and some commentary
- Google’s BigTable paper
- The Paxos Algorithm - read this page in order to appreciate that knocking up a Paxos implementation isn’t something you’d want to do whilst hungover on a Saturday morning.
The Shortlist
Here is a list of projects that could potentially replace a group of relational database shards. Some of these are much more than key-value stores, and aren’t suitable for low-latency data serving, but are interesting none-the-less.
| Name | Language | Fault-tolerance | Persistence | Client Protocol | Data model | Docs | Community |
| Project Voldemort | Java | partitioned, replicated, read-repair | Pluggable: BerkleyDB, Mysql | Java API | Structured / blob / text | A | Linkedin, no |
| Ringo | Erlang | partitioned, replicated, immutable | Custom on-disk (append only log) | HTTP | blob | B | Nokia, no |
| Scalaris | Erlang | partitioned, replicated, paxos | In-memory only | Erlang, Java, HTTP | blob | B | OnScale, no |
| Kai | Erlang | partitioned, replicated? | On-disk Dets file | Memcached | blob | C | no |
| Dynomite | Erlang | partitioned, replicated | Pluggable: couch, dets | Custom ascii, Thrift | blob | D+ | Powerset, no |
| MemcacheDB | C | replication | BerkleyDB | Memcached | blob | B | some |
| ThruDB | C++ | Replication | Pluggable: BerkleyDB, Custom, Mysql, S3 | Thrift | Document oriented | C+ | Third rail, unsure |
| CouchDB | Erlang | Replication, partitioning? | Custom on-disk | HTTP, json | Document oriented (json) | A | Apache, yes |
| Cassandra | Java | Replication, partitioning | Custom on-disk | Thrift | Bigtable meets Dynamo | F | Facebook, no |
| HBase | Java | Replication, partitioning | Custom on-disk | Custom API, Thrift, Rest | Bigtable | A | Apache, yes |
| Hypertable | C++ | Replication, partitioning | Custom on-disk | Thrift, other | Bigtable | A | Zvents, Baidu, yes |
Why 5 of these aren’t suitable
What I’m really looking for is a low latency, replicated, distributed key-value store. Something that scales well as you feed it more machines, and doesn’t require much setup or maintenance - it should just work. The API should be that of a simple hashtable: set(key, val), get(key), delete(key). This would dispense with the hassle of managing a sharded / replicated database setup, and hopefully be capable of serving up data by primary key efficiently.
Five of the projects on the list are far from being simple key-value stores, and as such don’t meet the requirements - but they are definitely worth a mention.
1) We’re already heavy users of Hadoop, and have been experimenting with Hbase for a while. It’s much more than a KV store, but latency is too great to serve data to the website. We will probably use Hbase internally for other stuff though - we already have stacks of data in HDFS.
2) Hypertable provides a similar feature set to Hbase (both are inspired by Google’s Bigtable). They recently announced a new sponsor, Baidu - the biggest Chinese search engine. Definitely one to watch, but doesn’t fit the low-latency KV store bill either.
3) Cassandra sounded very promising when the source was released by Facebook last year. They use it for inbox search. It’s Bigtable-esque, but uses a DHT so doesn’t need a central server (one of the Cassandra developers previously worked at Amazon on Dynamo). Unfortunately it’s languished in relative obscurity since release, because Facebook never really seemed interested in it as an open-source project. From what I can tell there isn’t much in the way of documentation or a community around the project at present.
4) CouchDB is an interesting one - it’s a “distributed, fault-tolerant and schema-free document-oriented database accessible via a RESTful HTTP/JSON API”. Data is stored in ‘documents’, which are essentially key-value maps themselves, using the data types you see in JSON. Read the CouchDB Technical Overview if you are curious how the web’s trendiest document database works under the hood. This article on the Rules of Database App Aging goes some way to explaining why document-oriented databases make sense. CouchDB can do full text indexing of your documents, and lets you express views over your data in Javascript. I could imagine using CouchDB to store lots of data on users: name, age, sex, address, IM name and lots of other fields, many of which could be null, and each site update adds or changes the available fields. In situations like that it quickly gets unwieldly adding and changing columns in a database, and updating versions of your application code to match. Although many people are using CouchDB in production, their FAQ points out they may still make backwards-incompatible changes to the storage format and API before version 1.0.
5) ThruDB is a document storage and indexing system made up for four components: a document storage service, indexing service, message queue and proxy. It uses Thrift for communication, and has a pluggable storage subsystem, including an Amazon S3 option. It’s designed to scale well horizontally, and might be a better option that CouchDB if you are running on EC2. I’ve heard a lot more about CouchDB than Thrudb recently, but it’s definitely worth a look if you need a document database. It’s not suitable for our needs for the same reasons as CouchDB.
Distributed key-value stores
The rest are much closer to being ’simple’ key-value stores with low enough latency to be used for serving data used to build dynamic pages. Latency will be dependent on the environment, and whether or not the dataset fits in memory. If it does, I’d expect sub-10ms response time, and if not, it all depends on how much money you spent on spinning rust.
MemcacheDB is essentially just memcached that saves stuff to disk using a Berkeley database. As useful as this may be for some situations, it doesn’t deal with replication and partitioning (sharding), so it would still require a lot of work to make it scale horizontally and be tolerant of machine failure. Other memcached derivatives such as repcached go some way to addressing this by giving you the ability to replicate entire memcache servers (async master-slave setup), but without partitioning it’s still going to be a pain to manage.
Project Voldemort looks awesome. Go and read the rather splendid website, which explains how it works, and includes pretty diagrams and a good description of how consistent hashing is used in the Design section. (If consistent hashing butters your muffin, check out libketama - a consistent hashing library and the Erlang libketama driver). Project-Voldemort handles replication and partitioning of data, and appears to be well written and designed. It’s reassuring to read in the docs how easy it is to swap out and mock different components for testing. It’s non-trivial to add nodes to a running cluster, but according to the mailing-list this is being worked on. It sounds like this would fit the bill if we ran it with a Java load-balancer service (see their Physical Architecture Options diagram) that exposed a Thrift API so all our non-Java clients could use it.
Scalaris is probably the most face-meltingly awesome thing you could build in Erlang. CouchDB, Ejabberd and RabbitMQ are cool, but Scalaris packs by far the most impressive collection of sexy technologies. Scalaris is a key-value store - it uses a modified version of the Chord algorithm to form a DHT, and stores the keys in lexicographical order, so range queries are possible. Although I didn’t see this explicitly mentioned, this should open up all sorts of interesting options for batch processing - map-reduce for example. On top of the DHT they use an improved version of Paxos to guarantee ACID properties when dealing with multiple concurrent transactions. So it’s a key-value store, but it can guarantee the ACID properties and do proper distributed transactions over multiple keys.
Oh, and to demonstrate how you can scale a webservice based on such a system, the Scalaris folk implemented their own version of Wikipedia on Scalaris, loaded in the Wikipedia data, and benchmarked their setup to prove it can do more transactions/sec on equal hardware than the classic PHP/MySQL combo that Wikipedia use. Yikes.
From what I can tell, Scalaris is only memory-resident at the moment and doesn’t persist data to disk. This makes it entirely impractical to actually run a service like Wikipedia on Scalaris for real - but it sounds like they tackled the hard problems first, and persisting to disk should be a walk in the park after you rolled your own version of Chord and made Paxos your bitch. Take a look at this presentation about Scalaris from the Erlang Exchange conference: Scalaris presentation video.
The reminaing projects, Dynomite, Ringo and Kai are all, more or less, trying to be Dynamo. Of the three, Ringo looks to be the most specialist - it makes a distinction between small (less than 4KB) and medium-size data items (<100MB). Medium sized items are stored in individual files, whereas small items are all stored in an append-log, the index of which is read into memory at startup. From what I can tell, Ringo can be used in conjunction with the Erlang map-reduce framework Nokia are working on called Disco.
I didn’t find out much about Kai other than it’s rather new, and some mentions in Japanese. You can chose either Erlang ets or dets as the storage system (memory or disk, respectively), and it uses the memcached protocol, so it will already have client libraries in many languages.
Dynomite doesn’t have great documentation, but it seems to be more capable than Kai, and is under active development. It has pluggable backends including the storage mechanism from CouchDB, so the 2GB file limit in dets won’t be an issue. Also I heard that Powerset are using it, so that’s encouraging.
Summary
Scalaris is fascinating, and I hope I can find the time to experiment more with it, but it needs to save stuff to disk before it’d be useful for the kind of things we might use it for at Last.fm.
I’m keeping an eye on Dynomite - hopefully more information will surface about what Powerset are doing with it, and how it performs at a large scale.
Based on my research so far, Project-Voldemort looks like the most suitable for our needs. I’d love to hear more about how it’s used at LinkedIn, and how many nodes they are running it on.
What else is there?
Here are some other related projects:
- Hazelcast - Java DHT/clustering library
- nmdb - a network database (dbm-style)
- Open Chord - Java DHT
If you know of anything I’ve missed off the list, or have any feedback/suggestions, please post a comment. I’m especially interested in hearing about people who’ve tested or are using KV-stores in lieu of relational databases.
UPDATE 1: Corrected table: memcachedb does replication, as per BerkeleyDB.
How we use IRC at Last.fm
Everyone that works at Last.fm is typically connected to our IRC server. We have different channels per team, as well as a company-wide channel, and a few channels dedicated to automated monitoring.
Sometimes it makes much more sense to discuss / ask questions on IRC instead of email, and it’s useful to be able to raise people who are not in the office. That said, the main reason I’m writing this post is to mention the dev-support bot we use: irccat.
IRCCat - Development support bot
The irccat bot joins all your channels, and waits for messages on a specified ip:port on your internal network. Anything you send to that port will be sent to IRC by the bot. IRCCat - as in, `cat` to IRC.
Using netcat, you can easily send events to irc from shell scripts:
$ echo “Something just happened” | nc -q0 somemachine 12345
That will send to the default channel only (first in the config file). You can direct messages to specific combinations of channels (#) or users (@) like so:
$ echo “#syschan Starting backup job” | nc -q0 somemachine 12345
$ echo “#musicteam,#legal,@alice New album uploaded: …” | nc -q0 somemachine 12345
Some of the things we automatically send to appropriate IRC channels:
- SVN commits
- JIRA issue tracker updates
- Nagios alerts for monitored hosts and services
- Deployment notices to testing/staging/production
- Results of automated tests if something bad happens
- Links to pics from security camfeed when someone opens the office door out of hours
We also post messages from automated backup jobs etc, which helps correlate such events with any unusual load spikes or glitches in usually-smooth graphs.
In addition to providing a cat-to-irc conduit, irccat will also hand off commands to a script you can provide. We use this to expose lookup tools and some admin functions to our support staff and developers. The handler script we use is PHP, and has access to our core website libs. Typing “?pokereleasenode”, “?lookup user RJ” or “?uncache artist Radiohead” is faster than writing a throw-away script, more accessible to non-developers, less hassle than a web interface and creates a public log so people can see what’s going on.
The bot is written in Java, it’s easy to build and configure, all the deps are included:
Getting to know ejabberd and writing modules
I started poking around in the ejabberd source code to see what I could learn. I couldn’t find much in the way of high level documentation that talks about how the various bits of ejabberd talk to each other, so I’m starting to piece it together myself.
After compiling ejabberd I made a php script I could use with the external authentication system. Here’s a version that supports just two hardcoded users:
ejabberd.cfg:
{auth_method, external}.
{extauth_program, "/tmp/auth.php"}.
auth.php:
-
#!/usr/bin/php
-
<?
-
if(!$fh){
-
}
-
-
do{
-
$len = $len[1];
-
if($len<1) continue;
-
switch($method){
-
case ‘auth’:
-
if(@$users[$username] == $password){
-
}else{
-
}
-
break;
-
-
case ‘isuser’:
-
}else{
-
}
-
break;
-
-
default:
-
}
-
}while(true);
I stripped down the ejabberd config to just load what I considered the bare essentials. Here is the modules section I’m testing with:
From ejabberd.cfg:
{modules,
[
{mod_caps, []},
{mod_disco, []},
{mod_roster, []},
{mod_pubsub, [ % requires mod_caps
{access_createnode, pubsub_createnode},
{plugins, ["default", "pep"]}
]},
{mod_mnesiaweb, []},
{mod_thriftctl, []}
]}.
mod_disco deals with discovery, so clients can find out what the server supports. mod_roster deals with rosters (buddy lists etc) using mnesia. mod_pubsub is enabled because I want to use User Tune, an extension that lets you broadcast the name of the song you are playing to all everyone in your roster. mod_caps provides XEP-115 - an extension for broadcasting and dynamically discovering client, device, or generic entity capabilities. mod_caps is a requirement of mod_pubsub.
I’ve removed the module that allows users to register, although I made a few accounts first whilst testing. The last two modules, mod_mnesiaweb and mod_thriftctl are modules I wrote.
mod_mnesiaweb
To help figure out what’s going on inside of ejabberd, it’s useful to be able to easily browse the mnesia database. Yaws comes with an appmod that does this, called ymnesia. This ejabberd module will start yaws in embedded mode and run this appmod, enabling you to explore the mnesia database from a web browser.
Yaws observation: yaws didn’t appear to build ymnesia by default, I edited the Makefile in src and added “ymnesia” to the module list. Also, if ./configure fails, the package you are probably missing is libpam0g-dev
mod_mnesiaweb:
-
% Ejabberd module that runs yaws in embedded mode,
-
% and loads the ymnesia appmod for browsing mnesia.
-
-module(mod_mnesiaweb).
-
-author(‘rj@last.fm’).
-
-
-include("/usr/local/lib/yaws/include/yaws.hrl").
-
-
-behaviour(gen_mod).
-
-export([start/2, stop/1]).
-
-
start(_Host, Opts) ->
-
Port = gen_mod:get_opt(port, Opts, 8001),
-
code:add_path("/usr/local/lib/yaws/ebin"),
-
application:set_env(yaws, embedded, true),
-
application:start(yaws),
-
GC = yaws_config:make_default_gconf(false,"yawstest"),
-
SC = #sconf{
-
port = Port,
-
servername = "ejabnesia",
-
listen = {0,0,0,0},
-
appmods = [{"showdb", ymnesia}],
-
docroot = "wwwroot"
-
},
-
yaws_api:setconf(GC, [[SC]]),
-
ok.
-
-
stop(_Host) ->
-
application:stop(yaws),
-
ok.
To compile it:
erlc -pa ${EJAB_SRC} -I ${EJAB_SRC} mod_mnesiaweb.erl
where EJAB_SRC is the ejabberd-2.X.X/src directory, after you’ve compiled from source (so the beams are there too).
Copy the resulting mod_mnesiaweb.beam to /var/lib/ejabberd/ebin so ejabberd finds it, and it should work. Hit up http://localhost:8001/showdb/ in your browser and you can explore the mnesia database.
Use the match syntax to filter tables. For example to find everyone in my roster, I use this in the input box next to roster:
{roster,{"RJ",'_', {'_','_',[]}}, ‘_’,'_’,'_’,'_’,'_’,'_’,'_’,'_’}
Not pretty, but it gets the job done. You can just view the entire table, copy a record then replace fields with ‘_’ to build queries.
mod_thriftctl
Next up I wanted to try the Erlang Thrift bindings (written by the folks at Amie St.), and expose some useful functionality for controlling the server.
If you aren’t familiar with Thrift, I recommend reading about it first. In a nutshell, you write your API using an IDL (a .thrift file) and the thrift compiler creates client libraries, and server code in various different languages. It’s an RPC mechanism, and useful in a mixed environment.
mod_thriftctl.thrift:
#!/usr/local/bin/thrift -php -erl
struct JabberUser {
1: string name,
2: string server
}
service Ejabthrift {
/* add ruser to roster of luser, and visa-versa. also routes presence to users if online */
void add_friend( 1: JabberUser luser,
2: JabberUser ruser
),
/* remove ruser from luser's roster */
void remove_friend( 1: JabberUser luser, 2: JabberUser ruser ),
/* make it look like fromuser sent a message to touser */
void spoof_message( 1: JabberUser fromuser, 2: JabberUser touser, 3: string message, 4: string subject ),
/* .. or a chat message */
void spoof_chat( 1: JabberUser fromuser, 2: JabberUser touser, 3: string message, 4: string thread ),
/* sends PEP usertune message, see http://xmpp.org/extensions/xep-0118.html */
void publish_np ( 1: JabberUser fromuser, 2: string artist, 3: string album, 4: string track, 5: i32 tracklength, 6: i32 tracknum )
}
Run that .thrift file, and you get gen-php and gen-erl directories, with php client code, and erlang files needed to build a server.
Here’s the ejabberd module, which starts a thrift server:
mod_thriftctl:
-
%
-
% A module to control ejabberd with a thrift interface.
-
%
-
-module(mod_thriftctl).
-
-author(‘rj@last.fm’).
-
-
% ejabberd headers:
-
-include("ejabberd.hrl").
-
-include("mod_roster.hrl").
-
-include("jlib.hrl").
-
-
% thrift server headers:
-
-include("thrift.hrl").
-
-include("transport/tSocket.hrl").
-
-include("protocol/tBinaryProtocol.hrl").
-
-include("server/tErlServer.hrl").
-
-include("transport/tErlAcceptor.hrl").
-
-
% we are an ejabberd module:
-
-behaviour(gen_mod).
-
-export([start/2, stop/1]).
-
-
% our thrift service:
-
-include("ejabthrift_thrift.hrl").
-
-include("mod_thriftctl_types.hrl").
-
-export([ add_friend/2, remove_friend/2,
-
spoof_message/4, spoof_chat/4,
-
publish_np/6
-
]).
-
-
% convert thrift Jabberuser into ejabberd jid
-
ju2jid(Jabberuser) when is_record(Jabberuser, jabberUser) ->
-
#jid{ user=Jabberuser#jabberUser.name, server=Jabberuser#jabberUser.server, resource="",
-
luser=Jabberuser#jabberUser.name, lserver=Jabberuser#jabberUser.server, lresource=""
-
}.
-
-
spoof_message( FromU, ToU, Msg, Subject ) ->
-
F = ju2jid(FromU),
-
T = ju2jid(ToU),
-
XmlBody = {xmlelement, "message",
-
[
-
{"from", jlib:jid_to_string(F)},
-
{"to", jlib:jid_to_string(T)}
-
],
-
[
-
{xmlelement, "subject", [], [{xmlcdata, Subject}]},
-
{xmlelement, "body", [], [{xmlcdata, Msg}]}
-
]
-
},
-
ejabberd_router:route(F, T, XmlBody).
-
-
spoof_chat( FromU, ToU, Msg, Thread ) ->
-
F = ju2jid(FromU),
-
T = ju2jid(ToU),
-
XmlBody = {xmlelement, "message",
-
[{"type", "chat"},
-
{"from", jlib:jid_to_string(F)},
-
{"to", jlib:jid_to_string(T)}
-
],
-
[
-
{xmlelement, "thread", [], [{xmlcdata, Thread}]},
-
{xmlelement, "body", [], [{xmlcdata, Msg}]}
-
]
-
},
-
ejabberd_router:route(F, T, XmlBody).
-
-
publish_np( FromU, ArtistS, AlbumS, TrackS, LengthI, TrackNumI ) ->
-
From = ju2jid(FromU),
-
% The usertune message must contain binaries, not strings or ints
-
FromStr = jlib:jid_to_string(From),
-
Artist = list_to_binary(ArtistS),
-
Album = list_to_binary(AlbumS),
-
Track = list_to_binary(TrackS),
-
Length = list_to_binary(io_lib:format("~w",[LengthI])),
-
TrackNum = list_to_binary(io_lib:format("~w",[TrackNumI])),
-
Xml = {xmlelement,"iq",
-
[{"from", FromStr},
-
{"type","set"},
-
{"id","pub1"}],
-
[{xmlcdata,<<"\n ">>},
-
{xmlelement,"pubsub",
-
[{"xmlns","http://jabber.org/protocol/pubsub"}],
-
[{xmlcdata,<<"\n ">>},
-
{xmlelement,"publish",
-
[{"node","http://jabber.org/protocol/tune"}],
-
[{xmlcdata,<<"\n ">>},
-
{xmlelement,"item",[],
-
[{xmlcdata,<<"\n ">>},
-
{xmlelement,"tune",
-
[{"xmlns","http://jabber.org/protocol/tune"}],
-
[{xmlcdata,<<"\n ">>},
-
{xmlelement,"artist",[],
-
[{xmlcdata, Artist}]},
-
{xmlcdata,<<"\n ">>},
-
{xmlelement,"length",[],[{xmlcdata, Length}]},
-
{xmlcdata,<<"\n ">>},
-
{xmlelement,"source",[],
-
[{xmlcdata, Album}]},
-
{xmlcdata,<<"\n ">>},
-
{xmlelement,"title",[],
-
[{xmlcdata, Track}]},
-
{xmlcdata,<<"\n ">>},
-
{xmlelement,"track",[],[{xmlcdata, TrackNum}]},
-
{xmlcdata,<<"\n ">>}]},
-
{xmlcdata,<<"\n ">>}]},
-
{xmlcdata,<<"\n ">>}]},
-
{xmlcdata,<<"\n ">>}]},
-
{xmlcdata,<<"\n">>}]},
-
% PEP means you act as a pubsub node yourself,
-
% so it’s addressed to yourself and is broadcast to your friends automatically:
-
ejabberd_router:route(From, From, Xml),
-
ok.
-
-
% adds bi-directional friend relationship immediately for both users.
-
add_friend( #jabberUser{name=LU, server=LS},
-
#jabberUser{name=RU, server=RS}) ->
-
AskMessage = "",
-
Group = "",
-
Subtype = both,
-
subscribe(LU, LS, RU, RS, RU, Group, Subtype, AskMessage),
-
subscribe(RU, RS, LU, LS, LU, Group, Subtype, AskMessage),
-
route_rosteritem(LU, LS, RU, RS, RU, Group, Subtype),
-
route_rosteritem(RU, RS, LU, LS, LU, Group, Subtype),
-
ok.
-
-
remove_friend( #jabberUser{name=LU, server=LS}, #jabberUser{name=RU, server=RS} ) ->
-
unsubscribe(LU, LS, RU, RS),
-
unsubscribe(RU, RS, LU, LS),
-
route_rosteritem(LU, LS, RU, RS, "", "", "remove"),
-
route_rosteritem(RU, RS, LU, LS, "", "", "remove"),
-
ok.
-
-
unsubscribe(LocalUser, LocalServer, RemoteUser, RemoteServer) ->
-
Key = {{LocalUser,LocalServer,{RemoteUser,RemoteServer,[]}},
-
{LocalUser,LocalServer}},
-
mnesia:transaction(fun() -> mnesia:delete(roster, Key, write) end).
-
-
route_rosteritem(LocalUser, LocalServer, RemoteUser, RemoteServer, Nick, Group, Subscription) ->
-
LJID = jlib:make_jid(LocalUser, LocalServer, ""),
-
RJID = jlib:make_jid(RemoteUser, RemoteServer, ""),
-
ToS = jlib:jid_to_string(LJID),
-
ItemJIDS = jlib:jid_to_string(RJID),
-
GroupXML = {xmlelement, "group", [], [{xmlcdata, Group}]},
-
Item = {xmlelement, "item",
-
[{"jid", ItemJIDS},
-
{"name", Nick},
-
{"subscription", Subscription}],
-
[GroupXML]},
-
Query = {xmlelement, "query", [{"xmlns", ?NS_ROSTER}], [Item]},
-
Packet = {xmlelement, "iq", [{"type", "set"}, {"to", ToS}], [Query]},
-
ejabberd_router:route(LJID, LJID, Packet).
-
-
-
subscribe(LocalUser, LocalServer, RemoteUser, RemoteServer, Nick, Group, Subscription, Xattrs) ->
-
R = #roster{usj = {LocalUser,LocalServer,{RemoteUser,RemoteServer,[]}},
-
us = {LocalUser,LocalServer},
-
jid = {RemoteUser,RemoteServer,[]},
-
name = Nick,
-
subscription = Subscription, % none, to=you see him, from=he sees you, both
-
ask = none, % out=send request, in=somebody requests you, none
-
groups = [Group],
-
askmessage = Xattrs, % example: [{"category","conference"}]
-
xs = []
-
},
-
mnesia:transaction(fun() -> mnesia:write(R) end).
-
-
start(Host, Opts) ->
-
?INFO("mod_ejabthrift start().",[]),
-
%% get options
-
Port = gen_mod:get_opt(port, Opts, 9000),
-
-
spawn(fun()-> thrift:start() end),
-
?INFO("mod_ejabthrift thrift:start().",[]),
-
-
Handler = ?MODULE,
-
Processor = ejabthrift_thrift,
-
-
TF = tBufferedTransportFactory:new(),
-
PF = tBinaryProtocolFactory:new(),
-
-
ServerTransport = tErlAcceptor,
-
ServerFlavor = tErlServer,
-
-
Server = oop:start_new(ServerFlavor, [Port, Handler, Processor, ServerTransport, TF, PF]),
-
-
case ?R0(Server, effectful_serve) of
-
ok ->
-
?INFO("mod_ejabthrift: Thrift server (~s) listening on port ~w",[Host, Port]),
-
% put Server into process dictionary (needed for clean stop)
-
put(thrift_server_reference, Server),
-
ok;
-
Error ->
-
?ERROR_MSG("mod_ejabthrift: Error starting thrift server: ~w", [Error]),
-
Error
-
end.
-
-
stop(_Host) ->
-
?C0(get(thrift_server_reference), stop),
-
ok.
To build, first build the gen-erl code:
erlc -pa ${EJAB_SRC} -I ${EJAB_SRC} -I ${ERL_THRIFT}/include -I ./gen-erl -o ./gen-erl ./gen-erl/*.erl
Where ERL_THRIFT is the lib/erl directory from the amiethrift code, git://repo.or.cz/amiethrift.git
Then compile the module:
erlc -pa ${EJAB_SRC} -I ${EJAB_SRC} -I ${ERL_THRIFT}/include -I ./gen-erl *.erl
To install, copy all the beam files to the ejabberd ebin dir:
sudo cp *.beam gen-erl/*.beam /var/lib/ejabberd/ebin/
This is inspired by mod_xmlrpc, which is in ejabberd-modules. As you can see from the start function, that’s what it takes to start a thrift server. It’s now trivial to call into ejabberd from other languages. For example, if you started listening to a song using a flash player on the website, a php webservice could make a user tune announcement on your behalf, or spoof messages from you boasting how much you love listening to Paris Hilton.
If anyone knows where I can read about the ejabberd architecture / design, so I don’t have to piece it all together myself, please let me know.
ssh hack: connect directly to machine via a firewall box
UPDATED 23/03/2009: added “-q0″ option to clean up netcat after session terminates, and left another useful ssh tip in the comments.
It’s common to have to ssh to firewall / gateway machine, then ssh to the machine you want to work on within a server network.
Typically you’d do this from your local machine:
$ ssh firewall.example.com
Password:
$ ssh my-private-host
I finally got bored of doing this, and created the following file, /usr/bin/sssh
#!/bin/bash ssh -oproxycommand="ssh -q firewall.example.com nc -q0 %h %p" $*
Now I can use the sssh command to connect to hosts using the firewall machine as a proxy. Like most good hacks, this uses netcat.
Eg:
$ sssh 10.1.2.3
Will connect me directly to a machine on the server network, via the firewall box. Seeing as it passes all parameters to ssh (the $* bit) you can do port forwards and X-forwarding as usual too:
$ sssh -L 5432:localhost:5432 my-vm
This lets me tunnel the port for a PostgreSQL running on my development vm (my-vm) in a single command. I have all my keys installed, so no passwords needed - I estimate this will save me about 60 seconds every day.
A Million-user Comet Application with Mochiweb, Part 3
Part 1 and Part 2 in this series showed how to build a comet application using mochiweb, and how to route messages to connected users. We managed to squeeze application memory down to 8KB per connection. We did ye olde c10k test, and observed what happened with 10,000 connected users. We made graphs. It was fun, but now it’s time to make good on the claims made in the title, and turn it up to 1 million connections.
This post covers the following:
- Add a pubsub-like subscription database using Mnesia
- Generate a realistic friends dataset for a million users
- Tune mnesia and bulk load in our friends data
- Opening a million connections from one machine
- Benchmark with 1 Million connected users
- Libevent + C for connection handling
- Final thoughts
One of the challenging parts of this test was actually being able to open 1M connections from a single test machine. Writing a server to accept 1M connections is easier than actually creating 1M connections to test it with, so a fair amount of this article is about the techniques used to open 1M connections from a single machine.
Getting our pubsub on
In Part 2 we used the router to send messages to specific users. This is fine for a chat/IM system, but that there are sexier things we could do instead. Before we launch into a large-scale test, let’s add one more module - a subscription database. We want the application store who your friends are, so it can push you all events generated by people on your friends list.
My intention is to use this for Last.fm so I can get a realtime feed of songs my friends are currently listening to. It could equally apply to other events generated on social networks. Flickr photo uploads, Facebook newsfeed items, Twitter messages etc. FriendFeed even have a realtime API in beta, so this kind of thing is definitely topical. (Although I’ve not heard of anyone except Facebook using Erlang for this kind of thing).
Implementing the subscription-manager
We’re implementing a general subscription manager, but we’ll be subscribing people to everyone on their friends list automatically - so you could also think of this as a friends database for now.
The subsmanager API:
- add_subscriptions([{Subscriber, Subscribee},...])
- remove_subscriptions([{Subscriber, Subscribee},...])
- get_subscribers(User)
subsmanager.erl
-
-module(subsmanager).
-
-behaviour(gen_server).
-
-include("/usr/local/lib/erlang/lib/stdlib-1.15.4/include/qlc.hrl").
-
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-
-export([add_subscriptions/1,
-
remove_subscriptions/1,
-
get_subscribers/1,
-
first_run/0,
-
stop/0,
-
start_link/0]).
-
-record(subscription, {subscriber, subscribee}).
-
-record(state, {}). % state is all in mnesia
-
-define(SERVER, global:whereis_name(?MODULE)).
-
-
start_link() ->
-
gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
-
-
stop() ->
-
gen_server:call(?SERVER, {stop}).
-
-
add_subscriptions(SubsList) ->
-
gen_server:call(?SERVER, {add_subscriptions, SubsList}, infinity).
-
-
remove_subscriptions(SubsList) ->
-
gen_server:call(?SERVER, {remove_subscriptions, SubsList}, infinity).
-
-
get_subscribers(User) ->
-
gen_server:call(?SERVER, {get_subscribers, User}).
-
-
%%
-
-
init([]) ->
-
ok = mnesia:start(),
-
io:format("Waiting on mnesia tables..\n",[]),
-
mnesia:wait_for_tables([subscription], 30000),
-
Info = mnesia:table_info(subscription, all),
-
io:format("OK. Subscription table info: \n~w\n\n",[Info]),
-
{ok, #state{}}.
-
-
handle_call({stop}, _From, State) ->
-
{stop, stop, State};
-
-
handle_call({add_subscriptions, SubsList}, _From, State) ->
-
% Transactionally is slower:
-
% F = fun() ->
-
% [ ok = mnesia:write(S) || S <- SubsList ]
-
% end,
-
% mnesia:transaction(F),
-
[ mnesia:dirty_write(S) || S <- SubsList ],
-
{reply, ok, State};
-
-
handle_call({remove_subscriptions, SubsList}, _From, State) ->
-
F = fun() ->
-
[ ok = mnesia:delete_object(S) || S <- SubsList ]
-
end,
-
mnesia:transaction(F),
-
{reply, ok, State};
-
-
handle_call({get_subscribers, User}, From, State) ->
-
F = fun() ->
-
Subs = mnesia:dirty_match_object(#subscription{subscriber=‘_’, subscribee=User}),
-
Users = [Dude || #subscription{subscriber=Dude, subscribee=_} <- Subs],
-
gen_server:reply(From, Users)
-
end,
-
spawn(F),
-
{noreply, State}.
-
-
handle_cast(_Msg, State) -> {noreply, State}.
-
handle_info(_Msg, State) -> {noreply, State}.
-
-
terminate(_Reason, _State) ->
-
mnesia:stop(),
-
ok.
-
-
code_change(_OldVersion, State, _Extra) ->
-
io:format("Reloading code for ?MODULE\n",[]),
-
{ok, State}.
-
-
%%
-
-
first_run() ->
-
mnesia:create_schema([node()]),
-
ok = mnesia:start(),
-
Ret = mnesia:create_table(subscription,
-
[
-
{disc_copies, [node()]},
-
{attributes, record_info(fields, subscription)},
-
{index, [subscribee]}, %index subscribee too
-
{type, bag}
-
]),
-
Ret.
Noteworthy points:
- I’ve included qlc.hrl, needed for mnesia queries using list comprehension, using an absolute path. That can’t be best practice, it wasn’t finding it otherwise though.
get_subscribersspawns another process and delegates the job of replying to that process, usinggen_server:reply. This means the gen_server loop won’t block on that call if we throw lots of lookups at it and mnesia slows down.rr(”subsmanager.erl”).in the example below allows you to use record definitions in the erl shell. Putting your record definitions into arecords.hrlfile and including that in your modules is considered better style. I inlined it for brevity.
Now to test it. first_run() creates the mnesia schema, so it’s important to run that first. Another potential gotcha with mnesia is that (by default) the database can only be accessed by the node that created it, so give the erl shell a name, and stick with it.
$ mkdir /var/mnesia
$ erl -boot start_sasl -mnesia dir '"/var/mnesia_data"' -sname subsman
(subsman@localhost)1> c(subsmanager).
{ok,subsmanager}
(subsman@localhost)2> subsmanager:first_run().
...
{atomic,ok}
(subsman@localhost)3> subsmanager:start_link().
Waiting on mnesia tables..
OK. Subscription table info:
[{access_mode,read_write},{active_replicas,[subsman@localhost]},{arity,3},{attributes,[subscriber,subscribee]},{checkpoints,[]},{commit_work,[{index,bag,[{3,{ram,57378}}]}]},{cookie,{{1224,800064,900003},subsman@localhost}},{cstruct,{cstruct,subscription,bag,[],[subsman@localhost],[],0,read_write,[3],[],false,subscription,[subscriber,subscribee],[],[],{{1224,863164,904753},subsman@localhost},{{2,0},[]}}},{disc_copies,[subsman@localhost]},{disc_only_copies,[]},{frag_properties,[]},{index,[3]},{load_by_force,false},{load_node,subsman@localhost},{load_order,0},{load_reason,{dumper,create_table}},{local_content,false},{master_nodes,[]},{memory,288},{ram_copies,[]},{record_name,subscription},{record_validation,{subscription,3,bag}},{type,bag},{size,0},{snmp,[]},{storage_type,disc_copies},{subscribers,[]},{user_properties,[]},{version,{{2,0},[]}},{where_to_commit,[{subsman@localhost,disc_copies}]},{where_to_read,subsman@localhost},{where_to_write,[subsman@localhost]},{wild_pattern,{subscription,’_',’_'}},{{index,3},57378}]
{ok,<0.105.0>}
(subsman@localhost)4> rr("subsmanager.erl").
[state,subscription]
(subsman@localhost)5> subsmanager:add_subscriptions([ #subscription{subscriber=alice, subscribee=rj} ]).
ok
(subsman@localhost)6> subsmanager:add_subscriptions([ #subscription{subscriber=bob, subscribee=rj} ]).
ok
(subsman@localhost)7> subsmanager:get_subscribers(rj).
[bob,alice]
(subsman@localhost)8> subsmanager:remove_subscriptions([ #subscription{subscriber=bob, subscribee=rj} ]).
ok
(subsman@localhost)8> subsmanager:get_subscribers(rj).
[alice]
(subsman@localhost)10> subsmanager:get_subscribers(charlie).
[]
We’ll use integer Ids to represent users for the benchmark - but for this test I used atoms (rj, alice, bob) and assumed that alice and bob are both on rj’s friends list. It’s nice that mnesia (and ets/dets) doesn’t care what values you use - any Erlang term is valid. This means it’s a simple upgrade to support multiple types of resource. You could start using {user, 123} or {photo, 789} to represent different things people might subscribe to, without changing anything in the subsmanager module.
Modifying the router to use subscriptions
Instead of addressing messages to specific users, ie router:send(123, "Hello user 123"), we’ll mark messages with a subject - that is, the person who generated the message (who played the song, who uploaded the photo etc) - and have the router deliver the message to every user who has subscribed to the subject user. In other words, the API will work like this: router:send(123, "Hello everyone subscribed to user 123")
Updated router.erl:
-
-module(router).
-
-behaviour(gen_server).
-
-
-export([start_link/0]).
-
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-
terminate/2, code_change/3]).
-
-
-export([send/2, login/2, logout/1]).
-
-
-define(SERVER, global:whereis_name(?MODULE)).
-
-
% will hold bidirectional mapping between id <–> pid
-
-record(state, {pid2id, id2pid}).
-
-
start_link() ->
-
gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
-
-
% sends Msg to anyone subscribed to Id
-
send(Id, Msg) ->
-
gen_server:call(?SERVER, {send, Id, Msg}).
-
-
login(Id, Pid) when is_pid(Pid) ->
-
gen_server:call(?SERVER, {login, Id, Pid}).
-
-
logout(Pid) when is_pid(Pid) ->
-
gen_server:call(?SERVER, {logout, Pid}).
-
-
%%
-
-
init([]) ->
-
% set this so we can catch death of logged in pids:
-
process_flag(trap_exit, true),
-
% use ets for routing tables
-
{ok, #state{
-
pid2id = ets:new(?MODULE, [bag]),
-
id2pid = ets:new(?MODULE, [bag])
-
}
-
}.
-
-
handle_call({login, Id, Pid}, _From, State) when is_pid(Pid) ->
-
ets:insert(State#state.pid2id, {Pid, Id}),
-
ets:insert(State#state.id2pid, {Id, Pid}),
-
link(Pid), % tell us if they exit, so we can log them out
-
%io:format("~w logged in as ~w\n",[Pid, Id]),
-
{reply, ok, State};
-
-
handle_call({logout, Pid}, _From, State) when is_pid(Pid) ->
-
unlink(Pid),
-
PidRows = ets:lookup(State#state.pid2id, Pid),
-
case PidRows of
-
[] ->
-
ok;
-
_ ->
-
IdRows = [ {I,P} || {P,I} <- PidRows ], % invert tuples
-
ets:delete(State#state.pid2id, Pid), % delete all pid->id entries
-
[ ets:delete_object(State#state.id2pid, Obj) || Obj <- IdRows ] % and all id->pid
-
end,
-
%io:format("pid ~w logged out\n",[Pid]),
-
{reply, ok, State};
-
-
handle_call({send, Id, Msg}, From, State) ->
-
F = fun() ->
-
% get users who are subscribed to Id:
-
Users = subsmanager:get_subscribers(Id),
-
io:format("Subscribers of ~w = ~w\n",[Id, Users]),
-
% get pids of anyone logged in from Users list:
-
Pids0 = lists:map(
-
fun(U)->
-
[ P || { _I, P } <- ets:lookup(State#state.id2pid, U) ]
-
end,
-
[ Id | Users ] % we are always subscribed to ourselves
-
),
-
Pids = lists:flatten(Pids0),
-
io:format("Pids: ~w\n", [Pids]),
-
% send Msg to them all
-
M = {router_msg, Msg},
-
[ Pid ! M || Pid <- Pids ],
-
% respond with how many users saw the message
-
gen_server:reply(From, {ok, length(Pids)})
-
end,
-
spawn(F),
-
{noreply, State}.
-
-
% handle death and cleanup of logged in processes
-
handle_info(Info, State) ->
-
case Info of
-
{‘EXIT’, Pid, _Why} ->
-
handle_call({logout, Pid}, blah, State);
-
Wtf ->
-
io:format("Caught unhandled message: ~w\n", [Wtf])
-
end,
-
{noreply, State}.
-
-
handle_cast(_Msg, State) ->
-
{noreply, State}.
-
terminate(_Reason, _State) ->
-
ok.
-
code_change(_OldVsn, State, _Extra) ->
-
{ok, State}.
And here’s a quick test that doesn’t require mochiweb - I’ve used atoms instead of user ids, and omitted some output for clarity:
(subsman@localhost)1> c(subsmanager), c(router), rr("subsmanager.erl").
(subsman@localhost)2> subsmanager:start_link().
(subsman@localhost)3> router:start_link().
(subsman@localhost)4> Subs = [#subscription{subscriber=alice, subscribee=rj}, #subscription{subscriber=bob, subscribee=rj}].
[#subscription{subscriber = alice,subscribee = rj},
#subscription{subscriber = bob,subscribee = rj}]
(subsman@localhost)5> subsmanager:add_subscriptions(Subs).
ok
(subsman@localhost)6> router:send(rj, “RJ did something”).
Subscribers of rj = [bob,alice]
Pids: []
{ok,0}
(subsman@localhost)7> router:login(alice, self()).
ok
(subsman@localhost)8> router:send(rj, “RJ did something”).
Subscribers of rj = [bob,alice]
Pids: [<0.46.0>]
{ok,1}
(subsman@localhost)9> receive {router_msg, M} -> io:format(”~s\n”,[M]) end.
RJ did something
ok
This shows how alice can a receive a message when the subject is someone she is subscribed to (rj), even though the message wasn’t sent directly to alice. The output shows that the router identified possible targets as [alice,bob] but only delivered the message to one person, alice, because bob was not logged in.
Generating a typical social-network friends dataset
We could generate lots of friend relationships at random, but that’s not particularly realistic. Social networks tend to exhibit a power law distribution. Social networks usually have a few super-popular users (some Twitter users have over 100,000 followers) and plenty of people with just a handful of friends. The Last.fm friends data is typical - it fits a Barabási–Albert graph model, so that’s what I’ll use.
To generate the dataset I’m using the python module from the excellent igraph library:
fakefriends.py:
-
import igraph
-
g = igraph.Graph.Barabasi(1000000, 15, directed=False)
-
print "Edges: " + str(g.ecount()) + " Verticies: " + str(g.vcount())
-
g.write_edgelist("fakefriends.txt")
This will generate with 2 user ids per line, space separated. These are the friend relationships we’ll load into our subsmanager. User ids range from 1 to a million.
Bulk loading friends data into mnesia
This small module will read the fakefriends.txt file and create a list of subscription records.
readfriends.erl - to read the fakefriends.txt and create subscription records:
-
-module(readfriends).
-
-export([load/1]).
-
-record(subscription, {subscriber, subscribee}).
-
-
load(Filename) ->
-
for_each_line_in_file(Filename,
-
fun(Line, Acc) ->
-
[As, Bs] = string:tokens(string:strip(Line, right, $\n), " "),
-
{A, _} = string:to_integer(As),
-
{B, _} = string:to_integer(Bs),
-
[ #subscription{subscriber=A, subscribee=B} | Acc ]
-
end, [read], []).
-
-
% via: http://www.trapexit.org/Reading_Lines_from_a_File
-
for_each_line_in_file(Name, Proc, Mode, Accum0) ->
-
{ok, Device} = file:open(Name, Mode),
-
for_each_line(Device, Proc, Accum0).
-
-
for_each_line(Device, Proc, Accum) ->
-
case io:get_line(Device, "") of
-
eof -> file:close(Device), Accum;
-
Line -> NewAccum = Proc(Line, Accum),
-
for_each_line(Device, Proc, NewAccum)
-
end.
Now in the subsmanager shell, you can read from the text file and add the subscriptions:
$ erl -name router@minifeeds4.gs2 +K true +A 128 -setcookie secretcookie -mnesia dump_log_write_threshold 50000 -mnesia dc_dump_limit 40
erl> c(readfriends), c(subsmanager).
erl> subsmanager:first_run().
erl> subsmanager:start_link().
erl> subsmanager:add_subscriptions( readfriends:load("fakefriends.txt") ).
Note the additional mnesia parameters - these are to avoid the ** WARNING ** Mnesia is overloaded messages you would (probably) otherwise see. Refer to my previous post: On bulk loading data into Mnesia for alternative ways to load in lots of data. The best solution seems to be (as pointed out in the comments, thanks Jacob!) to set those options. The Mnesia reference manual contains many other settings under Configuration Parameters, and is worth a look.
Turning it up to 1 Million
Creating a million tcp connections from one host is non-trivial. I’ve a feeling that people who do this regularly have small clusters dedicated to simulating lots of client connections, probably running a real tool like Tsung. Even with the tuning from Part 1 to increase kernel tcp memory, increase the file descriptor ulimits and set the local port range to the maximum, we will still hit a hard limit on ephemeral ports. When making a tcp connection, the client end is allocated (or you can specify) a port from the range in /proc/sys/net/ipv4/ip_local_port_range. It doesn’t matter if you specify it manually, or use an ephemeral port, you’re still going to run out. In Part 1, we set the range to “1024 65535″ - meaning there are 65535-1024 = 64511 unprivileged ports available. Some of them will be used by other processes, but we’ll never get over 64511 client connections, because we’ll run out of ports.
The local port range is assigned per-IP, so if we make our outgoing connections specifically from a range of different local IP addresses, we’ll be able to open more than 64511 outgoing connections in total.
So let’s bring up 17 new IP addresses, with the intention of making 62,000 connections from each - giving us a total of 1,054,000 connections. Safely over the 2^32 mark:
$ for i in `seq 1 17`; do echo sudo ifconfig eth0:$i 10.0.0.$i up ; done
If you run ifconfig now you should see your virtual interfaces: eth0:1, eth0:2 … eth0:17, each with a different IP address. Obviously you should chose a sensible part of whatever address space you are using.
All that remains now is to modify the floodtest tool from Part 1 to specify the local IP it should connect from… Unfortunately the erlang http client doesn’t let you specify the source IP. Neither does ibrowse, the alternative http client library. Damn.
<crazy idea>
At this point I considered another option: bringing up 17 pairs of IPs - one on the server and one on the client - each pair in their own isolated /30 subnet. I think that if I then made the client connect to any given server IP, it would force the local address to be other half of the pair on that subnet, because only one of the local IPs would actually be able to reach the server IP. In theory, this would mean declaring the local source IP on the client machine would not be necessary (although the range of server IPs would need to be specified). I don’t know if this would really work - it sounded plausible at the time. In the end I decided it was too perverted and didn’t try it.
</crazy idea>
I also poked around in OTP’s http_transport code and considered adding support for specifying the local IP. It’s not really a feature you usually need in an HTTP client though, and it would certainly have been more work.
gen_tcp lets you specify the source address, so I ended up writing a rather crude client using gen_tcp specifically for this test:
floodtest2.erl
-
-module(floodtest2).
-
-compile(export_all).
-
-define(SERVERADDR, "10.1.2.3"). % where mochiweb is running
-
-define(SERVERPORT, 8000).
-
-
% Generate the config in bash like so (chose some available address space):
-
% EACH=62000; for i in `seq 1 17`; do echo "{{10,0,0,$i}, $((($i-1)*$EACH+1)), $(($i*$EACH))}, "; done
-
-
run(Interval) ->
-
Config = [
-
{{10,0,0,1}, 1, 62000},
-
{{10,0,0,2}, 62001, 124000},
-
{{10,0,0,3}, 124001, 186000},
-
{{10,0,0,4}, 186001, 248000},
-
{{10,0,0,5}, 248001, 310000},
-
{{10,0,0,6}, 310001, 372000},
-
{{10,0,0,7}, 372001, 434000},
-
{{10,0,0,8}, 434001, 496000},
-
{{10,0,0,9}, 496001, 558000},
-
{{10,0,0,10}, 558001, 620000},
-
{{10,0,0,11}, 620001, 682000},
-
{{10,0,0,12}, 682001, 744000},
-
{{10,0,0,13}, 744001, 806000},
-
{{10,0,0,14}, 806001, 868000},
-
{{10,0,0,15}, 868001, 930000},
-
{{10,0,0,16}, 930001, 992000},
-
{{10,0,0,17}, 992001, 1054000}],
-
start(Config, Interval).
-
-
start(Config, Interval) ->
-
Monitor = monitor(),
-
AdjustedInterval = Interval / length(Config),
-
[ spawn(fun start/5, [Lower, Upper, Ip, AdjustedInterval, Monitor])
-
|| {Ip, Lower, Upper} <- Config ],
-
ok.
-
-
start(LowerID, UpperID, _, _, _) when LowerID == UpperID -> done;
-
start(LowerID, UpperID, LocalIP, Interval, Monitor) ->
-
spawn(fun connect/5, [?SERVERADDR, ?SERVERPORT, LocalIP, "/test/"++LowerID, Monitor]),
-
receive after Interval -> start(LowerID + 1, UpperID, LocalIP, Interval, Monitor) end.
-
-
connect(ServerAddr, ServerPort, ClientIP, Path, Monitor) ->
-
Opts = [binary, {packet, 0}, {ip, ClientIP}, {reuseaddr, true}, {active, false}],
-
{ok, Sock} = gen_tcp:connect(ServerAddr, ServerPort, Opts),
-
Monitor ! open,
-
ReqL = io_lib:format("GET ~s\r\nHost: ~s\r\n\r\n", [Path, ServerAddr]),
-
Req = list_to_binary(ReqL),
-
ok = gen_tcp:send(Sock, [Req]),
-
do_recv(Sock, Monitor),
-
(catch gen_tcp:close(Sock)),
-
ok.
-
-
do_recv(Sock, Monitor)->
-
case gen_tcp:recv(Sock, 0) of
-
{ok, B} ->
-
Monitor ! {bytes, size(B)},
-
io:format("Recvd ~s\n", [ binary_to_list(B)]),
-
io:format("Recvd ~w bytes\n", [size(B)]),
-
do_recv(Sock, Monitor);
-
{error, closed} ->
-
Monitor ! closed,
-
closed;
-
Other ->
-
Monitor ! closed,
-
io:format("Other:~w\n",[Other])
-
end.
-
-
% Monitor process receives stats and reports how much data we received etc:
-
monitor() ->
-
Pid = spawn(?MODULE, monitor0, [{0,0,0,0}]),
-
timer:send_interval(10000, Pid, report),
-
Pid.
-
-
monitor0({Open, Closed, Chunks, Bytes}=S) ->
-
receive
-
report -> io:format("{Open, Closed, Chunks, Bytes} = ~w\n",[S]);
-
open -> monitor0({Open + 1, Closed, Chunks, Bytes});
-
closed -> monitor0({Open, Closed + 1, Chunks, Bytes});
-
chunk -> monitor0({Open, Closed, Chunks + 1, Bytes});
-
{bytes, B} -> monitor0({Open, Closed, Chunks, Bytes + B})
-
end.
As an initial test I was connecting to the mochiweb app from Part 1 - it simply sends one message to every client every 10 seconds.
erl> c(floodtest2), floodtest2:run(20).
This quickly ate all my memory.
Turns out opening lots of connections with gen_tcp like that eats a lot of ram. I think it’d need ~36GB to make it work without any additional tuning. I’m not interested in trying to optimise my quick-hack erlang http client (in the real world, this would be 1M actual web browsers), and the only machine I could get my hands on that has more than 32GB of RAM is one of our production databases, and I can’t find a good excuse to take Last.fm offline whilst I test this :) Additionally, it seems like it still only managed to open around 64,500 ports. Hmm.
At this point I decided to break out the trusty libevent, which I was pleased to discover has an HTTP API. Newer versions also have a evhttp_connection_set_local_address function in the http API. This sounds promising.
Here’s the http client in C using libevent:
-
#include <sys/types.h>
-
#include <sys/time.h>
-
#include <sys/queue.h>
-
#include <stdlib.h>
-
#include <err.h>
-
#include <event.h>
-
#include <evhttp.h>
-
#include <unistd.h>
-
#include <stdio.h>
-
#include <sys/socket.h>
-
#include <netinet/in.h>
-
#include <time.h>
-
#include <pthread.h>
-
-
#define BUFSIZE 4096
-
#define NUMCONNS 62000
-
#define SERVERADDR "10.103.1.43"
-
#define SERVERPORT 8000
-
#define SLEEP_MS 10
-
-
char buf[BUFSIZE];
-
-
int bytes_recvd = 0;
-
int chunks_recvd = 0;
-
int closed = 0;
-
int connected = 0;
-
-
// called per chunk received
-
void chunkcb(struct evhttp_request * req, void * arg)
-
{
-
int s = evbuffer_remove( req->input_buffer, &buf, BUFSIZE );
-
//printf("Read %d bytes: %s\n", s, &buf);
-
bytes_recvd += s;
-
chunks_recvd++;
-
if(connected >= NUMCONNS && chunks_recvd%10000==0)
-
}
-
-
// gets called when request completes
-
void reqcb(struct evhttp_request * req, void * arg)
-
{
-
closed++;
-
}
-
-
int main(int argc, char **argv)
-
{
-
event_init();
-
struct evhttp *evhttp_connection;
-
struct evhttp_request *evhttp_request;
-
char addr[16];
-
char path[32]; // eg: "/test/123"
-
int i,octet;
-
for(octet=1; octet<=17; octet++){
-
sprintf(&addr, "10.224.0.%d", octet);
-
for(i=1;i<=NUMCONNS;i++) {
-
evhttp_connection = evhttp_connection_new(SERVERADDR, SERVERPORT);
-
evhttp_connection_set_local_address(evhttp_connection, &addr);
-
evhttp_set_timeout(evhttp_connection, 864000); // 10 day timeout
-
evhttp_request = evhttp_request_new(reqcb, NULL);
-
evhttp_request->chunk_cb = chunkcb;
-
sprintf(&path, "/test/%d", ++connected);
-
evhttp_make_request( evhttp_connection, evhttp_request, EVHTTP_REQ_GET, path );
-
evhttp_connection_set_timeout(evhttp_request->evcon, 864000);
-
event_loop( EVLOOP_NONBLOCK );
-
if( connected % 200 == 0 )
-
usleep(SLEEP_MS*1000);
-
}
-
}
-
event_dispatch();
-
return 0;
-
}
Most parameters are hardcoded as #define’s so you configure it by editing the source and recompiling.
Compile and run:
$ gcc -o httpclient httpclient.c -levent
$ ./httpclient
This still failed to open more than 64,500 ports. Although it used less RAM doing it.
It turns out that although I was specifying the local addresses, the ephemeral port allocation somewhere in the kernel or tcp stack didn’t care, and still ran out after 2^16. So in order to open more than 64,500 connections, you need to specify the local address and local port yourself, and manage them accordingly.
Unfortunately the libevent HTTP API doesn’t have an option to specify the local port. I patched libevent to add a suitable function:
void evhttp_connection_set_local_port(struct evhttp_connection *evcon, u_short port);.
This was a surprisingly pleasant experience; libevent seems well written, and the documentation is pretty decent too.
With my modified libevent installed, I was able to add the following under the set_local_address line in the above code:
evhttp_connection_set_local_port(evhttp_connection, 1024+i);
With that in place, multiple connections from different addresses were able to use the same local port number, specific to the the local address. I recompiled the client and let it run for a bit to confirm it would break the 2^16 barrier.
Netstat confirms it:
# netstat -n | awk '/^tcp/ {t[$NF]++}END{for(state in t){print state, t[state]}}’
TIME_WAIT 8
ESTABLISHED 118222
This shows how many ports are open in various states. We’re finally able to open more than 2^16 connections, phew.
Now we have a tool capable of opening a million http connections from a single box. It seems to consume around 2KB per connection, plus whatever the kernel needs. It’s time to use it for the “million connected user” test against our mochiweb comet server.
C1024K Test - 1 million comet connections
For this test I used 4 different servers of varying specs. These specs may be overpowered for the experiment, but they were available and waiting to go into production, and this made a good burn-in test. All four servers are on the same gigabit LAN, with up to 3 switches and a router in the middle somewhere.
The 1 million test I ran is similar to the 10k test from parts 1 and 2, the main difference being the modified client, now written in C using libevent, and that I’m running in a proper distributed-erlang setup with more than one machine.
On server 1 - Quad-core 2GHz CPU, 16GB of RAM
- Start subsmanager
- Load in the friends data
- Start the router
On server 2 - Dual Quad-core 2.8GHz CPU, 32GB of RAM
- Start mochiweb app
On server 3 - Quad-core 2GHz CPU, 16GB of RAM
- Create 17 virtual IPs as above
- Install patched libevent
- Run client:
./httpclientto create 100 connections per second, up to 1M
On server 4 - Dual-core 2GHz, 2GB RAM
- Run msggen program, to send lots of messages to the router
I measured the memory usage of mochiweb during the ramp-up to a million connections, and for the rest of the day:
The httpclient has a built in delay of 10ms between connections, so it took nearly 3 hours to open a million connections. The resident memory used by the mochiweb process with 1M open connections was around 25GB. Here’s the server this was running on as seen by Ganglia, which measures CPU, network and memory usage and produces nice graphs:
You can see it needs around 38GB and has started to swap. I suspect the difference is mostly consumed by the kernel to keep those connections open. The uplift at the end is when I started sending messages.
Messages were generated using 1,000 processes, with an average time between messages of 60ms per process, giving around 16,666 messages per second overall:
erl> [ spawn( fun()->msggen:start(1000000, 10+random:uniform(100), 1000000) end) || I <- lists:seq(1,1000) ].
The machine (server-4) generating messages looked like this on Ganglia:
That’s 10 MB per second of messages it’s pumping out - 16,666 messages a second. Typically these messages would come from a message bus, app servers, or part of an existing infrastructure.
When I started sending messages, the load on server 1 (hosting subsmanager and router) stayed below 1, and CPU utilization increased from 0 to 5%.
CPU on server 2 (hosting mochiweb app, with 1M connections) increased more dramatically:
Naturally as processes have to leave their hibernate state to handle messages, memory usage will increase slightly. Having all connections open with no messages is a best-case for memory usage - unsurprisingly, actually doing stuff requires more memory.
So where does this leave us? To be on the safe side, the mochiweb machine would need 40GB of RAM to hold open 1M active comet connections. Under load, up to 30GB of the memory would be used by the mochiweb app, and the remaining 10GB by the kernel. In other words, you need to allow 40KB per connection.
During various test with lots of connections, I ended up making some additional changes to my sysctl.conf. This was part trial-and-error, I don’t really know enough about the internals to make especially informed decisions about which values to change. My policy was to wait for things to break, check /var/log/kern.log and see what mysterious error was reported, then increase stuff that sounded sensible after a spot of googling. Here are the settings in place during the above test:
net.core.rmem_max = 33554432 net.core.wmem_max = 33554432 net.ipv4.tcp_rmem = 4096 16384 33554432 net.ipv4.tcp_wmem = 4096 16384 33554432 net.ipv4.tcp_mem = 786432 1048576 26777216 net.ipv4.tcp_max_tw_buckets = 360000 net.core.netdev_max_backlog = 2500 vm.min_free_kbytes = 65536 vm.swappiness = 0 net.ipv4.ip_local_port_range = 1024 65535
I would like to learn more about Linux tcp tuning so I can make a more informed decision about these settings. These are almost certainly not optimal, but at least they were enough to get to 1M connections. These changes, along with the fact this is running on a 64bit Erlang VM, and thus has a wordsize of 8bytes instead of 4, might explain why the memory usage is much higher than I observed during the C10k test of part 2.
An Erlang C-Node using Libevent
After dabbling with the HTTP api for libevent, it seemed entirely sensible to try the 1M connection test against a libevent HTTPd written in C so we have a basis for comparison.
I’m guessing that enabling kernel poll means the erlang VM is able to use epoll (or similar), but even so there’s clearly some overhead involved which we might be able to mitigate by delegating the connection handling to a C program using libevent. I want to reuse most of the Erlang code so far, so let’s do the bare minimum in C - just the connection handling and HTTP stuff.
Libevent has an asynchronous HTTP API, which makes implementing http servers trivial - well, trivial for C, but still less trivial than mochiweb IMO ;) I’d also been looking for an excuse to try the Erlang C interface, so the following program combines the two. It’s a comet http server in C using libevent which identifies users using an integer Id (like our mochiweb app), and also acts as an Erlang C-Node.
It connects to a designated erlang node, listens for messages like {123, <<"Hello user 123">>} then dispatches “Hello user 123″ to user 123, if connected. Messages for users that are not connected are discarded, just like previous examples.
httpdcnode.c
-
#include <sys/types.h>
-
#include <sys/time.h>
-
#include <sys/queue.h>
-
#include <stdlib.h>
-
#include <err.h>
-
#include <event.h>
-
#include <evhttp.h>
-
#include <stdio.h>
-
#include <sys/socket.h>
-
#include <netinet/in.h>
-
-
#include "erl_interface.h"
-
#include "ei.h"
-
-
#include <pthread.h>
-
-
#define BUFSIZE 1024
-
#define MAXUSERS (17*65536) // C1024K
-
-
// List of current http requests by uid:
-
struct evhttp_request * clients[MAXUSERS+1];
-
// Memory to store uids passed to the cleanup callback:
-
int slots[MAXUSERS+1];
-
-
// called when user disconnects
-
void cleanup(struct evhttp_connection *evcon, void *arg)
-
{
-
int *uidp = (int *) arg;
-
fprintf(stderr, "disconnected uid %d\n", *uidp);
-
clients[*uidp] = NULL;
-
}
-
-
// handles http connections, sets them up for chunked transfer,
-
// extracts the user id and registers in the global connection table,
-
// also sends a welcome chunk.
-
void request_handler(struct evhttp_request *req, void *arg)
-
{
-
struct evbuffer *buf;
-
buf = evbuffer_new();
-
if (buf == NULL){
-
err(1, "failed to create response buffer");
-
}
-
-
evhttp_add_header(req->output_headers, "Content-Type", "text/html; charset=utf-8");
-
-
int uid = -1;
-
if(strncmp(evhttp_request_uri(req), "/test/", 6) == 0){
-
uid = atoi( 6+evhttp_request_uri(req) );
-
}
-
-
if(uid <= 0){
-
evbuffer_add_printf(buf, "User id not found, try /test/123 instead");
-
evhttp_send_reply(req, HTTP_NOTFOUND, "Not Found", buf);
-
evbuffer_free(buf);
-
return;
-
}
-
-
if(uid > MAXUSERS){
-
evbuffer_add_printf(buf, "Max uid allowed is %d", MAXUSERS);
-
evhttp_send_reply(req, HTTP_SERVUNAVAIL, "We ran out of numbers", buf);
-
evbuffer_free(buf);
-
return;
-
}
-
-
evhttp_send_reply_start(req, HTTP_OK, "OK");
-
// Send welcome chunk:
-
evbuffer_add_printf(buf, "Welcome, Url: ‘%s’ Id: %d\n", evhttp_request_uri(req), uid);
-
evhttp_send_reply_chunk(req, buf);
-
evbuffer_free(buf);
-
-
// put reference into global uid->connection table:
-
clients[uid] = req;
-
// set close callback
-
evhttp_connection_set_closecb( req->evcon, cleanup, &slots[uid] );
-
}
-
-
-
// runs in a thread - the erlang c-node stuff
-
// expects msgs like {uid, msg} and sends a a ‘msg’ chunk to uid if connected
-
void cnode_run()
-
{
-
int fd; /* fd to Erlang node */
-
int got; /* Result of receive */
-
unsigned char buf[BUFSIZE]; /* Buffer for incoming message */
-
ErlMessage emsg; /* Incoming message */
-
-
ETERM *uid, *msg;
-
-
erl_init(NULL, 0);
-
-
if (erl_connect_init(1, "secretcookie", 0) == -1)
-
erl_err_quit("erl_connect_init");
-
-
if ((fd = erl_connect("httpdmaster@localhost")) < 0)
-
erl_err_quit("erl_connect");
-
-
fprintf(stderr, "Connected to httpdmaster@localhost\n\r");
-
-
struct evbuffer *evbuf;
-
-
while (1) {
-
got = erl_receive_msg(fd, buf, BUFSIZE, &emsg);
-
if (got == ERL_TICK) {
-
continue;
-
} else if (got == ERL_ERROR) {
-
fprintf(stderr, "ERL_ERROR from erl_receive_msg.\n");
-
break;
-
} else {
-
if (emsg.type == ERL_REG_SEND) {
-
// get uid and body data from eg: {123, <<"Hello">>}
-
uid = erl_element(1, emsg.msg);
-
msg = erl_element(2, emsg.msg);
-
int userid = ERL_INT_VALUE(uid);
-
char *body = (char *) ERL_BIN_PTR(msg);
-
int body_len = ERL_BIN_SIZE(msg);
-
// Is this userid connected?
-
if(clients[userid]){
-
fprintf(stderr, "Sending %d bytes to uid %d\n", body_len, userid);
-
evbuf = evbuffer_new();
-
evbuffer_add(evbuf, (const void*)body, (size_t) body_len);
-
evhttp_send_reply_chunk(clients[userid], evbuf);
-
evbuffer_free(evbuf);
-
}else{
-
fprintf(stderr, "Discarding %d bytes to uid %d - user not connected\n",
-
body_len, userid);
-
// noop
-
}
-
erl_free_term(emsg.msg);
-
erl_free_term(uid);
-
erl_free_term(msg);
-
}
-
}
-
}
-
// if we got here, erlang connection died.
-
// this thread is supposed to run forever
-
// TODO - gracefully handle failure / reconnect / etc
-
pthread_exit(0);
-
}
-
-
int main(int argc, char **argv)
-
{
-
// Launch the thread that runs the cnode:
-
pthread_attr_t tattr;
-
pthread_t helper;
-
int status;
-
pthread_create(&helper, NULL, cnode_run, NULL);
-
-
int i;
-
for(i=0;i<=MAXUSERS;i++) slots[i]=i;
-
// Launch libevent httpd:
-
struct evhttp *httpd;
-
event_init();
-
httpd = evhttp_start("0.0.0.0", 8000);
-
evhttp_set_gencb(httpd, request_handler, NULL);
-
event_dispatch();
-
// Not reached, event_dispatch() shouldn’t return
-
evhttp_free(httpd);
-
return 0;
-
}
The maximum number of users is #defined, and similarly to the mochiweb server, it listens on port 8000 and expects users to connect with a path like so: /test/<userid>. Also hardcoded is the name of the erlang node it will connect to in order to receive messages, httpdmaster@localhost, and the erlang cookie, “secretcookie”. Change these accordingly.
Run the erlang node it will connect to first:
$ erl -setcookie secretcookie -sname httpdmaster@localhost
Compile and run like so:
$ gcc -o httpdcnode httpdcnode.c -lerl_interface -lei -levent
$ ./httpdcnode
In the erlang shell, check you can see the hidden c-node:
erl> nodes(hidden).
[c1@localhost]
Now connect in your browser to http://localhost:8000/test/123. You should see the welcome message.
Now back to the erlang shell - send a message to the C node:
erl> {any, c1@localhost} ! {123, <<"Hello Libevent World">>}.
Note that we don’t have a Pid to use, so we use the alternate representation of {procname, node}. We use ‘any’ as the process name, which is ignored by the C-node.
Now you’re able to deliver comet messages via Erlang, but all the http connections are managed by a libevent C program which acts as an Erlang node.
After removing the debug print statements, I connected 1M clients to the httpdcnode server using the same client as above, the machine showed a total of just under 10GB or memory used. The resident memory of the server process was stable at under 2GB:
So big savings compared to mochiweb when handling lots of connections - the resident memory per connection for the server process with libevent is just under 2KB. With everything connected, the server machine claims:
Mem: 32968672k total, 9636488k used, 23332184k free, 180k buffers
So the kernel/tcp stack is consuming an additional 8KB per connection, which seems a little high, but I have no basis for comparison.
This libevent-cnode server needs a bit more work. It doesn’t sensibly handle multiple connections from the same user yet, and there’s no locking so a race condition exists if you disconnect at just when a message was going to be dispatched.
Even so, I think this could be generalized in such a way that would allow you to use Erlang for all the interesting stuff, and have a C+libevent process act as a dumb connection-pool. With a bit more wrapper code and callbacks into Erlang, you’d hardly need to know this was going on - the C program could be run as a driver or a C-node, and an Erlang wrapper could give you a decent api built on top of libevent. (see this post for an example Erlang C driver). I would like to experiment further with this.
Final Thoughts
I have enough data now to judge how much hardware would be needed if we deploy a large scale comet system for Last.fm. Even a worst case of 40KB per connection isn’t unreasonable - memory is pretty cheap at the moment, and 40GB to support a million users is not unreasonable. 10GB is even better. I will finish up the app I’m building and deploy it somewhere people can try it out. Along the way I’ll tidy up the erlang memcached client I’m using and release that (from jungerl, with modifications for consistent hashing and some bug fixes), and some other things. Stay tuned :)
About Me
Tags
bash c cnode comet databases dht driver ejabberd erlang hack hashing http irc java kernel ketama last.fm libevent memcached mnesia mochiweb netcat networking php ssh streaming tcp thrift xmpp yawsRecent Posts
- Anti-RDBMS: A list of distributed key-value stores
- How we use IRC at Last.fm
- Getting to know ejabberd and writing modules
- ssh hack: connect directly to machine via a firewall box
- A Million-user Comet Application with Mochiweb, Part 3
- A Million-user Comet Application with Mochiweb, Part 2
- A Million-user Comet Application with Mochiweb, Part 1
- On bulk loading data into Mnesia
- Updated bash PS1
- Transcoding HTTP mp3 streaming proxy in bash




