Introduction to Erlang : Message Passing

This entry is part 15 of 16 in the series Introduction to Erlang

Message Passing

The communication model (among processes) in Erlang is message passing. No much need to be said about this. Erlang processes share no memory. The way that processes communicate is via (asynchronous) message passing. Every process (even the shell) holds a mailbox queue where incoming messages are placed until received by the process. Message passing is asynchronous because the sending process does not block on send. On the other hand, receiving a message in Erlang is a blocking operation.

Characteristics

In this subsection I will describe some of the characteristics of message passing in Erlang.

Asynchronous

As I already mentioned, message passing in Erlang is a non-blocking operation.

Data Copy

The message’s data are copied from the sending process to the receiver’s message queue, so the receiver gets a fresh copy of the data.

Ordering

Erlang runtime guarantees that if two messages are sent from node A to node B and both are delivered, then the ordering of these messages is kept (causal ordering).

Successful Send

The send operation always succeeds (even if the target is a non-existing process) and evaluates to the data sent. An exception is when trying to send data to a non-existing registered process.

Sending Messages

Erlang uses the exclamation mark (!) as the operator for sending a message. ! is equivalent with the erlang:send/2 function but is more commonly used because of its compactness. The ! operator is used as following

%send message Message to the process with pid Pid
Pid ! Message

and returns the message sent. Returning the message sent allows to send multiple messages “at once”; all

Pid1 ! Message, Pid2 ! Message, Pid3 ! Message
Pid1 ! (Pid2 ! (Pid3 ! Message))
Pid1 ! Pid2 ! Pid3 ! Message

are equivalent.

Example

As I have mentioned before, the shell is nothing more than a process. As a process, it has a message queue. In order to print and empty the shell’s message queue we can use the flush/0 BIF.

1> self() ! any_erlang_term_can_be_sent.
any_erlang_term_can_be_sent %notice that the msg sent
                            %was also returned
2> flush().                             
Shell got any_erlang_term_can_be_sent
ok
3> self() ! [this, is, a, list, 'of', atoms].
[this,is,a,list,'of',atoms]
4> self() ! {this, [is, a], tuple, {'!', '!'}}.
{this,[is,a],tuple,{'!','!'}}
5> self() ! {self(), 123}.
{<0.35.0>,123}
6> flush().
Shell got [this,is,a,list,'of',atoms]
Shell got {this,[is,a],tuple,{'!','!'}}
Shell got {<0.35.0>,123}
ok
7> Pid1 = self(), Pid2 = self(), Pid3 = self().
<0.35.0>
8> Pid1 ! msg, Pid2 ! msg, Pid3 ! msg.
msg
9> flush().
Shell got msg
Shell got msg
Shell got msg
ok
10> Pid1 ! (Pid2 ! (Pid3 ! msg)).      
msg
11> flush().                     
Shell got msg
Shell got msg
Shell got msg
ok
12> Pid1 ! Pid2 ! Pid3 ! msg.    
msg
13> flush().                 
Shell got msg
Shell got msg
Shell got msg
ok

Receiving Messages

Erlang uses pattern matching for receiving messages (same as in function clause selection and the case statement). The receive statement is used to deliver messages from the message queue. Its format is the following:

receive
    Pattern1 when Guard1 ->
	Do1;
    Pattern2 when Guard2 ->
	Do2;
    _Other ->
	Catch_all
end

Notice the similarities of the clauses with the ones in a case statement. The last clause is the “catch all” clause since it will match with any message (that did not match with a previous clause). Of course having a catch all clause is optional.

Receiving Order

The message processing is done in a FIFS (First In – First Served) order. Every incoming message is placed in the tail of the process’ message queue. When a receive statement is met the following processing happens:

  1. The first message (head of the message queue) is pattern matched against the first receive clause. If match, execute the clause’s body, else go to the next step.
  2. The same message is pattern matched against the second (if any) receive clause. If match, execute the clause’s body, else go to the next step.
  3. The same message is pattern matched against the last clause. If match, execute the clause’s body, else go to the next step.
  4. The same iterative process starts again from step 1, but now with the next message from the message queue.

Of course, the message (if any) that is delivered through receive is removed from the message queue.

Examples

Ping Pong

A simple example where 2 processes exchange “ping-pong” messages.

-module(pingpong).
-export([play/1]).
 
play(N) when is_integer(N), N > 0 ->
    Pong = spawn(fun pong/0),
    ping(N, Pong).
 
ping(0,Pong) ->
    Pong ! exit,
    ok;
ping(N, Pong) ->
    Pong ! {self(), ping},
    receive
	pong ->
	    io:format("~w : pong [~w]~n", [self(), N])
    end,
    ping(N - 1, Pong).
 
pong() ->
    receive
	{From, ping} ->
	    io:format("~w : ping~n", [self()]),
	    From ! pong,
	    pong();
	exit ->
	    ok
    end.
“Details”

Some details about the above ping pong implementation are:

  • All receives are non-selective to who the sender is. A message with the correct format, even if send by an “unknown” process, will be always accepted. On the other hand, a selective (to who the sender is) receive would pattern match the message against the sender’s pid. For example, the receive of pong of the “Ping” process could be made selective as {Pong, pong} -> .... In this case, the “Pong” process would have to do its send operation as From ! {self(), pong} in order to be delivered by the “From” (ping) process.
  • The way I designed the “Pong” process it cannot be changed to perform a selective receive (at least until receiving the first message which could be used to fetch the sender’s pid, which could be of course from an “uknown” – external process). If we wanted to use a selective receive, pong/0 could be changed to pong/1, where the argument would be the pid of the “Ping” process.
