gen_server & wait/notify

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

gen_server & wait/notify

Chris Waymire
I need some help finding a pattern/solution to a problem that is proving more difficult then it should, it seems.

I have a gen_server module that needs to take an asynchronous request/response operation and expose it as a synchronous operation. Typically I would use receive to block and have the asynchronous operation issue a message to unblock it but that won't work (i believe) with a gen_server since it loops over receive and forwards them to calls to handle_info. The code below demonstrates the core of the matter of what i'm talking about. I'm not sure what the solution would be to achieve this functionality while using the gen_server behaviour.



handle_call({request, Data#data{uid=UID}}, _From, State) ->
    make_asycnc_req(Data),
    receive
        {UID, Response} -> {reply, Response, State}
    end.

handle_info({response, Response#response{uid=UID}}, State) ->
    self() ! {UID, Response},
    {noreply, State}.

Thanks,

-- Chris

_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions
Reply | Threaded
Open this post in threaded view
|

Re: gen_server & wait/notify

zxq9-2
On 2017年09月28日 木曜日 17:58:08 Chris Waymire wrote:
> handle_call({request, Data#data{uid=UID}}, _From, State) ->
>     make_asycnc_req(Data),
>     receive
>         {UID, Response} -> {reply, Response, State}
>     end.
>
> handle_info({response, Response#response{uid=UID}}, State) ->
>     self() ! {UID, Response},
>     {noreply, State}.


Your naked `receive` will work just fine. The flow of the process has never
returned to the gen_server module yet to await another message to dispatch,
so when you write a naked `receive` in some handling code that is exactly
where the process will block, and you can receive any arbitrary thing you
want there. Just be careful not to match on any system or gen_server message
types and you'll get the behavior you expect (though your mailbox may be
filling up with other stuff in the meantime).

-Craig
_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions
Reply | Threaded
Open this post in threaded view
|

Re: gen_server & wait/notify

Vlad Dumitrescu-2
In reply to this post by Chris Waymire
Hi!

If you want your gen_server to be able to service other requests during the async request, then you want something like below (modulo error handling and typos and bugs)

best regards,
Vlad

handle_call({request, Data#data{uid=UID}}, From, State) ->
    make_async_req(Data),
    NewState = State#state{pending=[{UID, From}|State#state.pending]}
    {noreply, NewState}.

handle_info({response, Response#response{uid=UID}}, State=#state{pending=Pending}) ->
    {value, {UID, From}, NewPending} = lists:keytake(UID, 1, Pending),
    gen_server:reply(From, Response),
    NewState = State#state{pending=NewPending},
    {noreply, NewState}.


On Fri, Sep 29, 2017 at 2:58 AM, Chris Waymire <[hidden email]> wrote:
I need some help finding a pattern/solution to a problem that is proving more difficult then it should, it seems.

I have a gen_server module that needs to take an asynchronous request/response operation and expose it as a synchronous operation. Typically I would use receive to block and have the asynchronous operation issue a message to unblock it but that won't work (i believe) with a gen_server since it loops over receive and forwards them to calls to handle_info. The code below demonstrates the core of the matter of what i'm talking about. I'm not sure what the solution would be to achieve this functionality while using the gen_server behaviour.



handle_call({request, Data#data{uid=UID}}, _From, State) ->
    make_asycnc_req(Data),
    receive
        {UID, Response} -> {reply, Response, State}
    end.

handle_info({response, Response#response{uid=UID}}, State) ->
    self() ! {UID, Response},
    {noreply, State}.

Thanks,

-- Chris

_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions



_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions
Reply | Threaded
Open this post in threaded view
|

Re: gen_server & wait/notify

zxq9-2
In reply to this post by zxq9-2
On 2017年09月29日 金曜日 16:23:42 zxq9 wrote:

> On 2017年09月28日 木曜日 17:58:08 Chris Waymire wrote:
> > handle_call({request, Data#data{uid=UID}}, _From, State) ->
> >     make_asycnc_req(Data),
> >     receive
> >         {UID, Response} -> {reply, Response, State}
> >     end.
> >
> > handle_info({response, Response#response{uid=UID}}, State) ->
> >     self() ! {UID, Response},
> >     {noreply, State}.
>
>
> Your naked `receive` will work just fine. The flow of the process has never
> returned to the gen_server module yet to await another message to dispatch,
> so when you write a naked `receive` in some handling code that is exactly
> where the process will block, and you can receive any arbitrary thing you
> want there. Just be careful not to match on any system or gen_server message
> types and you'll get the behavior you expect (though your mailbox may be
> filling up with other stuff in the meantime).

One note of caution is that you might actually be fighting against the
natural order of things. Consider carefully whether you need this and what
it is achieving for you. You may really be better off with casts. See if
there is a way for you to design the system to be entirely async, and if
not, why not. What state is being async going to threaten? Is it an
ordering issue? Are you relaying a message and waiting for a response
that will be returned? If so, why is the originating process not just
receiving the response directly? (Do you really need a middle-man?)

Another way to achieve this without blocking is to use references to
tag messages and keep a digest of them. That prevents your gen_server
from becoming unresponsive to system messages or blocking indefinitely
in the event the sender of the message you are waiting on crashes.

This is actually what the `From` argument to handle_call/3 is for:

handle_call({request, Data = data{uid = UID}},
            {Sender, Tag},
            State = #s{queue = Q}) ->
    ok = make_async_request(Data, Tag),
    {noreply, State#s{queue = [{{Tag, UID, Sender} | Queue]}};
% ...

handle_cast({response, {Tag, UID}, Message}, State = #s{queue = Queue}) ->
    case lists:keyfind(Tag, 1, Queue) of
        Ticket = {Tag, UID, Sender} ->
            NewState = State#s{queue = lists:delete(Ticket, Queue)},
            {reply, Message, NewState};
        false ->
            LogString = "Received bad message: ~tp",
            ok = log(warning, LogString, [{{Tag, UID}, Message}]),
            {noreply, State}
    end;
% ...

There may be any number of ways you might want to phrase that or structure
it, but basically that's how you can safely stow such a pending response
value and sender, and get the response back out to them without any weird
blockages or unresponsiveness.

-Craig
_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions
Reply | Threaded
Open this post in threaded view
|

Re: gen_server & wait/notify

Dmitry Kolesnikov-2
+1 to Craig

The usage of blocking receive within gen_server is possible but then the value of async call is questionable in your design. You either make it fully async with state-machine like approach or make it sync.

Best Regards,
Dmitry 


On 29 Sep 2017, at 10.40, zxq9 <[hidden email]> wrote:

On 2017年09月29日 金曜日 16:23:42 zxq9 wrote:
On 2017年09月28日 木曜日 17:58:08 Chris Waymire wrote:
handle_call({request, Data#data{uid=UID}}, _From, State) ->
   make_asycnc_req(Data),
   receive
       {UID, Response} -> {reply, Response, State}
   end.

handle_info({response, Response#response{uid=UID}}, State) ->
   self() ! {UID, Response},
   {noreply, State}.


Your naked `receive` will work just fine. The flow of the process has never
returned to the gen_server module yet to await another message to dispatch,
so when you write a naked `receive` in some handling code that is exactly
where the process will block, and you can receive any arbitrary thing you
want there. Just be careful not to match on any system or gen_server message
types and you'll get the behavior you expect (though your mailbox may be
filling up with other stuff in the meantime).

One note of caution is that you might actually be fighting against the
natural order of things. Consider carefully whether you need this and what
it is achieving for you. You may really be better off with casts. See if
there is a way for you to design the system to be entirely async, and if
not, why not. What state is being async going to threaten? Is it an
ordering issue? Are you relaying a message and waiting for a response
that will be returned? If so, why is the originating process not just
receiving the response directly? (Do you really need a middle-man?)

Another way to achieve this without blocking is to use references to
tag messages and keep a digest of them. That prevents your gen_server
from becoming unresponsive to system messages or blocking indefinitely
in the event the sender of the message you are waiting on crashes.

This is actually what the `From` argument to handle_call/3 is for:

handle_call({request, Data = data{uid = UID}},
           {Sender, Tag},
           State = #s{queue = Q}) ->
   ok = make_async_request(Data, Tag),
   {noreply, State#s{queue = [{{Tag, UID, Sender} | Queue]}};
% ...

handle_cast({response, {Tag, UID}, Message}, State = #s{queue = Queue}) ->
   case lists:keyfind(Tag, 1, Queue) of
       Ticket = {Tag, UID, Sender} ->
           NewState = State#s{queue = lists:delete(Ticket, Queue)},
           {reply, Message, NewState};
       false ->
           LogString = "Received bad message: ~tp",
           ok = log(warning, LogString, [{{Tag, UID}, Message}]),
           {noreply, State}
   end;
% ...

There may be any number of ways you might want to phrase that or structure
it, but basically that's how you can safely stow such a pending response
value and sender, and get the response back out to them without any weird
blockages or unresponsiveness.

-Craig
_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions


_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions
Reply | Threaded
Open this post in threaded view
|

Re: gen_server & wait/notify

Chris Waymire
In reply to this post by zxq9-2
I'll try to provide some context around what I am doing. Basically I am writing a client for a custom MQTT network that in addition to the basic MQTT functionality also provides a request/response system. So the idea here is that when a user of the app executes the "request" function a uuid will be attached to their message payload and published onto the network. Another endpoint that is subscribed to that topic will receive the message, process it, and publish a response with a reference to the original message id. The original sender then picks up this message and accepts it as the response to their request. There's more going on there but that's the gist of it. So which the request function is called I need the caller to block, but not the MQTT client itself as it will still need to be processing messages on the network. 

On Fri, Sep 29, 2017 at 12:40 AM, zxq9 <[hidden email]> wrote:
On 2017年09月29日 金曜日 16:23:42 zxq9 wrote:
> On 2017年09月28日 木曜日 17:58:08 Chris Waymire wrote:
> > handle_call({request, Data#data{uid=UID}}, _From, State) ->
> >     make_asycnc_req(Data),
> >     receive
> >         {UID, Response} -> {reply, Response, State}
> >     end.
> >
> > handle_info({response, Response#response{uid=UID}}, State) ->
> >     self() ! {UID, Response},
> >     {noreply, State}.
>
>
> Your naked `receive` will work just fine. The flow of the process has never
> returned to the gen_server module yet to await another message to dispatch,
> so when you write a naked `receive` in some handling code that is exactly
> where the process will block, and you can receive any arbitrary thing you
> want there. Just be careful not to match on any system or gen_server message
> types and you'll get the behavior you expect (though your mailbox may be
> filling up with other stuff in the meantime).

One note of caution is that you might actually be fighting against the
natural order of things. Consider carefully whether you need this and what
it is achieving for you. You may really be better off with casts. See if
there is a way for you to design the system to be entirely async, and if
not, why not. What state is being async going to threaten? Is it an
ordering issue? Are you relaying a message and waiting for a response
that will be returned? If so, why is the originating process not just
receiving the response directly? (Do you really need a middle-man?)

Another way to achieve this without blocking is to use references to
tag messages and keep a digest of them. That prevents your gen_server
from becoming unresponsive to system messages or blocking indefinitely
in the event the sender of the message you are waiting on crashes.

This is actually what the `From` argument to handle_call/3 is for:

handle_call({request, Data = data{uid = UID}},
            {Sender, Tag},
            State = #s{queue = Q}) ->
    ok = make_async_request(Data, Tag),
    {noreply, State#s{queue = [{{Tag, UID, Sender} | Queue]}};
% ...

handle_cast({response, {Tag, UID}, Message}, State = #s{queue = Queue}) ->
    case lists:keyfind(Tag, 1, Queue) of
        Ticket = {Tag, UID, Sender} ->
            NewState = State#s{queue = lists:delete(Ticket, Queue)},
            {reply, Message, NewState};
        false ->
            LogString = "Received bad message: ~tp",
            ok = log(warning, LogString, [{{Tag, UID}, Message}]),
            {noreply, State}
    end;
% ...

There may be any number of ways you might want to phrase that or structure
it, but basically that's how you can safely stow such a pending response
value and sender, and get the response back out to them without any weird
blockages or unresponsiveness.

-Craig
_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions


_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions
Reply | Threaded
Open this post in threaded view
|

Re: gen_server & wait/notify

Mikael Pettersson-5
See Vlad's reply a few days ago.  What you're describing is easily solvable by:
1. having the gen_server's handle_call for this API call stash away the From and uuid in its internal state and return noreply as that blocks the caller, and
2. when the response comes in (tagged with that uuid) match it to the corresponding From and gen_server:reply to that one, which unblocks the caller.


On Sun, Oct 1, 2017 at 7:22 PM, Chris Waymire <[hidden email]> wrote:
I'll try to provide some context around what I am doing. Basically I am writing a client for a custom MQTT network that in addition to the basic MQTT functionality also provides a request/response system. So the idea here is that when a user of the app executes the "request" function a uuid will be attached to their message payload and published onto the network. Another endpoint that is subscribed to that topic will receive the message, process it, and publish a response with a reference to the original message id. The original sender then picks up this message and accepts it as the response to their request. There's more going on there but that's the gist of it. So which the request function is called I need the caller to block, but not the MQTT client itself as it will still need to be processing messages on the network. 

On Fri, Sep 29, 2017 at 12:40 AM, zxq9 <[hidden email]> wrote:
On 2017年09月29日 金曜日 16:23:42 zxq9 wrote:
> On 2017年09月28日 木曜日 17:58:08 Chris Waymire wrote:
> > handle_call({request, Data#data{uid=UID}}, _From, State) ->
> >     make_asycnc_req(Data),
> >     receive
> >         {UID, Response} -> {reply, Response, State}
> >     end.
> >
> > handle_info({response, Response#response{uid=UID}}, State) ->
> >     self() ! {UID, Response},
> >     {noreply, State}.
>
>
> Your naked `receive` will work just fine. The flow of the process has never
> returned to the gen_server module yet to await another message to dispatch,
> so when you write a naked `receive` in some handling code that is exactly
> where the process will block, and you can receive any arbitrary thing you
> want there. Just be careful not to match on any system or gen_server message
> types and you'll get the behavior you expect (though your mailbox may be
> filling up with other stuff in the meantime).

One note of caution is that you might actually be fighting against the
natural order of things. Consider carefully whether you need this and what
it is achieving for you. You may really be better off with casts. See if
there is a way for you to design the system to be entirely async, and if
not, why not. What state is being async going to threaten? Is it an
ordering issue? Are you relaying a message and waiting for a response
that will be returned? If so, why is the originating process not just
receiving the response directly? (Do you really need a middle-man?)

Another way to achieve this without blocking is to use references to
tag messages and keep a digest of them. That prevents your gen_server
from becoming unresponsive to system messages or blocking indefinitely
in the event the sender of the message you are waiting on crashes.

This is actually what the `From` argument to handle_call/3 is for:

handle_call({request, Data = data{uid = UID}},
            {Sender, Tag},
            State = #s{queue = Q}) ->
    ok = make_async_request(Data, Tag),
    {noreply, State#s{queue = [{{Tag, UID, Sender} | Queue]}};
% ...

handle_cast({response, {Tag, UID}, Message}, State = #s{queue = Queue}) ->
    case lists:keyfind(Tag, 1, Queue) of
        Ticket = {Tag, UID, Sender} ->
            NewState = State#s{queue = lists:delete(Ticket, Queue)},
            {reply, Message, NewState};
        false ->
            LogString = "Received bad message: ~tp",
            ok = log(warning, LogString, [{{Tag, UID}, Message}]),
            {noreply, State}
    end;
% ...

There may be any number of ways you might want to phrase that or structure
it, but basically that's how you can safely stow such a pending response
value and sender, and get the response back out to them without any weird
blockages or unresponsiveness.

-Craig
_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions


_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions



_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions
Reply | Threaded
Open this post in threaded view
|

Re: gen_server & wait/notify

Chris Waymire
Ahhh.. I was not familiar with how the noreply response works from handle_call. If I'm understanding it correctly though, in the snippet he provided, I would need to change {reply, Message, NewState} to instead call gen_server:reply/2 which then unblocks the caller. That is exactly what I am needing. I figured there had to be something to do that without having to cobble together some cludgy code. 

Thanks.

-- Chris


On Sun, Oct 1, 2017 at 2:44 PM, Mikael Pettersson <[hidden email]> wrote:
See Vlad's reply a few days ago.  What you're describing is easily solvable by:
1. having the gen_server's handle_call for this API call stash away the From and uuid in its internal state and return noreply as that blocks the caller, and
2. when the response comes in (tagged with that uuid) match it to the corresponding From and gen_server:reply to that one, which unblocks the caller.


On Sun, Oct 1, 2017 at 7:22 PM, Chris Waymire <[hidden email]> wrote:
I'll try to provide some context around what I am doing. Basically I am writing a client for a custom MQTT network that in addition to the basic MQTT functionality also provides a request/response system. So the idea here is that when a user of the app executes the "request" function a uuid will be attached to their message payload and published onto the network. Another endpoint that is subscribed to that topic will receive the message, process it, and publish a response with a reference to the original message id. The original sender then picks up this message and accepts it as the response to their request. There's more going on there but that's the gist of it. So which the request function is called I need the caller to block, but not the MQTT client itself as it will still need to be processing messages on the network. 

On Fri, Sep 29, 2017 at 12:40 AM, zxq9 <[hidden email]> wrote:
On 2017年09月29日 金曜日 16:23:42 zxq9 wrote:
> On 2017年09月28日 木曜日 17:58:08 Chris Waymire wrote:
> > handle_call({request, Data#data{uid=UID}}, _From, State) ->
> >     make_asycnc_req(Data),
> >     receive
> >         {UID, Response} -> {reply, Response, State}
> >     end.
> >
> > handle_info({response, Response#response{uid=UID}}, State) ->
> >     self() ! {UID, Response},
> >     {noreply, State}.
>
>
> Your naked `receive` will work just fine. The flow of the process has never
> returned to the gen_server module yet to await another message to dispatch,
> so when you write a naked `receive` in some handling code that is exactly
> where the process will block, and you can receive any arbitrary thing you
> want there. Just be careful not to match on any system or gen_server message
> types and you'll get the behavior you expect (though your mailbox may be
> filling up with other stuff in the meantime).

One note of caution is that you might actually be fighting against the
natural order of things. Consider carefully whether you need this and what
it is achieving for you. You may really be better off with casts. See if
there is a way for you to design the system to be entirely async, and if
not, why not. What state is being async going to threaten? Is it an
ordering issue? Are you relaying a message and waiting for a response
that will be returned? If so, why is the originating process not just
receiving the response directly? (Do you really need a middle-man?)

Another way to achieve this without blocking is to use references to
tag messages and keep a digest of them. That prevents your gen_server
from becoming unresponsive to system messages or blocking indefinitely
in the event the sender of the message you are waiting on crashes.

This is actually what the `From` argument to handle_call/3 is for:

handle_call({request, Data = data{uid = UID}},
            {Sender, Tag},
            State = #s{queue = Q}) ->
    ok = make_async_request(Data, Tag),
    {noreply, State#s{queue = [{{Tag, UID, Sender} | Queue]}};
% ...

handle_cast({response, {Tag, UID}, Message}, State = #s{queue = Queue}) ->
    case lists:keyfind(Tag, 1, Queue) of
        Ticket = {Tag, UID, Sender} ->
            NewState = State#s{queue = lists:delete(Ticket, Queue)},
            {reply, Message, NewState};
        false ->
            LogString = "Received bad message: ~tp",
            ok = log(warning, LogString, [{{Tag, UID}, Message}]),
            {noreply, State}
    end;
% ...

There may be any number of ways you might want to phrase that or structure
it, but basically that's how you can safely stow such a pending response
value and sender, and get the response back out to them without any weird
blockages or unresponsiveness.

-Craig
_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions


_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions




_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions