c

Rewriting Playdar: C++ to Erlang, massive savings

I’ve heard many anecdotes and claims about how many lines of code are saved when you write in Erlang instead of [C++/other language]. I’m happy to report that I now have first-hand experience and some data to share.

I initially wrote Playdar in C++ (using Boost and Asio libraries), starting back in February this year. I was fortunate to be working with some experienced developers who helped me come to terms with C++. There were three of us hacking on it regularly up until a few months ago, and despite being relatively new to C++, I’ll say that we ended up with a well designed and robust codebase, all things considered.

On Feeling Smug

I’ll admit I felt rather smug making it all work in C++ with Boost and ASIO. Getting it to build on all three platforms and dynamically load extensions (DLLs etc) at runtime in a cross-platform way was also quite satisfying (I had plenty of help with that side of things). I learned a lot about C++, Boost, ASIO and CMake. But, as the codebase grew, I began to seriously question my decision to use C++.

My initial reasons for choosing C++ were twofold:

  • Distribution – shipping the Erlang VM didn’t sound like fun
  • Taglib – *the* library to read metadata from audio files (mp3, m4a, ogg etc) is C++

It turns out Playdar is naturally a good fit for Erlang – it does lots in parallel, and lots of stuff it does is asynchronous and event based. Even with all the stuff you get with Boost, multithreaded stuff in C++ is inelegant, to put it kindly.

SLOCed and Loaded

Anyway, a couple of weeks ago I sat down to re-implement Playdar from scratch in Erlang. I thrashed out the guts of it in a couple of days, and by the end of the week I almost had it 1:1 features with the C++ codebase. There’s still a bit of C++ left – code to interface with taglib.

Using the SLOCcount tool (SLOC=source lines of code) I counted the lines of code in various modules from both codebases, here are the results:

Erlang Version C++ Version Savings
Core Daemon 1,100 4,491 75%
Library + Scanner 197 + 167.cpp 1,355 73%
LAN Resolver 105 427 75%
P2P 463 1,762 74%
TOTAL 2,032 8,035 75%


75% less lines of code using Erlang compared to C++ to implement the same thing – not too shabby :)

The second time around writing in Erlang I knew exactly what I was building, so it’s unfair to compare development time of the two codebases, but given how fast I can type I reckon I saved a good few hours of just pounding the keyboard to input the code (and countless hours of debugging: Erlang tends to work first time, really). Well I’m not sure if “saved” is the right word, considering It was working in C++ already, but it’s my time to waste :)

If you count the third party code bundled with both codebases (excluding boost/asio!) then the erlang codebase saves a whopping 92%. I’m more interested in the savings in code I had to write, however.

Memory and CPU Usage

I’ve done some preliminary comparisons between both projects, when it comes to CPU and memory usage both projects are pretty similar. The Erlang codebase uses slightly more memory than C++ at the moment, but I’m convinced I can get that down to at least as low as the C++ project was. I picked up a few optimization tricks from my three-part Million-user comet experiment in Erlang earlier this year. I’ll post more about this if I learn any new tricks.

One thing I’ve realised about the Erlang codebase is that I’ve used processes to encapsulate state (active queries, specifically) where I didn’t really need to. It seemed sensible at the time, but it’s probably just a waste of memory. I’m going to change it to spawn processes to get the work done (ie, a process that runs the query) but not necessarily just to maintain state.

Distribution to the desktop

C++

You just have to make sure that you build everything and ship with any DLLs along with checks in the installer for system libraries needed (runtime dlls). Oh, and make sure you don’t change the plugin binary interface in the main app, or new plugins will crash and burn when you load them. Add a check for that. Oh and be careful about compiling taglib and stuff with mingw and the rest with VC++, or things might mysteriously crash. Also I heard a horror story about allocating memory in plugin code but deallocating it in the main app when the plugin was compiled against a different stdlib than the main app. This is all par for the course, and the experienced C++ developers I asked for help had no trouble making it work. Size of installable pacakge: 2.5MB

Erlang

Compiling, and building/loading plugins in the Erlang codebase is straightforward on all platforms, as is often the way with VMs. I was against shipping the Erlang VM originally because I figured it would be a lot of hassle and increase the download size substantially. Packaging an Erlang app for the desktop involves taking the installed VM directory structure and stripping out all the docs, source and parts of the Erlang stdlib we don’t use, then packaging it along with the compiled Playdar code. CouchDB does something like this too, and RabbitMQ ships the Erlang VM without stripping unneeded libs. We’ll work on packaging some more (for all platforms), but to date Max has crafted a package that contains the necessary bits of the Erlang VM, a sexy Prefpane to start/stop the daemon on OS X, and the compiled Playdar code all weighing in under 10MB.

We’ll put together a Windows installer soon that’ll probably be around the same size. A 10MB download isn’t so bad nowadays, and I expect we can optimize the packaging process some more. Linux users will get a package that depends on the erlang VM in their package manager.
Seems like shipping Erlang apps to the desktop isn’t so hard after all.

tl;dr

Someone rewrote a C++ app in Erlang: 75% less lines of code for same functionality.

You should read this blog post about Playdar, by Paul Lamere, and take a look at the Playdar website.

C++ codebase (deprecated)
Erlang codebase

Playdar is the future, and the future is written in Erlang :)

Tags: , ,

Wednesday, October 21st, 2009 playdar, programming 14 Comments

A Million-user Comet Application with Mochiweb, Part 3

Part 1 and Part 2 in this series showed how to build a comet application using mochiweb, and how to route messages to connected users. We managed to squeeze application memory down to 8KB per connection. We did ye olde c10k test, and observed what happened with 10,000 connected users. We made graphs. It was fun, but now it’s time to make good on the claims made in the title, and turn it up to 1 million connections.

This post covers the following:

  • Add a pubsub-like subscription database using Mnesia
  • Generate a realistic friends dataset for a million users
  • Tune mnesia and bulk load in our friends data
  • Opening a million connections from one machine
  • Benchmark with 1 Million connected users
  • Libevent + C for connection handling
  • Final thoughts

One of the challenging parts of this test was actually being able to open 1M connections from a single test machine. Writing a server to accept 1M connections is easier than actually creating 1M connections to test it with, so a fair amount of this article is about the techniques used to open 1M connections from a single machine.

Getting our pubsub on

In Part 2 we used the router to send messages to specific users. This is fine for a chat/IM system, but that there are sexier things we could do instead. Before we launch into a large-scale test, let’s add one more module – a subscription database. We want the application store who your friends are, so it can push you all events generated by people on your friends list.

My intention is to use this for Last.fm so I can get a realtime feed of songs my friends are currently listening to. It could equally apply to other events generated on social networks. Flickr photo uploads, Facebook newsfeed items, Twitter messages etc. FriendFeed even have a realtime API in beta, so this kind of thing is definitely topical. (Although I’ve not heard of anyone except Facebook using Erlang for this kind of thing).

Implementing the subscription-manager

We’re implementing a general subscription manager, but we’ll be subscribing people to everyone on their friends list automatically – so you could also think of this as a friends database for now.

