Quantcast

TCP client

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

TCP client

Stavros Filargyropoulos
Hey,

Just starting out on erlang, and trying to implement a message broker using erlang. I have a gen_fsm process for each client that handles the command and data receiving and I was wondering what is the best way to for the main broker process to send data to the client.

Do I just send a message to the client process and handle the sending inside `handle_info`? How would I handle race conditions in this case? E.g. When the client process terminates just after the main process sent the message.

My code looks something like this at the moment:

-module(pq_test).
-export([init/1,handle_info/3,handle_event/3]).
-export([start/0]).
-export([recv_cmd/2,recv_data/2]).

-record(state, {socket, buffer, current_command}).

start() ->
    spawn_link(fun start_link/0).

start_link() ->
    {ok, ListenSocket} = gen_tcp:listen(11880, [{active, false}, binary]),
    accept(ListenSocket).

accept(ListenSocket) ->
    {ok, Socket} = gen_tcp:accept(ListenSocket),
    {ok, Pid} = gen_fsm:start_link({local, ?MODULE}, ?MODULE, Socket, []),
    erlang:display(Pid),
    gen_tcp:controlling_process(Socket, Pid),
    accept(ListenSocket).

init(Socket)->
    inet:setopts(Socket, [{active, true}]),
    {ok, recv_cmd,
     #state{socket = Socket, buffer = <<"">>, current_command = null}}.

cmd_parse(publish, [Topic, Key, Length], #state{socket = Socket})->
    {Len, _} = string:to_integer(Length),
    erlang:display(Len),
    io:format("Publish -~s- -~s- -~B-~n", [Topic, Key, Len]),
    {next_state, recv_data,
     #state{socket = Socket, buffer = <<"">>, current_command = {publish, Len}}};
cmd_parse(_, _, #state{socket = Socket}) ->
    io:format("Unknown command~n"),
    {next_state, recv_cmd,
     #state{socket = Socket, buffer = <<"">>, current_command = null}}.

cmd_process(CommandLine, #state{socket = Socket} = State) ->
    io:format("1~n"),
    case string:tokens(binary_to_list(CommandLine), [$\s, $\r, $\n]) of
    [Command | Args] ->
        cmd_parse(list_to_atom(Command), Args, State);
    _ -> {next_state, recv_cmd,
          #state{socket = Socket, buffer = <<"">>, current_command = null}}
    end.

cmd_more(CommandLine, #state{socket = Socket}) ->
    {next_state, recv_cmd,
     #state{socket=Socket, buffer=CommandLine, current_command = null}}.

recv_cmd({data, Data}, #state{buffer = Buffer} = State) ->
    CommandLine = <<Buffer/binary, Data/binary>>,
    Last = binary:last(CommandLine),
    case Last of
    $\n -> cmd_process(CommandLine, State);
    _  -> cmd_more(CommandLine, State)
    end.

recv_data({data, Data},
      #state{socket = Socket,
         buffer = Buffer,
         current_command = {publish, Len}}) when Len > byte_size(Data) ->
    {next_state, recv_data,
     #state{socket = Socket, buffer = <<Buffer/binary, Data/binary>>,
        current_command = {publish, Len - byte_size(Data)}}};
recv_data({data, Data}, #state{socket = Socket, buffer = Buffer}) ->
    io:format("Got all data: ~s~n", [<<Buffer/binary, Data/binary>>]),
    {next_state, recv_cmd,
     #state{socket = Socket, buffer = <<"">>, current_command = null}}.

handle_info({tcp, _, Message}, StateName, State) ->
    ?MODULE:StateName({data, Message}, State);
handle_info(Event, StateName, State) ->
    io:format("handle info~n"),
    erlang:display(Event),
    {next_state, StateName, State}.

handle_event(_, _, _) ->
    io:format("handle event").


_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: TCP client

Ingela Andin
Hi!

gen_fsm will be deprecated in OTP 20, so I recommend using gen_statem instead. In gen_statem you can have the semantics of selective receive by using postpone.

Regards Ingela 
 

2017-04-20 22:34 GMT+02:00 Stavros Filargyropoulos <[hidden email]>:
Hey,

Just starting out on erlang, and trying to implement a message broker using erlang. I have a gen_fsm process for each client that handles the command and data receiving and I was wondering what is the best way to for the main broker process to send data to the client.

Do I just send a message to the client process and handle the sending inside `handle_info`? How would I handle race conditions in this case? E.g. When the client process terminates just after the main process sent the message.

My code looks something like this at the moment:

-module(pq_test).
-export([init/1,handle_info/3,handle_event/3]).
-export([start/0]).
-export([recv_cmd/2,recv_data/2]).

-record(state, {socket, buffer, current_command}).

start() ->
    spawn_link(fun start_link/0).

start_link() ->
    {ok, ListenSocket} = gen_tcp:listen(11880, [{active, false}, binary]),
    accept(ListenSocket).

accept(ListenSocket) ->
    {ok, Socket} = gen_tcp:accept(ListenSocket),
    {ok, Pid} = gen_fsm:start_link({local, ?MODULE}, ?MODULE, Socket, []),
    erlang:display(Pid),
    gen_tcp:controlling_process(Socket, Pid),
    accept(ListenSocket).

init(Socket)->
    inet:setopts(Socket, [{active, true}]),
    {ok, recv_cmd,
     #state{socket = Socket, buffer = <<"">>, current_command = null}}.

cmd_parse(publish, [Topic, Key, Length], #state{socket = Socket})->
    {Len, _} = string:to_integer(Length),
    erlang:display(Len),
    io:format("Publish -~s- -~s- -~B-~n", [Topic, Key, Len]),
    {next_state, recv_data,
     #state{socket = Socket, buffer = <<"">>, current_command = {publish, Len}}};
cmd_parse(_, _, #state{socket = Socket}) ->
    io:format("Unknown command~n"),
    {next_state, recv_cmd,
     #state{socket = Socket, buffer = <<"">>, current_command = null}}.

cmd_process(CommandLine, #state{socket = Socket} = State) ->
    io:format("1~n"),
    case string:tokens(binary_to_list(CommandLine), [$\s, $\r, $\n]) of
    [Command | Args] ->
        cmd_parse(list_to_atom(Command), Args, State);
    _ -> {next_state, recv_cmd,
          #state{socket = Socket, buffer = <<"">>, current_command = null}}
    end.

cmd_more(CommandLine, #state{socket = Socket}) ->
    {next_state, recv_cmd,
     #state{socket=Socket, buffer=CommandLine, current_command = null}}.

recv_cmd({data, Data}, #state{buffer = Buffer} = State) ->
    CommandLine = <<Buffer/binary, Data/binary>>,
    Last = binary:last(CommandLine),
    case Last of
    $\n -> cmd_process(CommandLine, State);
    _  -> cmd_more(CommandLine, State)
    end.

recv_data({data, Data},
      #state{socket = Socket,
         buffer = Buffer,
         current_command = {publish, Len}}) when Len > byte_size(Data) ->
    {next_state, recv_data,
     #state{socket = Socket, buffer = <<Buffer/binary, Data/binary>>,
        current_command = {publish, Len - byte_size(Data)}}};
recv_data({data, Data}, #state{socket = Socket, buffer = Buffer}) ->
    io:format("Got all data: ~s~n", [<<Buffer/binary, Data/binary>>]),
    {next_state, recv_cmd,
     #state{socket = Socket, buffer = <<"">>, current_command = null}}.

handle_info({tcp, _, Message}, StateName, State) ->
    ?MODULE:StateName({data, Message}, State);
handle_info(Event, StateName, State) ->
    io:format("handle info~n"),
    erlang:display(Event),
    {next_state, StateName, State}.

handle_event(_, _, _) ->
    io:format("handle event").


_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions



_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions
Loading...