Generic I/O and binary I/O

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Generic I/O and binary I/O

Robert Virding-4
I have now written a first draft of a generic i/o module, gen_io.  It
provides a generic synchronous interface to the i/o protocol and uses
callbacks for the device specific stuff.  There is "some"
documentation in the beginning of the file.

I have also reworked my binary i/o module, bin_io, to use gen_io.  It
is reasonable small example of using gen_io.

There is also a first draft of tcp_io which provides an i/o interface
to a socket.  It is not complete, specifically it does not correctly
handle when the port closes for the generic i/o interface.  I'll fix
it.  N.B. this is probably not the most efficient way of doing
formatted i/o to a socket but it does allow writing using generic i/o
functions.

I am going to convert file_io_server, the main file interface, to use
gen_io to see if gen_io is sufficient.  If this works then I reckon
that gen_io will do for synchronous i/o.  Asynchronous i/o, like for
the tty interface, is another matter and I think that it might be
difficult to generalise it.  I will try however.

Comment?

Robert

-------------- next part --------------
%%% File    : gen_io.erl
%%% Author  : Robert Virding <rv>
%%% Purpose :
%%% Created : 21 Nov 2001 by Robert Virding <rv>

%% Mod:init([Argument]) ->
%%      {ok,State}
%%      {stop,Reason}
%%
%% Mod:handle_request(Request, State) ->
%%      {reply,From,Reply,State}
%%      {noreply,State}
%%      {stop,Reason,From,Reply,State}
%%      {stop,Reason,State}
%%  Request is the full unparsed request.
%%
%% Mod:terminate(Reason, State) -> ok
%%
%% Mod:read_chars(State) ->
%%      {ok,[Char],State}
%%      {error,Reason,State}
%%
%% Mod:push_chars([Char], State) ->
%%      {ok,State}
%%      {error,Reason,State}
%%  After we have read all the characters we need we push the rest
%%  back into the state.  N.B. the char list may be empty.
%%
%% Mod:write_chars([Char], State) ->
%%      {ok,State}
%%      {error,Reason,State}

-module(gen_io).
-author('rv').

-compile(export_all).
-export([start/3,start_link/3]).
-export([request/2]).

%% start(Mod, [Arg], [Option])
%% start_link(Mod, [Arg], [Option])
%%
%%  where
%%   Mod is the callback module
%%   [Arg] init arguments
%%   [Option]

start(Mod, Args, Opts) ->
    do_start(spawn, Mod, Args, Opts).

start_link(Mod, Args, Opts) ->
    do_start(spawn_link, Mod, Args, Opts).

do_start(Spawn, Mod, Args, Opts) ->
    Self = self(),
    Ref = make_ref(),
    Pid = erlang:Spawn(fun () -> init(Self, Ref, Mod, Args, Opts) end),
    %% Monitor process and wait for reply or termination.
    Mref = erlang:monitor(process, Pid),
    receive
        {Ref,ok} ->
            erlang:demonitor(Mref),
            receive
                {'DOWN',Mref,_,_,Reason} ->
                    {error,Reason}
            after 0 ->
                    {ok,Pid}
            end;
        {'DOWN',Mref,_,_,Reason} ->
            {error,Reason}
    end.

%% init(StarterPid, StartRef, IoModule, [Arg], [Opt])

init(Start, Ref, IoM, Args, Opts) ->
    case catch IoM:init(Args) of
        {ok,State} ->
            Start ! {Ref,ok},
            loop(IoM, State);
        {stop,Reason} ->
            exit(Reason);
        {'EXIT',Reason} ->
            exit(Reason);
        Else ->
            Error = {bad_return_value,Else},
            exit(Error)
    end.

%% loop(IoModule, State) -> void.
%%  Main io loop.  This never returns anything, just terminates.

