使用 OTP 原理的非阻塞 TCP 服务器

发布于 2024-11-10 02:38:08 字数 212 浏览 3 评论 0原文

我开始学习 Erlang,所以我尝试写“hello, world!”并发编程,IRC 机器人。

我已经使用 Erlang 编写了一个,没有任何 OTP 细节(管理程序、应用程序等行为)。我希望使用 OTP 原则重写它,但不幸的是我无法找出使用 OTP 进行套接字编程的“正确”方法。

似乎唯一合理的方法是手动创建另一个进程并将其链接到主管,但肯定有人在某个地方之前已经这样做过。

I'm starting to learn Erlang, so I'm trying to write the "hello, world!" of concurrent programming, an IRC bot.

I've already written one using Erlang without any OTP niceties (supervisor, application, etc. behaviours). I'm looking to rewrite it using OTP principles but unfortunately I can't figure out the "right" way to do socket programming with OTP.

It seems the only reasonable way is to create another process manually and link it to the supervisor, but surely someone, somewhere, has done this before.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

听风念你 2024-11-17 02:38:09

很高兴您已经开始学习 Erlang/OTP!

以下资源非常有用:

  • OTP 设计原则。如果您还没有阅读过,请仔细阅读。请注意 OTP 是面向对象 (OO) 的常见误解:事实并非如此!忘记关于“继承”的一切。仅仅通过“扩展”标准模块来构建完整的系统是不可能的。
  • 消息系统
    <块引用>

    必须使用这些函数来实现进程对系统消息的使用

  • 特殊流程。特殊流程是符合 OTP 标准的流程,可以与主管很好地集成。

这是我的项目中的一些代码。我也是一名Erlang学习者,所以请不要太相信代码。

-module(gen_tcpserver).

%% Public API
-export([start_link/2]).

%% start_link reference
-export([init/2]).

%% System internal API
-export([system_continue/3, system_terminate/4, system_code_change/4]).

-define(ACCEPT_TIMEOUT, 250).

-record(server_state, {socket=undefined,
                       args,
                       func}).

%% ListenArgs are given to gen_tcp:listen
%% AcceptFun(Socket) -> ok, blocks the TCP accept loop
start_link(ListenArgs, AcceptFun) ->
    State = #server_state{args=ListenArgs,func=AcceptFun},
    proc_lib:start_link(?MODULE, init, [self(), State]).

init(Parent, State) ->
    {Port, Options} = State#server_state.args,
    {ok, ListenSocket} = gen_tcp:listen(Port, Options),
    NewState = State#server_state{socket=ListenSocket},
    Debug = sys:debug_options([]),
    proc_lib:init_ack(Parent, {ok, self()}),
    loop(Parent, Debug, NewState).

loop(Parent, Debug, State) ->
    case gen_tcp:accept(State#server_state.socket, ?ACCEPT_TIMEOUT) of
        {ok, Socket} when Debug =:= [] -> ok = (State#server_state.func)(Socket);
        {ok, Socket} ->
            sys:handle_debug(Debug, fun print_event/3, undefined, {accepted, Socket}),
            ok = (State#server_state.func)(Socket);
        {error, timeout} -> ok;
        {error, closed} when Debug =:= [] ->
            sys:handle_debug(Debug, fun print_event/3, undefined, {closed}),
            exit(normal);
        {error, closed} -> exit(normal)
    end,
    flush(Parent, Debug, State).

flush(Parent, Debug, State) ->
    receive
        {system, From, Msg} ->
            sys:handle_system_msg(Msg, From, Parent, ?MODULE, Debug, State)
        after 0 ->
            loop(Parent, Debug, State)
    end.

print_event(Device, Event, _Extra) ->
    io:format(Device, "*DBG* TCP event = ~p~n", [Event]).

system_continue(Parent, Debug, State) ->
    loop(Parent, Debug, State).

system_terminate(Reason, _Parent, _Debug, State) ->
    gen_tcp:close(State#server_state.socket),
    exit(Reason).

system_code_change(State, _Module, _OldVsn, _Extra) ->
    {ok, State}.

请注意,这是一个合规的 OTP 流程(可以由主管管理)。您应该使用 AcceptFun 来生成(=更快)一个新的工作子进程。不过我还没有彻底测试过。

1> {ok, A} = gen_tcpserver:start_link({8080,[]},fun(Socket)->gen_tcp:close(Socket) end).
{ok,<0.93.0>}
2> sys:trace(A, true).
ok
*DBG* TCP event = {accepted,#Port<0.2102>}
*DBG* TCP event = {accepted,#Port<0.2103>}
3> 

2>ok 之后,我将 Google Chrome 浏览器指向端口 8080:这是对 TCP 的一个很好的测试!)

Great that you've began learning Erlang/OTP!

The following resources are very useful:

  • The OTP Design Principles. Read this carefully, if you already haven't. Note the common misconception that OTP is object orientated (OO): it's not! Forget everything about "inheritance". It's not possible to merely build complete systems by "extending" standard modules.
  • The Messaging System:

    These functions must be used to implement the use of system messages for a process

  • The Special Processes. A special process is an OTP-compliant process that can integrate well with supervisors.

This is some code I have in my project. I am an Erlang learner too, so don't trust the code too much, please.

-module(gen_tcpserver).

%% Public API
-export([start_link/2]).

%% start_link reference
-export([init/2]).

%% System internal API
-export([system_continue/3, system_terminate/4, system_code_change/4]).

-define(ACCEPT_TIMEOUT, 250).

-record(server_state, {socket=undefined,
                       args,
                       func}).

%% ListenArgs are given to gen_tcp:listen
%% AcceptFun(Socket) -> ok, blocks the TCP accept loop
start_link(ListenArgs, AcceptFun) ->
    State = #server_state{args=ListenArgs,func=AcceptFun},
    proc_lib:start_link(?MODULE, init, [self(), State]).

init(Parent, State) ->
    {Port, Options} = State#server_state.args,
    {ok, ListenSocket} = gen_tcp:listen(Port, Options),
    NewState = State#server_state{socket=ListenSocket},
    Debug = sys:debug_options([]),
    proc_lib:init_ack(Parent, {ok, self()}),
    loop(Parent, Debug, NewState).

loop(Parent, Debug, State) ->
    case gen_tcp:accept(State#server_state.socket, ?ACCEPT_TIMEOUT) of
        {ok, Socket} when Debug =:= [] -> ok = (State#server_state.func)(Socket);
        {ok, Socket} ->
            sys:handle_debug(Debug, fun print_event/3, undefined, {accepted, Socket}),
            ok = (State#server_state.func)(Socket);
        {error, timeout} -> ok;
        {error, closed} when Debug =:= [] ->
            sys:handle_debug(Debug, fun print_event/3, undefined, {closed}),
            exit(normal);
        {error, closed} -> exit(normal)
    end,
    flush(Parent, Debug, State).

flush(Parent, Debug, State) ->
    receive
        {system, From, Msg} ->
            sys:handle_system_msg(Msg, From, Parent, ?MODULE, Debug, State)
        after 0 ->
            loop(Parent, Debug, State)
    end.

print_event(Device, Event, _Extra) ->
    io:format(Device, "*DBG* TCP event = ~p~n", [Event]).

system_continue(Parent, Debug, State) ->
    loop(Parent, Debug, State).

system_terminate(Reason, _Parent, _Debug, State) ->
    gen_tcp:close(State#server_state.socket),
    exit(Reason).

system_code_change(State, _Module, _OldVsn, _Extra) ->
    {ok, State}.

Note that this is a compliant OTP process (it can be managed by a supervisor). You should use AcceptFun to spawn (=faster) a new worker child. I have not yet tested it thorough though.

1> {ok, A} = gen_tcpserver:start_link({8080,[]},fun(Socket)->gen_tcp:close(Socket) end).
{ok,<0.93.0>}
2> sys:trace(A, true).
ok
*DBG* TCP event = {accepted,#Port<0.2102>}
*DBG* TCP event = {accepted,#Port<0.2103>}
3> 

(After 2>'s ok I pointed my Google Chrome browser to port 8080: a great test for TCP!)

不知在何时 2024-11-17 02:38:09

实现异步 TCP 侦听器的另一种方法是使用 supervisor_bridge

这是我写的一些代码来展示这一点(未经测试):

-module(connection_bridge).

-behaviour(supervisor_bridge).

% supervisor_bridge export
-export([init/1, terminate/2]).

% internal proc_lib:start_link
-export([accept_init/3]).

%% Port: see gen_tcp:listen(Port, _).
%% Options: see gen_tcp:listen(_, Options).
%% ConnectionHandler: Module:Function(Arguments)->pid() or fun/0->pid()
%% ConnectionHandler: return pid that will receive TCP messages
init({Port, Options, ConnectionHandler}) ->
    case gen_tcp:listen(Port, Options) of
        {ok, ListenSocket} ->
            {ok, ServerPid} = proc_lib:start_link(?MODULE, accept_init,
                [self(), ListenSocket, ConnectionHandler], 1000),
            {ok, ServerPid, ListenSocket};
        OtherResult -> OtherResult
    end.

terminate(_Reason, ListenSocket) ->
    gen_tcp:close(ListenSocket).

accept_init(ParentPid, ListenSocket, ConnectionHandler) ->
    proc_lib:init_ack(ParentPid, {ok, self()}),
    accept_loop(ListenSocket, ConnectionHandler).

accept_loop(ListenSocket, ConnectionHandler) ->
    case gen_tcp:accept(ListenSocket) of
        {ok, ClientSocket} ->
            Pid = case ConnectionHandler of
                {Module, Function, Arguments} ->
                    apply(Module, Function, Arguments);
                Function when is_function(Function, 0) ->
                    Function()
            end,
            ok = gen_tcp:controlling_process(ClientSocket, Pid),
            accept_loop(ListenSocket, ConnectionHandler);
        {error, closed} ->
            error({shutdown, tcp_closed});
        {error, Reason} ->
            error(Reason)
    end.

比我的其他答案更容易理解。 connection_bridge 也可以扩展以支持 UDP 和 SCTP。

Another way to implement an asynchronous TCP listener is by using supervisor_bridge.

Here is some code that I wrote to show this (not tested):

-module(connection_bridge).

-behaviour(supervisor_bridge).

% supervisor_bridge export
-export([init/1, terminate/2]).

% internal proc_lib:start_link
-export([accept_init/3]).

%% Port: see gen_tcp:listen(Port, _).
%% Options: see gen_tcp:listen(_, Options).
%% ConnectionHandler: Module:Function(Arguments)->pid() or fun/0->pid()
%% ConnectionHandler: return pid that will receive TCP messages
init({Port, Options, ConnectionHandler}) ->
    case gen_tcp:listen(Port, Options) of
        {ok, ListenSocket} ->
            {ok, ServerPid} = proc_lib:start_link(?MODULE, accept_init,
                [self(), ListenSocket, ConnectionHandler], 1000),
            {ok, ServerPid, ListenSocket};
        OtherResult -> OtherResult
    end.

terminate(_Reason, ListenSocket) ->
    gen_tcp:close(ListenSocket).

accept_init(ParentPid, ListenSocket, ConnectionHandler) ->
    proc_lib:init_ack(ParentPid, {ok, self()}),
    accept_loop(ListenSocket, ConnectionHandler).

accept_loop(ListenSocket, ConnectionHandler) ->
    case gen_tcp:accept(ListenSocket) of
        {ok, ClientSocket} ->
            Pid = case ConnectionHandler of
                {Module, Function, Arguments} ->
                    apply(Module, Function, Arguments);
                Function when is_function(Function, 0) ->
                    Function()
            end,
            ok = gen_tcp:controlling_process(ClientSocket, Pid),
            accept_loop(ListenSocket, ConnectionHandler);
        {error, closed} ->
            error({shutdown, tcp_closed});
        {error, Reason} ->
            error(Reason)
    end.

A lot easier to understand than my other answer. The connection_bridge can be extended to support UDP and SCTP too.

眼睛会笑 2024-11-17 02:38:08

我认为这就是您正在寻找的:
http://www.trapexit.org/Building_a_Non-blocking_TCP_server_using_OTP_principles
这是关于如何使用 OTP 构建非阻塞 TCP 服务器的完整教程(当然,有完整的文档和解释)。

I think this is what you're looking for:
http://www.trapexit.org/Building_a_Non-blocking_TCP_server_using_OTP_principles
It's a full tutorial about how to build a non-blocking TCP server using OTP (of course, is fully documented and explained).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文