This section elaborates on the basic objects for composition introduced in Section 1.2: processes and sessions. Specification and composition mechanisms will be discussed in Section 3.
In our underlying model, each document and appliance has a state that consists of a set of value assignments for that given entity's variables. In a reliable distributed system, the states of components should be modified only in systematic ways. For example, only authenticated processes should be permitted to modify the state of a given process (e.g., an appointments calendar). In addition, some processes may have privileges that other processes do not enjoy. For example, processes corresponding to the chair of a meeting may have privileges that processes of ordinary members do not possess. Furthermore, a reliable distributed system will provide safety mechanisms (e.g., a guarantee that disallows two appointments for the same person from being scheduled for exactly the same time).
To enable reliable application development, we encapsulate the state of an entity within a process that manages that entity (i.e., document or appliance), as illustrated in Figure 2. The state can be changed only by servicing requests received from other processes. From an implementation standpoint, each process is a multithreaded persistent Java object that can communicate with other processes using UDP [Pos80].
A process cannot modify the state of another process directly; however, a process P can request a process Q, that Q modify its state in a manner prescribed by P, as illustrated in Figure 3. The kinds of requests that P can make of Q depends on the relationship between P and Q; for instance, if P is Q's boss, then P can make requests that Q's subordinates cannot make. The process structure facilitates communication of requests, and it also supports verification that requestors have appropriate capabilities.
Figure 3: To modify the state of remote process Q,
process P sends requests asynchronously.
A group of cooperating processes (i.e., a session) may be distributed on the Internet, anywhere in the world; in one session, all the processes might be in the same room, and in another session the processes might be on different continents. In some sessions, processes might have to interact with people during the session; for instance, a calendar process might need to get the acquiescence of the owner of the calendar before appointments are set. The time taken by a person to react to a signal from a process can vary significantly. Therefore, the delay between sending a message and the eventual response to the message can vary a great deal. For this reason, asynchronous buffered message-passing mechanisms are used, as illustrated in Figure 3.
Therefore, the underlying communication mechanism is not a synchronized remote procedure call (RPC) [BN84]; a process P cannot modify Q's state by executing an RPC on Q. Rather, a process P can send a message to Q requesting that Q execute an asynchronous (i.e., one-way) RPC, and this message is placed in one of process Q's incoming message queues. Process Q determines how its incoming queues are managed; for instance, it may give priority to one queue over another.
Our model provides a dynamic mechanism that allows processes to create and destroy new input and output queues during sessions, as illustrated in Figure 4.
Figure 4: Process
P creates a new input queue X.
A set of incoming message queues and a set of outgoing message queues is associated with each process. A message queue of a process P is a local object of P; message queues can be created or eliminated just like any other object.
As illustrated in Figure 5, an output message queue can be bound to an arbitrary number of input queues. A message at the head of an output queue is sent to every input queue to which it is bound, after which the message is deleted from the output queue; each input queue gets an identical copy of the message. Assume for the time being that all messages are delivered, even though the actual protocol (discussed briefly later) allows for dropped messages and employs timeouts.
Figure 5: An example of process input and output
queue connections: process P's output queue is bound to process S's
input queue; process Q's output queue is bound to broadcast to the
input queues of processes R, S, and T. Messages are fairly merged
on process S's input queue. Our message-passing mechanism ensures
FIFO delivery of messages on any given channel.
Also as illustrated in Figure 5, an input queue can be bound to an arbitrary number of output queues. Messages from an output queue to an input queue are delivered in the order (i.e., first-in first-out or FIFO) that they are sent along the corresponding channel. The sequence of messages delivered to an input queue is a fair merge of the sequences of messages sent to the input queue from all the output queues to which it is bound.
Messages queues are typed; the type of a message queue specifies exactly which types of messages can be placed in the queue. An output queue is bound to an input queue only if any message type that can appear in the output queue can also appear in the input queue; we discuss more about binding later.
A process may have many input queues. Each input queue restricts the types of messages that can be placed in the queue. A set of processes is associated with each input queue, only the output queues of these processes can be bound to the input queue. In our implementation, this condition is ensured by the binding mechanism provided by our infrastructure [CRS96]. Thus, the infrastructure facilitates control of messages that can be delivered to the input queue of a process. For instance, an input queue may restrict the binding to it to only allow ``manager'' processes.
This set of processes is specified either as an enumerated list or by attributes. Our current design allows the specification to be either a list or ``any,'' but there are more sophisticated schemes that fit our overall plan; for instance, an input queue of type colleague of a person's calendar process can be restricted to be bound only to output queues of calendar processes of people in that person's work group.
A message sent to an input queue from an invalid output queue is not delivered; in our implementation, an exception is thrown in the sending process. Furthermore, only messages sent by processes in a specified set can be placed in the input queue. Our present design does not support security; for instance, it does not prevent a rogue process from pretending to be another process.
One of the methods that can be invoked on an output queue binds the output queue to a set of input queues. Binding an output queue to an input queue sets up a FIFO channel from the output queue to the input queue. Parallel composition among a set of processes can be achieved by binding the input and output queues appropriately; we discuss parallel composition of processes in Section 3.1. Our model provides a dynamic mechanism that allows the bindings of input and output queues to change during sessions, as shown in Figure 6.
Figure 6: In changing the binding of an output queue
to a different set of input queues (or, in this case, to a different
single input queue), a process can perform a quickBind or a safeBind.
Note that the process that has the output queue changes the binding,
using an asynchronous request. The binding mechanism ensures reliable
reconnections when a safeBind is called.
Each input queue has a unique global address. This address is an IP address, socket number, and a local address for the input queue on its host processor. A process can bind one of its output queues to an input queue of another process (or to one of its own input queues). The design has two kinds of bind methods: a quickBind and a safeBind. The quick version does not check the type and access control of the input queue to which it binds; if binding is invalid, an exception is thrown in the sending process when the first invalid message is sent along the channel. The safe version completes the binding only after checking that binding is valid in terms of type and access control, and an exception is thrown if the binding is invalid.
Every message includes in its header the identity of the process that sent the message and the message type. The communications layer delivers the message to an input queue only if the type and access control are valid. Each message is checked at the point of delivery to the input queue, because the access control list for an input queue can be changed at any point in the computation.
In our implementation, a process is a Java program that has a collection of files (persistent storage) and which interacts with other processes by operations on its message queues. Since the program accesses files, it is implemented as a Java application program and not as an applet. The input and output message queues of a process are local objects of the process.
A method on an output queue (a) changes the set of input queues to which the output queue is bound either by binding another input queue using quick or safe binding or by deleting an input queue from the set, or (b) appends a message to the rear of the queue. A method on an input queue (a) changes the access control list for the queue by adding or removing processes from the list, (b) waits until the queue is nonempty and then returns the message at the head of the queue (and deletes this message from the queue), or (c) returns a value indicating whether the queue is empty or nonempty.
A process can do anything a Java program can do; for instance, it can have one thread for each input queue, where each thread waits for a message in its input queue. We do not restrict how a process handles messages or files. We are developing systematic ways for manipulating threads and messages [SC96], analyzing application performance [Rif96], and reasoning about parallel programs [Tho96, Mas97], but these methods are not discussed in this paper.
If each appliance and document attached to the Internet is encapsulated within a process, a computer may have to support hundreds or thousands of persistent processes. Efficiency requirements limit the number of concurrently executing processes on a computer. Our scheduling layer limits the number of concurrently executing processes to those that are active -- i.e., participating in sessions.
A request to a process to participate in a session is sent to the home address of the process where the request is trapped by a scheduler. If the process is already executing, the scheduler passes the message on to the process. If the process is not executing, the scheduler causes the process to execute (forks the process), and then passes it the message.
Mobile processes are dealt with in the following way. Each process has an unchanging ``home'' address. This home address can be found by using search engines on the Web, for instance. The home address includes the unchanging address of an input queue to which requests to participate in sessions are sent. Thus, in phase 1 of session initiation, the initiator sends request messages to permanent home addresses. A member process that agrees to participate in the session replies with the addresses of its input queues; these addresses can be dynamic. The address of an input queue for a mobile process can be on a different processor than its home address. In phase 2, processes bind their output queues to input queue addresses returned in phase 1. For example, in Figure 7, the calendar processes could reside on different machines from their home addresses; when an initiator attempts to set up a session, it performs this two-phase protocol to locate processes and commit them to the session (initiate-and-commit).
The present design requires a process to be immobile during its participation in a session: it cannot change the addresses of its input queues during a session though it can change the addresses after the end of one session and before the start of the next. The design can be extended, by using message indirection for instance, to deal with mobility during a session.
Consider the example from Section 1.1 of a secretary setting up a BOF meeting with members from different sites. Prior to the session, each committee member has installed a calendar process on her or his machine. Each calendar process operates within a single address space, communicates with files by standard I/O operations, and communicates with other calendar processes through communication requests. For the actual implementation, an Internet address is associated with each process.
A session is an instance of an application, implemented as a network of processes. As illustrated in Figure 7, the BOF scheduling session consists of many different types of processes: an initiator process that sets up connections and relays address information, user calendar processes with access to the appointment calendars of individual users, and a secretary process that coordinates the collection of information and the decision and broadcasting of a suitable meeting time. Programs corresponding to each process type are installed on the appropriate machines; for the session in Figure 7, the calendar user processes and secretary process are processes running on their respective users' desktop computers.
Figure 7: The initiator process uses the invoker's
address directory to set up a session between existing calendar and
secretary processes.
Associated with each session is an initial process -- the initiator process -- that is responsible for linking processes together. In the BOF scheduling example, someone (e.g., a person or a person's process) sets up the initiator process. Processes are composed in parallel to form a session in two phases, as follows.
The initiator sends a request to each of the processes in the session's initial membership list; this request is a message asking the recipient to participate in the session. Each session has a unique identity: (process number, sequence number). A member process responds to the request either by refusing to participate or by agreeing to participate. It may refuse to participate because (for instance) its access control does not permit this participation, or because it is already participating in another session, and that session's specification forbids the process from concurrently participating in more than one session. If it agrees to participate, it replies with the (global) addresses of its input queues that are to be connected to the output queues of other processes in the session.
After receiving replies from all member processes (or timing out), the initiator sends a second message to all of the members, informing them either to initiate or to abort the session. A message to initiate the session contains the addresses of the input queues to which each member process is to bind its output queues. A process, on receiving the initiate message, binds its output queues to appropriate input queues and starts its threads, and thus begins its participation in the session. After completing their tasks, the member processes close the session, having each modified their local states.