The Bulk Synchronous Parallel model for graph processing

The BSP Model

The Bulk Synchronous Parallel (BSP) is a model for designing parallel algorithms. The BSP model was developed by Leslie Valiant of Harvard University during the 1980s and published in 1990.

Computation proceeds as a sequence of iterations, called supersteps. Each superstep involves a phase of parallel computations, that emits messages. Messages are routed to their final destination on the cluster and are ready for effective delivery at the beginning of the next superstep.

There is a barrier between consecutive supersteps. This means that:

  1. The messages sent in any current superstep get delivered to the destination only in the next superstep
  2. Computations of the next superstep starts only after every computation of the current superstep have completed.

Values are retained across barriers. In graph terms, this means that the value of any vertex or edge attribute at the beginning of a superstep is equal to the corresponding value at the end of the previous superstep.

For graph processing, the computation unit is either a vertex or an edge of the graph. The messages are sent towards either vertices or edges of the graph. Some implementation of BSP are called vertex-centric when the parallel computations are performed on the vertices of the graph, and the messages are sent to vertices. Other implementation, like GraphX, are edge-centric: the parallel computations are performed on the edges of the graph.

The computation phase of a superstep is a parallel invocation of a user defined program on every active vertex or edge of the graph. A vertex or edge is inactive if it has itself transitioned to the inactive state. It transitions to the active state upon reception of a message.

The user defined program usually processes the messages sent to an individual vertex or edge, updates some values associated with the vertex or the edge, and then send messages to other vertices or edges in the graph. Actual implementations of the BSP model for graph processing limit the set of possible recipients to the neighborood of each vertex.

Practical implementations of BSP for Graph processing

Giraph

Giraph is a vertex-centric implementation of BSP for graph processing. In this platform, processing is done on each active vertex of the graph, and messages are sent to other vertices of the graph. Giraph is written in Java, vertex processing is specified by subclassing the org.apache.giraph.graph.BasicComputation class and overriding the compute() method.

The compute() function does all the processing, first initializing the value associated to the vertices at the first superstep, and processing vertices by updating their value when they first receive a message.

An example implementation of the Breadth First Search (BFS) algorithm with Giraph is the following:

public class SimpleBFS
        extends
    BasicComputation<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
    // A computation in which vertex ids are Long, vertex data are Double,
    // edge data are Float and BSP messages are made of one Double number.

    // the source vertex of the search
    Vertex<LongWritable, DoubleWritable, FloatWritable> srcVertex;
    // Helper function: send messages to all the neigbhor vertices
    private void SendMessages(Vertex<LongWritable, DoubleWritable, FloatWritable> vertex) {
        for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
            sendMessage(edge.getTargetVertexId(), new DoubleWritable(1));
        }
    }

    @Override
    public void compute(Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
            Iterable messages) throws IOException {
        if (getSuperstep() == 0) {
            // 0 for the source vertex
            if (vertex ==  srcVertex) {
                vertex.setValue(new DoubleWritable(0));
                SendMessages(vertex);
            }
            else {
                vertex.setValue(new DoubleWritable(Integer.MAX_VALUE));
            }
        }
        else {
            if (vertex.getValue().get() == Integer.MAX_VALUE) {
                vertex.setValue(new DoubleWritable(getSuperstep()));
                SendMessages(vertex);
            }
            vertex.voteToHalt();  // request to be inactive.
        }
    }
}

At the first superstep, values associated with the vertices are initialized: the source vertex receive 0, all the other are given the largest possible number. On all subsequent superstep, a vertex is only processed one time, when its current attribute is still Integer.MAX_VALUE. In this case, its attribute is set to the current superstep number which is, for the BFS algorithm, its distance from the source vertex.

At the end of all superstep except the first one, all vertices call the voteToHalt() function to set themselves as inactive. They will stay in this state until they receive a message: if they already have been processed at a previous superstep, this new message will have no effect, otherwise it will be used to update their attribute value.

GraphX

GraphX is written in Scala and includes an edge-centric implementation of the Bulk Synchronous Parallel model. It is modeled after the Pregel implementation developped by Google. Parallel computation of the supersteps are done on each edge of the graph, and messages are sent to vertices.

Contrary to Giraph, there is not a unique program run in parallel on each edge, but a set of three user-defined programs:

  • the first one, called the vertex program is in charge of updating the vertex attribute from the received messages;
  • the second one is in charge of sending messages to other vertices : the possible recipients are the two vertices connected by the current processed edge;
  • the third one is used to implement any commutative and associative operation to merge two messages in one. This helps to reduce the number of messages to handle and process.

A possible implementation of BFS with GraphX is:

// working copy of the input graph, put 0 to the source vertex only
val bfsGraph : Graph[ Int, ED ] = graph.mapVertices(
        (vid, attr) => if (vid == root) 0 else Int.MaxValue )

// pregel with messages of type Int
bfsGraph.pregel[ Int ](initialMsg = Int.MaxValue) (
    // vertex program
    (vid : VertexId, attr : Int, msg : Int) => Math.min(msg, attr),
    // sendMsg: send to dst vertex if not yet processed
    (edgeT : EdgeTriplet[Int, ED]) =>
        if (edgeT.srcAttr != Int.MaxValue &&
            edgeT.dstAttr == Int.MaxValue)
            Iterator( (edgeT.dstId, edgeT.srcAttr + 1) )
        else Iterator.empty,
    // mergeMsg
    Math.min(_, _) )

Contrary to Giraph, the BSP/Pregel implementation of GraphX does not give any mean to know the current superstep count. However, it gives the possibility to specify an initial message sent to all vertices, prior to the first superstep. For the BFS algorithm, this initial message is useless, it is easier to initialize the graph with the correct value before processing it. Because the initial message with the value Int.MaxValue is sent to all vertices, the vertex program has to take care to not remove any previously set attribute value.

The parallel computations of GraphX are done on the edges, giving access to both the source and destination vertex. This allows to process the same graph and considering it either as a directed or undirected graph. In this BFS example, the graph is considered as directed, and edges are taken in their natural order, from the vertex registered as source to the destination one. With a small change in the code, the edges could be traversed in the reverse direction, propagating the messages from the destination vertex to the source vertex, without changing the way the graph is represented in memory by GraphX.

Leave a Reply

Your email address will not be published.