mnesia

Getting to know ejabberd and writing modules

I started poking around in the ejabberd source code to see what I could learn. I couldn’t find much in the way of high level documentation that talks about how the various bits of ejabberd talk to each other, so I’m starting to piece it together myself.

After compiling ejabberd I made a php script I could use with the external authentication system. Here’s a version that supports just two hardcoded users:

ejabberd.cfg:
{auth_method, external}.
{extauth_program, "/tmp/auth.php"}.


auth.php:

  1. #!/usr/bin/php
  2. <?
  3. $fh  = fopen("php://stdin", ‘r’);
  4. if(!$fh){
  5.     die("Cannot open STDIN\n");
  6. }
  7. $users = array(‘user1′=>‘password1′, ‘user2′=>‘password2′);
  8.  
  9. do{
  10.     $lenBytes = fgets($fh, 3);
  11.     $len = unpack(‘n’, $lenBytes);
  12.     $len = $len[1];
  13.     if($len<1) continue;
  14.     $msg = fgets($fh, $len+1);
  15.     $toks=explode(‘:’,$msg);
  16.     $method = array_shift($toks);
  17.     switch($method){
  18.         case ‘auth’:
  19.             list($username, $server, $password) = $toks;
  20.             if(@$users[$username] == $password){
  21.                 print pack("nn", 2, 1); // ok
  22.             }else{
  23.                 print pack("nn", 2, 0); // fail
  24.             }
  25.             break;
  26.  
  27.         case ‘isuser’:
  28.             list($username, $server) = $toks;
  29.             if(isset($users[$username])){
  30.                 print pack("nn", 2, 1); // yes
  31.             }else{
  32.                 print pack("nn", 2, 0); // nope
  33.             }
  34.             break;
  35.  
  36.         default:
  37.             print pack("nn", 2, 0);// fail
  38.     }
  39. }while(true);


I stripped down the ejabberd config to just load what I considered the bare essentials. Here is the modules section I’m testing with:

From ejabberd.cfg:
{modules,
[
{mod_caps, []},
{mod_disco, []},
{mod_roster, []},
{mod_pubsub, [ % requires mod_caps
{access_createnode, pubsub_createnode},
{plugins, ["default", "pep"]}
]},
{mod_mnesiaweb, []},
{mod_thriftctl, []}
]}.

mod_disco deals with discovery, so clients can find out what the server supports. mod_roster deals with rosters (buddy lists etc) using mnesia. mod_pubsub is enabled because I want to use User Tune, an extension that lets you broadcast the name of the song you are playing to all everyone in your roster. mod_caps provides XEP-115 – an extension for broadcasting and dynamically discovering client, device, or generic entity capabilities. mod_caps is a requirement of mod_pubsub.

I’ve removed the module that allows users to register, although I made a few accounts first whilst testing. The last two modules, mod_mnesiaweb and mod_thriftctl are modules I wrote.

mod_mnesiaweb

To help figure out what’s going on inside of ejabberd, it’s useful to be able to easily browse the mnesia database. Yaws comes with an appmod that does this, called ymnesia. This ejabberd module will start yaws in embedded mode and run this appmod, enabling you to explore the mnesia database from a web browser.

Yaws observation: yaws didn’t appear to build ymnesia by default, I edited the Makefile in src and added “ymnesia” to the module list. Also, if ./configure fails, the package you are probably missing is libpam0g-dev