The subsmanager API:

  • add_subscriptions([{Subscriber, Subscribee},...])
  • remove_subscriptions([{Subscriber, Subscribee},...])
  • get_subscribers(User)

subsmanager.erl

  1. -module(subsmanager).
  2. -behaviour(gen_server).
  3. -include("/usr/local/lib/erlang/lib/stdlib-1.15.4/include/qlc.hrl").
  4. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  5. -export([add_subscriptions/1,
  6.          remove_subscriptions/1,
  7.          get_subscribers/1,
  8.          first_run/0,
  9.          stop/0,
  10.          start_link/0]).
  11. -record(subscription, {subscriber, subscribee}).
  12. -record(state, {}). % state is all in mnesia
  13. -define(SERVER, global:whereis_name(?MODULE)).
  14.  
  15. start_link() ->
  16.     gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
  17.  
  18. stop() ->
  19.     gen_server:call(?SERVER, {stop}).
  20.  
  21. add_subscriptions(SubsList) ->
  22.     gen_server:call(?SERVER, {add_subscriptions, SubsList}, infinity).
  23.  
  24. remove_subscriptions(SubsList) ->
  25.     gen_server:call(?SERVER, {remove_subscriptions, SubsList}, infinity).
  26.  
  27. get_subscribers(User) ->
  28.     gen_server:call(?SERVER, {get_subscribers, User}).
  29.  
  30. %%
  31.  
  32. init([]) ->
  33.     ok = mnesia:start(),
  34.     io:format("Waiting on mnesia tables..\n",[]),
  35.     mnesia:wait_for_tables([subscription], 30000),
  36.     Info = mnesia:table_info(subscription, all),
  37.     io:format("OK. Subscription table info: \n~w\n\n",[Info]),
  38.     {ok, #state{}}.
  39.  
  40. handle_call({stop}, _From, State) ->
  41.     {stop, stop, State};
  42.  
  43. handle_call({add_subscriptions, SubsList}, _From, State) ->
  44.     % Transactionally is slower:
  45.     % F = fun() ->
  46.     %         [ ok = mnesia:write(S) || S <- SubsList ]
  47.     %     end,
  48.     % mnesia:transaction(F),
  49.     [ mnesia:dirty_write(S) || S <- SubsList ],
  50.     {reply, ok, State};
  51.  
  52. handle_call({remove_subscriptions, SubsList}, _From, State) ->
  53.     F = fun() ->
  54.         [ ok = mnesia:delete_object(S) || S <- SubsList ]
  55.     end,
  56.     mnesia:transaction(F),
  57.     {reply, ok, State};
  58.  
  59. handle_call({get_subscribers, User}, From, State) ->
  60.     F = fun() ->
  61.         Subs = mnesia:dirty_match_object(#subscription{subscriber=‘_’, subscribee=User}),
  62.         Users = [Dude || #subscription{subscriber=Dude, subscribee=_} <- Subs],
  63.         gen_server:reply(From, Users)
  64.     end,
  65.     spawn(F),
  66.     {noreply, State}.
  67.  
  68. handle_cast(_Msg, State) -> {noreply, State}.
  69. handle_info(_Msg, State) -> {noreply, State}.
  70.  
  71. terminate(_Reason, _State) ->
  72.     mnesia:stop(),
  73.     ok.
  74.  
  75. code_change(_OldVersion, State, _Extra) ->
  76.     io:format("Reloading code for ?MODULE\n",[]),
  77.     {ok, State}.
  78.  
  79. %%
  80.  
  81. first_run() ->
  82.     mnesia:create_schema([node()]),
  83.     ok = mnesia:start(),
  84.     Ret = mnesia:create_table(subscription,
  85.     [
  86.      {disc_copies, [node()]},
  87.      {attributes, record_info(fields, subscription)},
  88.      {index, [subscribee]}, %index subscribee too
  89.      {type, bag}
  90.     ]),
  91.     Ret.



Noteworthy points:

  • I’ve included qlc.hrl, needed for mnesia queries using list comprehension, using an absolute path. That can’t be best practice, it wasn’t finding it otherwise though.
  • get_subscribers spawns another process and delegates the job of replying to that process, using gen_server:reply. This means the gen_server loop won’t block on that call if we throw lots of lookups at it and mnesia slows down.
  • rr(”subsmanager.erl”). in the example below allows you to use record definitions in the erl shell. Putting your record definitions into a records.hrl file and including that in your modules is considered better style. I inlined it for brevity.

Now to test it. first_run() creates the mnesia schema, so it’s important to run that first. Another potential gotcha with mnesia is that (by default) the database can only be accessed by the node that created it, so give the erl shell a name, and stick with it.

$ mkdir /var/mnesia
$ erl -boot start_sasl -mnesia dir '"/var/mnesia_data"' -sname subsman
(subsman@localhost)1> c(subsmanager).
{ok,subsmanager}
(subsman@localhost)2> subsmanager:first_run().
...
{atomic,ok}
(subsman@localhost)3> subsmanager:start_link().
Waiting on mnesia tables..
OK. Subscription table info:
[{access_mode,read_write},{active_replicas,[subsman@localhost]},{arity,3},{attributes,[subscriber,subscribee]},{checkpoints,[]},{commit_work,[{index,bag,[{3,{ram,57378}}]}]},{cookie,{{1224,800064,900003},subsman@localhost}},{cstruct,{cstruct,subscription,bag,[],[subsman@localhost],[],0,read_write,[3],[],false,subscription,[subscriber,subscribee],[],[],{{1224,863164,904753},subsman@localhost},{{2,0},[]}}},{disc_copies,[subsman@localhost]},{disc_only_copies,[]},{frag_properties,[]},{index,[3]},{load_by_force,false},{load_node,subsman@localhost},{load_order,0},{load_reason,{dumper,create_table}},{local_content,false},{master_nodes,[]},{memory,288},{ram_copies,[]},{record_name,subscription},{record_validation,{subscription,3,bag}},{type,bag},{size,0},{snmp,[]},{storage_type,disc_copies},{subscribers,[]},{user_properties,[]},{version,{{2,0},[]}},{where_to_commit,[{subsman@localhost,disc_copies}]},{where_to_read,subsman@localhost},{where_to_write,[subsman@localhost]},{wild_pattern,{subscription,'_','_'}},{{index,3},57378}]


{ok,<0.105.0>}
(subsman@localhost)4> rr("subsmanager.erl").
[state,subscription]
(subsman@localhost)5> subsmanager:add_subscriptions([ #subscription{subscriber=alice, subscribee=rj} ]).
ok
(subsman@localhost)6> subsmanager:add_subscriptions([ #subscription{subscriber=bob, subscribee=rj} ]).
ok
(subsman@localhost)7> subsmanager:get_subscribers(rj).
[bob,alice]
(subsman@localhost)8> subsmanager:remove_subscriptions([ #subscription{subscriber=bob, subscribee=rj} ]).
ok
(subsman@localhost)8> subsmanager:get_subscribers(rj).
[alice]
(subsman@localhost)10> subsmanager:get_subscribers(charlie).
[]

We’ll use integer Ids to represent users for the benchmark – but for this test I used atoms (rj, alice, bob) and assumed that alice and bob are both on rj’s friends list. It’s nice that mnesia (and ets/dets) doesn’t care what values you use – any Erlang term is valid. This means it’s a simple upgrade to support multiple types of resource. You could start using {user, 123} or {photo, 789} to represent different things people might subscribe to, without changing anything in the subsmanager module.

Modifying the router to use subscriptions

Instead of addressing messages to specific users, ie router:send(123, "Hello user 123"), we’ll mark messages with a subject – that is, the person who generated the message (who played the song, who uploaded the photo etc) – and have the router deliver the message to every user who has subscribed to the subject user. In other words, the API will work like this: router:send(123, "Hello everyone subscribed to user 123")

Updated router.erl:

  1. -module(router).
  2. -behaviour(gen_server).
  3.  
  4. -export([start_link/0]).
  5. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  6.      terminate/2, code_change/3]).
  7.  
  8. -export([send/2, login/2, logout/1]).
  9.  
  10. -define(SERVER, global:whereis_name(?MODULE)).
  11.  
  12. % will hold bidirectional mapping between id <–> pid
  13. -record(state, {pid2id, id2pid}).
  14.  
  15. start_link() ->
  16.     gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
  17.  
  18. % sends Msg to anyone subscribed to Id
  19. send(Id, Msg) ->
  20.     gen_server:call(?SERVER, {send, Id, Msg}).
  21.  
  22. login(Id, Pid) when is_pid(Pid) ->
  23.     gen_server:call(?SERVER, {login, Id, Pid}).
  24.  
  25. logout(Pid) when is_pid(Pid) ->
  26.     gen_server:call(?SERVER, {logout, Pid}).
  27.  
  28. %%
  29.  
  30. init([]) ->
  31.     % set this so we can catch death of logged in pids:
  32.     process_flag(trap_exit, true),
  33.     % use ets for routing tables
  34.     {ok, #state{
  35.                 pid2id = ets:new(?MODULE, [bag]),
  36.                 id2pid = ets:new(?MODULE, [bag])
  37.                }
  38.     }.
  39.  
  40. handle_call({login, Id, Pid}, _From, State) when is_pid(Pid) ->
  41.     ets:insert(State#state.pid2id, {Pid, Id}),
  42.     ets:insert(State#state.id2pid, {Id, Pid}),
  43.     link(Pid), % tell us if they exit, so we can log them out
  44.     %io:format("~w logged in as ~w\n",[Pid, Id]),
  45.     {reply, ok, State};
  46.  
  47. handle_call({logout, Pid}, _From, State) when is_pid(Pid) ->
  48.     unlink(Pid),
  49.     PidRows = ets:lookup(State#state.pid2id, Pid),
  50.     case PidRows of
  51.         [] ->
  52.             ok;
  53.         _ ->
  54.             IdRows = [ {I,P} || {P,I} <- PidRows ], % invert tuples
  55.             ets:delete(State#state.pid2id, Pid),   % delete all pid->id entries
  56.             [ ets:delete_object(State#state.id2pid, Obj) || Obj <- IdRows ] % and all id->pid
  57.     end,
  58.     %io:format("pid ~w logged out\n",[Pid]),
  59.     {reply, ok, State};
  60.  
  61. handle_call({send, Id, Msg}, From, State) ->
  62.     F = fun() ->
  63.         % get users who are subscribed to Id:
  64.         Users = subsmanager:get_subscribers(Id),
  65.         io:format("Subscribers of ~w = ~w\n",[Id, Users]),
  66.         % get pids of anyone logged in from Users list:
  67.         Pids0 = lists:map(
  68.             fun(U)->
  69.                 [ P || { _I, P } <- ets:lookup(State#state.id2pid, U) ]
  70.             end,
  71.             [ Id | Users ] % we are always subscribed to ourselves
  72.         ),
  73.         Pids = lists:flatten(Pids0),
  74.         io:format("Pids: ~w\n", [Pids]),
  75.         % send Msg to them all
  76.         M = {router_msg, Msg},
  77.         [ Pid ! M || Pid <- Pids ],
  78.         % respond with how many users saw the message
  79.         gen_server:reply(From, {ok, length(Pids)})
  80.     end,
  81.     spawn(F),
  82.     {noreply, State}.
  83.  
  84. % handle death and cleanup of logged in processes
  85. handle_info(Info, State) ->
  86.     case Info of
  87.         {‘EXIT’, Pid, _Why} ->
  88.             handle_call({logout, Pid}, blah, State);
  89.         Wtf ->
  90.             io:format("Caught unhandled message: ~w\n", [Wtf])
  91.     end,
  92.     {noreply, State}.
  93.  
  94. handle_cast(_Msg, State) ->
  95.     {noreply, State}.
  96. terminate(_Reason, _State) ->
  97.     ok.
  98. code_change(_OldVsn, State, _Extra) ->
  99.     {ok, State}.


And here’s a quick test that doesn’t require mochiweb – I’ve used atoms instead of user ids, and omitted some output for clarity:

(subsman@localhost)1> c(subsmanager), c(router), rr("subsmanager.erl").
(subsman@localhost)2> subsmanager:start_link().
(subsman@localhost)3> router:start_link().
(subsman@localhost)4> Subs = [#subscription{subscriber=alice, subscribee=rj}, #subscription{subscriber=bob, subscribee=rj}].
[#subscription{subscriber = alice,subscribee = rj},
#subscription{subscriber = bob,subscribee = rj}]
(subsman@localhost)5> subsmanager:add_subscriptions(Subs).
ok
(subsman@localhost)6> router:send(rj, "RJ did something").
Subscribers of rj = [bob,alice]
Pids: []
{ok,0}
(subsman@localhost)7> router:login(alice, self()).
ok
(subsman@localhost)8> router:send(rj, "RJ did something").
Subscribers of rj = [bob,alice]
Pids: [<0.46.0>]
{ok,1}
(subsman@localhost)9> receive {router_msg, M} -> io:format("~s\n",[M]) end.
RJ did something
ok

This shows how alice can a receive a message when the subject is someone she is subscribed to (rj), even though the message wasn’t sent directly to alice. The output shows that the router identified possible targets as [alice,bob] but only delivered the message to one person, alice, because bob was not logged in.

Generating a typical social-network friends dataset

We could generate lots of friend relationships at random, but that’s not particularly realistic. Social networks tend to exhibit a power law distribution. Social networks usually have a few super-popular users (some Twitter users have over 100,000 followers) and plenty of people with just a handful of friends. The Last.fm friends data is typical – it fits a Barabási–Albert graph model, so that’s what I’ll use.

To generate the dataset I’m using the python module from the excellent igraph library:

fakefriends.py:

  1. import igraph
  2. g = igraph.Graph.Barabasi(1000000, 15, directed=False)
  3. print "Edges: " + str(g.ecount()) + " Verticies: " + str(g.vcount())
  4. g.write_edgelist("fakefriends.txt")



This will generate with 2 user ids per line, space separated. These are the friend relationships we’ll load into our subsmanager. User ids range from 1 to a million.

Bulk loading friends data into mnesia

This small module will read the fakefriends.txt file and create a list of subscription records.

readfriends.erl – to read the fakefriends.txt and create subscription records:

  1. -module(readfriends).
  2. -export([load/1]).
  3. -record(subscription, {subscriber, subscribee}).
  4.  
  5. load(Filename) ->
  6.     for_each_line_in_file(Filename,
  7.         fun(Line, Acc) ->
  8.             [As, Bs] = string:tokens(string:strip(Line, right, $\n), " "),
  9.             {A, _} = string:to_integer(As),
  10.             {B, _} = string:to_integer(Bs),
  11.             [ #subscription{subscriber=A, subscribee=B} | Acc ]
  12.         end, [read], []).
  13.  
  14. % via: http://www.trapexit.org/Reading_Lines_from_a_File
  15. for_each_line_in_file(Name, Proc, Mode, Accum0) ->
  16.     {ok, Device} = file:open(Name, Mode),
  17.     for_each_line(Device, Proc, Accum0).
  18.  
  19. for_each_line(Device, Proc, Accum) ->
  20.     case io:get_line(Device, "") of
  21.         eof  -> file:close(Device), Accum;
  22.         Line -> NewAccum = Proc(Line, Accum),
  23.                     for_each_line(Device, Proc, NewAccum)
  24.     end.



Now in the subsmanager shell, you can read from the text file and add the subscriptions:

$ erl -name router@minifeeds4.gs2 +K true +A 128 -setcookie secretcookie -mnesia dump_log_write_threshold 50000 -mnesia dc_dump_limit 40
erl> c(readfriends), c(subsmanager).
erl> subsmanager:first_run().
erl> subsmanager:start_link().
erl> subsmanager:add_subscriptions( readfriends:load("fakefriends.txt") ).

Note the additional mnesia parameters – these are to avoid the ** WARNING ** Mnesia is overloaded messages you would (probably) otherwise see. Refer to my previous post: On bulk loading data into Mnesia for alternative ways to load in lots of data. The best solution seems to be (as pointed out in the comments, thanks Jacob!) to set those options. The Mnesia reference manual contains many other settings under Configuration Parameters, and is worth a look.

Turning it up to 1 Million

Creating a million tcp connections from one host is non-trivial. I’ve a feeling that people who do this regularly have small clusters dedicated to simulating lots of client connections, probably running a real tool like Tsung. Even with the tuning from Part 1 to increase kernel tcp memory, increase the file descriptor ulimits and set the local port range to the maximum, we will still hit a hard limit on ephemeral ports. When making a tcp connection, the client end is allocated (or you can specify) a port from the range in /proc/sys/net/ipv4/ip_local_port_range. It doesn’t matter if you specify it manually, or use an ephemeral port, you’re still going to run out. In Part 1, we set the range to “1024 65535″ – meaning there are 65535-1024 = 64511 unprivileged ports available. Some of them will be used by other processes, but we’ll never get over 64511 client connections, because we’ll run out of ports.

The local port range is assigned per-IP, so if we make our outgoing connections specifically from a range of different local IP addresses, we’ll be able to open more than 64511 outgoing connections in total.

So let’s bring up 17 new IP addresses, with the intention of making 62,000 connections from each – giving us a total of 1,054,000 connections. Safely over the 2^32 mark:

$ for i in `seq 1 17`; do echo sudo ifconfig eth0:$i 10.0.0.$i up ; done

If you run ifconfig now you should see your virtual interfaces: eth0:1, eth0:2 … eth0:17, each with a different IP address. Obviously you should chose a sensible part of whatever address space you are using.

All that remains now is to modify the floodtest tool from Part 1 to specify the local IP it should connect from… Unfortunately the erlang http client doesn’t let you specify the source IP. Neither does ibrowse, the alternative http client library. Damn.

<crazy idea>
At this point I considered another option: bringing up 17 pairs of IPs – one on the server and one on the client – each pair in their own isolated /30 subnet. I think that if I then made the client connect to any given server IP, it would force the local address to be other half of the pair on that subnet, because only one of the local IPs would actually be able to reach the server IP. In theory, this would mean declaring the local source IP on the client machine would not be necessary (although the range of server IPs would need to be specified). I don’t know if this would really work – it sounded plausible at the time. In the end I decided it was too perverted and didn’t try it.
</crazy idea>

I also poked around in OTP’s http_transport code and considered adding support for specifying the local IP. It’s not really a feature you usually need in an HTTP client though, and it would certainly have been more work.

gen_tcp lets you specify the source address, so I ended up writing a rather crude client using gen_tcp specifically for this test:

floodtest2.erl

  1. -module(floodtest2).
  2. -compile(export_all).
  3. -define(SERVERADDR, "10.1.2.3"). % where mochiweb is running
  4. -define(SERVERPORT, 8000).
  5.  
  6. % Generate the config in bash like so (chose some available address space):
  7. % EACH=62000; for i in `seq 1 17`; do echo "{{10,0,0,$i}, $((($i-1)*$EACH+1)), $(($i*$EACH))}, "; done
  8.  
  9. run(Interval) ->
  10.         Config = [
  11. {{10,0,0,1}, 1, 62000},
  12. {{10,0,0,2}, 62001, 124000},
  13. {{10,0,0,3}, 124001, 186000},
  14. {{10,0,0,4}, 186001, 248000},
  15. {{10,0,0,5}, 248001, 310000},
  16. {{10,0,0,6}, 310001, 372000},
  17. {{10,0,0,7}, 372001, 434000},
  18. {{10,0,0,8}, 434001, 496000},
  19. {{10,0,0,9}, 496001, 558000},
  20. {{10,0,0,10}, 558001, 620000},
  21. {{10,0,0,11}, 620001, 682000},
  22. {{10,0,0,12}, 682001, 744000},
  23. {{10,0,0,13}, 744001, 806000},
  24. {{10,0,0,14}, 806001, 868000},
  25. {{10,0,0,15}, 868001, 930000},
  26. {{10,0,0,16}, 930001, 992000},
  27. {{10,0,0,17}, 992001, 1054000}],
  28.         start(Config, Interval).
  29.  
  30. start(Config, Interval) ->
  31.         Monitor = monitor(),
  32.         AdjustedInterval = Interval / length(Config),
  33.         [ spawn(fun start/5, [Lower, Upper, Ip, AdjustedInterval, Monitor])
  34.           || {Ip, Lower, Upper}  <- Config ],
  35.         ok.
  36.  
  37. start(LowerID, UpperID, _, _, _) when LowerID == UpperID -> done;
  38. start(LowerID, UpperID, LocalIP, Interval, Monitor) ->
  39.         spawn(fun connect/5, [?SERVERADDR, ?SERVERPORT, LocalIP, "/test/"++LowerID, Monitor]),
  40.         receive after Interval -> start(LowerID + 1, UpperID, LocalIP, Interval, Monitor) end.
  41.  
  42. connect(ServerAddr, ServerPort, ClientIP, Path, Monitor) ->
  43.         Opts = [binary, {packet, 0}, {ip, ClientIP}, {reuseaddr, true}, {active, false}],
  44.         {ok, Sock} = gen_tcp:connect(ServerAddr, ServerPort, Opts),
  45.         Monitor ! open,
  46.         ReqL = io_lib:format("GET ~s\r\nHost: ~s\r\n\r\n", [Path, ServerAddr]),
  47.         Req = list_to_binary(ReqL),
  48.         ok = gen_tcp:send(Sock, [Req]),
  49.         do_recv(Sock, Monitor),
  50.         (catch gen_tcp:close(Sock)),
  51.         ok.
  52.  
  53. do_recv(Sock, Monitor)->
  54.         case gen_tcp:recv(Sock, 0) of
  55.                 {ok, B} ->
  56.                         Monitor ! {bytes, size(B)},
  57.                         io:format("Recvd ~s\n", [ binary_to_list(B)]),
  58.                         io:format("Recvd ~w bytes\n", [size(B)]),
  59.                         do_recv(Sock, Monitor);
  60.                 {error, closed} ->
  61.                         Monitor ! closed,
  62.                         closed;
  63.                 Other ->
  64.                         Monitor ! closed,
  65.                         io:format("Other:~w\n",[Other])
  66.         end.
  67.  
  68. % Monitor process receives stats and reports how much data we received etc:
  69. monitor() ->
  70.         Pid = spawn(?MODULE, monitor0, [{0,0,0,0}]),
  71.         timer:send_interval(10000, Pid, report),
  72.         Pid.
  73.  
  74. monitor0({Open, Closed, Chunks, Bytes}=S) ->
  75.         receive
  76.                 report  -> io:format("{Open, Closed, Chunks, Bytes} = ~w\n",[S]);
  77.                 open    -> monitor0({Open + 1, Closed, Chunks, Bytes});
  78.                 closed  -> monitor0({Open, Closed + 1, Chunks, Bytes});
  79.                 chunk   -> monitor0({Open, Closed, Chunks + 1, Bytes});
  80.                 {bytes, B} -> monitor0({Open, Closed, Chunks, Bytes + B})
  81.         end.



As an initial test I was connecting to the mochiweb app from Part 1 – it simply sends one message to every client every 10 seconds.

erl> c(floodtest2), floodtest2:run(20).

This quickly ate all my memory.

Turns out opening lots of connections with gen_tcp like that eats a lot of ram. I think it’d need ~36GB to make it work without any additional tuning. I’m not interested in trying to optimise my quick-hack erlang http client (in the real world, this would be 1M actual web browsers), and the only machine I could get my hands on that has more than 32GB of RAM is one of our production databases, and I can’t find a good excuse to take Last.fm offline whilst I test this :) Additionally, it seems like it still only managed to open around 64,500 ports. Hmm.

At this point I decided to break out the trusty libevent, which I was pleased to discover has an HTTP API. Newer versions also have a evhttp_connection_set_local_address function in the http API. This sounds promising.

Here’s the http client in C using libevent:

  1. #include <sys/types.h>
  2. #include <sys/time.h>
  3. #include <sys/queue.h>
  4. #include <stdlib.h>
  5. #include <err.h>
  6. #include <event.h>
  7. #include <evhttp.h>
  8. #include <unistd.h>
  9. #include <stdio.h>
  10. #include <sys/socket.h>
  11. #include <netinet/in.h>
  12. #include <time.h>
  13. #include <pthread.h>
  14.  
  15. #define BUFSIZE 4096
  16. #define NUMCONNS 62000
  17. #define SERVERADDR "10.103.1.43"
  18. #define SERVERPORT 8000
  19. #define SLEEP_MS 10
  20.  
  21. char buf[BUFSIZE];
  22.  
  23. int bytes_recvd = 0;
  24. int chunks_recvd = 0;
  25. int closed = 0;
  26. int connected = 0;
  27.  
  28. // called per chunk received
  29. void chunkcb(struct evhttp_request * req, void * arg)
  30. {
  31.     int s = evbuffer_remove( req->input_buffer, &buf, BUFSIZE );
  32.     //printf("Read %d bytes: %s\n", s, &buf);
  33.     bytes_recvd += s;
  34.     chunks_recvd++;
  35.     if(connected >= NUMCONNS && chunks_recvd%10000==0)
  36.         printf(">Chunks: %d\tBytes: %d\tClosed: %d\n", chunks_recvd, bytes_recvd, closed);
  37. }
  38.  
  39. // gets called when request completes
  40. void reqcb(struct evhttp_request * req, void * arg)
  41. {
  42.     closed++;
  43. }
  44.  
  45. int main(int argc, char **argv)
  46. {
  47.     event_init();
  48.     struct evhttp *evhttp_connection;
  49.     struct evhttp_request *evhttp_request;
  50.     char addr[16];
  51.     char path[32]; // eg: "/test/123"
  52.     int i,octet;
  53.     for(octet=1; octet<=17; octet++){
  54.         sprintf(&addr, "10.224.0.%d", octet);
  55.         for(i=1;i<=NUMCONNS;i++) {
  56.             evhttp_connection = evhttp_connection_new(SERVERADDR, SERVERPORT);
  57.             evhttp_connection_set_local_address(evhttp_connection, &addr);
  58.             evhttp_set_timeout(evhttp_connection, 864000); // 10 day timeout
  59.             evhttp_request = evhttp_request_new(reqcb, NULL);
  60.             evhttp_request->chunk_cb = chunkcb;
  61.             sprintf(&path, "/test/%d", ++connected);
  62.             if(i%100==0)  printf("Req: %s\t->\t%s\n", addr, &path);
  63.             evhttp_make_request( evhttp_connection, evhttp_request, EVHTTP_REQ_GET, path );
  64.             evhttp_connection_set_timeout(evhttp_request->evcon, 864000);
  65.             event_loop( EVLOOP_NONBLOCK );
  66.             if( connected % 200 == 0 )
  67.                 printf("\nChunks: %d\tBytes: %d\tClosed: %d\n", chunks_recvd, bytes_recvd, closed);
  68.             usleep(SLEEP_MS*1000);
  69.         }
  70.     }
  71.     event_dispatch();
  72.     return 0;
  73. }



Most parameters are hardcoded as #define’s so you configure it by editing the source and recompiling.

Compile and run:
$ gcc -o httpclient httpclient.c -levent
$ ./httpclient

This still failed to open more than 64,500 ports. Although it used less RAM doing it.

It turns out that although I was specifying the local addresses, the ephemeral port allocation somewhere in the kernel or tcp stack didn’t care, and still ran out after 2^16. So in order to open more than 64,500 connections, you need to specify the local address and local port yourself, and manage them accordingly.

Unfortunately the libevent HTTP API doesn’t have an option to specify the local port. I patched libevent to add a suitable function:
void evhttp_connection_set_local_port(struct evhttp_connection *evcon, u_short port);.

This was a surprisingly pleasant experience; libevent seems well written, and the documentation is pretty decent too.

With my modified libevent installed, I was able to add the following under the set_local_address line in the above code:
evhttp_connection_set_local_port(evhttp_connection, 1024+i);

With that in place, multiple connections from different addresses were able to use the same local port number, specific to the the local address. I recompiled the client and let it run for a bit to confirm it would break the 2^16 barrier.

Netstat confirms it:
# netstat -n | awk '/^tcp/ {t[$NF]++}END{for(state in t){print state, t[state]}}'
TIME_WAIT 8
ESTABLISHED 118222

This shows how many ports are open in various states. We’re finally able to open more than 2^16 connections, phew.

Now we have a tool capable of opening a million http connections from a single box. It seems to consume around 2KB per connection, plus whatever the kernel needs. It’s time to use it for the “million connected user” test against our mochiweb comet server.

C1024K Test – 1 million comet connections

For this test I used 4 different servers of varying specs. These specs may be overpowered for the experiment, but they were available and waiting to go into production, and this made a good burn-in test. All four servers are on the same gigabit LAN, with up to 3 switches and a router in the middle somewhere.

The 1 million test I ran is similar to the 10k test from parts 1 and 2, the main difference being the modified client, now written in C using libevent, and that I’m running in a proper distributed-erlang setup with more than one machine.

On server 1 – Quad-core 2GHz CPU, 16GB of RAM

  • Start subsmanager
  • Load in the friends data
  • Start the router

On server 2 – Dual Quad-core 2.8GHz CPU, 32GB of RAM

  • Start mochiweb app

On server 3 – Quad-core 2GHz CPU, 16GB of RAM

  • Create 17 virtual IPs as above
  • Install patched libevent
  • Run client: ./httpclient to create 100 connections per second, up to 1M

On server 4 – Dual-core 2GHz, 2GB RAM

  • Run msggen program, to send lots of messages to the router

I measured the memory usage of mochiweb during the ramp-up to a million connections, and for the rest of the day:

The httpclient has a built in delay of 10ms between connections, so it took nearly 3 hours to open a million connections. The resident memory used by the mochiweb process with 1M open connections was around 25GB. Here’s the server this was running on as seen by Ganglia, which measures CPU, network and memory usage and produces nice graphs:

You can see it needs around 38GB and has started to swap. I suspect the difference is mostly consumed by the kernel to keep those connections open. The uplift at the end is when I started sending messages.

Messages were generated using 1,000 processes, with an average time between messages of 60ms per process, giving around 16,666 messages per second overall:

erl> [ spawn( fun()->msggen:start(1000000, 10+random:uniform(100), 1000000) end) || I <- lists:seq(1,1000) ].

The machine (server-4) generating messages looked like this on Ganglia:

That’s 10 MB per second of messages it’s pumping out – 16,666 messages a second. Typically these messages would come from a message bus, app servers, or part of an existing infrastructure.

When I started sending messages, the load on server 1 (hosting subsmanager and router) stayed below 1, and CPU utilization increased from 0 to 5%.

CPU on server 2 (hosting mochiweb app, with 1M connections) increased more dramatically:

Naturally as processes have to leave their hibernate state to handle messages, memory usage will increase slightly. Having all connections open with no messages is a best-case for memory usage – unsurprisingly, actually doing stuff requires more memory.

So where does this leave us? To be on the safe side, the mochiweb machine would need 40GB of RAM to hold open 1M active comet connections. Under load, up to 30GB of the memory would be used by the mochiweb app, and the remaining 10GB by the kernel. In other words, you need to allow 40KB per connection.

During various test with lots of connections, I ended up making some additional changes to my sysctl.conf. This was part trial-and-error, I don’t really know enough about the internals to make especially informed decisions about which values to change. My policy was to wait for things to break, check /var/log/kern.log and see what mysterious error was reported, then increase stuff that sounded sensible after a spot of googling. Here are the settings in place during the above test:

net.core.rmem_max = 33554432
net.core.wmem_max = 33554432
net.ipv4.tcp_rmem = 4096 16384 33554432
net.ipv4.tcp_wmem = 4096 16384 33554432
net.ipv4.tcp_mem = 786432 1048576 26777216
net.ipv4.tcp_max_tw_buckets = 360000
net.core.netdev_max_backlog = 2500
vm.min_free_kbytes = 65536
vm.swappiness = 0
net.ipv4.ip_local_port_range = 1024 65535

I would like to learn more about Linux tcp tuning so I can make a more informed decision about these settings. These are almost certainly not optimal, but at least they were enough to get to 1M connections. These changes, along with the fact this is running on a 64bit Erlang VM, and thus has a wordsize of 8bytes instead of 4, might explain why the memory usage is much higher than I observed during the C10k test of part 2.

An Erlang C-Node using Libevent

After dabbling with the HTTP api for libevent, it seemed entirely sensible to try the 1M connection test against a libevent HTTPd written in C so we have a basis for comparison.

I’m guessing that enabling kernel poll means the erlang VM is able to use epoll (or similar), but even so there’s clearly some overhead involved which we might be able to mitigate by delegating the connection handling to a C program using libevent. I want to reuse most of the Erlang code so far, so let’s do the bare minimum in C – just the connection handling and HTTP stuff.

Libevent has an asynchronous HTTP API, which makes implementing http servers trivial – well, trivial for C, but still less trivial than mochiweb IMO ;) I’d also been looking for an excuse to try the Erlang C interface, so the following program combines the two. It’s a comet http server in C using libevent which identifies users using an integer Id (like our mochiweb app), and also acts as an Erlang C-Node.

It connects to a designated erlang node, listens for messages like {123, <<"Hello user 123">>} then dispatches “Hello user 123″ to user 123, if connected. Messages for users that are not connected are discarded, just like previous examples.

httpdcnode.c

  1. #include <sys/types.h>
  2. #include <sys/time.h>
  3. #include <sys/queue.h>
  4. #include <stdlib.h>
  5. #include <err.h>
  6. #include <event.h>
  7. #include <evhttp.h>
  8. #include <stdio.h>
  9. #include <sys/socket.h>
  10. #include <netinet/in.h>
  11.  
  12. #include "erl_interface.h"
  13. #include "ei.h"
  14.  
  15. #include <pthread.h>
  16.  
  17. #define BUFSIZE 1024
  18. #define MAXUSERS (17*65536) // C1024K
  19.  
  20. // List of current http requests by uid:
  21. struct evhttp_request * clients[MAXUSERS+1];
  22. // Memory to store uids passed to the cleanup callback:
  23. int slots[MAXUSERS+1];
  24.  
  25. // called when user disconnects
  26. void cleanup(struct evhttp_connection *evcon, void *arg)
  27. {
  28.     int *uidp = (int *) arg;
  29.     fprintf(stderr, "disconnected uid %d\n", *uidp);
  30.     clients[*uidp] = NULL;
  31. }
  32.  
  33. // handles http connections, sets them up for chunked transfer,
  34. // extracts the user id and registers in the global connection table,
  35. // also sends a welcome chunk.
  36. void request_handler(struct evhttp_request *req, void *arg)
  37. {
  38.         struct evbuffer *buf;
  39.         buf = evbuffer_new();
  40.         if (buf == NULL){
  41.             err(1, "failed to create response buffer");
  42.         }
  43.  
  44.         evhttp_add_header(req->output_headers, "Content-Type", "text/html; charset=utf-8");
  45.  
  46.         int uid = -1;
  47.         if(strncmp(evhttp_request_uri(req), "/test/", 6) == 0){
  48.             uid = atoi( 6+evhttp_request_uri(req) );
  49.         }
  50.  
  51.         if(uid <= 0){
  52.             evbuffer_add_printf(buf, "User id not found, try /test/123 instead");
  53.             evhttp_send_reply(req, HTTP_NOTFOUND, "Not Found", buf);
  54.             evbuffer_free(buf);
  55.             return;
  56.         }
  57.  
  58.         if(uid > MAXUSERS){
  59.             evbuffer_add_printf(buf, "Max uid allowed is %d", MAXUSERS);
  60.             evhttp_send_reply(req, HTTP_SERVUNAVAIL, "We ran out of numbers", buf);
  61.             evbuffer_free(buf);
  62.             return;
  63.         }
  64.  
  65.         evhttp_send_reply_start(req, HTTP_OK, "OK");
  66.         // Send welcome chunk:
  67.         evbuffer_add_printf(buf, "Welcome, Url: ‘%s’ Id: %d\n", evhttp_request_uri(req), uid);
  68.         evhttp_send_reply_chunk(req, buf);
  69.         evbuffer_free(buf);
  70.  
  71.         // put reference into global uid->connection table:
  72.         clients[uid] = req;
  73.         // set close callback
  74.         evhttp_connection_set_closecb( req->evcon, cleanup, &slots[uid] );
  75. }
  76.  
  77.  
  78. // runs in a thread – the erlang c-node stuff
  79. // expects msgs like {uid, msg} and sends a a ‘msg’ chunk to uid if connected
  80. void cnode_run()
  81. {
  82.     int fd;                                  /* fd to Erlang node */
  83.     int got;                                 /* Result of receive */
  84.     unsigned char buf[BUFSIZE];              /* Buffer for incoming message */
  85.     ErlMessage emsg;                         /* Incoming message */
  86.  
  87.     ETERM *uid, *msg;
  88.  
  89.     erl_init(NULL, 0);
  90.  
  91.     if (erl_connect_init(1, "secretcookie", 0) == -1)
  92.         erl_err_quit("erl_connect_init");
  93.  
  94.     if ((fd = erl_connect("httpdmaster@localhost")) < 0)
  95.         erl_err_quit("erl_connect");
  96.  
  97.     fprintf(stderr, "Connected to httpdmaster@localhost\n\r");
  98.  
  99.     struct evbuffer *evbuf;
  100.  
  101.     while (1) {
  102.         got = erl_receive_msg(fd, buf, BUFSIZE, &emsg);
  103.         if (got == ERL_TICK) {
  104.             continue;
  105.         } else if (got == ERL_ERROR) {
  106.             fprintf(stderr, "ERL_ERROR from erl_receive_msg.\n");
  107.             break;
  108.         } else {
  109.             if (emsg.type == ERL_REG_SEND) {
  110.                 // get uid and body data from eg: {123, <<"Hello">>}
  111.                 uid = erl_element(1, emsg.msg);
  112.                 msg = erl_element(2, emsg.msg);
  113.                 int userid = ERL_INT_VALUE(uid);
  114.                 char *body = (char *) ERL_BIN_PTR(msg);
  115.                 int body_len = ERL_BIN_SIZE(msg);
  116.                 // Is this userid connected?
  117.                 if(clients[userid]){
  118.                     fprintf(stderr, "Sending %d bytes to uid %d\n", body_len, userid);                
  119.                     evbuf = evbuffer_new();
  120.                     evbuffer_add(evbuf, (const void*)body, (size_t) body_len);
  121.                     evhttp_send_reply_chunk(clients[userid], evbuf);
  122.                     evbuffer_free(evbuf);
  123.                 }else{
  124.                     fprintf(stderr, "Discarding %d bytes to uid %d – user not connected\n",
  125.                             body_len, userid);                
  126.                     // noop
  127.                 }
  128.                 erl_free_term(emsg.msg);
  129.                 erl_free_term(uid);
  130.                 erl_free_term(msg);
  131.             }
  132.         }
  133.     }
  134.     // if we got here, erlang connection died.
  135.     // this thread is supposed to run forever
  136.     // TODO – gracefully handle failure / reconnect / etc
  137.     pthread_exit(0);
  138. }
  139.  
  140. int main(int argc, char **argv)
  141. {
  142.     // Launch the thread that runs the cnode:
  143.     pthread_attr_t tattr;
  144.     pthread_t helper;
  145.     int status;
  146.     pthread_create(&helper, NULL, cnode_run, NULL);
  147.  
  148.     int i;
  149.     for(i=0;i<=MAXUSERS;i++) slots[i]=i;
  150.     // Launch libevent httpd:
  151.     struct evhttp *httpd;
  152.     event_init();
  153.     httpd = evhttp_start("0.0.0.0", 8000);
  154.     evhttp_set_gencb(httpd, request_handler, NULL);
  155.     event_dispatch();
  156.     // Not reached, event_dispatch() shouldn’t return
  157.     evhttp_free(httpd);
  158.     return 0;
  159. }


The maximum number of users is #defined, and similarly to the mochiweb server, it listens on port 8000 and expects users to connect with a path like so: /test/<userid>. Also hardcoded is the name of the erlang node it will connect to in order to receive messages, httpdmaster@localhost, and the erlang cookie, “secretcookie”. Change these accordingly.

Run the erlang node it will connect to first:
$ erl -setcookie secretcookie -sname httpdmaster@localhost

Compile and run like so:
$ gcc -o httpdcnode httpdcnode.c -lerl_interface -lei -levent
$ ./httpdcnode

In the erlang shell, check you can see the hidden c-node:
erl> nodes(hidden).
[c1@localhost]

Now connect in your browser to http://localhost:8000/test/123. You should see the welcome message.

Now back to the erlang shell – send a message to the C node:

erl> {any, c1@localhost} ! {123, <<"Hello Libevent World">>}.

Note that we don’t have a Pid to use, so we use the alternate representation of {procname, node}. We use ‘any’ as the process name, which is ignored by the C-node.

Now you’re able to deliver comet messages via Erlang, but all the http connections are managed by a libevent C program which acts as an Erlang node.

After removing the debug print statements, I connected 1M clients to the httpdcnode server using the same client as above, the machine showed a total of just under 10GB or memory used. The resident memory of the server process was stable at under 2GB:

So big savings compared to mochiweb when handling lots of connections – the resident memory per connection for the server process with libevent is just under 2KB. With everything connected, the server machine claims:
Mem: 32968672k total, 9636488k used, 23332184k free, 180k buffers
So the kernel/tcp stack is consuming an additional 8KB per connection, which seems a little high, but I have no basis for comparison.

This libevent-cnode server needs a bit more work. It doesn’t sensibly handle multiple connections from the same user yet, and there’s no locking so a race condition exists if you disconnect at just when a message was going to be dispatched.

Even so, I think this could be generalized in such a way that would allow you to use Erlang for all the interesting stuff, and have a C+libevent process act as a dumb connection-pool. With a bit more wrapper code and callbacks into Erlang, you’d hardly need to know this was going on – the C program could be run as a driver or a C-node, and an Erlang wrapper could give you a decent api built on top of libevent. (see this post for an example Erlang C driver). I would like to experiment further with this.

Final Thoughts

I have enough data now to judge how much hardware would be needed if we deploy a large scale comet system for Last.fm. Even a worst case of 40KB per connection isn’t unreasonable – memory is pretty cheap at the moment, and 40GB to support a million users is not unreasonable. 10GB is even better. I will finish up the app I’m building and deploy it somewhere people can try it out. Along the way I’ll tidy up the erlang memcached client I’m using and release that (from jungerl, with modifications for consistent hashing and some bug fixes), and some other things. Stay tuned :)

Tags: , , , , ,

Tuesday, November 4th, 2008 programming 40 Comments

Erlang libketama driver – Consistent Hashing

All the data I need from memcached is assigned to servers using a consistent hashing mechanism, implemented as libketama – a shared library written in C. We use a php extension to wrap this, and also have a pure java implementation. Rather than port the algorithm to Erlang, I wrote a an Erlang driver.

There are 3 things covered here:

  • A small driver program written in C (using libketama)
  • Some basic testing from the shell using Perl and xxd
  • The Erlang gen_server that calls it

C driver program

  1. /*  Expects a one-byte length header, followed by a key (<255bytes)
  2.  *  Returns an ip:port string with 1 byte len header
  3.  *
  4.  */
  5. #include <ketama.h>
  6. #include <stdio.h>
  7. #include <stdlib.h>
  8. #include <unistd.h>
  9. #include <string.h>
  10.  
  11. typedef unsigned char byte;
  12.  
  13. int read_exact(byte *buf, int len)
  14. {
  15.     int i, got = 0;
  16.     do {
  17.         if((i=read(0,buf+got, len-got))<=0) return i;
  18.         got += i;
  19.     } while(got<len);
  20.     return len;
  21. }
  22.  
  23. int main(int argc, char **argv)
  24. {
  25.     if(argc==1){
  26.         printf("Usage: %s <ketama.servers file>\n", *argv);
  27.         return 1;
  28.     }
  29.  
  30.     ketama_continuum c;
  31.     ketama_roll( &c, *++argv );
  32.     mcs *m;
  33.  
  34.     byte len;
  35.     byte buffer[256];
  36.     while ( 1 ) {
  37.         if( 1 != read_exact(&len, 1) ) break;
  38.         if( (int)len >= 255 ) break;
  39.         read_exact((byte *)&buffer, (int)len);
  40.         buffer[len] = \0;
  41.         m = ketama_get_server( (char *) &buffer, c );
  42.         sprintf((char *)&buffer, "%s",m->ip);
  43.         int respleni = strlen(m->ip);
  44.         char l = (0xff & respleni);
  45.         write(1, &l, 1);
  46.         write(1, (char*)&buffer, respleni);
  47.     }
  48.  
  49.     return 0;
  50. }


Testing the driver with Perl and xxd

Before writing the Erlang bit, it’d be nice to know the driver program does what we expect.  Will send the driver a 1-byte length header followed by the key, and expect a 1-byte length header and the value as a response. Say we’re hashing a memcached key ‘user:123′ to a server, we can do what the Erlang port does with a bit of perl, and the ‘xxd’ command to see output in binary.

perl -e '$key="user:123"; $len=pack("C",length($key)); print $len; print $key;' | xxd -b

0000000: 00001000 01110101 01110011 01100101 01110010 00111010  .user:
0000006: 00110001 00110010 00110011                             123

Note the first byte (00001000) printed before the key is the length of the key, 8. Now let’s send this to the driver program and check the response (provide a valid ketama.servers file):

perl -e '$key="user:123"; $len=pack("C",length($key)); print $len; print $key;' | ./ketama_erlang_driver /var/ketama.servers | xxd -b

0000000: 00010000 00110001 00110000 00101110 00110000 00101110  .10.0.
0000006: 00110001 00101110 00110001 00110001 00111000 00111010  1.118:
000000c: 00110001 00110001 00110010 00110001 00110001           11211

The first byte of the response (00010000) is 16, which is the length of the server address returned by the driver, “10.0.1.118:11211″ – It does what we expect, onwards…

The Erlang bit

  1. -module(ketama).
  2. -behaviour(gen_server).
  3. -export([start_link/0, start_link/1, start_link/2, getserver/1]).
  4. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  5.      terminate/2, code_change/3]).
  6.  
  7. -record(state, {port}).
  8.  
  9. start_link() ->
  10.     start_link("/web/site/GLOBAL/ketama.servers").
  11.  
  12. start_link(ServersFile) ->
  13.     start_link(ServersFile, "/usr/bin/ketama_erlang_driver").
  14.  
  15. start_link(ServersFile, BinPath) ->
  16.     gen_server:start_link({local, ?MODULE}, ?MODULE, [ServersFile, BinPath], []).
  17.  
  18. getserver(Key) ->
  19.     gen_server:call(?MODULE, {getserver, Key}).
  20.  
  21. %%
  22.  
  23. init([ServersFile, BinPath]) ->
  24.     Exe = BinPath ++ " " ++ ServersFile,
  25.     Port = open_port({spawn, Exe}, [binary, {packet, 1}, use_stdio]),
  26.     {ok, #state{port=Port}}.
  27.  
  28. handle_call({getserver, Key}, _From, #state{port=Port} = State) ->
  29.     Port ! {self(), {command, Key}},
  30.     receive
  31.         {Port, {data, Data}} ->
  32.             {reply, Data, State}
  33.         after 1000 -> % if it takes this long, you have serious issues.
  34.             {stop, ketama_port_timeout, State}
  35.     end.
  36.  
  37. handle_cast(_Msg, State) ->    {noreply, State}.
  38. handle_info({‘EXIT’, Port, Reason}, #state{port = Port} = State) ->
  39.     {stop, {port_terminated, Reason}, State}.
  40. terminate({port_terminated, _Reason}, _State) ->    ok;
  41. terminate(_Reason, #state{port = Port} = _State) ->     port_close(Port).
  42. code_change(_OldVsn, State, _Extra) ->     {ok, State}.



This code can be found in the erlang directory of the ketama source in svn:
svn://svn.audioscrobbler.net/misc/ketama/

Tags: , , , , ,

Sunday, September 28th, 2008 programming 9 Comments