Handling routing_key for topic routing in gen_server handle_info

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Handling routing_key for topic routing in gen_server handle_info

Sébastien BRICE
Hello there,

I am a bit new to the Erlang Environment

I am writing an emailtesting application that filters incoming email
with a randomly generated routing_keys on a topic exchange to make
emails entering my system

Once they are delivered (and processed) on an queue, I want to label
them again with the previously randomly routing_key to route them to
another exchange to make them ready for the final consume.

This 2nd producing step is causing me real troubles

I am getting data back from a tcp socket (processed by a third-tier
program: spamassassin) with handle_info pattern matching

I rely on a gen_server to consume messages first through the regular
amqp_client/include/amqp_client.hrl Library

I use handle_info in my gen_server behaviour and then pattern match on
the parameters.

Detecting delivered AMQP message is done through function heads
(records) in handle_info callback

***My gen_server****
handle_info({#'basic.deliver'{routing_key=Key, consumer_tag=Tag},
Content}, State) ->
     #amqp_msg{props = Properties, payload = Payload} = Content,
     #'P_basic'{message_id = MessageId, headers = Headers} = Properties,
     send_to_spamassassin:calcule_score(Payload),
     {noreply, State};
handle_info(Msg, State) ->
     case Msg of
         {_,_,Data} ->
            scored_email:main(Data);
         {_,_} ->
     end,
     {noreply, State}.

***send_to_spamassassin function ***
     calcule_score(Message) ->
     case gen_tcp:connect("localhost", 783, [{mode, binary}]) of
         {ok, Sock} ->
             …
             gen_tcp:send(Sock, Message2);
         {error,_} ->
             io:fwrite("Connection error! Quitting...~n")
     end.


TCP socket is nice to talk with spamassassin, it returns me a 3-tuple
with binary string data like that:

{tcp,#Port<0.55>,<<"SPAMD/1.1 0 EX_OK\r\nContent-length: 564\r\nSpam:
True ; 7.9 / 5.0\r\n\r\nReceived: from localhost by
XXXX.ikexpress.com\n\twith SpamAssassin (version 3.4.2);\n\tThu, 15 Aug
2019 21:44:12 +0200\nX-Spam-Checker-Version: SpamAssassin 3.4.2
(2018-09-13) on\n\tXXXXX.ikexpress.com\nX-Spam-Flag: YES\nX-Spam-Level:
*******\nX-Spam-Status: Yes, score=7.9 required=5.0
tests=EMPTY_MESSAGE,MISSING_DATE,\n\tMISSING_FROM,MISSING_HEADERS,MISSING_MID,MISSING_SUBJECT,\n\tNO_HEADERS_MESSAGE,NO_RECEIVED,NO_RELAYS
autolearn=no\n\tautolearn_force=no version=3.4.2\nMIME-Version:
1.0\nContent-Type: multipart/mixed;
boundary=\"----------=_5D55B60C.D2FC2670\"\n\n">>}

The loop in the second handle_info match OK the answer from the
listening gen_tcp server, but I have to do the packaging to send it to a
topic Exchange (topic_scored_email exchange)

***scored_email***
main(Argv) ->
     {ok, Connection} =
amqp_connection:start(#amqp_params_network{virtual_host = <<"/">>}),
     {ok, Channel} = amqp_connection:open_channel(Connection),
     amqp_channel:call(Channel, #'exchange.declare'{exchange =
<<"topic_scored_email">>,type = <<"topic">>}),
     {RoutingKey, Message} = case Argv of
                                 …
%DOING PATTERN MATCHING THAT WORKS HERE
                                 …
                             end,
     amqp_channel:cast(Channel,#'basic.publish'{exchange =
<<"topic_scored_email">>,routing_key = RoutingKey},#amqp_msg{payload =
Message}),

First issue is type of the data (binary string) but I guess it can be
workarounded using BIF binary_to_tuple or stuff like that.

What I struggle to understand is how I could pass the righ RoutingKey,
since Erlang is functionnal, there is no side effect or assignation.

That change in format data (AMQP --> raw tcp --> then AMQP again) seems
impossible (to me) to achieve with OTP abstraction

However I would like to reassemble every processed message with the
right routing key matched 5 lines above.

How could I modify my code, to do that ? I come from imperative language
and reach my limit here…

Yours

PS I know it is more a rabbitmq issue and I might be more successful to
post on stackoverflow or rabbitmq google groups but I feel
#Erlang-questions could come handy on that topic

_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions
Reply | Threaded
Open this post in threaded view
|

Re: Handling routing_key for topic routing in gen_server handle_info

Dmitry Belyaev-3
If we remove all the unrelated implementation details then we see a picture:

handle_info({incoming, Id, SomeDetails}, State) ->
   start_processing(SomeDetails),
   {noreply, State};
handle_info({processed, Result}, State) ->
   Id = ???, % how to get the value
   finalise(Id, Result),
   {noreply, State}