mod_mnesiaweb:

  1. % Ejabberd module that runs yaws in embedded mode,
  2. % and loads the ymnesia appmod for browsing mnesia.
  3. -module(mod_mnesiaweb).
  4. -author(‘rj@last.fm’).
  5.  
  6. -include("/usr/local/lib/yaws/include/yaws.hrl").
  7.  
  8. -behaviour(gen_mod).
  9. -export([start/2, stop/1]).
  10.  
  11. start(_Host, Opts) ->
  12.     Port = gen_mod:get_opt(port, Opts, 8001),
  13.     code:add_path("/usr/local/lib/yaws/ebin"),
  14.     application:set_env(yaws, embedded, true),
  15.     application:start(yaws),
  16.     GC = yaws_config:make_default_gconf(false,"yawstest"),
  17.     SC = #sconf{
  18.         port = Port,
  19.         servername = "ejabnesia",
  20.         listen = {0,0,0,0},
  21.         appmods = [{"showdb", ymnesia}],
  22.         docroot = "wwwroot"
  23.         },
  24.     yaws_api:setconf(GC, [[SC]]),
  25.     ok.
  26.  
  27. stop(_Host) ->
  28.     application:stop(yaws),
  29.     ok.


To compile it:
erlc -pa ${EJAB_SRC} -I ${EJAB_SRC} mod_mnesiaweb.erl
where EJAB_SRC is the ejabberd-2.X.X/src directory, after you’ve compiled from source (so the beams are there too).

Copy the resulting mod_mnesiaweb.beam to /var/lib/ejabberd/ebin so ejabberd finds it, and it should work. Hit up http://localhost:8001/showdb/ in your browser and you can explore the mnesia database.

Use the match syntax to filter tables. For example to find everyone in my roster, I use this in the input box next to roster:
{roster,{"RJ",'_', {'_','_',[]}}, '_','_','_','_','_','_','_','_'}

Not pretty, but it gets the job done. You can just view the entire table, copy a record then replace fields with ‘_’ to build queries.

mod_thriftctl

Next up I wanted to try the Erlang Thrift bindings (written by the folks at Amie St.), and expose some useful functionality for controlling the server.

If you aren’t familiar with Thrift, I recommend reading about it first. In a nutshell, you write your API using an IDL (a .thrift file) and the thrift compiler creates client libraries, and server code in various different languages. It’s an RPC mechanism, and useful in a mixed environment.

mod_thriftctl.thrift:
#!/usr/local/bin/thrift -php -erl

struct JabberUser {
1: string name,
2: string server
}

service Ejabthrift {
/* add ruser to roster of luser, and visa-versa. also routes presence to users if online */
void add_friend( 1: JabberUser luser,
2: JabberUser ruser
),

/* remove ruser from luser's roster */
void remove_friend( 1: JabberUser luser, 2: JabberUser ruser ),

/* make it look like fromuser sent a message to touser */
void spoof_message( 1: JabberUser fromuser, 2: JabberUser touser, 3: string message, 4: string subject ),
/* .. or a chat message */
void spoof_chat( 1: JabberUser fromuser, 2: JabberUser touser, 3: string message, 4: string thread ),

/* sends PEP usertune message, see http://xmpp.org/extensions/xep-0118.html */
void publish_np ( 1: JabberUser fromuser, 2: string artist, 3: string album, 4: string track, 5: i32 tracklength, 6: i32 tracknum )
}

Run that .thrift file, and you get gen-php and gen-erl directories, with php client code, and erlang files needed to build a server.

Here’s the ejabberd module, which starts a thrift server:

mod_thriftctl:

  1. %
  2. % A module to control ejabberd with a thrift interface.
  3. %
  4. -module(mod_thriftctl).
  5. -author(‘rj@last.fm’).
  6.  
  7. % ejabberd headers:
  8. -include("ejabberd.hrl").
  9. -include("mod_roster.hrl").
  10. -include("jlib.hrl").
  11.  
  12. % thrift server headers:
  13. -include("thrift.hrl").
  14. -include("transport/tSocket.hrl").
  15. -include("protocol/tBinaryProtocol.hrl").
  16. -include("server/tErlServer.hrl").
  17. -include("transport/tErlAcceptor.hrl").
  18.  
  19. % we are an ejabberd module:
  20. -behaviour(gen_mod).
  21. -export([start/2, stop/1]).
  22.  
  23. % our thrift service:
  24. -include("ejabthrift_thrift.hrl").
  25. -include("mod_thriftctl_types.hrl").
  26. -export([   add_friend/2, remove_friend/2,
  27.             spoof_message/4, spoof_chat/4,
  28.             publish_np/6
  29.         ]).
  30.  
  31. % convert thrift Jabberuser into ejabberd jid
  32. ju2jid(Jabberuser) when is_record(Jabberuser, jabberUser) ->
  33.     #jid{ user=Jabberuser#jabberUser.name, server=Jabberuser#jabberUser.server, resource="",
  34.           luser=Jabberuser#jabberUser.name, lserver=Jabberuser#jabberUser.server, lresource=""
  35.         }.
  36.  
  37. spoof_message( FromU, ToU, Msg, Subject ) ->
  38.     F = ju2jid(FromU),
  39.     T = ju2jid(ToU),
  40.     XmlBody = {xmlelement, "message",
  41.                [
  42.                 {"from", jlib:jid_to_string(F)},
  43.                 {"to", jlib:jid_to_string(T)}
  44.                ],
  45.                [
  46.                {xmlelement, "subject", [], [{xmlcdata, Subject}]},
  47.                {xmlelement, "body", [], [{xmlcdata, Msg}]}
  48.                ]
  49.               },
  50.     ejabberd_router:route(F, T, XmlBody).
  51.  
  52. spoof_chat( FromU, ToU, Msg, Thread ) ->
  53.     F = ju2jid(FromU),
  54.     T = ju2jid(ToU),
  55.     XmlBody = {xmlelement, "message",
  56.                [{"type", "chat"},
  57.                 {"from", jlib:jid_to_string(F)},
  58.                 {"to", jlib:jid_to_string(T)}
  59.                ],
  60.                [
  61.                {xmlelement, "thread", [], [{xmlcdata, Thread}]},
  62.                {xmlelement, "body", [], [{xmlcdata, Msg}]}
  63.                ]
  64.               },
  65.     ejabberd_router:route(F, T, XmlBody).
  66.  
  67. publish_np( FromU, ArtistS, AlbumS, TrackS, LengthI, TrackNumI ) ->
  68.     From = ju2jid(FromU),
  69.     % The usertune message must contain binaries, not strings or ints
  70.     FromStr     = jlib:jid_to_string(From),
  71.     Artist      = list_to_binary(ArtistS),
  72.     Album       = list_to_binary(AlbumS),
  73.     Track       = list_to_binary(TrackS),
  74.     Length      = list_to_binary(io_lib:format("~w",[LengthI])),
  75.     TrackNum    = list_to_binary(io_lib:format("~w",[TrackNumI])),
  76.     Xml = {xmlelement,"iq",
  77.                 [{"from", FromStr},
  78.                  {"type","set"},
  79.                  {"id","pub1"}],
  80.                 [{xmlcdata,<<"\n  ">>},
  81.                  {xmlelement,"pubsub",
  82.                   [{"xmlns","http://jabber.org/protocol/pubsub"}],
  83.                   [{xmlcdata,<<"\n    ">>},
  84.                    {xmlelement,"publish",
  85.                     [{"node","http://jabber.org/protocol/tune"}],
  86.                     [{xmlcdata,<<"\n      ">>},
  87.                      {xmlelement,"item",[],
  88.                       [{xmlcdata,<<"\n        ">>},
  89.                        {xmlelement,"tune",
  90.                         [{"xmlns","http://jabber.org/protocol/tune"}],
  91.                         [{xmlcdata,<<"\n          ">>},
  92.                          {xmlelement,"artist",[],
  93.                           [{xmlcdata, Artist}]},
  94.                          {xmlcdata,<<"\n          ">>},
  95.                          {xmlelement,"length",[],[{xmlcdata, Length}]},
  96.                          {xmlcdata,<<"\n          ">>},
  97.                          {xmlelement,"source",[],
  98.                           [{xmlcdata, Album}]},
  99.                          {xmlcdata,<<"\n          ">>},
  100.                          {xmlelement,"title",[],
  101.                           [{xmlcdata, Track}]},
  102.                          {xmlcdata,<<"\n          ">>},
  103.                          {xmlelement,"track",[],[{xmlcdata, TrackNum}]},
  104.                          {xmlcdata,<<"\n        ">>}]},
  105.                        {xmlcdata,<<"\n      ">>}]},
  106.                      {xmlcdata,<<"\n    ">>}]},
  107.                    {xmlcdata,<<"\n  ">>}]},
  108.                  {xmlcdata,<<"\n">>}]},
  109.     % PEP means you act as a pubsub node yourself,
  110.     % so it’s addressed to yourself and is broadcast to your friends automatically:
  111.     ejabberd_router:route(From, From, Xml),
  112.     ok.
  113.  
  114. % adds bi-directional friend relationship immediately for both users.
  115. add_friend(     #jabberUser{name=LU, server=LS},
  116.                 #jabberUser{name=RU, server=RS}) ->
  117.     AskMessage = "",
  118.     Group = "",
  119.     Subtype = both,
  120.     subscribe(LU, LS, RU, RS, RU, Group, Subtype, AskMessage),
  121.     subscribe(RU, RS, LU, LS, LU, Group, Subtype, AskMessage),
  122.     route_rosteritem(LU, LS, RU, RS, RU, Group, Subtype),
  123.     route_rosteritem(RU, RS, LU, LS, LU, Group, Subtype),
  124.     ok.
  125.  
  126. remove_friend( #jabberUser{name=LU, server=LS}, #jabberUser{name=RU, server=RS} ) ->
  127.     unsubscribe(LU, LS, RU, RS),
  128.     unsubscribe(RU, RS, LU, LS),
  129.     route_rosteritem(LU, LS, RU, RS, "", "", "remove"),
  130.     route_rosteritem(RU, RS, LU, LS, "", "", "remove"),
  131.     ok.
  132.  
  133. unsubscribe(LocalUser, LocalServer, RemoteUser, RemoteServer) ->
  134.     Key = {{LocalUser,LocalServer,{RemoteUser,RemoteServer,[]}},
  135.        {LocalUser,LocalServer}},
  136.     mnesia:transaction(fun() -> mnesia:delete(roster, Key, write) end).
  137.  
  138. route_rosteritem(LocalUser, LocalServer, RemoteUser, RemoteServer, Nick, Group, Subscription) ->
  139.     LJID = jlib:make_jid(LocalUser, LocalServer, ""),
  140.     RJID = jlib:make_jid(RemoteUser, RemoteServer, ""),
  141.     ToS = jlib:jid_to_string(LJID),
  142.     ItemJIDS = jlib:jid_to_string(RJID),
  143.     GroupXML = {xmlelement, "group", [], [{xmlcdata, Group}]},
  144.     Item = {xmlelement, "item",
  145.         [{"jid", ItemJIDS},
  146.          {"name", Nick},
  147.          {"subscription", Subscription}],
  148.         [GroupXML]},
  149.     Query = {xmlelement, "query", [{"xmlns", ?NS_ROSTER}], [Item]},
  150.     Packet = {xmlelement, "iq", [{"type", "set"}, {"to", ToS}], [Query]},
  151.     ejabberd_router:route(LJID, LJID, Packet).
  152.  
  153.  
  154. subscribe(LocalUser, LocalServer, RemoteUser, RemoteServer, Nick, Group, Subscription, Xattrs) ->
  155.     R = #roster{usj = {LocalUser,LocalServer,{RemoteUser,RemoteServer,[]}},
  156.                 us = {LocalUser,LocalServer},
  157.                 jid = {RemoteUser,RemoteServer,[]},
  158.                 name = Nick,
  159.                 subscription = Subscription, % none, to=you see him, from=he sees you, both
  160.                 ask = none, % out=send request, in=somebody requests you, none
  161.                 groups = [Group],
  162.                 askmessage = Xattrs, % example: [{"category","conference"}]
  163.                 xs = []
  164.                },
  165.     mnesia:transaction(fun() -> mnesia:write(R) end).
  166.  
  167. start(Host, Opts) ->
  168.     ?INFO("mod_ejabthrift start().",[]),
  169.     %% get options
  170.     Port = gen_mod:get_opt(port, Opts, 9000),
  171.  
  172.     spawn(fun()-> thrift:start() end),
  173.     ?INFO("mod_ejabthrift thrift:start().",[]),
  174.  
  175.     Handler   = ?MODULE,
  176.     Processor = ejabthrift_thrift,
  177.  
  178.     TF = tBufferedTransportFactory:new(),
  179.     PF = tBinaryProtocolFactory:new(),
  180.  
  181.     ServerTransport = tErlAcceptor,
  182.     ServerFlavor    = tErlServer,
  183.  
  184.     Server = oop:start_new(ServerFlavor, [Port, Handler, Processor, ServerTransport, TF, PF]),
  185.  
  186.     case ?R0(Server, effectful_serve) of
  187.     ok    ->
  188.         ?INFO("mod_ejabthrift: Thrift server (~s) listening on port ~w",[Host, Port]),
  189.         % put Server into process dictionary (needed for clean stop)
  190.         put(thrift_server_reference, Server),
  191.         ok;
  192.     Error ->
  193.         ?ERROR_MSG("mod_ejabthrift: Error starting thrift server: ~w", [Error]),
  194.         Error
  195.     end.
  196.  
  197. stop(_Host) ->
  198.     ?C0(get(thrift_server_reference), stop),
  199.     ok.


To build, first build the gen-erl code:

erlc -pa ${EJAB_SRC} -I ${EJAB_SRC} -I ${ERL_THRIFT}/include -I ./gen-erl -o ./gen-erl ./gen-erl/*.erl

Where ERL_THRIFT is the lib/erl directory from the amiethrift code, git://repo.or.cz/amiethrift.git

Then compile the module:

erlc -pa ${EJAB_SRC} -I ${EJAB_SRC} -I ${ERL_THRIFT}/include -I ./gen-erl *.erl

To install, copy all the beam files to the ejabberd ebin dir:

sudo cp *.beam gen-erl/*.beam /var/lib/ejabberd/ebin/

This is inspired by mod_xmlrpc, which is in ejabberd-modules. As you can see from the start function, that’s what it takes to start a thrift server. It’s now trivial to call into ejabberd from other languages. For example, if you started listening to a song using a flash player on the website, a php webservice could make a user tune announcement on your behalf, or spoof messages from you boasting how much you love listening to Paris Hilton.

If anyone knows where I can read about the ejabberd architecture / design, so I don’t have to piece it all together myself, please let me know.

Tags: , , , , ,

Sunday, November 23rd, 2008 programming 5 Comments

On bulk loading data into Mnesia

Consider this a work-in-progress; I will update this post if I find a ‘better’ way to do fast bulk loading

The time has come to replace my ets-based storage backend with something non-volatile. I considered a dets/ets hybrid, but I really need this to be replicated to at least a second node for HA / failover. Mnesia beckoned.

The problem:

  • 15 million [fairly simple] records
  • 1 Mnesia table: bag, disc_copies, just 1 node, 1 additional index
  • Hardware is a quad-core 2GHz CPU, 16GB Ram, 8x 74Gig 15k rpm scsi disks in RAID-6
  • Takes ages* to load and spews a load of “Mnesia is overloaded” warnings

* My definition of ‘takes ages’: Much longer than PostgreSQL \copy or MySQL LOAD DATA INFILE

At this point all I want is a quick way to bulk-load some data into a disc_copies table on a single node, so I can get on with running some tests.

Here is the table creation code:
mnesia:create_table(subscription,
[
{disc_copies, [node()]},
{attributes, record_info(fields, subscription)},
{index, [subscribee]}, %index subscribee too
{type, bag}
]
)

The subscription record is fairly simple:
{subscription, subscriber={resource, user, 123}, subscribee={resource, artist, 456}}

I’m starting erlang like so:
erl +A 128 -mnesia dir '"/home/erlang/mnesia_dir"' -boot start_sasl

The interesting thing there is really the +A 128 – this spreads the cpu load better between the 4 cores.

Attempt 0) ‘by the book’ one transaction to rule them all

Something like this:
mnesia:transaction(fun()-> [ mnesia:write(S) || S <- Subs ] end)

Time taken: Too long, I gave up after 12 hours
Number of “Mnesia overloaded” warnings: lots
Conclusion: Must be a better way
TODO: actually run this test and time it.

Attempt 1) dirty_write

There isn’t really any need to do this in a transaction, so I tried dirty_write.
[ mnesia:dirty_write(S) || S <- Subs ]

And here’s the warning in full:
=ERROR REPORT==== 13-Oct-2008::16:53:57 ===
Mnesia('mynode@myhost'): ** WARNING ** Mnesia is overloaded: {dump_log,
write_threshold}

Time taken: 890 secs
Number of “Mnesia overloaded” warnings: lots
Conclusion: Workable, but nothing to boast about. Those warnings are annoying

Attempt 2) dirty_write, defer index creation

A common trick with traditional RDBMS would be to bulk load the data into the table and add the indexes afterwards. In some scenarios you can avoid costly incremental index update operations. If you are doing this in one gigantic transaction it shouldn’t matter, and I’m not really sure how mnesia works under the hood (something I plan to rectify if I end up using it for real).
I tried a similar approach by commenting out the {index, [subscribee]} line above, doing the load, then using mnesia:add_table_index(subscriber, subscribee) afterwards to add the index once all the data was loaded. Note that mnesia was still building the primary index on the fly, but that can’t be helped.
Time taken: 883 secs (679s load + 204s index creation)
Number of “Mnesia overloaded” warnings: lots
Conclusion: Insignificant, meh

Attempt 3) mnesia:ets() trickery

This is slightly perverted, but I tried it because I was suspicious that incrementally updating the on-disk data wasn’t especially optimal. The idea is to make a ram_only table and use the mnesia:ets() function to write directly to the ets table (doesn’t get much faster than ets). The table can then be converted to disc_copies. There are caveats – to quote The Fine Manual:

Call the Fun in a raw context which is not protected by a transaction. The Mnesia function call is performed in the Fun are performed directly on the local ets tables on the assumption that the local storage type is ram_copies and the tables are not replicated to other nodes. Subscriptions are not triggered and checkpoints are not updated, but it is extremely fast.

I can live with that. I don’t mind if replication takes a while to setup when I put this into production – I’ll gladly take any optimisations I can get at this stage (testing/development).

Loading a list of subscriptions looks like this:
mnesia:ets(fun()-> [mnesia:dirty_write(S) || S <- Subs] end).
And to convert this into disc_copies once data is loaded in:
mnesia:change_table_copy_type(subscription, node(), disc_copies).

Time taken: 745 secs (699s load + 46s convert to disc_copies)
Number of “Mnesia overloaded” warnings: none!
Conclusion: Fastest yet, bit hacky

Summary

At least the ets() trick doesn’t spew a million warnings. I also need to examine the output of mnesia:dump_to_textfile and see if loading data from that format is any faster.

TODO:

  • Examine / test using the dum_to_textfile method
  • Run full transactional load and time it
  • Try similar thing with PostgreSQL

Tags: ,

Monday, October 13th, 2008 hacks, programming 7 Comments