{"id":610,"date":"2011-05-26T08:24:35","date_gmt":"2011-05-26T07:24:35","guid":{"rendered":"http:\/\/trigonakis.com\/blog\/?p=610"},"modified":"2011-05-26T08:24:35","modified_gmt":"2011-05-26T07:24:35","slug":"introduction-to-erlang-message-passing","status":"publish","type":"post","link":"http:\/\/trigonakis.com\/blog\/2011\/05\/26\/introduction-to-erlang-message-passing\/","title":{"rendered":"Introduction to Erlang : Message Passing"},"content":{"rendered":"<div class=\"seriesmeta\">This entry is part 15 of 16 in the series <a href=\"http:\/\/trigonakis.com\/blog\/series\/introduction-to-erlang\/\" class=\"series-57\" title=\"Introduction to Erlang\">Introduction to Erlang<\/a><\/div><h3>Message Passing<\/h3>\n<p>The communication model (among processes) in Erlang is <i>message passing<\/i>. No much need to be said about this. Erlang processes <strong>share no memory<\/strong>. The way that processes communicate is via (asynchronous) message passing. Every process (even the shell) holds a mailbox queue where incoming messages are placed until received by the process. Message passing is asynchronous because the sending process does not block on send. On the other hand, receiving a message in Erlang is a blocking operation.<\/p>\n<h4>Characteristics<\/h4>\n<p>In this subsection I will describe some of the characteristics of message passing in Erlang.<\/p>\n<h5>Asynchronous<\/h5>\n<p>As I already mentioned, message passing in Erlang is a non-blocking operation.<\/p>\n<h5>Data Copy<\/h5>\n<p>The message&#8217;s data are copied from the sending process to the receiver&#8217;s message queue, so the receiver gets a fresh copy of the data.<\/p>\n<h5>Ordering<\/h5>\n<p>Erlang runtime guarantees that if two messages are sent from node <code>A<\/code> to node <code>B<\/code> and both are delivered, then the ordering of these messages is kept (causal ordering).<\/p>\n<h5>Successful Send<\/h5>\n<p>The send operation always succeeds (even if the target is a non-existing process) and evaluates to the data sent. An exception is when trying to send data to a non-existing registered process.<br \/>\n<!--more--><\/p>\n<h3>Sending Messages<\/h3>\n<p>Erlang uses the exclamation mark (<strong>!<\/strong>) as the operator for sending a message. ! is equivalent with the <code>erlang:send\/2<\/code> function but is more commonly used because of its compactness. The ! operator is used as following<\/p>\n<pre lang=\"erlang\">\r\n%send message Message to the process with pid Pid\r\nPid ! Message\r\n<\/pre>\n<p>and returns the message sent. Returning the message sent allows to send multiple messages &#8220;at once&#8221;; all <\/p>\n<pre lang=\"erlang\">\r\nPid1 ! Message, Pid2 ! Message, Pid3 ! Message\r\nPid1 ! (Pid2 ! (Pid3 ! Message))\r\nPid1 ! Pid2 ! Pid3 ! Message\r\n<\/pre>\n<p>are equivalent.<\/p>\n<h4>Example<\/h4>\n<p>As I have mentioned before, the shell is nothing more than a process. As a process, it has a message queue. In order to print and empty the shell&#8217;s message queue we can use the <code>flush\/0<\/code> BIF.<\/p>\n<pre lang=\"erlang\">\r\n1> self() ! any_erlang_term_can_be_sent.\r\nany_erlang_term_can_be_sent %notice that the msg sent\r\n                            %was also returned\r\n2> flush().                             \r\nShell got any_erlang_term_can_be_sent\r\nok\r\n3> self() ! [this, is, a, list, 'of', atoms].\r\n[this,is,a,list,'of',atoms]\r\n4> self() ! {this, [is, a], tuple, {'!', '!'}}.\r\n{this,[is,a],tuple,{'!','!'}}\r\n5> self() ! {self(), 123}.\r\n{<0.35.0>,123}\r\n6> flush().\r\nShell got [this,is,a,list,'of',atoms]\r\nShell got {this,[is,a],tuple,{'!','!'}}\r\nShell got {<0.35.0>,123}\r\nok\r\n7> Pid1 = self(), Pid2 = self(), Pid3 = self().\r\n<0.35.0>\r\n8> Pid1 ! msg, Pid2 ! msg, Pid3 ! msg.\r\nmsg\r\n9> flush().\r\nShell got msg\r\nShell got msg\r\nShell got msg\r\nok\r\n10> Pid1 ! (Pid2 ! (Pid3 ! msg)).      \r\nmsg\r\n11> flush().                     \r\nShell got msg\r\nShell got msg\r\nShell got msg\r\nok\r\n12> Pid1 ! Pid2 ! Pid3 ! msg.    \r\nmsg\r\n13> flush().                 \r\nShell got msg\r\nShell got msg\r\nShell got msg\r\nok\r\n<\/pre>\n<h3>Receiving Messages<\/h3>\n<p>Erlang uses pattern matching for receiving messages (same as in function clause selection and the <code>case<\/code> statement). The <code>receive<\/code> statement is used to deliver messages from the message queue. Its format is the following:<\/p>\n<pre lang=\"erlang\">\r\nreceive\r\n    Pattern1 when Guard1 ->\r\n\tDo1;\r\n    Pattern2 when Guard2 ->\r\n\tDo2;\r\n    _Other ->\r\n\tCatch_all\r\nend\r\n<\/pre>\n<p>Notice the similarities of the clauses with the ones in a <code>case<\/code> statement. The last clause is the &#8220;catch all&#8221; clause since it will match with any message (that did not match with a previous clause). Of course having a catch all clause is optional.<\/p>\n<h4>Receiving Order<\/h4>\n<p>The message processing is done in a <i>FIFS<\/i> (First In &#8211; First Served) order. Every incoming message is placed in the tail of the process&#8217; message queue. When a <code>receive<\/code> statement is met the following processing happens:<\/p>\n<ol>\n<li>The first message (head of the message queue) is pattern matched against the first receive clause. If match, execute the clause&#8217;s body, else go to the next step.<\/li>\n<li>The same message is pattern matched against the second (if any) receive clause. If match, execute the clause&#8217;s body, else go to the next step.<\/li>\n<li>&#8230;<\/li>\n<li>The same message is pattern matched against the last clause. If match, execute the clause&#8217;s body, else go to the next step.<\/li>\n<li>The same iterative process starts again from step 1, but now with the next message from the message queue.<\/li>\n<\/ol>\n<p>Of course, the message (if any) that is delivered through <code>receive<\/code> is removed from the message queue.<\/p>\n<h3>Examples<\/h3>\n<h4>Ping Pong<\/h4>\n<p>A simple example where 2 processes exchange &#8220;ping-pong&#8221; messages.<\/p>\n<pre lang=\"erlang\">\r\n-module(pingpong).\r\n-export([play\/1]).\r\n\r\nplay(N) when is_integer(N), N > 0 ->\r\n    Pong = spawn(fun pong\/0),\r\n    ping(N, Pong).\r\n\r\nping(0,Pong) ->\r\n    Pong ! exit,\r\n    ok;\r\nping(N, Pong) ->\r\n    Pong ! {self(), ping},\r\n    receive\r\n\tpong ->\r\n\t    io:format(\"~w : pong [~w]~n\", [self(), N])\r\n    end,\r\n    ping(N - 1, Pong).\r\n\r\npong() ->\r\n    receive\r\n\t{From, ping} ->\r\n\t    io:format(\"~w : ping~n\", [self()]),\r\n\t    From ! pong,\r\n\t    pong();\r\n\texit ->\r\n\t    ok\r\n    end.\t    \r\n<\/pre>\n<h5>&#8220;Details&#8221;<\/h5>\n<p>Some details about the above ping pong implementation are:<\/p>\n<ul>\n<li>All receives are <strong>non-selective<\/strong> to who the sender is. A message with the correct format, even if send by an &#8220;unknown&#8221; process, will be always accepted. On the other hand, a <strong>selective<\/strong> (to who the sender is) receive would pattern match the message against the sender&#8217;s pid. For example, the receive of pong of the &#8220;Ping&#8221; process could be made selective as <code>{Pong, pong} -> ...<\/code>. In this case, the &#8220;Pong&#8221; process would have to do its send operation as <code>From ! {self(), pong}<\/code> in order to be delivered by the &#8220;<code>From<\/code>&#8221; (ping) process.<\/li>\n<li>The way I designed the &#8220;Pong&#8221; process it cannot be changed to perform a selective receive (at least until receiving the first message which could be used to fetch the sender&#8217;s pid, which could be of course from an &#8220;uknown&#8221; &#8211; external process). If we wanted to use a selective receive, <code>pong\/0<\/code> could be changed to <code>pong\/1<\/code>, where the argument would be the pid of the &#8220;Ping&#8221; process.<\/li>\n<\/ul>\n<h5>Example run<\/h5>\n<pre lang=\"erlang\">\r\n1> c(pingpong).\r\n{ok,pingpong}\r\n2> pingpong:play(4).\r\n<0.42.0> : ping\r\n<0.35.0> : pong [4]\r\n<0.42.0> : ping\r\n<0.35.0> : pong [3]\r\n<0.42.0> : ping\r\n<0.35.0> : pong [2]\r\n<0.42.0> : ping\r\n<0.35.0> : pong [1]\r\nok\r\n<\/pre>\n<h4>Token Ring<\/h4>\n<p>This application creates <code>NumNodes<\/code> processes and arranges them in a ring (every process has one <i>next<\/i> process). Then the coordinator &#8220;inserts&#8221; a token in the first node of the ring. Every node (process) receiving the token increases its value by 1 and sends it to the next node. The application stops when the token has value greater than the <code>MAXVAL<\/code>.<\/p>\n<pre lang=\"erlang\">\r\n-module(ring).\r\n-export([create\/1, node\/2, connect\/1]).\r\n\r\n-define(MAXVAL, 100000).\r\n\r\n%creates the ring's nodes, connects them in a ring, sends the token in the ring, and\r\n%collects the exit messages from the nodes\r\ncreate(NumNodes) when is_integer(NumNodes), NumNodes > 1 ->\r\n    Nodes = [spawn(?MODULE, node, [ID, self()]) || ID <- lists:seq(1, NumNodes)],\r\n    ring:connect(Nodes),\r\n    hd(Nodes) ! {token, 0},\r\n    getexits(Nodes).\r\n\r\n%collects the exit messages from the nodes\r\ngetexits([]) ->\r\n    io:format(\"[Coord] Done.~n\"),\r\n    ok;\r\ngetexits(Nodes) ->\r\n    receive\r\n\t{Node, exit} ->\r\n\t    case lists:member(Node, Nodes) of\r\n\t\ttrue ->\r\n\t\t    getexits(lists:delete(Node, Nodes));\r\n\t\t_ ->\r\n\t\t    getexits(Nodes)\r\n\t    end\r\n    end.\r\n\r\n%little trick in order to connect the last with the first node\r\n%handle the [nd0, nd1, ..., ndN] list as [nd0, nd1, ..., ndN, nd0]\r\nconnect(N = [H | _]) ->\r\n    connect_(N ++ [H]).\r\n\r\n%connects the nodes to a ring\r\nconnect_([]) ->\r\n    connected;\r\nconnect_([_]) ->\r\n    connected;\r\nconnect_([N1, N2 | Nodes]) ->\r\n    N1 ! {self(), connect, N2},\r\n    connect_([N2 | Nodes]).\r\n\r\n%the node function. Initially waits for the next node's pid\r\nnode(ID, CrdId) ->\r\n    receive\r\n\t{CrdId, connect, NxtNdId} ->\r\n\t    io:format(\"[~p:~p] got my next ~p~n\", [ID, self(), NxtNdId]),\r\n\t    node(ID, CrdId, NxtNdId)\r\n    end.\r\n\r\n%the main functionality of a node; receive the token, increase its value and send\r\n%it to the next node on the ring\r\nnode(ID, CrdId, NxtNdId) ->\r\n    receive\r\n\t{token, Val} ->\r\n\t    if\r\n\t\tVal < ?MAXVAL ->\r\n\t\t    NxtNdId ! {token, Val + 1},\r\n\t\t    node(ID, CrdId, NxtNdId);\r\n\t\ttrue ->\r\n\t\t    io:format(\"[~p:~p] token value ~p~n\", [ID, self(), Val]),\r\n\t\t    case erlang:is_process_alive(NxtNdId) of\r\n\t\t\ttrue ->\r\n\t\t\t    NxtNdId ! {token, Val + 1};\r\n\t\t\t_ ->\r\n\t\t\t    ok\r\n\t\t    end,\r\n\t\t    CrdId ! {self(), exit},\r\n\t\t    done\r\n\t    end\r\n    end.     \r\n<\/pre>\n<h5>Example run<\/h5>\n<pre lang=\"erlang\">\r\n1> c(ring).\r\n{ok,ring}\r\n2> ring:create(5).\r\n[1:<0.42.0>] got my next <0.43.0>\r\n[2:<0.43.0>] got my next <0.44.0>\r\n[3:<0.44.0>] got my next <0.45.0>\r\n[4:<0.45.0>] got my next <0.46.0>\r\n[5:<0.46.0>] got my next <0.42.0>\r\n[1:<0.42.0>] token value 100000\r\n[2:<0.43.0>] token value 100001\r\n[3:<0.44.0>] token value 100002\r\n[4:<0.45.0>] token value 100003\r\n[5:<0.46.0>] token value 100004\r\n[Coord] Done.\r\nok\r\n<\/pre>\n<h4>Timeout<\/h4>\n<p><code>receive<\/code> is a blocking statement; it blocks until a message that matches one of the clauses is placed in the incoming message queue. Erlang allows the programmer to explicitly unblock the <code>receive<\/code> statement using a timeout (if a matching message is not delivered until the timeout expires). The complete format of <code>receive<\/code> statement, including the <code>after<\/code> construct, is:<\/p>\n<pre lang=\"erlang\">\r\nreceive\r\n    Pattern1 when Guard1 ->\r\n\tDo1;\r\n    Pattern2 when Guard2 ->\r\n\tDo2;\r\n    _Other ->\r\n\tCatch_all\r\nafter Millisecs ->\r\n        Do_timeout;\r\nend\r\n<\/pre>\n<h5>Example<\/h5>\n<pre lang=\"erlang\">\r\n-module(timeout).\r\n-export([start\/0, sleep\/1]).\r\n\r\nstart() ->\r\n    spawn(fun timeout\/0).\r\n\r\ntimeout() ->\r\n    receive\r\n\tcancel ->\r\n\t    io:format(\"Timeout canceled~n\")\r\n    after 2000 -> % 2 seconds\r\n\t    io:format(\"Timeout triggered~n\"),\r\n\t    timeout()\r\n    end.\r\n\t\r\n%a sleep function\r\nsleep(Ms) ->    \r\n    io:format(\"Sleeping for ~w ms~n\", [Ms]),\r\n    receive\r\n    after Ms ->\r\n\t    done\r\n    end.\r\n<\/pre>\n<h6>Run<\/h6>\n<pre lang=\"erlang\">\r\n1> c(timeout).\r\n{ok,timeout}\r\n2> P = timeout:start().\r\n<0.42.0>\r\nTimeout triggered\r\nTimeout triggered\r\n3> P ! cancel.\r\nTimeout canceled\r\ncancel\r\n4> timeout:sleep(1000).\r\nSleeping for 1000 ms\r\ndone\r\n5> timeout:sleep(3000).\r\nSleeping for 3000 ms\r\ndone\r\n<\/pre>\n<h3>Next<\/h3>\n<p>The next post will be a step-by-step tutorial on building a slightly bigger application in Erlang. The application will be a (shared) memory system abstraction.<\/p>\n","protected":false},"excerpt":{"rendered":"<div class=\"seriesmeta\">This entry is part 15 of 16 in the series <a href=\"http:\/\/trigonakis.com\/blog\/series\/introduction-to-erlang\/\" class=\"series-57\" title=\"Introduction to Erlang\">Introduction to Erlang<\/a><\/div><p>Message Passing The communication model (among processes) in Erlang is message passing. No much need to be said about this. Erlang processes share no memory. The way that processes communicate is via (asynchronous) message passing. Every process (even the shell) holds a mailbox queue where incoming messages are placed until received by the process. Message [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"jetpack_post_was_ever_published":false,"jetpack_publicize_message":"","jetpack_is_tweetstorm":false,"jetpack_publicize_feature_enabled":true,"jetpack_social_post_already_shared":false,"jetpack_social_options":{"image_generator_settings":{"template":"highway","enabled":false}}},"categories":[40,51,28],"tags":[70,26,71,42],"jetpack_publicize_connections":[],"jetpack_featured_media_url":"","jetpack_sharing_enabled":true,"jetpack_shortlink":"https:\/\/wp.me\/p1ouW6-9Q","_links":{"self":[{"href":"http:\/\/trigonakis.com\/blog\/wp-json\/wp\/v2\/posts\/610"}],"collection":[{"href":"http:\/\/trigonakis.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/trigonakis.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/trigonakis.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/trigonakis.com\/blog\/wp-json\/wp\/v2\/comments?post=610"}],"version-history":[{"count":2,"href":"http:\/\/trigonakis.com\/blog\/wp-json\/wp\/v2\/posts\/610\/revisions"}],"predecessor-version":[{"id":612,"href":"http:\/\/trigonakis.com\/blog\/wp-json\/wp\/v2\/posts\/610\/revisions\/612"}],"wp:attachment":[{"href":"http:\/\/trigonakis.com\/blog\/wp-json\/wp\/v2\/media?parent=610"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/trigonakis.com\/blog\/wp-json\/wp\/v2\/categories?post=610"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/trigonakis.com\/blog\/wp-json\/wp\/v2\/tags?post=610"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}