Archive for October, 2008

A Million-user Comet Application with Mochiweb, Part 2

In Part 1, we built a (somewhat useless) mochiweb comet application that sent clients a message every 10 seconds. We tuned the Linux kernel, and built a tool to establish a lot of connections in order to test performance and memory usage. We found that it took around 45KB per connection.

Part 2 is about turning the application into something useful, and saving memory:

  • Implement a message router with a login/logout/send API
  • Update the mochiweb app to receive messages from the router
  • Setup a distributed erlang system so we can run the router on a different node/host to mochiweb
  • Write a tool to spam the router with lots of messages
  • Graph memory usage over 24hrs, and optimise the mochiweb app to save memory.

This means we are decoupling the message sending logic from the mochiweb app. In tandem with the floodtest tool from part 1, we can benchmark a setup closer to a production scenario.

Implementing the message router

The router API is just 3 functions:

  • login(Id, Pid) register a process (of pid Pid) to receive messages for Id
  • logout(Pid) to stop receiving messages
  • send(Id, Msg) sends the message Msg to any client logged in as Id

Note that, by design, it is possible for one process to login with multiple different Ids.

This example router module uses 2 ets tables to store bidirectional mappings between Pids and Ids. (pid2id and id2pid in the #state record below.)

router.erl:

  1. -module(router).
  2. -behaviour(gen_server).
  3.  
  4. -export([start_link/0]).
  5. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  6.      terminate/2, code_change/3]).
  7.  
  8. -export([send/2, login/2, logout/1]).
  9.  
  10. -define(SERVER, global:whereis_name(?MODULE)).
  11.  
  12. % will hold bidirectional mapping between id <–> pid
  13. -record(state, {pid2id, id2pid}).
  14.  
  15. start_link() ->
  16.     gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
  17.  
  18. % sends Msg to anyone logged in as Id
  19. send(Id, Msg) ->
  20.     gen_server:call(?SERVER, {send, Id, Msg}).
  21.  
  22. login(Id, Pid) when is_pid(Pid) ->
  23.     gen_server:call(?SERVER, {login, Id, Pid}).
  24.  
  25. logout(Pid) when is_pid(Pid) ->
  26.     gen_server:call(?SERVER, {logout, Pid}).
  27.  
  28. %%
  29.  
  30. init([]) ->
  31.     % set this so we can catch death of logged in pids:
  32.     process_flag(trap_exit, true),
  33.     % use ets for routing tables
  34.     {ok, #state{
  35.                 pid2id = ets:new(?MODULE, [bag]),
  36.                 id2pid = ets:new(?MODULE, [bag])
  37.                }
  38.     }.
  39.  
  40. handle_call({login, Id, Pid}, _From, State) when is_pid(Pid) ->
  41.     ets:insert(State#state.pid2id, {Pid, Id}),
  42.     ets:insert(State#state.id2pid, {Id, Pid}),
  43.     link(Pid), % tell us if they exit, so we can log them out
  44.     io:format("~w logged in as ~w\n",[Pid, Id]),
  45.     {reply, ok, State};
  46.  
  47. handle_call({logout, Pid}, _From, State) when is_pid(Pid) ->
  48.     unlink(Pid),
  49.     PidRows = ets:lookup(State#state.pid2id, Pid),
  50.     case PidRows of
  51.         [] ->
  52.             ok;
  53.         _ ->
  54.             IdRows = [ {I,P} || {P,I} <- PidRows ], % invert tuples
  55.             % delete all pid->id entries
  56.             ets:delete(State#state.pid2id, Pid),
  57.             % and all id->pid
  58.             [ ets:delete_object(State#state.id2pid, Obj) || Obj <- IdRows ]
  59.     end,
  60.     io:format("pid ~w logged out\n",[Pid]),
  61.     {reply, ok, State};
  62.  
  63. handle_call({send, Id, Msg}, _From, State) ->
  64.     % get pids who are logged in as this Id
  65.     Pids = [ P || { _Id, P } <- ets:lookup(State#state.id2pid, Id) ],
  66.     % send Msg to them all
  67.     M = {router_msg, Msg},
  68.     [ Pid ! M || Pid <- Pids ],
  69.     {reply, ok, State}.
  70.  
  71. % handle death and cleanup of logged in processes
  72. handle_info(Info, State) ->
  73.     case Info of
  74.         {‘EXIT’, Pid, _Why} ->
  75.             % force logout:
  76.             handle_call({logout, Pid}, blah, State);
  77.         Wtf ->
  78.             io:format("Caught unhandled message: ~w\n", [Wtf])
  79.     end,
  80.     {noreply, State}.
  81.  
  82. handle_cast(_Msg, State) ->
  83.     {noreply, State}.
  84. terminate(_Reason, _State) ->
  85.     ok.
  86. code_change(_OldVsn, State, _Extra) ->
  87.     {ok, State}.


Updating the mochiweb application

Let’s assume a user is represented by an integer Id based on the URL they connect to mochiweb with, and use that id to register with the message router. Instead of blocking for 10 seconds then sending something, the mochiweb loop will block on receiving messages from the router, and send an HTTP chunk to the client for every message the router sends it:

  • Client connects to mochiweb at http://localhost:8000/test/123
  • Mochiweb app registers the pid for that connection against the id ‘123′ with the message router
  • If you send a message to the router addressed to id ‘123′, it will be relayed to the correct mochiweb process, and appear in the browser for that user

Here’s the updated version of mochiconntest_web.erl:

  1. -module(mochiconntest_web).
  2.  
  3. -export([start/1, stop/0, loop/2]).
  4.  
  5. %% External API
  6.  
  7. start(Options) ->
  8.     {DocRoot, Options1} = get_option(docroot, Options),
  9.     Loop = fun (Req) ->
  10.                    ?MODULE:loop(Req, DocRoot)
  11.            end,
  12.     % we’ll set our maximum to 1 million connections. (default: 2048)
  13.     mochiweb_http:start([{max, 1000000}, {name, ?MODULE}, {loop, Loop} | Options1]).
  14.  
  15. stop() ->
  16.     mochiweb_http:stop(?MODULE).
  17.  
  18. loop(Req, DocRoot) ->
  19.     "/" ++ Path = Req:get(path),
  20.     case Req:get(method) of
  21.         Method when Method =:= ‘GET’; Method =:= ‘HEAD’ ->
  22.             case Path of
  23.                 "test/" ++ Id ->
  24.                     Response = Req:ok({"text/html; charset=utf-8",
  25.                                       [{"Server","Mochiweb-Test"}],
  26.                                       chunked}),
  27.                     % login using an integer rather than a string
  28.                     {IdInt, _} = string:to_integer(Id),
  29.                     router:login(IdInt, self()),
  30.                     feed(Response, IdInt, 1);
  31.                 _ ->
  32.                     Req:not_found()
  33.             end;
  34.         ‘POST’ ->
  35.             case Path of
  36.                 _ ->
  37.                     Req:not_found()
  38.             end;
  39.         _ ->
  40.             Req:respond({501, [], []})
  41.     end.
  42.  
  43. feed(Response, Id, N) ->
  44.     receive
  45.     {router_msg, Msg} ->
  46.         Html = io_lib:format("Recvd msg #~w: ‘~s’", [N, Msg]),
  47.         Response:write_chunk(Html)
  48.     end,
  49.     feed(Response, Id, N+1).
  50.  
  51. %% Internal API
  52.  
  53. get_option(Option, Options) ->
  54.     {proplists:get_value(Option, Options), proplists:delete(Option, Options)}.


It’s Alive!

Now let’s bring it to life – we’ll use 2 erlang shells, one for mochiweb and one for the router. Edit start-dev.sh, used to start mochiweb, and add the following additional parameters to erl:

  • -sname n1 to name the erlang node ‘n1′
  • +K true to enable kernel-poll. Seems daft not to when dealing with lots of connections
  • +P 134217727 the default maximum number of processes you can spawn is 32768. Considering we need one process per connection (and I don’t know of any good reason not to) I suggest just setting this to the maximum possible value. 134,217,727 is the max according to “man erl”.

Now run make && ./start-dev.sh and you should see a prompt like this: (n1@localhost)1> – your mochiweb app is now running and the erlang node has a name.

Now run another erlang shell like so:
erl -sname n2
Currently those two erlang instances don’t know about each other, fix that:
(n2@localhost)1> nodes().
[]
(n2@localhost)2> net_adm:ping(n1@localhost).
pong
(n2@localhost)3> nodes().
[n1@localhost]

Now compile and start the router from this shell:
(n2@localhost)4> c(router).
{ok,router}
(n2@localhost)5> router:start_link().
{ok,<0.38.0>}

Now for the fun bit, go to http://localhost:8000/test/123 in your browser (or use lynx --source "http://localhost:8000/test/123" from the console). Check the shell you launched the router in, you should see it logged in one user.

You can now send messages to the router and watch them appear in your browser. Only send strings for now, because we are using ~s to format them with io_lib:format in the feed function, and atoms will crash it:

Just borrow the shell you used to launch the router:

(n2@localhost)6> router:send(123, "Hello World").
(n2@localhost)7> router:send(123, "Why not open another browser window too?").
(n2@localhost)8> router:send(456, "This message will go into the void unless you are connected as /test/456 too").

Check your browser, you’ve got comet :)

Running in a distributed erlang system

It makes sense to run the router and mochiweb front-end(s) on different machines. Assuming you have a couple of spare machines to test this on, you should start the erlang shells as distributed nodes, i.e. use -name n1@host1.example.com instead of -sname n1 (and the same for n2). Make sure they can see each other by using net_adm:ping(...) as above.

Note that on line 16 of router.erl, the name of the router process (’router’) is registered globally, and that because we are using the following macro to identify/locate the router in calls to gen_server, it will already work fine in a distributed system:

-define(SERVER, global:whereis_name(?MODULE)).

A global name registry for processes in a distributed system is just one of the things you get for free with Erlang.

Generating lots of messages

In a real environment we might see a long-tail like usage pattern, with some very active users and many infrequent users. However for this test we’ll just indiscriminately spam random users with fake messages.

msggen.erl:

  1. -module(msggen).
  2. -export([start/3]).
  3.  
  4. start(0, _, _) -> ok;
  5. start(Num, Interval, Max) ->
  6.     Id = random:uniform(Max),
  7.     router:send(Id, "Fake message Num = " ++ Num),
  8.     receive after Interval -> start(Num -1, Interval, Max) end.



This will send Num messages to random user Ids between 1 and Max, waiting Interval ms between each send.

You can see this in action if you run the router and the mochiweb app, connect with your browser to http://localhost:8000/test/3 then run:

erl -sname test
(test@localhost)1> net_adm:ping(n1@localhost).
pong
(test@localhost)2> c(msggen).
{ok,msggen}
(test@localhost)3> msggen:start(20, 10, 5).
ok

This will send 20 messages to random Ids between 1-5, with a 10ms wait between messages. Chances are Id 3 will receive a message or four.

We can even run a few of these in parallel to simulate multiple sources for messages. Here’s an example of spawning 10 processes that each send 20 messages to ids 1-5 with a 100ms delay between each message:

[ spawn(fun() -> msggen:start(20, 100, 5), io:format("~w finished.\n", [self()]) end) || _ <- lists:seq(1,10) ].
[<0.97.0>,<0.98.0>,<0.99.0>,<0.100.0>,<0.101.0>,<0.102.0>,
<0.103.0>,<0.104.0>,<0.105.0>,<0.106.0>]
<0.101.0> finished.
<0.105.0> finished.
<0.106.0> finished.
<0.104.0> finished.
<0.102.0> finished.
<0.98.0> finished.
<0.99.0> finished.
<0.100.0> finished.
<0.103.0> finished.
<0.97.0> finished.

C10K again, with feeling

We have the pieces we need to run another larger-scale test now; clients connect to our mochiweb app, which registers them with the message router. We can generate a high volume of fake messages to fire at the router, which will send them to any registered clients. Let’s run the 10,000 concurrent-user test again from Part 1, but this time we’ll leave all the clients connected for a while while we blast lots of messages through the system.

Assuming you followed the instructions in Part 1 to tune your kernel and increase your max files ulimit etc, this should be easy. You already have the mochiweb app and router running, so let’s dump more traffic on it.

Without any clients connected, the mochiweb beam process uses around 40MB (resident):

$ ps -o rss= -p `pgrep -f 'sname n1'`
40156

This greps for the process ID of the command with ’sname n1′ in it, which is our mochiweb erlang process, then uses some formatting options to ps to print the RSS value – the resident memory size (KB)

I concocted this hideous one-liner to print the timestamp (human readable and a unixtime in case we need it later), current memory usage of mochiweb (resident KB), and the number of currently established connections every 60 seconds – leave this running on the mochiweb machine in a spare terminal:

$ MOCHIPID=`pgrep -f 'name n1'`; while [ 1 ] ; do NUMCON=`netstat -n | awk '/ESTABLISHED/ && $4=="127.0.0.1:8000"' | wc -l`; MEM=`ps -o rss= -p $MOCHIPID`; echo -e "`date`\t`date +%s`\t$MEM\t$NUMCON"; sleep 60; done | tee -a mochimem.log

If anyone knows a better way to plot memory usage for a single process over time please leave a comment..

Now launch the floodtest tool from Part 1 in a new erl shell:
erl> floodtest:start("/tmp/mochi-urls.txt", 10).

This will establish 100 new connections per second until all 10,000 clients are connected.
You’ll see it quickly reaches 10k connections:
erl> floodtest:start("/tmp/mochi-urls.txt", 10).
Stats: {825,0,0}
Stats: {1629,0,0}
Stats: {2397,0,0}
Stats: {3218,0,0}
Stats: {4057,0,0}
Stats: {4837,0,0}
Stats: {5565,0,0}
Stats: {6295,0,0}
Stats: {7022,0,0}
Stats: {7727,0,0}
Stats: {8415,0,0}
Stats: {9116,0,0}
Stats: {9792,0,0}
Stats: {10000,0,0}
...

Check the hideous memory usage one-liner output:
Mon Oct 20 16:57:24 BST 2008 1224518244 40388 1
Mon Oct 20 16:58:25 BST 2008 1224518305 41120 263
Mon Oct 20 16:59:27 BST 2008 1224518367 65252 5267
Mon Oct 20 17:00:32 BST 2008 1224518432 89008 9836
Mon Oct 20 17:01:37 BST 2008 1224518497 90748 10001
Mon Oct 20 17:02:41 BST 2008 1224518561 90964 10001
Mon Oct 20 17:03:46 BST 2008 1224518626 90964 10001
Mon Oct 20 17:04:51 BST 2008 1224518691 90964 10001

It reached 10k concurrent connections (plus one I had open in firefox) and the resident memory size of mochiweb is around 90MB (90964KB).

Now unleash some messages:

erl> [ spawn(fun() -> msggen:start(1000000, 100, 10000) end) || _ <- lists:seq(1,100) ].
[<0.65.0>,<0.66.0>,<0.67.0>,<0.68.0>,<0.69.0>,<0.70.0>,
<0.71.0>,<0.72.0>,<0.73.0>,<0.74.0>,<0.75.0>,<0.76.0>,
<0.77.0>,<0.78.0>,<0.79.0>,<0.80.0>,<0.81.0>,<0.82.0>,
<0.83.0>,<0.84.0>,<0.85.0>,<0.86.0>,<0.87.0>,<0.88.0>,
<0.89.0>,<0.90.0>,<0.91.0>,<0.92.0>,<0.93.0>|...]

That’s 100 processes each sending a million messages at a rate of 10 messages a second to random Ids from 1 to 10,000. That means the router is seeing 1000 messages per second, and on average each of our 10k clients will get one message every 10 seconds.

Check the output in the floodtest shell, and you’ll see clients are receiving http chunks (remember it was {NumConnected, NumClosed, NumChunksRecvd}):
...
Stats: {10000,0,5912}
Stats: {10000,0,15496}
Stats: {10000,0,25145}
Stats: {10000,0,34755}
Stats: {10000,0,44342}
...

A million messages at a rate of 10 per second per process will take 27 hours to complete. Here’s how the memory usage looks after just 10 mins:
Mon Oct 20 16:57:24 BST 2008 1224518244 40388 1
Mon Oct 20 16:58:25 BST 2008 1224518305 41120 263
Mon Oct 20 16:59:27 BST 2008 1224518367 65252 5267
Mon Oct 20 17:00:32 BST 2008 1224518432 89008 9836
Mon Oct 20 17:01:37 BST 2008 1224518497 90748 10001
Mon Oct 20 17:02:41 BST 2008 1224518561 90964 10001
Mon Oct 20 17:03:46 BST 2008 1224518626 90964 10001
Mon Oct 20 17:04:51 BST 2008 1224518691 90964 10001
Mon Oct 20 17:05:55 BST 2008 1224518755 90980 10001
Mon Oct 20 17:07:00 BST 2008 1224518820 91120 10001
Mon Oct 20 17:08:05 BST 2008 1224518885 98664 10001
Mon Oct 20 17:09:10 BST 2008 1224518950 106752 10001
Mon Oct 20 17:10:15 BST 2008 1224519015 114044 10001
Mon Oct 20 17:11:20 BST 2008 1224519080 119468 10001
Mon Oct 20 17:12:25 BST 2008 1224519145 125360 10001

You can see the size already crept up from 40MB to 90MB when all 10k clients were connected, and to 125MB after running a bit longer.

It’s worth pointing out that the floodtest shell is almost CPU-bound, the msggen shell is using 2% CPU and the router and mochiweb less than 1%. (ie, only simulating lots of clients is using much CPU – the server app itself is very light on the CPU). It helps to have multiple machines, or a multicore CPU for testing.

Results after running for 24 hours

I ran this for 24 hours, whilst logging memory usage of the mochiweb process to mochimem.log. This is with 10,000 connected clients, and 1000 messages per second being sent to random clients.

The following bit of bash/awk was used to trick gnuplot into turning the mochimem.log file into a graph:

(echo -e "set terminal png size 500,300\nset xlabel \"Minutes Elapsed\"\nset ylabel \"Mem (KB)\"\nset title \"Mem usage with 10k active connections, 1000 msg/sec\"\nplot \"-\" using 1:2 with lines notitle" ; awk 'BEGIN{FS="\t";} NR%10==0 {if(!t){t=$2} mins=($2-t)/60; printf("%d %d\n",mins,$3)}' mochimem.log ; echo -e "end" ) | gnuplot > mochimem.png

Graph of memory usage with c10k, 1000msg/sec, 24hrs

Memory usage with c10k, 1000msg/sec, 24hrs

This graph shows the memory usage (with 10k active connections and 1000 msgs/sec) levels off at around 250MB over a 24 hour period. The two big drops, once near the start and once at the end of the test, are when I ran this in the mochiweb erlang process, just out of curiosity:

erl> [erlang:garbage_collect(P) || P <- erlang:processes()].

This forces all processes to garbage collect, and reclaimed around 100MB of memory – next up we investigate ways to save memory without resorting to manually forcing garbage collection.

Reducing memory usage in mochiweb

Seeing as the mochiweb app is just sending messages and then immediately forgetting them, the memory usage shouldn’t need to increase with the number of messages sent.

I’m a novice when it comes to Erlang memory management, but I’m going to assume that if I can force it to garbage collect more often, it will allow us to reclaim much of that memory, and ultimately let us serve more users with less overall system memory. We might burn a bit more CPU in the process, but that’s an acceptable trade-off.

Digging around in the erlang docs yields this option:

erlang:system_flag(fullsweep_after, Number)

Number is a non-negative integer which indicates how many times generational garbages collections can be done without forcing a fullsweep collection. The value applies to new processes; processes already running are not affected.
In low-memory systems (especially without virtual memory), setting the value to 0 can help to conserve memory.
An alternative way to set this value is through the (operating system) environment variable ERL_FULLSWEEP_AFTER.

Sounds intriguing, but it only applies to new processes and would affect all processes in the VM, not just our mochiweb processes.

Next up is this:

erlang:system_flag(min_heap_size, MinHeapSize)

Sets the default minimum heap size for processes. The size is given in words. The new min_heap_size only effects processes spawned after the change of min_heap_size has been made. The min_heap_size can be set for individual processes by use of spawn_opt/N or process_flag/2.

Could be useful, but I’m pretty sure our mochiweb processes need a bigger heap than the default value anyway. I’d like to avoid needing to patch the mochiweb source to add spawn options if possible.

Next to catch my eye was this:

erlang:hibernate(Module, Function, Args)

Puts the calling process into a wait state where its memory allocation has been reduced as much as possible, which is useful if the process does not expect to receive any messages in the near future.

The process will be awaken when a message is sent to it, and control will resume in Module:Function with the arguments given by Args with the call stack emptied, meaning that the process will terminate when that function returns. Thus erlang:hibernate/3 will never return to its caller.

If the process has any message in its message queue, the process will be awaken immediately in the same way as described above.

In more technical terms, what erlang:hibernate/3 does is the following. It discards the call stack for the process. Then it garbage collects the process. After the garbage collection, all live data is in one continuous heap. The heap is then shrunken to the exact same size as the live data which it holds (even if that size is less than the minimum heap size for the process).

If the size of the live data in the process is less than the minimum heap size, the first garbage collection occurring after the process has been awaken will ensure that the heap size is changed to a size not smaller than the minimum heap size.

Note that emptying the call stack means that any surrounding catch is removed and has to be re-inserted after hibernation. One effect of this is that processes started using proc_lib (also indirectly, such as gen_server processes), should use proc_lib:hibernate/3 instead to ensure that the exception handler continues to work when the process wakes up.

This sounds reasonable – let’s try hibernating after every message and see what happens.

Edit mochiconntest_web.erl and change the following:

  • Make the last line of the feed(Response, Id, N) function call hibernate instead of calling itself
  • Call hibernate immediately after logging into the router, rather than calling feed and blocking on receive
  • Remember to export feed/3 so hibernate can call back into the function on wake-up

Updated mochiconntest_web.erl with hibernation between messages:

  1. -module(mochiconntest_web).
  2.  
  3. -export([start/1, stop/0, loop/2, feed/3]).
  4.  
  5. %% External API
  6.  
  7. start(Options) ->
  8.     {DocRoot, Options1} = get_option(docroot, Options),
  9.     Loop = fun (Req) ->
  10.                    ?MODULE:loop(Req, DocRoot)
  11.            end,
  12.     % we’ll set our maximum to 1 million connections. (default: 2048)
  13.     mochiweb_http:start([{max, 1000000}, {name, ?MODULE}, {loop, Loop} | Options1]).
  14.  
  15. stop() ->
  16.     mochiweb_http:stop(?MODULE).
  17.  
  18. loop(Req, DocRoot) ->
  19.     "/" ++ Path = Req:get(path),
  20.     case Req:get(method) of
  21.         Method when Method =:= ‘GET’; Method =:= ‘HEAD’ ->
  22.             case Path of
  23.                 "test/" ++ IdStr ->
  24.                     Response = Req:ok({"text/html; charset=utf-8",
  25.                                       [{"Server","Mochiweb-Test"}],
  26.                                       chunked}),
  27.                     {Id, _} = string:to_integer(IdStr),
  28.                     router:login(Id, self()),
  29.                     % Hibernate this process until it receives a message:
  30.                     proc_lib:hibernate(?MODULE, feed, [Response, Id, 1]);
  31.                 _ ->
  32.  
  33.  
  34.                     Req:not_found()
  35.             end;
  36.         ‘POST’ ->
  37.             case Path of
  38.                 _ ->
  39.                     Req:not_found()
  40.             end;
  41.         _ ->
  42.             Req:respond({501, [], []})
  43.     end.
  44.  
  45. feed(Response, Id, N) ->
  46.     receive
  47.     {router_msg, Msg} ->
  48.         Html = io_lib:format("Recvd msg #~w: ‘~w’<br/>", [N, Msg]),
  49.         Response:write_chunk(Html)
  50.     end,
  51.     % Hibernate this process until it receives a message:
  52.     proc_lib:hibernate(?MODULE, feed, [Response, Id, N+1]).
  53.  
  54.  
  55. %% Internal API
  56.  
  57. get_option(Option, Options) ->
  58.     {proplists:get_value(Option, Options), proplists:delete(Option, Options)}.


I made these changes, ran make to rebuild mochiweb, then redid the same c10k test (1000msgs/sec for 24hrs).

Results after running for 24 hours w/ proc_lib:hibernate()

Memory usage with c10k, 1000msg/sec, 24hrs, using hibernate()

Memory usage with c10k, 1000msg/sec, 24hrs, using hibernate()

Judicious use of hibernate means the mochiweb application memory levels out at 78MB Resident with 10k connections, much better than the 450MB we saw in Part 1. There was no significant increase in CPU usage.

Summary

We made a comet application on Mochiweb that lets us push arbitrary messages to users identified by an integer ID. After pumping 1000 msgs/sec through it for 24 hours, with 10,000 connected users, we observed it using 80MB, or 8KB per user. We even made pretty graphs.

This is quite an improvement from the 45KB per used we saw in Part 1. The savings are attributed to making the application behave in a more realistic way, and use of hibernate for mochiweb processes between messages.

Next Steps

In Part 3, I’ll turn it up to 1 million connected clients. I will be deploying the test app on a multi-cpu 64-bit server with plenty of RAM. This will show what difference, if any, running on a 64-bit VM makes. I’ll also detail some additional tricks and tuning needed in order to simulate 1 million client connections.

The application will evolve into a sort of pub-sub system, where subscriptions are associated to user Ids and stored by the app, rather than provided by clients when they connect. We’ll load in a typical social-network dataset: friends. This will allow a user to login with their user Id and automatically receive any event generated by one of their friends.

UPDATED: Part 3 is now online.

Tags: , ,

Thursday, October 23rd, 2008 programming 21 Comments

A Million-user Comet Application with Mochiweb, Part 1

In this series I will detail what I found out empirically about how mochiweb performs with lots of open connections, and show how to build a comet application using mochiweb, where each mochiweb connection is registered with a router which dispatches messages to various users. We end up with a working application that can cope with a million concurrent connections, and crucially, knowing how much RAM we need to make it work.

In part one:

  • Build a basic comet mochiweb app that sends clients a message every 10 seconds.
  • Tune the Linux kernel to handle lots of TCP connections
  • Build a flood-testing tool to open lots of connections (ye olde C10k test)
  • Examine how much memory this requires per connection.

Future posts in this series will cover how to build a real message routing system, additional tricks to reduce memory usage, and more testing with 100k and 1m concurrent connections.

I assume you know your way around the Linux command line, and know a bit of Erlang.

Building a Mochiweb test application

In brief:

  1. Install and build Mochiweb
  2. Run: /your-mochiweb-path/scripts/new_mochiweb.erl mochiconntest
  3. cd mochiconntest and edit src/mochiconntest_web.erl

This code (mochiconntest_web.erl) just accepts connections and uses chunked transfer to send an initial welcome message, and one message every 10 seconds to every client.

  1. -module(mochiconntest_web).
  2. -export([start/1, stop/0, loop/2]).
  3. %% External API
  4. start(Options) ->
  5.     {DocRoot, Options1} = get_option(docroot, Options),
  6.     Loop = fun (Req) ->
  7.                    ?MODULE:loop(Req, DocRoot)
  8.            end,
  9.     % we’ll set our maximum to 1 million connections. (default: 2048)
  10.     mochiweb_http:start([{max, 1000000}, {name, ?MODULE}, {loop, Loop} | Options1]).
  11.  
  12. stop() ->
  13.     mochiweb_http:stop(?MODULE).
  14.  
  15. loop(Req, DocRoot) ->
  16.     "/" ++ Path = Req:get(path),
  17.     case Req:get(method) of
  18.         Method when Method =:= ‘GET’; Method =:= ‘HEAD’ ->
  19.             case Path of
  20.                 "test/" ++ Id ->
  21.                     Response = Req:ok({"text/html; charset=utf-8",
  22.                                       [{"Server","Mochiweb-Test"}],
  23.                                       chunked}),
  24.                     Response:write_chunk("Mochiconntest welcomes you! Your Id: " ++ Id ++ "\n"),
  25.                     %% router:login(list_to_atom(Id), self()),
  26.                     feed(Response, Id, 1);
  27.                 _ ->
  28.                     Req:not_found()
  29.             end;
  30.         ‘POST’ ->
  31.             case Path of
  32.                 _ ->
  33.                     Req:not_found()
  34.             end;
  35.         _ ->
  36.             Req:respond({501, [], []})
  37.     end.
  38.  
  39. feed(Response, Path, N) ->
  40.     receive
  41.         %{router_msg, Msg} ->
  42.         %    Html = io_lib:format("Recvd msg #~w: ‘~s’<br/>", [N, Msg]),
  43.         %    Response:write_chunk(Html);
  44.     after 10000 ->
  45.         Msg = io_lib:format("Chunk ~w for id ~s\n", [N, Path]),
  46.         Response:write_chunk(Msg)
  47.     end,
  48.     feed(Response, Path, N+1).
  49.  
  50. %% Internal API
  51. get_option(Option, Options) ->
  52.     {proplists:get_value(Option, Options), proplists:delete(Option, Options)}.


Start your mochiweb app

make && ./start-dev.sh
By default mochiweb listens on port 8000, on all interfaces. If you are doing this on the desktop, you can test with any web browser. Just navigate to http://localhost:8000/test/foo.

Here’s the command-line test:

$ lynx --source "http://localhost:8000/test/foo"
Mochiconntest welcomes you! Your Id: foo<br/>
Chunk 1 for id foo<br/>
Chunk 2 for id foo<br/>
Chunk 3 for id foo<br/>
^C

Yep, it works. Now let’s make it suffer.

Tuning the Linux Kernel for many tcp connections

Save yourself some time and tune the kernel tcp settings before testing with lots of connections, or your test will fail and you’ll see lots of Out of socket memory messages (and if you are masquerading, nf_conntrack: table full, dropping packet.)

Here are the sysctl settings I ended up with – YMMV, but these will probably do:

# General gigabit tuning:
net.core.rmem_max = 16777216
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 87380 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216
net.ipv4.tcp_syncookies = 1
# this gives the kernel more memory for tcp
# which you need with many (100k+) open socket connections
net.ipv4.tcp_mem = 50576   64768   98152
net.core.netdev_max_backlog = 2500
# I was also masquerading the port comet was on, you might not need this
net.ipv4.netfilter.ip_conntrack_max = 1048576

Put these in /etc/sysctl.conf then run sysctl -p to apply them. No need to reboot, now your kernel should be able to handle a lot more open connections, yay.

Creating a lot of connections

There are many ways to do this. Tsung is quite sexy, and there and plenty of other less-sexy ways to spam an httpd with lots of requests (ab, httperf, httpload etc). None of them are ideally suited for testing a comet application, and I’d been looking for an excuse to try the Erlang http client, so I wrote a basic test to make lots of connections.
Just because you can, doesn’t mean you should.. one process per connection would definitely be a waste here. I’m using one process to load urls from a file, and another process to establish and receive messages from all http connections (and one process as a timer to print a report every 10 seconds). All data received from the server is discarded, but it does increment a counter so we can keep track of how many HTTP chunks were delivered.


floodtest.erl

  1. -module(floodtest).
  2. -export([start/2, timer/2, recv/1]).
  3.  
  4. start(Filename, Wait) ->
  5.     inets:start(),
  6.     spawn(?MODULE, timer, [10000, self()]),
  7.     This = self(),
  8.     spawn(fun()-> loadurls(Filename, fun(U)-> This ! {loadurl, U} end, Wait) end),
  9.     recv({0,0,0}).
  10.  
  11. recv(Stats) ->
  12.     {Active, Closed, Chunks} = Stats,
  13.     receive
  14.         {stats} -> io:format("Stats: ~w\n",[Stats])
  15.         after 0 -> noop
  16.     end,
  17.     receive
  18.         {http,{_Ref,stream_start,_X}} ->  recv({Active+1,Closed,Chunks});
  19.         {http,{_Ref,stream,_X}} ->          recv({Active, Closed, Chunks+1});
  20.         {http,{_Ref,stream_end,_X}} ->  recv({Active-1, Closed+1, Chunks});
  21.         {http,{_Ref,{error,Why}}} ->
  22.             io:format("Closed: ~w\n",[Why]),
  23.             recv({Active-1, Closed+1, Chunks});
  24.         {loadurl, Url} ->
  25.             http:request(get, {Url, []}, [], [{sync, false}, {stream, self}, {version, 1.1}, {body_format, binary}]),
  26.                 recv(Stats)
  27.     end.
  28.  
  29. timer(T, Who) ->
  30.     receive
  31.     after T ->
  32.         Who ! {stats}
  33.     end,
  34.     timer(T, Who).
  35.  
  36. % Read lines from a file with a specified delay between lines:
  37. for_each_line_in_file(Name, Proc, Mode, Accum0) ->
  38.     {ok, Device} = file:open(Name, Mode),
  39.     for_each_line(Device, Proc, Accum0).
  40.  
  41. for_each_line(Device, Proc, Accum) ->
  42.     case io:get_line(Device, "") of
  43.         eof  -> file:close(Device), Accum;
  44.         Line -> NewAccum = Proc(Line, Accum),
  45.                     for_each_line(Device, Proc, NewAccum)
  46.     end.
  47.  
  48. loadurls(Filename, Callback, Wait) ->
  49.     for_each_line_in_file(Filename,
  50.         fun(Line, List) ->
  51.             Callback(string:strip(Line, right, $\n)),
  52.             receive
  53.             after Wait ->
  54.                 noop
  55.             end,
  56.             List
  57.         end,
  58.         [read], []).



Each connection we make requires an ephemeral port, and thus a file descriptor, and by default this is limited to 1024. To avoid the Too many open files problem you’ll need to modify the ulimit for your shell. This can be changed in /etc/security/limits.conf, but requires a logout/login. For now you can just sudo and modify the current shell (su back to your non-priv’ed user after calling ulimit if you don’t want to run as root):

$ sudo bash
# ulimit -n 999999
# erl

You might as well increase the ephemeral port range to the maximum too:
# echo "1024 65535" > /proc/sys/net/ipv4/ip_local_port_range

Generate a file of URLs to feed to the floodtest program:
( for i in `seq 1 10000`; do echo "http://localhost:8000/test/$i" ; done ) > /tmp/mochi-urls.txt

From the erlang prompt you can now compile and launch floodtest.erl:
erl> c(floodtest).
erl> floodtest:start("/tmp/mochi-urls.txt", 100).

This will establish 10 new connections per second (ie, 1 connection every 100ms).

It will output stats in the form {Active, Closed, Chunks} where Active is the number of connections currently established, Closed is the number that were terminated for some reason, and Chunks is the number of chunks served by chunked transfer from mochiweb. Closed should stay on 0, and Chunks should be more than Active, because each active connection will receive multiple chunks (1 every 10 seconds).


The resident size of the mochiweb beam process with 10,000 active connections was 450MB – that’s 45KB per connection. CPU utilization on the machine was practically nothing, as expected.

Assessment so far

That was a reasonable first attempt. 45KB per-connection seems a bit high – I could probably cook something up in C using libevent that could do this with closer to 4.5KB per connection (just a guess, if anyone has experience please leave a comment). If you factor in the amount of code and time it took to do this in Erlang compared with C, I think the increased memory usage is more excusable.


In future posts I’ll cover building a message router (so we can uncomment lines 25 and 41-43 in mochiconntest_web.erl) and talk about some ways to reduce the overall memory usage. I’ll also share the results of testing with 100k and 1M connections.

UPDATED: Part 2 and Part 3 are online now.

Tags: , , , , , ,

Wednesday, October 15th, 2008 programming 41 Comments

On bulk loading data into Mnesia

Consider this a work-in-progress; I will update this post if I find a ‘better’ way to do fast bulk loading

The time has come to replace my ets-based storage backend with something non-volatile. I considered a dets/ets hybrid, but I really need this to be replicated to at least a second node for HA / failover. Mnesia beckoned.

The problem:

  • 15 million [fairly simple] records
  • 1 Mnesia table: bag, disc_copies, just 1 node, 1 additional index
  • Hardware is a quad-core 2GHz CPU, 16GB Ram, 8x 74Gig 15k rpm scsi disks in RAID-6
  • Takes ages* to load and spews a load of “Mnesia is overloaded” warnings

* My definition of ‘takes ages’: Much longer than PostgreSQL \copy or MySQL LOAD DATA INFILE

At this point all I want is a quick way to bulk-load some data into a disc_copies table on a single node, so I can get on with running some tests.

Here is the table creation code:
mnesia:create_table(subscription,
[
{disc_copies, [node()]},
{attributes, record_info(fields, subscription)},
{index, [subscribee]}, %index subscribee too
{type, bag}
]
)

The subscription record is fairly simple:
{subscription, subscriber={resource, user, 123}, subscribee={resource, artist, 456}}

I’m starting erlang like so:
erl +A 128 -mnesia dir '"/home/erlang/mnesia_dir"' -boot start_sasl

The interesting thing there is really the +A 128 – this spreads the cpu load better between the 4 cores.

Attempt 0) ‘by the book’ one transaction to rule them all

Something like this:
mnesia:transaction(fun()-> [ mnesia:write(S) || S <- Subs ] end)

Time taken: Too long, I gave up after 12 hours
Number of “Mnesia overloaded” warnings: lots
Conclusion: Must be a better way
TODO: actually run this test and time it.

Attempt 1) dirty_write

There isn’t really any need to do this in a transaction, so I tried dirty_write.
[ mnesia:dirty_write(S) || S <- Subs ]

And here’s the warning in full:
=ERROR REPORT==== 13-Oct-2008::16:53:57 ===
Mnesia('mynode@myhost'): ** WARNING ** Mnesia is overloaded: {dump_log,
write_threshold}

Time taken: 890 secs
Number of “Mnesia overloaded” warnings: lots
Conclusion: Workable, but nothing to boast about. Those warnings are annoying

Attempt 2) dirty_write, defer index creation

A common trick with traditional RDBMS would be to bulk load the data into the table and add the indexes afterwards. In some scenarios you can avoid costly incremental index update operations. If you are doing this in one gigantic transaction it shouldn’t matter, and I’m not really sure how mnesia works under the hood (something I plan to rectify if I end up using it for real).
I tried a similar approach by commenting out the {index, [subscribee]} line above, doing the load, then using mnesia:add_table_index(subscriber, subscribee) afterwards to add the index once all the data was loaded. Note that mnesia was still building the primary index on the fly, but that can’t be helped.
Time taken: 883 secs (679s load + 204s index creation)
Number of “Mnesia overloaded” warnings: lots
Conclusion: Insignificant, meh

Attempt 3) mnesia:ets() trickery

This is slightly perverted, but I tried it because I was suspicious that incrementally updating the on-disk data wasn’t especially optimal. The idea is to make a ram_only table and use the mnesia:ets() function to write directly to the ets table (doesn’t get much faster than ets). The table can then be converted to disc_copies. There are caveats – to quote The Fine Manual:

Call the Fun in a raw context which is not protected by a transaction. The Mnesia function call is performed in the Fun are performed directly on the local ets tables on the assumption that the local storage type is ram_copies and the tables are not replicated to other nodes. Subscriptions are not triggered and checkpoints are not updated, but it is extremely fast.

I can live with that. I don’t mind if replication takes a while to setup when I put this into production – I’ll gladly take any optimisations I can get at this stage (testing/development).

Loading a list of subscriptions looks like this:
mnesia:ets(fun()-> [mnesia:dirty_write(S) || S <- Subs] end).
And to convert this into disc_copies once data is loaded in:
mnesia:change_table_copy_type(subscription, node(), disc_copies).

Time taken: 745 secs (699s load + 46s convert to disc_copies)
Number of “Mnesia overloaded” warnings: none!
Conclusion: Fastest yet, bit hacky

Summary

At least the ets() trick doesn’t spew a million warnings. I also need to examine the output of mnesia:dump_to_textfile and see if loading data from that format is any faster.

TODO:

  • Examine / test using the dum_to_textfile method
  • Run full transactional load and time it
  • Try similar thing with PostgreSQL

Tags: ,

Monday, October 13th, 2008 hacks, programming 7 Comments

Updated bash PS1

Made a minor tweak to my .bashrc after browsing dotfiles.org for some ideas. One neat trick I gleaned was detecting when the exit code of the last command ($?) was non-zero and altering the prompt. This will be useful for quickly seeing at a glance if some enormous load of output from make was successful or not.

Note the prompt goes red on failure

Note the prompt goes red on failure

Here are the bits from my updated .bashrc:

  1. # define useful aliases for color codes
  2. sh_norm="\[\033[0m\]"
  3. sh_black="\[\033[0;30m\]"
  4. sh_darkgray="\[\033[1;30m\]"
  5. sh_blue="\[\033[0;34m\]"
  6. sh_light_blue="\[\033[1;34m\]"
  7. sh_green="\[\033[0;32m\]"
  8. sh_light_green="\[\033[1;32m\]"
  9. sh_cyan="\[\033[0;36m\]"
  10. sh_light_cyan="\[\033[1;36m\]"
  11. sh_red="\[\033[0;31m\]"
  12. sh_light_red="\[\033[1;31m\]"
  13. sh_purple="\[\033[0;35m\]"
  14. sh_light_purple="\[\033[1;35m\]"
  15. sh_brown="\[\033[0;33m\]"
  16. sh_yellow="\[\033[1;33m\]"
  17. sh_light_gray="\[\033[0;37m\]"
  18. sh_white="\[\033[1;37m\]"
  19.  
  20. case `hostname` in
  21.     "livehost"|"production_server"|"sauron")
  22.         HOSTCOLOUR=${sh_red}
  23.         ;;
  24.     "staging-node")      HOSTCOLOUR=${sh_yellow} ;;
  25.     *)              HOSTCOLOUR=${sh_green} ;;
  26. esac
  27.  
  28. export PROMPT_COMMAND=‘if [ $? -ne 0 ]; then ERROR_FLAG=1; else ERROR_FLAG=; fi; ‘
  29. export PS1=${sh_white}\u@’${HOSTCOLOUR}\h${sh_norm}\w\n${sh_norm}‘${ERROR_FLAG:+’${sh_light_red}‘}\$${ERROR_FLAG:+’${sh_norm}‘} ‘



I’m also using the hostname to decide what colour the host appears in the prompt. My home directory, and thus .bashrc, is mounted on most hosts I log in to, and this serves as a reminder if I’m logged in to a production host. Green is the default, and it’s overridden for various special hosts.

Tags:

Saturday, October 11th, 2008 hacks 2 Comments