loop(IoM, St0) ->
    receive
        {io_request,From,ReplyAs,Req} when pid(From) ->
            %% Handle general io requests.
            case io_request(Req, IoM, St0) of
                {ok,Rep,St1} ->
                    io_reply(From, ReplyAs, Rep),
                    loop(IoM, St1);
                {error,Rep,St1} ->
                    io_reply(From, ReplyAs, Rep),
                    loop(IoM, St1);
                {stop,Reason,Rep,St1} ->
                    terminate(Reason, From, {io_reply,ReplyAs,Rep}, IoM, St1)
            end;
        Request ->
            case catch IoM:handle_request(Request, St0) of
                {reply,From,Rep,St1} ->
                    From ! Rep,
                    loop(IoM, St1);
                {noreply,St1} ->
                    loop(IoM, St1);
                {stop,Reason,From,Rep,St1} ->
                    terminate(Reason, From, Rep, IoM, St1);
                {stop,Reason,St1} ->
                    terminate(Reason, IoM, St1)
            end
    end.

io_reply(From, ReplyAs, Reply) ->
    From ! {io_reply, ReplyAs, Reply}.

terminate(Reason, From, Reply, IoM, St) ->
    From ! Reply,
    terminate(Reason, IoM, St).

terminate(Reason, IoM, St) ->
    catch IoM:terminate(Reason, St),
    exit(Reason).

%% io_request(Request, IoModule, State) ->
%%      {ok,Reply,State} | {error,Reply,State} | {stop,Reason,Reply,State}.
%%
%%  Handle general io requests.

io_request({put_chars,Chars}, IoM, St0) ->
    case catch IoM:write_chars(Chars, St0) of
        {ok,St1} -> {ok,ok,St1};
        {error,Reason,St1} -> {stop,normal,{error,Reason},St1};
        Other -> {stop,Other,{error,Other},St0}
    end;
io_request({put_chars,Mod,Func,Args}, IoM, St) ->
    case catch apply(Mod, Func, Args) of
        {'EXIT',Reason} -> {error,{error,Func},St};
        Cs -> io_request({put_chars,Cs}, IoM, St)
    end;
io_request({get_until,Prompt,Mod,Func,ExtraArgs}, IoM, St) ->
    get_until(Mod, Func, ExtraArgs, IoM, St);
io_request({requests,Reqs}, IoM, St) when list(Reqs) ->
    io_request_loop(Reqs, IoM, {ok,ok,St});
io_request(Unknown, IoM, St) ->
    Reason = {error,Unknown},
    {error,{error,Reason},St}.

%% io_request_loop([Request], IoModule, Result) -> Result.
%%  Process list of requests as long as results are ok.

io_request_loop([], IoM, Res) -> Res;
io_request_loop([Req|Reqs], IoM, {ok,Rep,St}) ->
    io_request_loop(Reqs, IoM, io_request(Req, IoM, St));
io_request_loop([Req|Reqs], IoM, Res) -> Res.

%% get_until(Module, Func, [ExtraArg], IoModule, State) ->
%%      {ok,Reply,State} | {error,Reply,State} | {stop,Reason,Reply,State}.
%%  Apply the get_until loop scanning the binary until the scan
%%  function has enough.  Buffer any remaining bytes until the next
%%  call.

get_until(Mod, Func, ExtraArgs, IoM, St) ->
    get_until_loop(Mod, Func, ExtraArgs, IoM, St, {more,[]}).

get_until_loop(M, F, As, IoM, St0, {more,Cont}) ->
    case catch IoM:read_chars(St0) of
        {ok,Cs,St1} ->
            get_until_loop(M, F, As, IoM, St1,
                           catch apply(M, F, [Cont,Cs|As]));
        {error,Reason,St1} ->
            {stop,Reason,{error,Reason},St1};
        Other ->
            {stop,Other,{error,Other},St0}
    end;
get_until_loop(M, F, As, IoM, St0, {done,Res,Buf}) ->
    case catch IoM:push_chars(Buf, St0) of
        {ok,St1} -> {ok,Res,St1};
        {error,Reason,St1} ->
            {stop,Reason,{error,Reason},St1};
        Other ->
            {stop,Other,{error,Other},St0}
    end;
get_until_loop(M, F, As, IoM, St, Other) ->
    {error,{error,F},St}.

%% request(IoServer, Request) -> {ok,Reply} | {error,Reason}.
%%  Send a standard io request to to an io server and wait for the reply.