Example run
1> c(pingpong).
{ok,pingpong}
2> pingpong:play(4).
<0.42.0> : ping
<0.35.0> : pong [4]
<0.42.0> : ping
<0.35.0> : pong [3]
<0.42.0> : ping
<0.35.0> : pong [2]
<0.42.0> : ping
<0.35.0> : pong [1]
ok

Token Ring

This application creates NumNodes processes and arranges them in a ring (every process has one next process). Then the coordinator “inserts” a token in the first node of the ring. Every node (process) receiving the token increases its value by 1 and sends it to the next node. The application stops when the token has value greater than the MAXVAL.

-module(ring).
-export([create/1, node/2, connect/1]).
 
-define(MAXVAL, 100000).
 
%creates the ring's nodes, connects them in a ring, sends the token in the ring, and
%collects the exit messages from the nodes
create(NumNodes) when is_integer(NumNodes), NumNodes > 1 ->
    Nodes = [spawn(?MODULE, node, [ID, self()]) || ID <- lists:seq(1, NumNodes)],
    ring:connect(Nodes),
    hd(Nodes) ! {token, 0},
    getexits(Nodes).
 
%collects the exit messages from the nodes
getexits([]) ->
    io:format("[Coord] Done.~n"),
    ok;
getexits(Nodes) ->
    receive
	{Node, exit} ->
	    case lists:member(Node, Nodes) of
		true ->
		    getexits(lists:delete(Node, Nodes));
		_ ->
		    getexits(Nodes)
	    end
    end.
 
%little trick in order to connect the last with the first node
%handle the [nd0, nd1, ..., ndN] list as [nd0, nd1, ..., ndN, nd0]
connect(N = [H | _]) ->
    connect_(N ++ [H]).
 
%connects the nodes to a ring
connect_([]) ->
    connected;
connect_([_]) ->
    connected;
connect_([N1, N2 | Nodes]) ->
    N1 ! {self(), connect, N2},
    connect_([N2 | Nodes]).
 
%the node function. Initially waits for the next node's pid
node(ID, CrdId) ->
    receive
	{CrdId, connect, NxtNdId} ->
	    io:format("[~p:~p] got my next ~p~n", [ID, self(), NxtNdId]),
	    node(ID, CrdId, NxtNdId)
    end.
 
%the main functionality of a node; receive the token, increase its value and send
%it to the next node on the ring
node(ID, CrdId, NxtNdId) ->
    receive
	{token, Val} ->
	    if
		Val < ?MAXVAL ->
		    NxtNdId ! {token, Val + 1},
		    node(ID, CrdId, NxtNdId);
		true ->
		    io:format("[~p:~p] token value ~p~n", [ID, self(), Val]),
		    case erlang:is_process_alive(NxtNdId) of
			true ->
			    NxtNdId ! {token, Val + 1};
			_ ->
			    ok
		    end,
		    CrdId ! {self(), exit},
		    done
	    end
    end.
Example run
1> c(ring).
{ok,ring}
2> ring:create(5).
[1:<0.42.0>] got my next <0.43.0>
[2:<0.43.0>] got my next <0.44.0>
[3:<0.44.0>] got my next <0.45.0>
[4:<0.45.0>] got my next <0.46.0>
[5:<0.46.0>] got my next <0.42.0>
[1:<0.42.0>] token value 100000
[2:<0.43.0>] token value 100001
[3:<0.44.0>] token value 100002
[4:<0.45.0>] token value 100003
[5:<0.46.0>] token value 100004
[Coord] Done.
ok

Timeout

receive is a blocking statement; it blocks until a message that matches one of the clauses is placed in the incoming message queue. Erlang allows the programmer to explicitly unblock the receive statement using a timeout (if a matching message is not delivered until the timeout expires). The complete format of receive statement, including the after construct, is:

receive
    Pattern1 when Guard1 ->
	Do1;
    Pattern2 when Guard2 ->
	Do2;
    _Other ->
	Catch_all
after Millisecs ->
        Do_timeout;
end
Example
-module(timeout).
-export([start/0, sleep/1]).
 
start() ->
    spawn(fun timeout/0).
 
timeout() ->
    receive
	cancel ->
	    io:format("Timeout canceled~n")
    after 2000 -> % 2 seconds
	    io:format("Timeout triggered~n"),
	    timeout()
    end.
 
%a sleep function
sleep(Ms) ->    
    io:format("Sleeping for ~w ms~n", [Ms]),
    receive
    after Ms ->
	    done
    end.
Run
1> c(timeout).
{ok,timeout}
2> P = timeout:start().
<0.42.0>
Timeout triggered
Timeout triggered
3> P ! cancel.
Timeout canceled
cancel
4> timeout:sleep(1000).
Sleeping for 1000 ms
done
5> timeout:sleep(3000).
Sleeping for 3000 ms
done

Next

The next post will be a step-by-step tutorial on building a slightly bigger application in Erlang. The application will be a (shared) memory system abstraction.

Series NavigationIntroduction to Erlang : Concurrency (Processes)Introduction to Erlang : Shared Memory Example

One Response to “Introduction to Erlang : Message Passing”

Leave a Reply

*

Posts’ Calendar
May 2011
M T W T F S S
« Apr   Jun »
 1
2345678
9101112131415
16171819202122
23242526272829
3031