I can suggest the following approaches to do that assuming you cannot force the processor to accept and pass your value back.
1. If you can switch to synchronous processing (which may be desirable to prevent overloading of the processor) you can store the Id in the process State.
   handle_info({incoming, Id, SomeDetails}, #{current_id := undefined} = State) ->
      start_processing(SomeDetails),
      {noreply, State#{current_id => Id}};
   handle_info({processed, Result}, #{current_id := Id} = State) ->
      finalise(Id, Result),
      {noreply, State#{current_id => undefined}}

2. If the processing must be asynchronous and there may be multiple inflight processings, you can:
  a) make start_processing to accept the Id and to spawn a new process which would receive the processed message (tcp reponse)
    handle_info({incoming, Id, SomeDetails}, State) ->
      spawn_link(fun ->
         start_processing(SomeDetails),
         receive
            {processed, Result} ->
                finalise(Id, Result)
         end
      end),
     {noreply, State}

3. make mapping from sockets to Ids and keep it in the state
    handle_info({incoming, Id, SomeDetails}, State) ->
      Socket = start_processing(SomeDetails),
      {noreply, State#{Socket => Id}};
    handle_info({processed, Socket, Result}, State) ->
      #{Socket := Id} = State,
      finalise(Id, Result),
      {noreply, maps:delete(State, Socket)}.


Kind regards,
Dmitry Belyaev


On Fri, Aug 16, 2019 at 11:51 PM Sébastien BRICE <[hidden email]> wrote:
Hello there,

I am a bit new to the Erlang Environment

I am writing an emailtesting application that filters incoming email
with a randomly generated routing_keys on a topic exchange to make
emails entering my system

Once they are delivered (and processed) on an queue, I want to label
them again with the previously randomly routing_key to route them to
another exchange to make them ready for the final consume.

This 2nd producing step is causing me real troubles

I am getting data back from a tcp socket (processed by a third-tier
program: spamassassin) with handle_info pattern matching

I rely on a gen_server to consume messages first through the regular
amqp_client/include/amqp_client.hrl Library

I use handle_info in my gen_server behaviour and then pattern match on
the parameters.

Detecting delivered AMQP message is done through function heads
(records) in handle_info callback

***My gen_server****
handle_info({#'basic.deliver'{routing_key=Key, consumer_tag=Tag},
Content}, State) ->
     #amqp_msg{props = Properties, payload = Payload} = Content,
     #'P_basic'{message_id = MessageId, headers = Headers} = Properties,
     send_to_spamassassin:calcule_score(Payload),
     {noreply, State};
handle_info(Msg, State) ->
     case Msg of
         {_,_,Data} ->
            scored_email:main(Data);
         {_,_} ->
     end,
     {noreply, State}.

***send_to_spamassassin function ***
     calcule_score(Message) ->
     case gen_tcp:connect("localhost", 783, [{mode, binary}]) of
         {ok, Sock} ->
             …
             gen_tcp:send(Sock, Message2);
         {error,_} ->
             io:fwrite("Connection error! Quitting...~n")
     end.


TCP socket is nice to talk with spamassassin, it returns me a 3-tuple
with binary string data like that:

{tcp,#Port<0.55>,<<"SPAMD/1.1 0 EX_OK\r\nContent-length: 564\r\nSpam:
True ; 7.9 / 5.0\r\n\r\nReceived: from localhost by
XXXX.ikexpress.com\n\twith SpamAssassin (version 3.4.2);\n\tThu, 15 Aug
2019 21:44:12 +0200\nX-Spam-Checker-Version: SpamAssassin 3.4.2
(2018-09-13) on\n\tXXXXX.ikexpress.com\nX-Spam-Flag: YES\nX-Spam-Level:
*******\nX-Spam-Status: Yes, score=7.9 required=5.0
tests=EMPTY_MESSAGE,MISSING_DATE,\n\tMISSING_FROM,MISSING_HEADERS,MISSING_MID,MISSING_SUBJECT,\n\tNO_HEADERS_MESSAGE,NO_RECEIVED,NO_RELAYS
autolearn=no\n\tautolearn_force=no version=3.4.2\nMIME-Version:
1.0\nContent-Type: multipart/mixed;
boundary=\"----------=_5D55B60C.D2FC2670\"\n\n">>}

The loop in the second handle_info match OK the answer from the
listening gen_tcp server, but I have to do the packaging to send it to a
topic Exchange (topic_scored_email exchange)

***scored_email***
main(Argv) ->
     {ok, Connection} =
amqp_connection:start(#amqp_params_network{virtual_host = <<"/">>}),
     {ok, Channel} = amqp_connection:open_channel(Connection),
     amqp_channel:call(Channel, #'exchange.declare'{exchange =
<<"topic_scored_email">>,type = <<"topic">>}),
     {RoutingKey, Message} = case Argv of
                                 …
%DOING PATTERN MATCHING THAT WORKS HERE
                                 …
                             end,
     amqp_channel:cast(Channel,#'basic.publish'{exchange =
<<"topic_scored_email">>,routing_key = RoutingKey},#amqp_msg{payload =
Message}),

First issue is type of the data (binary string) but I guess it can be
workarounded using BIF binary_to_tuple or stuff like that.

What I struggle to understand is how I could pass the righ RoutingKey,
since Erlang is functionnal, there is no side effect or assignation.

That change in format data (AMQP --> raw tcp --> then AMQP again) seems
impossible (to me) to achieve with OTP abstraction

However I would like to reassemble every processed message with the
right routing key matched 5 lines above.

How could I modify my code, to do that ? I come from imperative language
and reach my limit here…

Yours

PS I know it is more a rabbitmq issue and I might be more successful to
post on stackoverflow or rabbitmq google groups but I feel
#Erlang-questions could come handy on that topic

_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions

_______________________________________________
erlang-questions mailing list
[hidden email]
http://erlang.org/mailman/listinfo/erlang-questions