request(Pid, Request) when pid(Pid) ->
    Mref = erlang:monitor(process,Pid),
    Pid ! {io_request,self(),Pid,Request},
    wait_io_mon_reply(Pid,Mref);
request(Name, Request) when atom(Name) ->
    case whereis(Name) of
        undefined ->
            {error, arguments};
        Pid ->
            request(Pid, Request)
    end.

wait_io_mon_reply(From, Mref) ->
    receive
        {io_reply,From,Reply} ->
            erlang:demonitor(Mref),
            receive
                {'DOWN', Mref, _, _, _} -> true
            after 0 -> true
            end,
            Reply;
        {'EXIT', From, _What} ->
            receive
                {'DOWN', Mref, _, _, _} -> true
            after 0 -> true
            end,
            {error,terminated};
        {'DOWN', Mref, _, _, _} ->
            receive
                {'EXIT', From, _What} -> true
            after 0 -> true
            end,
            {error,terminated}
    end.
-------------- next part --------------
%% File    : bin_io.erl
%% Author  : Robert Virding
%% Purpose : Open a binary for standard i/o requests.

-module(bin_io).

%% The main user interface.
-export([open_read/1,open_create/0,open_append/1,close/1]).

%% The gen_io callbacks.
-export([init/1,terminate/2,handle_request/2,
         read_chars/1,write_chars/2,push_chars/2]).

-record(bin_io, {mode,bin,buf}).

-define(READ_SIZE, 256). %Bytes per chunk read

%% The main interface.

open_read(Bin) ->
    gen_io:start_link(?MODULE, [read,Bin], []).

open_create() ->
    gen_io:start_link(?MODULE, [create], []).

open_append(Bin) ->
    gen_io:start_link(?MODULE, [append,Bin], []).

close(Io) ->
    Io ! {bin_request,self(),Io,close},
    receive
        {bin_reply,Io,Rep} -> Rep
    end.

%% init([Arg]) -> {ok,State}.

