Archive for September, 2008
Transcoding HTTP mp3 streaming proxy in bash
Here’s how to make a proxy for streaming mp3s. It transcodes on-the-fly to 64kpbs MP3 using lame. When transcoding is finished, it calls the ./posthandler.sh script, which can either just delete the file, or potentially archive it so you don’t need to transcode it again.
-
#!/bin/bash
-
read method url version
-
-
method="${method%$CR}"
-
url="${url%$CR}"
-
version="${version%$CR}"
-
-
echo -ne "HTTP/1.0 200 OK\r\nContent-type: audio/mpeg\r\n\r\n"
-
-
BR=64 #birate to transcode to.
-
PIPE="/tmp/$$.pipe"
-
mkfifo "$PIPE"
-
-
OUTFILE="./tmp.$$.$BR.mp3"
-
rm $OUTFILE
-
url=`echo "$url" | sed ’s/\///’`
-
echo "** GET $url" >&2
-
-
nohup lynx –source "$url" \
-
| (lame –preset cbr $BR –mp3input – - 2>/dev/null \
-
&& (echo "** Finished transcoding $url" >&2 ; \
-
./posthandler.sh "$OUTFILE"&))\
-
| tee -i "$PIPE" > $OUTFILE &
-
-
cat < $PIPE
-
rm $PIPE
One interesting limitation seems to be the buffer size of a fifo pipe in linux. Even though the transcoding step is pretty quick, if a client is connected the transcoding only manages to fill the pipe a couple of hundred k ahead of what is being read.
The -i flag to `tee` means it ignores interrupts, and will finish transcoding the file and call the posthandler even if the client disconnects.
Run is like this:
while [ 1 ]; do nc -vlp 8080 -c './transstreamer.sh' ; done
Then hit up a url of your choice using your awesome new proxy:
mpg321 "http://localhost:8080/http://freedownloads.last.fm/download/105468518/Letters%2BFrom%2BThe%2BBoatman.mp3"
Not the most scalable solution, but a mildly amusing quick hack.
Erlang libketama driver – Consistent Hashing
All the data I need from memcached is assigned to servers using a consistent hashing mechanism, implemented as libketama – a shared library written in C. We use a php extension to wrap this, and also have a pure java implementation. Rather than port the algorithm to Erlang, I wrote a an Erlang driver.
There are 3 things covered here:
- A small driver program written in C (using libketama)
- Some basic testing from the shell using Perl and xxd
- The Erlang gen_server that calls it
C driver program
-
/* Expects a one-byte length header, followed by a key (<255bytes)
-
* Returns an ip:port string with 1 byte len header
-
*
-
*/
-
#include <ketama.h>
-
#include <stdio.h>
-
#include <stdlib.h>
-
#include <unistd.h>
-
#include <string.h>
-
-
typedef unsigned char byte;
-
-
int read_exact(byte *buf, int len)
-
{
-
int i, got = 0;
-
do {
-
if((i=read(0,buf+got, len-got))<=0) return i;
-
got += i;
-
} while(got<len);
-
return len;
-
}
-
-
int main(int argc, char **argv)
-
{
-
if(argc==1){
-
return 1;
-
}
-
-
ketama_continuum c;
-
ketama_roll( &c, *++argv );
-
mcs *m;
-
-
byte len;
-
byte buffer[256];
-
while ( 1 ) {
-
if( 1 != read_exact(&len, 1) ) break;
-
if( (int)len >= 255 ) break;
-
read_exact((byte *)&buffer, (int)len);
-
buffer[len] = ‘\0‘;
-
m = ketama_get_server( (char *) &buffer, c );
-
sprintf((char *)&buffer, "%s",m->ip);
-
int respleni = strlen(m->ip);
-
char l = (0xff & respleni);
-
write(1, &l, 1);
-
write(1, (char*)&buffer, respleni);
-
}
-
-
return 0;
-
}
Testing the driver with Perl and xxd
Before writing the Erlang bit, it’d be nice to know the driver program does what we expect. Will send the driver a 1-byte length header followed by the key, and expect a 1-byte length header and the value as a response. Say we’re hashing a memcached key ‘user:123′ to a server, we can do what the Erlang port does with a bit of perl, and the ‘xxd’ command to see output in binary.
perl -e '$key="user:123"; $len=pack("C",length($key)); print $len; print $key;' | xxd -b
0000000: 00001000 01110101 01110011 01100101 01110010 00111010 .user: 0000006: 00110001 00110010 00110011 123
Note the first byte (00001000) printed before the key is the length of the key, 8. Now let’s send this to the driver program and check the response (provide a valid ketama.servers file):
perl -e '$key="user:123"; $len=pack("C",length($key)); print $len; print $key;' | ./ketama_erlang_driver /var/ketama.servers | xxd -b
0000000: 00010000 00110001 00110000 00101110 00110000 00101110 .10.0. 0000006: 00110001 00101110 00110001 00110001 00111000 00111010 1.118: 000000c: 00110001 00110001 00110010 00110001 00110001 11211
The first byte of the response (00010000) is 16, which is the length of the server address returned by the driver, “10.0.1.118:11211″ – It does what we expect, onwards…
The Erlang bit
-
-module(ketama).
-
-behaviour(gen_server).
-
-export([start_link/0, start_link/1, start_link/2, getserver/1]).
-
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-
terminate/2, code_change/3]).
-
-
-record(state, {port}).
-
-
start_link() ->
-
start_link("/web/site/GLOBAL/ketama.servers").
-
-
start_link(ServersFile) ->
-
start_link(ServersFile, "/usr/bin/ketama_erlang_driver").
-
-
start_link(ServersFile, BinPath) ->
-
gen_server:start_link({local, ?MODULE}, ?MODULE, [ServersFile, BinPath], []).
-
-
getserver(Key) ->
-
gen_server:call(?MODULE, {getserver, Key}).
-
-
%%
-
-
init([ServersFile, BinPath]) ->
-
Exe = BinPath ++ " " ++ ServersFile,
-
Port = open_port({spawn, Exe}, [binary, {packet, 1}, use_stdio]),
-
{ok, #state{port=Port}}.
-
-
handle_call({getserver, Key}, _From, #state{port=Port} = State) ->
-
Port ! {self(), {command, Key}},
-
receive
-
{Port, {data, Data}} ->
-
{reply, Data, State}
-
after 1000 -> % if it takes this long, you have serious issues.
-
{stop, ketama_port_timeout, State}
-
end.
-
-
handle_cast(_Msg, State) -> {noreply, State}.
-
handle_info({‘EXIT’, Port, Reason}, #state{port = Port} = State) ->
-
{stop, {port_terminated, Reason}, State}.
-
terminate({port_terminated, _Reason}, _State) -> ok;
-
terminate(_Reason, #state{port = Port} = _State) -> port_close(Port).
-
code_change(_OldVsn, State, _Extra) -> {ok, State}.
This code can be found in the erlang directory of the ketama source in svn:
svn://svn.audioscrobbler.net/misc/ketama/
Reading Serialized PHP Objects from Erlang
I started writing some Erlang recently. The vast majority of data I need to access from Erlang resides in cached, serialized php objects. Here’s what I came up with to turn a serialized php object into a sort of nested Erlang proplist thing.
This gives:
a:4:{i:0;i:123;i:1;s:5:"hello";i:2;d:3.14;i:3;a:2:{s:1:"a";s:3:"foo";s:1:"b";s:3:"bar";}}
It’s not hard to see how the (relatively undocumented) PHP serialization format works. Here’s what it becomes in Erlang:
1> php:unserialize("a:4:{i:0;i:123;i:1;s:5:\"hello\";i:2;d:3.14;i:3;a:2:{s:1:\"a\";s:3:\"foo\";s:1:\"b\";s:3:\"bar\";}}").
{[[{0,123},
{1,<<"hello">>},
{2,3.14},
{3,[{a,<<"foo">>},{b,<<"bar">>}]}]],
[]}
Here’s what it does with objects:
-
<?php
-
class ExampleClass {
-
var $id = 123;
-
var $name = "RJ";
-
}
-
$s = new ExampleClass();
-
?>
2> php:unserialize("O:12:\"ExampleClass\":3:{s:2:\"id\";i:123;s:4:\"name\";s:2:\"RJ\";s:9:\"languages\";a:3:{i:0;s:3:\"php\";i:1;s:6:\"erlang\";i:2;s:3:\"etc\";}}").
{[{class,"ExampleClass",
[{id,123},
{name,<<"RJ">>},
{languages,[{0,<<"php">>},
{1,<<"erlang">>},
{2,<<"etc">>}]}]}],
[]}
Due to a combination of PHP’s “relaxed” type system, an old database abstraction library, and munging things in and out of memcached, we sometimes end up with numeric properties, such as ‘id’, represented as strings by PHP. To mitigate this, I ended up with some nasty code that forces certain properties to a predefined type (”id” is always an int, etc..). Yuk. Anyway, here’s the Erlang module:
-
%
-
% Takes a serialized php object and turns it into an erlang data structure
-
%
-
-module(php).
-
-author(‘Richard Jones <rj at last.fm>’).
-
-export([unserialize/1]).
-
-
% Usage: {Result, Leftover} = php:unserialize(…)
-
-
unserialize(S) when is_binary(S) -> unserialize(binary_to_list(S));
-
unserialize(S) when is_list(S) -> takeval(S, 1).
-
-
% Internal stuff
-
-
takeval(Str, Num) ->
-
{Parsed, Remains} = takeval(Str, Num, []),
-
{ lists:reverse(Parsed), Remains }.
-
-
takeval([$} | Leftover], 0, Acc) -> {Acc, Leftover};
-
takeval(Str, 0, Acc) -> {Acc, Str};
-
takeval([], 0, Acc) -> Acc;
-
-
takeval(Str, Num, Acc) ->
-
{Val, Rest} = phpval(Str),
-
%Lots of tracing if you enable this:
-
%io:format("\nState\n Str: ~s\n Num: ~w\n Acc:~w\n", [Str,Num,Acc]),
-
%io:format("-Val: ~w\n-Rest: ~s\n\n",[Val, Rest]),
-
takeval(Rest, Num-1, [Val | Acc]).
-
-
%
-
% Parse induvidual php values.
-
% a "phpval" here is T:val; where T is the type code for int, object, array etc..
-
%
-
-
% Simple ones:
-
phpval([]) -> [];
-
phpval([ $} | Rest ]) -> phpval(Rest); % skip }
-
phpval([$N,$;|Rest]) -> {null, Rest}; % null
-
phpval([$b,$:,$1,$; | Rest]) -> {true, Rest}; % true
-
phpval([$b,$:,$0,$; | Rest]) -> {false, Rest}; % false
-
-
% r seems to be a recursive reference to something, represented as an int.
-
phpval([$r, $: | Rest]) ->
-
{RefNum, [$; | Rest1]} = string:to_integer(Rest),
-
{{php_ref, RefNum}, Rest1};
-
-
% int
-
phpval([$i, $: | Rest])->
-
{Num, [$; | Rest1]} = string:to_integer(Rest),
-
{Num, Rest1};
-
-
% double / float
-
% NB: php floats can be ints, and string:to_float doesn’t like that.
-
phpval(X=[$d, $: | Rest]) ->
-
{Num, [$; | Rest1]} = case string:to_float(Rest) of
-
{error, no_float} -> string:to_integer(Rest);
-
{N,R} -> {N,R}
-
end,
-
{Num, Rest1};
-
-
% string
-
phpval([$s, $: | Rest]) ->
-
{Len, [$: | Rest1]} =string:to_integer(Rest),
-
S = list_to_binary(string:sub_string(Rest1, 2, Len+1)),
-
{S, lists:nthtail(Len+3, Rest1)};
-
-
% array
-
phpval([$a, $: | Rest]) ->
-
{NumEntries, [$:, ${ | Rest1]} =string:to_integer(Rest),
-
{Array, Rest2} = takeval(Rest1, NumEntries*2),
-
{arraytidy(Array), Rest2};
-
-
% object O:4:\"User\":53:{
-
phpval([$O, $: | Rest]) ->
-
{ClassnameLen, [$: | Rest1]} =string:to_integer(Rest),
-
% Rest1: "classname":NumEnt:{..
-
Classname = string:sub_string(Rest1, 2, ClassnameLen+1),
-
Rest1b = lists:nthtail(ClassnameLen+3, Rest1),
-
{NumEntries, [$:, ${ | Rest2]} = string:to_integer(Rest1b),
-
{Classvals, Rest3} = takeval(Rest2, NumEntries*2),
-
{{class, Classname, arraytidy(Classvals)}, Rest3}.
-
-
%%
-
%% Helpers:
-
%%
-
-
% convert [ k1,v1,k2,v2,k3,v3 ] into [ {k1,v2}, {k2,v2}, {k3,v3} ]
-
arraytidy(L) ->
-
lists:reverse(lists:foldl(fun arraytidy/2, [], L)).
-
-
arraytidy(El, [{key___partial, K} | L]) -> [{atomize(K), El} | L];
-
-
arraytidy(El, L) -> [{key___partial, El} | L].
-
-
%% Make properties or keys into atoms
-
atomize(K) when is_binary(K) ->
-
atomize(binary_to_list(K));
-
atomize(K) when is_list(K) ->
-
list_to_atom(string:to_lower(K));
-
atomize(K) -> K.
About Me
Tags
Recent Posts
- Rewriting Playdar: C++ to Erlang, massive savings
- Erlang talk at London Hackspace
- Anti-RDBMS: A list of distributed key-value stores
- How we use IRC at Last.fm
- Getting to know ejabberd and writing modules
- ssh hack: connect directly to machine via a firewall box
- A Million-user Comet Application with Mochiweb, Part 3
- A Million-user Comet Application with Mochiweb, Part 2
- A Million-user Comet Application with Mochiweb, Part 1
- On bulk loading data into Mnesia