erlang如何写port驱动与外面世界通讯
鉴于很多同学在帖子里面询问erlang与外面世界交互的port如何写的问题, david king很早前就写了篇文章详细地解决了这个问题,而且很有深度。请参见
http://www.ketralnis.com/roller/dking/entry/20070903
Supervision Trees and Ports in Erlang
08:31PM Sep 03, 2007 in category Erlang by David King
Erlang provides a whole lot of infrastructure for doing tasks commonly associated with building giant fault-tolerant systems. But what about when you have to talk to an external non-Erlang program? Can we make our communication to that program fault-tolerant using only native Erlang/OTP components?
The problem
I'm writing a blogging engine using Erlyweb and Mnesia, and one of my tasks is to pass the user's blog entry through a formatter. I use two: one for Markdown, and one to sanitise for XSS attacks. Since there's no Markdown or HTML-sanitiser implementation for Erlang, we need to communicate with the Perl implementations (markdown, HTML-TagFilter).
We'll want some fault-tolerance (in case one of our external programs crash, we want to re-launch it), and we don't want to introduce a bottleneck by serialising access to our external programs, so we'll launch several copies of it to spread the load and allow some concurrency. In addition, we'll want to leave room to move all of the processing to its own node in the event that our blog gets very popular.
We'll communicate using an Erlang Port. A Port is a connection to an external program, implemented using Unix pipes. So our external program just has to know how to read/write to stdout/stdin. We're going to keep this all pretty generic to the actual external program. You should be able to do the same for any other program that can take data in length-encoded packets (or can be wrapped in one that does, like our Perl wrapper) for which you want to launch mutliple workers.
To launch our workers, we'll use Erlang's gen_server interface, which is a way to write a generic server without having to write the implementation details of the server, keeping state, message-handling and responding, etc. Instead, you write a gen_server with a set of callbacks, and all you write is the code to actually handle the messages that you want to respond to. Bascially, gen_server is a module that can take another module with appropriate callbacks and handle all of the tasks of making it a server that looks more or less like (psuedo-code)
loop(State) ->
Message=receive Some_Message
if Message is an internal message, like shutdown or startup or code-replacement
let gen_server handle it
loop(State)
else it's a message that the programmer wants to respond to
pass it to the programmer, let them handle it, get new state
loop(NewState)
Our gen_server will have to keep track of its connection to our external program, and on request it will have to send data to the external program and retrieve the result. It will also keep track of its process group (pg2) and adding itself to it
Erlang does something similar for supervisors. A supervisor is a process whose entire job is to watch another process or set of processes. In our case, we just want it to make sure that our external processes are up and running, and to re-launch them in case they crash, and to bring up all of our workers at once so that we don't have to launch them individually. We'll have one supervisor for Markdown which will watch all of our Markdown gen_servers, and another for all of our sanitiser gen_servers. To make managing them easier, we'll use another supervisor that will watch those two supervisors, re-launching them if they crash, and enabling us to bring both up, and all of the associated gen_servers, at once. So the plan is to have a formatter supervisor that will launch a markdown supervisor and a sanitiser supervisor, each of which will launch N workers, like this:
I'll show you working code, and try to explain the fuzzy bits, but I encourage you to research the functions that compose it in Erlang's own docs so that you really know what is going on.
external_program.pl
Since all of this relies on an actual external program, here's a shell to use for a Perl program:
1 #!/usr/bin/env perl -w
2
3 # turn on auto-flushing
4 $cfh = select (STDOUT);
5 $| = 1;
6 select ($cfh);
7
8 sub process {
9 return $_[0];
10 }
11
12 until(eof(STDIN)) {
13 my ($length_bytes,$length,$data,$return);
14
15 read STDIN,$length_bytes,4; # four-byte length-header
16 $length=unpack("N",$length_bytes);
17
18 read STDIN,$data,$length;
19
20 $return=&process($data);
21
22 print STDOUT pack("N",length($return));
23 print STDOUT $return;
24 }
It communicates with Erlang via stdin/stdout, and handles packets with 4-byte length headers (with network byte-order, since that's how Erlang sends it). That is just a stub program that returns the data coming in to it, but you can replace process with whatever you want to process/format the data. I have two versions of this, one that applies markdown, and one that sanitises input with HTML-TagFilter.
external_program.erl
Now the for the Erlang bits. First, we'll construct the gen_server to actually communicate with the Perl program. We'll communicate using a Port (which uses a unix pipe). We want to send data in atomic packets so that the Perl program knows when we're done sending data and so that we know when the Perl program is done responding. To do this, we'll send and receive data in packets of up to 232 bytes. That is, we'll send our Perl program a four-byte header (in network byte-order) indicating how much data is to be sent, then we'll send the data, and Perl will send back a four-byte header indicating how much data it is about to send, and then it will send that data. Erlang will handle the packets and headers on its side with the {packet,4} option to open_port/2, but we'll have to handle the packetising ourselves from the Perl side (already done in the above Perl program). This one is pretty generic, so you could use it to communicate with any outside program that can send data in packets, not just a formatter. Here's the code:
1 -module(external_program).
2 -behaviour(gen_server).
3
4 % API
5 -export([start_link/3,
6 filter/2]).
7
8 % gen_server callbacks
9 -export([init/1,
10 handle_call/3,
11 handle_cast/2,
12 handle_info/2,
13 code_change/3,
14 terminate/2]).
15
16 -record(state,{port}).
17
18 start_link(Type,Id,ExtProg) ->
19 gen_server:start_link({local,Id},?MODULE,{Type,ExtProg},_Options=[]).
20
21 handle_info({'EXIT', Port, Reason}, #state{port = Port} = State) ->
22 {stop, {port_terminated, Reason}, State}.
23 terminate({port_terminated, _Reason}, _State) ->
24 ok;
25 terminate(_Reason, #state{port = Port} = _State) ->
26 port_close(Port).
27 handle_cast(_Msg, State) ->
28 {noreply, State}.
29 code_change(_OldVsn, State, _Extra) ->
30 {ok, State}.
31
32 init({Type,ExtProg}) ->
33 process_flag(trap_exit, true),
34 ok=pg2:create(Type), % idempotent
35 ok=pg2:join(Type,self()),
36 Port = open_port({spawn, ExtProg}, [binary,{packet,4}]),
37 {ok, #state{port = Port}}.
38
39 handle_call(_Command={filter,Data},_From,#state{port=Port}=State) ->
40 port_command(Port,Data),
41 receive {Port,{data,Data2}} ->
42 {reply,Data2,State}
43 end.
44
45 filter(Type,Data) ->
46 gen_server:call(pg2:get_closest_pid(Type),{filter,Data}).
And here's the blow-by-blow. Skip ahead to externalprogram_supervisor.erl if you understand it. First, we start our module with
2 -behaviour(gen_server)
This tells the compiler to warn us if we forget any functions for call-back that gen_server is expecting to find. I won't introduce the exports here, I'll introduce the functions we come to them.
We use a record to encode our state, even though the only item in our state is the connection to the external program
16 -record(state,{port}).
We do this in case we later decide to add anything to the state, it will save us from modifying all of the function headers that refer to the state. Remember that our entire state between calls must go into this record, so if we want to later add information like the time the program was launched (for statistical tracking), or the system user that launched it, this is where it goes.
start_link/3 is how we'll start our gen_server. It's the function that we call from another module (our supervisor), not one called as a gen_server callback, so it can look however we want, but our supervisor is expecting it to return {ok,Pid}.
18 start_link(Type,Id,ExtProg) ->
19 gen_server:start_link({local,Id},?MODULE,{Type,ExtProg},_Options=[]).
It calls gen_server:start_link/4, which takes:
* {local,Id}: This is how gen_server will register the PID of the server. local means that it will only be registered on the local node, and Id is an atom passed to us by our supervisor that will appear in process listings. We don't use this anywhere, it's just nice to have for viewing in appmon and other tools, and it's optional (gen_server:start_link/3 takes only the other three arguments)
* the call-back module (i.e. this module)
* the arguments to the gen_server's init/1 function (which we'll get to)
* any special options to gen_server (we don't use any)
It returns {ok,Pid}, where Pid is the Pid of the launched process. While we used gen_server:start_link/4, which takes registered name for the gen_server, we don't use it to address the gen_server. Instead, we're going to use the pg2 group name that we set in init/1 below. This is because we're going to launch several copies of this gen_server, so communicating directly with it isn't practical.
handle_info/2 is a callback function that is called whenever our server receives a message that gen_server doesn't understand.
21 handle_info({'EXIT', Port, Reason}, #state{port = Port} = State) ->
22 {stop, {port_terminated, Reason}, State}.
The only message that we handle is the Port indicating that it has shut down (for instance, if it crashes). In this case, there's nothing that we can do anymore (since our whole reason for being is to communicate with this Port), so we signal to our caller that we intend to shut down, and gen_server will shut us down, which includes calling our terminate/2 function and do other shutdown tasks.
terminate/2 is a callback function that gen_server calls whenever it is shutting us down for any reason so that we can free up any resources that we have (like our Port). It passes us the Reason for shutdown and the last State of the server
23 terminate({port_terminated, _Reason}, _State) ->
24 ok;
25 terminate(_Reason, #state{port = Port} = _State) ->
26 port_close(Port).
We have two clauses to handle a shutdown. If we are shutting down because our port terminated (because handle_info said so), then we just let gen_server finish shutting us down. If we shut down for any other reason, then we close the Port before we die so that it's not left open forever. terminate's return value is ignored.
handle_cast/2 handles asynchronous messages to our server
27 handle_cast(_Msg, State) ->
28 {noreply, State}.
gen_servers can handle two types of in-band messages: synchronous messages generated by calls to gen_server:call/2, and asynchronous messages generated by gen_server:cast/2. (gen_server handles the difference.) Since we're only handling synchronous messages, we just don't return a reply if we receive any asynchronous messages, but if you really wanted you could make a synchronous call and send it as a response
code_change/3 is called when our code is being dynamically replaced
29 code_change(_OldVsn, State, _Extra) ->
30 {ok, State}.
Erlang supports replacing code while the server is running, with no downtime. In the event that the code is replaced, a gen_server temporarily stops responding to messages (enqueueing them as they come in), waits for the code to be replaced, calls the call-back module's new code_change function, picks up the new state, and starts running again. This could be used if we were to upgrade our code to one that required a new member of the state record, for instance, to upgrade the running state to the state that the new version uses. (If the function returns anything but {ok,NewState}, the gen_server crashes, which would be fine in our case since our supervisor would just restart it under the new code anyway.) Unless we plan to convert the state of the server, which for now we don't, we just return {ok,State}
init/1 is called by gen_server just before the server-loop is run.
32 init({Type,ExtProg}) ->
33 process_flag(trap_exit, true),
34 ok=pg2:create(Type), % idempotent
35 ok=pg2:join(Type,self()),
36 Port = open_port({spawn, ExtProg}, [binary,{packet,4}]),
37 {ok, #state{port = Port}}.
Its argument comes from gen_server:start_link, which we called in our own start_link/3 function, and is our Type (which becomes our pg2 group name) and the path to the external program that we want to launch (the Perl programs mentioned above). We first indicate that we want to trap exits, so that we can receive 'EXIT' messages from our Port in case it dies. These messages are not recognised by gen_server, so they get passed to our handle_info/2 callback function. Then we create our pg2 group (this operation is idempotent, which means that doing it several times has the same effect as doing it once), and join this process to the group. Then we open up the external program itself using open_port. It takes a tuple of {Type,Path}, which we use to indicate that we are launching an external program (the spawn type) and where to find it, and a list of options, which we use to tell it that we want to communicate using binaries instead of lists, and that we want it to handle communication to our Port with packets with a length-header of four bytes. Then we tell gen_server that all is well and to start the server with a state record containing a reference to our Port.
handle_call/2 is called whenever we receive a synchronous request from an outside caller, this is the actual guts of our server.
39 handle_call(_Command={filter,Data},_From,#state{port=Port}=State) ->
40 port_command(Port,Data),
41 receive {Port,{data,Data2}} ->
42 {reply,Data2,State}
43 end.
We just receive the command (which is a {filter,Data} tuple; it doesn't have to be, but that's how we're going to call it later), send it to the Port, get the reply from the Port, and tell gen_server to reply that back to the caller. We didn't have to write any of the server code, just the implementation of our code.
Finally, here's how we actually communicate with our server after it's launched:
45 filter(Type,Data) ->
46 gen_server:call(pg2:get_closest_pid(Type),{filter,Data}).
We make a synchronous call to the "closest" Pid that is a member of the pg2 group that our programs join when launching. ("Closest" just means that pg2 will first look on the local node before looking on other nodes.) pg2:get_closest_pid randomises which worker it returns within a group, so this should spread the load if we get many simultaneous requests. This could be expanded to try again in the event of a timeout (assuming that the timed-out server will crash and be re-launched, or that pg2:get_closest_pid will return a different Pid next time)
At this point you should be able to create and call a gen_server
externalprogram_supervisor.erl
Now for the supervisor. Again, we'll keep this generic. It handles multiple external_program workers, launching NumWorkers (passed to start_link/3). We'll use the (aptly named) supervisor behaviour. supervisor launches a group of processes, watches them, and re-launches them if they die. It also provides a way to bring up or down a group of processes all at once, just by launching or shutting down the supervisor. I'm going to explain the functions of order for clarity.
1 -module(externalprogram_supervisor).
2 -behavior(supervisor).
3
4 % API
5 -export([start_link/3]).
6
7 % supervisor callbacks
8 -export([init/1]).
9
10 start_link(Type,NumWorkers,ExtProg) ->
11 supervisor:start_link({local,
12 list_to_atom(atom_to_list(Type) ++ "_supervisor")},
13 ?MODULE,
14 {Type,NumWorkers,ExtProg}).
15
16 init({Type, NumWorkers,ExtProg}) ->
17 {ok,
18 {{one_for_one,
19 3,
20 10},
21 [begin
22 Id=list_to_atom(atom_to_list(Type) ++ integer_to_list(Which)),
23 {Id,
24 {external_program,
25 start_link,
26 [Type,Id,ExtProg]},
27 permanent,
28 _Timeout=10*1000,
29 worker,
30 [external_program]}
31 end
32 || Which <- lists:seq(1,NumWorkers)]}}.
33
Again, -behaviour(supervisor). tells the compiler to warn us if we forget any callback functions required by supervisor.
supervisor only has one callback, init/1:
16 init({Type, NumWorkers,ExtProg}) ->
17 {ok,
18 {{one_for_one,
19 3,
20 10},
21 [begin
22 Id=list_to_atom(atom_to_list(Type) ++ integer_to_list(Which)),
23 {Id,
24 {external_program,
25 start_link,
26 [Type,Id,ExtProg]},
27 permanent,
28 _Timeout=10*1000,
29 worker,
30 [external_program]}
31 end
32 || Which <- lists:seq(1,NumWorkers)]}}.
It's only called once when the supervisor is created (or re-started after crashing). It gets its arguments from supervisor:start_link, which we call from our local start_link (that we'll explain in a moment). init is expected to return a restart strategy and a list of Childspecs, where a Childspec looks like (straight from the Erlang supervisor documentation):
{Id,StartFunc,Restart,Shutdown,Type,Modules},
Id = term()
StartFunc = {M,F,A}
M = F = atom()
A = [term()]
Restart = permanent | transient | temporary
Shutdown = brutal_kill | int()>=0 | infinity
Type = worker | supervisor
Modules = [Module] | dynamic
Module = atom()
You can read up on what all of that means in the supervisor documentation, but we return a list of them consisting of NumWorkers items (see the list-comprehension) such that each is launched by calling external_program:start_link/3 (specified by the StartFunc in the Childspec), passing each worker their Id, Type and ExtProg. The Id just an atom that we generate from the Type and the worker-number (which is just determined by their launch-order) that you'll recall being passed to gen_server to register the worker's PID so that in debug listings we can see markdown3 instead of <0.15.3>
The supervisor itself is created by calling our local start_link/3, which calls supervisor:start_link/2 (which actually creates the supervisor, calls our init, etc). This supervisor will be called from formatter_supervisor, defined below, but you could call it done at this point and have a working implementation of an N-worker external program.
1> externalprogram_supervisor:start_link(my_server,5,"/usr/local/bin/external_program.pl").
<0.1976.0>
2> pg2:get_members(my_server).
[<0.1981.0>,<0.1980.0>,<0.1979.0>,<0.1978.0>,<0.1977.0>]
3> gen_server:call(pg2:get_closest_pid(my_server),{filter,<<"This is some random text!">>}).
<<"This is some random text!">>
formatter_supervisor.erl
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
再温习一下,呵呵
我这里也有一个书上的例子
先是c的driver部分
erl_common.c 实现marshal/demarshal
#include <io.h>
#else
#include <unistd.h>
#endif
typedef unsigned char byte;
int read_cmd(byte *buf);
int write_cmd(byte *buf, int len);
int read_exact(byte *buf, int len);
int write_exact(byte *buf, int len);
int read_cmd(byte *buf) {
int len;
if(read_exact(buf, 2) != 2)
return -1;
len = (buf[0] << 8) | buf[1];
return read_exact(buf, len);
}
int write_cmd(byte *buf, int len) {
byte li;
li = (len >> 8) & 0xff;
write_exact(&li, 1);
li = len & 0xff;
write_exact(&li, 1);
return write_exact(buf, len);
}
int read_exact(byte *buf, int len) {
int i, got = 0;
do{
if((i = read(0, buf + got, len - got)) <= 0)
return i;
got += i;
} while(got < len);
return len;
}
int write_exact(byte *buf, int len) {
int i, wrote = 0;
do {
if((i = write(1, buf + wrote, len - wrote)) <= 0)
return i;
wrote += i;
} while(wrote < len);
return len;
}
Our original goal was to have N workers running for two external programs. So far all of the code we've written has been generic to the external program, but now we're going to get into (very slightly) more specific stuff. The formatter_supervisor will be a supervisor that will launch and watch two supervisors, one for Markdown and one for our santiser. They'll both run the same externalprogram_supervisor code, they'll just be launched with different arguments.
1 -module(formatter_supervisor).
2 -behavior(supervisor).
3
4 % External exports
5 -export([start_link/0]).
6
7 % supervisor callbacks
8 -export([init/1]).
9
10 start_link() ->
11 supervisor:start_link(?MODULE, '_').
12
13 % formatters are assumed to create pg2s registered under their
14 % type-names (at the moment, markdown and sanitiser)
15 init(_Args) ->
16 {ok,
17 {{one_for_one,
18 3,
19 10},
20 [{markdown_supervisor,
21 {externalprogram_supervisor,
22 start_link,
23 [markdown,
24 myapp_config:markdown_workers(),
25 myapp_config:markdown_program()]},
26 permanent,
27 _Timeout1=infinity,
28 supervisor,
29 [externalprogram_supervisor]},
30 {sanitiser_supervisor,
31 {externalprogram_supervisor,
32 start_link,
33 [sanitiser,
34 myapp_config:sanitiser_workers(),
35 myapp_config:sanitiser_program()]},
36 permanent,
37 _Timeout2=infinity,
38 supervisor,
39 [externalprogram_supervisor]}]}}.
That looks almost the same as our externalprogram_supervisor, except that the ChildSpecs returned by init are different. We don't have a local version of start_worker_link, we just call externalprogram_supervisor:create_link (since there's nothing we need to do with it after launching it). The ChildSpec list looks like this:
20 [{markdown_supervisor,
21 {externalprogram_supervisor,
22 start_link,
23 [markdown,
24 myapp_config:markdown_workers(),
25 myapp_config:markdown_program()]},
26 permanent,
27 _Timeout1=infinity,
28 supervisor,
29 [externalprogram_supervisor]},
30 {sanitiser_supervisor,
31 {externalprogram_supervisor,
32 start_link,
33 [sanitiser,
34 myapp_config:sanitiser_workers(),
35 myapp_config:sanitiser_program()]},
36 permanent,
37 _Timeout2=infinity,
38 supervisor,
39 [externalprogram_supervisor]}]}}.
Nothing too special there, except that we pass in the Type atom to externalprogram_supervisor:start_link (markdown or sanitiser), which becomes the name of the pg2 group. The shutdown timeout is set to infinity for supervisors of supervisors (as recommended by the Erlang docs) to give the indirect children all time to shut down, and we've indicated that the worker type is supervisor, so that Erlang knows that we're building a supervision tree. Of course, the arguments to externalprogram_supervisor:start_link assume that myapp_config:markdown_workers/0 and friends have been defined, but you could just replace those with literal values, too. myapp_config:markdown_workers/0 is assumed to return an integer (in my case, five), which is how many copies of the program you want to launch, and myapp_config:markdown_program/0 returns the full path to the program to launch (like "/usr/local/myapp/lib/markdown.pl"). formatter_supervisor is the supervisor that will be started from my application's main supervisor.
format.erl
The actual communication with the programs should be pretty easy now, we just ask external_program to do its magic. Here it is:
1 -module(format).
2
3 % formatters are defined and launched by the formatter_supervisor
4
5 -export([markdown/1,
6 sanitise/1]).
7
8 markdown(Data) ->
9 external_program:filter(markdown,Data).
10
11 sanitise(Data) ->
12 external_program:filter(sanitiser,Data).
It should work with lists or a binaries. Add a use Text::Markdown; to the top of our Perl module module (at the beginning of this entry) and replace its process function with:
10 sub process {
11 return Text::Markdown::markdown($_[0]);
12 }
Launch our formatter_supervisor with formatter_supervisor:start_link() and now we have a working Markdown implementation:
(name@node)106> format:markdown("I *really* like cookies").
<<"<p>I <em>really<em> like cookies<p>n">>