init([read,Bin]) when binary(Bin) ->
    {ok,#bin_io{mode=read,bin=Bin,buf=[]}};
init([create]) ->
    {ok,#bin_io{mode=write,bin= <<>>,buf=[]}};
init([append,Bin]) when binary(Bin) ->
    {ok,#bin_io{mode=write,bin=Bin,buf=[]}}.

%% terminate(Reason, State) -> ok.

terminate(R, St) -> ok.

%% handle_request(Request, State) ->
%%      {reply,From,Reply,State} |
%%      {noreply,State} |
%%      {stop,Reason,From,Reply,State} |
%%      {stop,Reason,State}.
%%
%%  Handle bin_io specific requests.

handle_request({bin_request,From,ReplyAs,close}, #bin_io{mode=Mode}=St)
  when pid(From) ->
    Rep = case Mode of
              read -> ok;
              write -> {ok,list_to_binary([St#bin_io.bin,St#bin_io.buf])}
          end,
    {stop,normal,From,{bin_reply,ReplyAs,Rep},St#bin_io{buf=[]}};
handle_request({bin_request,From,ReplyAs,Other}, #bin_io{}=St)
  when pid(From) ->
    Rep = {error,{request,Other}},
    {reply,From,{bin_reply,ReplyAs,Rep},St};
handle_request(Unknown, State) ->
    %% Just ignore unknown requests.
    {noreply,State}.

%% write_chars(Chars, State) ->
%%      {ok,State} | {error,Reason,State}.

write_chars(Cs, #bin_io{mode=write,buf=Buf}=St) when binary(Cs) ->
    {ok,St#bin_io{buf=[Buf|Cs]}};
write_chars(Cs, #bin_io{mode=write,buf=Buf}=St) ->
    case catch list_to_binary(Cs) of
        {'EXIT',Reason} -> {error,Reason,St};
        MoreBin -> {ok,St#bin_io{buf=[Buf|MoreBin]}}
    end;
write_chars(Cs, #bin_io{mode=read}=St) ->
    {error,badmode,St}.

%% read_chars(State) ->
%%      {ok,Chars,State} | {error,Reason,State}.

read_chars(#bin_io{mode=read,buf=[],bin=Bin}=St) ->
    if Bin == <<>> -> {ok,eof,St};
       size(Bin) < ?READ_SIZE ->
            {ok,binary_to_list(Bin),St#bin_io{bin= <<>>}};
       true ->
            {B1,B2} = split_binary(Bin, ?READ_SIZE),
            {ok,binary_to_list(B1),St#bin_io{bin=B2}}
    end;
read_chars(#bin_io{mode=read,buf=Buf}=St) ->
    {ok,Buf,St#bin_io{buf=[]}};
read_chars(#bin_io{mode=write}=St) ->
    {error,badmode,St}.

%% push_chars([Char], State) ->
%%      {ok,State} | {error,Reason,State}.

push_chars(Cs, #bin_io{buf=[]}=St) ->
    {ok,St#bin_io{buf=Cs}}.
-------------- next part --------------
%% File    : tcp_io.erl
%% Author  : Robert Virding
%% Purpose : Open a tcp connection for standard i/o requests.

-module(tcp_io).

%% The main user interface.
-export([open/2,open/3,close/1]).

%% The gen_io callbacks.
-export([init/1,terminate/2,handle_request/2,
         read_chars/1,write_chars/2,push_chars/2]).

-record(tcp_io, {server, %Server
                 port, %Port
                 sock=none, %Socket
                 ibuf=none}).

open(Server, Port) ->
    open(Server, Port, []).

open(Server, Port, SockOpts) ->
    gen_io:start_link(?MODULE, [Server,Port,SockOpts], []).

close(Io) ->
    Io ! {tcp_request,self(),Io,close},
    receive
        {tcp_reply,Io,Rep} -> Rep
    end.

%% init([Arg]) -> {ok,State}.
%%  Try to open the socket with the given arguments.

init([Server,Port,SockOpts]) ->
    case gen_tcp:connect(Server, Port, [list,{active,false}|SockOpts]) of
        {ok,S} ->
            {ok,#tcp_io{server=Server,port=Port,sock=S,ibuf=[]}};
        {error,Reason} -> {stop,Reason}
    end.

%% terminate(Reason, State) -> ok.
%%  This just closes the socket.

terminate(Reason, #tcp_io{sock=S}) ->
    gen_tcp:close(S).

%% handle_request(Request, State) ->
%%      {reply,From,Reply,State} |
%%      {stop,Reason,From,Reply,State} |
%%      {noreply,State}.

handle_request({tcp_request,From,ReplyAs,close}, #tcp_io{}=St)
  when pid(From) ->
    {stop,normal,From,{tcp_reply,ReplyAs,ok},St};
handle_request({tcp_request,From,ReplyAs,Other}, #tcp_io{}=St)
  when pid(From) ->
    Rep = {error,{request,Other}},
    {reply,From,{tcp_reply,ReplyAs,Rep},St};
handle_request(Unknown, St) ->
    %% Just ignore unknown requests.
    {noreply,St}.

%% write_chars(Chars, State) ->
%%      {ok,State} | {error,Reason,State}.

write_chars(Cs, #tcp_io{sock=S}=St) ->
    case gen_tcp:send(S, Cs) of
        ok -> {ok,St};
        {error,Reason} -> {error,Reason,St}
    end.

%% read_chars(State) ->
%%      {ok,Chars,State} | {error,Reason,State}.

read_chars(#tcp_io{sock=S,ibuf=[]}=St) ->
    case gen_tcp:recv(S, 0) of
        {ok,Cs} -> {ok,Cs,St};
        {error,closed} -> {ok,eof,St};
        {error,Reason} -> {error,Reason,St}
    end;
read_chars(#tcp_io{ibuf=Cs}=St) ->
    {ok,Cs,St#tcp_io{ibuf=[]}}.

%% push_chars([Char], State) ->
%%      {ok,State} | {error,Reason,State}.

push_chars(Cs, #tcp_io{ibuf=[]}=St) ->
    {ok,St#tcp_io{ibuf=Cs}}.