The communication model (among processes) in Erlang is message passing. No much need to be said about this. Erlang processes share no memory. The way that processes communicate is via (asynchronous) message passing. Every process (even the shell) holds a mailbox queue where incoming messages are placed until received by the process. Message passing is asynchronous because the sending process does not block on send. On the other hand, receiving a message in Erlang is a blocking operation.
Characteristics
In this subsection I will describe some of the characteristics of message passing in Erlang.
Asynchronous
As I already mentioned, message passing in Erlang is a non-blocking operation.
Data Copy
The message’s data are copied from the sending process to the receiver’s message queue, so the receiver gets a fresh copy of the data.
Ordering
Erlang runtime guarantees that if two messages are sent from node A to node B and both are delivered, then the ordering of these messages is kept (causal ordering).
Successful Send
The send operation always succeeds (even if the target is a non-existing process) and evaluates to the data sent. An exception is when trying to send data to a non-existing registered process.
Sending Messages
Erlang uses the exclamation mark (!) as the operator for sending a message. ! is equivalent with the erlang:send/2 function but is more commonly used because of its compactness. The ! operator is used as following
%send message Message to the process with pid PidPid!Message
%send message Message to the process with pid Pid
Pid ! Message
and returns the message sent. Returning the message sent allows to send multiple messages “at once”; all
As I have mentioned before, the shell is nothing more than a process. As a process, it has a message queue. In order to print and empty the shell’s message queue we can use the flush/0 BIF.
1>self()! any_erlang_term_can_be_sent.
any_erlang_term_can_be_sent %notice that the msg sent%was also returned2>flush().Shell got any_erlang_term_can_be_sent
ok
3>self()![this, is, a,list,'of', atoms].[this,is,a,list,'of',atoms]4>self()!{this,[is, a], tuple,{'!','!'}}.{this,[is,a],tuple,{'!','!'}}5>self()!{self(),123}.{<0.35.0>,123}6>flush().Shell got [this,is,a,list,'of',atoms]Shell got {this,[is,a],tuple,{'!','!'}}Shell got {<0.35.0>,123}
ok
7>Pid1=self(),Pid2=self(),Pid3=self().<0.35.0>8>Pid1! msg,Pid2! msg,Pid3! msg.
msg
9>flush().Shell got msg
Shell got msg
Shell got msg
ok
10>Pid1!(Pid2!(Pid3! msg)).
msg
11>flush().Shell got msg
Shell got msg
Shell got msg
ok
12>Pid1!Pid2!Pid3! msg.
msg
13>flush().Shell got msg
Shell got msg
Shell got msg
ok
Erlang uses pattern matching for receiving messages (same as in function clause selection and the case statement). The receive statement is used to deliver messages from the message queue. Its format is the following:
receive
Pattern1 when Guard1 ->
Do1;
Pattern2 when Guard2 ->
Do2;
_Other ->
Catch_all
end
Notice the similarities of the clauses with the ones in a case statement. The last clause is the “catch all” clause since it will match with any message (that did not match with a previous clause). Of course having a catch all clause is optional.
Receiving Order
The message processing is done in a FIFS (First In – First Served) order. Every incoming message is placed in the tail of the process’ message queue. When a receive statement is met the following processing happens:
The first message (head of the message queue) is pattern matched against the first receive clause. If match, execute the clause’s body, else go to the next step.
The same message is pattern matched against the second (if any) receive clause. If match, execute the clause’s body, else go to the next step.
…
The same message is pattern matched against the last clause. If match, execute the clause’s body, else go to the next step.
The same iterative process starts again from step 1, but now with the next message from the message queue.
Of course, the message (if any) that is delivered through receive is removed from the message queue.
Examples
Ping Pong
A simple example where 2 processes exchange “ping-pong” messages.
Some details about the above ping pong implementation are:
All receives are non-selective to who the sender is. A message with the correct format, even if send by an “unknown” process, will be always accepted. On the other hand, a selective (to who the sender is) receive would pattern match the message against the sender’s pid. For example, the receive of pong of the “Ping” process could be made selective as {Pong, pong} -> .... In this case, the “Pong” process would have to do its send operation as From ! {self(), pong} in order to be delivered by the “From” (ping) process.
The way I designed the “Pong” process it cannot be changed to perform a selective receive (at least until receiving the first message which could be used to fetch the sender’s pid, which could be of course from an “uknown” – external process). If we wanted to use a selective receive, pong/0 could be changed to pong/1, where the argument would be the pid of the “Ping” process.
This application creates NumNodes processes and arranges them in a ring (every process has one next process). Then the coordinator “inserts” a token in the first node of the ring. Every node (process) receiving the token increases its value by 1 and sends it to the next node. The application stops when the token has value greater than the MAXVAL.
-module(ring).-export([create/1,node/2, connect/1]).-define(MAXVAL,100000).%creates the ring's nodes, connects them in a ring, sends the token in the ring, and%collects the exit messages from the nodescreate(NumNodes)whenis_integer(NumNodes),NumNodes>1->Nodes=[spawn(?MODULE,node,[ID,self()])||ID<-lists:seq(1,NumNodes)],
ring:connect(Nodes),hd(Nodes)!{token,0},getexits(Nodes).%collects the exit messages from the nodesgetexits([])->io:format("[Coord] Done.~n"),
ok;getexits(Nodes)->receive{Node,exit}->caselists:member(Node,Nodes)of
true ->getexits(lists:delete(Node,Nodes));_->getexits(Nodes)endend.%little trick in order to connect the last with the first node%handle the [nd0, nd1, ..., ndN] list as [nd0, nd1, ..., ndN, nd0]connect(N=[H | _])->connect_(N++[H]).%connects the nodes to a ringconnect_([])->connected;connect_([_])->connected;connect_([N1,N2 | Nodes])->N1!{self(), connect,N2},connect_([N2 | Nodes]).%the node function. Initially waits for the next node's pidnode(ID,CrdId)->receive{CrdId, connect,NxtNdId}->io:format("[~p:~p] got my next ~p~n",[ID,self(),NxtNdId]),node(ID,CrdId,NxtNdId)end.%the main functionality of a node; receive the token, increase its value and send%it to the next node on the ringnode(ID,CrdId,NxtNdId)->receive{token,Val}->ifVal< ?MAXVAL->NxtNdId!{token,Val+1},node(ID,CrdId,NxtNdId);
true ->io:format("[~p:~p] token value ~p~n",[ID,self(),Val]),caseerlang:is_process_alive(NxtNdId)of
true ->NxtNdId!{token,Val+1};_->okend,CrdId!{self(),exit},
done
endend.
-module(ring).
-export([create/1, node/2, connect/1]).
-define(MAXVAL, 100000).
%creates the ring's nodes, connects them in a ring, sends the token in the ring, and
%collects the exit messages from the nodes
create(NumNodes) when is_integer(NumNodes), NumNodes > 1 ->
Nodes = [spawn(?MODULE, node, [ID, self()]) || ID <- lists:seq(1, NumNodes)],
ring:connect(Nodes),
hd(Nodes) ! {token, 0},
getexits(Nodes).
%collects the exit messages from the nodes
getexits([]) ->
io:format("[Coord] Done.~n"),
ok;
getexits(Nodes) ->
receive
{Node, exit} ->
case lists:member(Node, Nodes) of
true ->
getexits(lists:delete(Node, Nodes));
_ ->
getexits(Nodes)
end
end.
%little trick in order to connect the last with the first node
%handle the [nd0, nd1, ..., ndN] list as [nd0, nd1, ..., ndN, nd0]
connect(N = [H | _]) ->
connect_(N ++ [H]).
%connects the nodes to a ring
connect_([]) ->
connected;
connect_([_]) ->
connected;
connect_([N1, N2 | Nodes]) ->
N1 ! {self(), connect, N2},
connect_([N2 | Nodes]).
%the node function. Initially waits for the next node's pid
node(ID, CrdId) ->
receive
{CrdId, connect, NxtNdId} ->
io:format("[~p:~p] got my next ~p~n", [ID, self(), NxtNdId]),
node(ID, CrdId, NxtNdId)
end.
%the main functionality of a node; receive the token, increase its value and send
%it to the next node on the ring
node(ID, CrdId, NxtNdId) ->
receive
{token, Val} ->
if
Val < ?MAXVAL ->
NxtNdId ! {token, Val + 1},
node(ID, CrdId, NxtNdId);
true ->
io:format("[~p:~p] token value ~p~n", [ID, self(), Val]),
case erlang:is_process_alive(NxtNdId) of
true ->
NxtNdId ! {token, Val + 1};
_ ->
ok
end,
CrdId ! {self(), exit},
done
end
end.
Example run
1>c(ring).{ok,ring}2> ring:create(5).[1:<0.42.0>] got my next <0.43.0>[2:<0.43.0>] got my next <0.44.0>[3:<0.44.0>] got my next <0.45.0>[4:<0.45.0>] got my next <0.46.0>[5:<0.46.0>] got my next <0.42.0>[1:<0.42.0>] token value 100000[2:<0.43.0>] token value 100001[3:<0.44.0>] token value 100002[4:<0.45.0>] token value 100003[5:<0.46.0>] token value 100004[Coord]Done.
ok
1> c(ring).
{ok,ring}
2> ring:create(5).
[1:<0.42.0>] got my next <0.43.0>
[2:<0.43.0>] got my next <0.44.0>
[3:<0.44.0>] got my next <0.45.0>
[4:<0.45.0>] got my next <0.46.0>
[5:<0.46.0>] got my next <0.42.0>
[1:<0.42.0>] token value 100000
[2:<0.43.0>] token value 100001
[3:<0.44.0>] token value 100002
[4:<0.45.0>] token value 100003
[5:<0.46.0>] token value 100004
[Coord] Done.
ok
Timeout
receive is a blocking statement; it blocks until a message that matches one of the clauses is placed in the incoming message queue. Erlang allows the programmer to explicitly unblock the receive statement using a timeout (if a matching message is not delivered until the timeout expires). The complete format of receive statement, including the after construct, is:
-module(timeout).
-export([start/0, sleep/1]).
start() ->
spawn(fun timeout/0).
timeout() ->
receive
cancel ->
io:format("Timeout canceled~n")
after 2000 -> % 2 seconds
io:format("Timeout triggered~n"),
timeout()
end.
%a sleep function
sleep(Ms) ->
io:format("Sleeping for ~w ms~n", [Ms]),
receive
after Ms ->
done
end.
Run
1>c(timeout).{ok,timeout}2>P=timeout:start().<0.42.0>Timeout triggered
Timeout triggered
3>P! cancel.Timeout canceled
cancel
4>timeout:sleep(1000).Sleeping for 1000 ms
done
5>timeout:sleep(3000).Sleeping for 3000 ms
done
1> c(timeout).
{ok,timeout}
2> P = timeout:start().
<0.42.0>
Timeout triggered
Timeout triggered
3> P ! cancel.
Timeout canceled
cancel
4> timeout:sleep(1000).
Sleeping for 1000 ms
done
5> timeout:sleep(3000).
Sleeping for 3000 ms
done
Next
The next post will be a step-by-step tutorial on building a slightly bigger application in Erlang. The application will be a (shared) memory system abstraction.
One Response to “Introduction to Erlang : Message Passing”
[…] http://trigonakis.com/blog/2011/05/26/introduction-to-erlang-message-passing/ […]