Integrales por suma de Riemann: MPI & MapReduce

La Suma de Riemann es una técnica de integración numérica para calcular valores de integrales definidas (bajo ciertas condiciones, como la continuidad). Visualmente lo que estamos haciendo es calcular el área por “debajo” de la curva definida por la función en cuestión.

Para calcular esa superficie, este método divide el continuo del eje x en “barras” o rectángulos, como se ve en la imagen:

Riemann_Integration_3Estos rectángulos van desde el eje x hasta la curva, dependiendo el enfoque que se tome (sumas izquierdas, derechas, máximas, mínimas, etc.)

El error en este tipo de sumas es alto respecto a otros métodos, pero mientras las bases de los rectángulos sean más chicas (tiendan a cero), más converge la suma hacia el resultado de la integral.

Un programa en lenguaje C que hace ese cálculo para la función x2 en el intervalo [0;10] con 100 rectángulos (pasos):

#include <stdio.h>
#include <stdlib.h>

#define XINICIAL 0
#define XFINAL 10
#define PASOS 100

float calcularFuncion(float x);

int main (int argc, char *argv[])
{
      float resultado = 0.0;        
      float incremento = ((float)XFINAL-(float)XINICIAL)/(float)PASOS;
      float semiIncremento = incremento/2;
      float x = XINICIAL;
      int i;
      for (i=0;i<PASOS;i++)
      {
         resultado += incremento * (calcularFuncion(x)+calcularFuncion
                                             (x+incremento))/(float)2;
         x+=incremento;
      }

      printf("El resultado de es %f \n",resultado);
}

float calcularFuncion(float x)
{
  return x*x;
}

Un código realmente simple. Si quisiéramos procesarlo en un cluster utilizando MPI (OpenMP) en C, tenemos dos opciones:

  1. Que el nodo master le pase a los workers cada posición x del rectángulo a calcular, el worker calcule todos sus rectángulos asignados, sume sus superficies y devuelva un valor al nodo master.
  2. Que el nodo master divida la curva a asignar en tantas partes como workers disponibles tenga el cluster y les asigne una posición de inicio y final para que cada worker calcule con este método las superficies parciales y devuelva un valor al nodo master.

Veamos una implementación posible en C (con MPI) de la primer opción:

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

#define XINICIAL 0
#define XFINAL 10
#define PASOS 100
#define MASTER 0
#define TAG_X 1
#define TAG_RESULT_PARCIAL 2

float calcularFuncion(float x);

int main (int argc, char *argv[])
{
      int rank, value, size, dest,i,workers;  
      float resultado,incremento,x,resultParcial;
      MPI_Status status; 

      incremento = ((float)XFINAL-(float)XINICIAL)/(float)PASOS;

      MPI_Init( &argc, &argv );

      MPI_Comm_rank( MPI_COMM_WORLD, &rank );
      MPI_Comm_size( MPI_COMM_WORLD, &size ); 

      workers=size-1;

      if (size<2)
      {
    printf("No se puede procesar con %i nodos\n",size);
    return 0;
      }

      if (rank==MASTER)
      {
    dest=1;
    x=XINICIAL;
    // Hacemos un Round Robin
    for (i=0;i<PASOS;i++)
    {
      MPI_Send(&x, 1, MPI_FLOAT, dest, TAG_X, MPI_COMM_WORLD); 
      dest++;
      if (dest==size) dest=1;
      x+=incremento;
    }
    for (i=MASTER+1;i<size;i++)
    {
      MPI_Recv(&resultParcial, 1, MPI_FLOAT, i, TAG_RESULT_PARCIAL, 
                                          MPI_COMM_WORLD, &status); 
      resultado +=resultParcial;
    }

    printf("El resultado de es %f \n",resultado);

 }
 else
 {
    for (i=0; i<PASOS-(((int)(PASOS/workers))*workers)>=rank?
                                          (int)(PASOS/workers)+1:
                                          (int)(PASOS/workers); i++) 
    {
      MPI_Recv(&x, 1, MPI_FLOAT, MASTER, TAG_X, MPI_COMM_WORLD, 
                                                          &status); 
      resultParcial += incremento * (calcularFuncion(x)+
                            calcularFuncion(x+incremento))/(float)2;

    }
    MPI_Send(&resultParcial,1, MPI_FLOAT, MASTER, TAG_RESULT_PARCIAL,
                                                    MPI_COMM_WORLD);
      }      
}

float calcularFuncion(float x)
{
  return x*x;
}

Veamos una implementación posible en C (con MPI) de la segunda opción:

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

#define XINICIAL 0
#define XFINAL 10
#define PASOS 1000
#define MASTER 0
#define TAG_INICIO 1
#define TAG_FIN 2
#define TAG_RESULT_PARCIAL 3

float calcularFuncion(float x);

int main (int argc, char *argv[])
{
      int rank, value, size, i,workers;  
      float resultado,incremento,x,resultParcial,xfinal;
      MPI_Status status; 

      incremento = ((float)XFINAL-(float)XINICIAL)/(float)PASOS;

      MPI_Init( &argc, &argv );

      MPI_Comm_rank( MPI_COMM_WORLD, &rank );
      MPI_Comm_size( MPI_COMM_WORLD, &size ); 
      resultParcial=0;
      workers=size-1;

      if (size<2)
      {
    printf("No se puede procesar con %i nodos\n",size);
    return 0;
      }

      if (rank==MASTER)
      {
    for (i=MASTER+1;i<size;i++)
    {
      x=XINICIAL+(i-1)*XFINAL/(size-1);
      MPI_Send(&x, 1, MPI_FLOAT, i, TAG_INICIO, MPI_COMM_WORLD); 
      xfinal=XINICIAL+i*XFINAL/(size-1);
      MPI_Send(&xfinal, 1, MPI_FLOAT, i, TAG_FIN, MPI_COMM_WORLD); 
    }
    for (i=MASTER+1;i<size;i++)
    {
      MPI_Recv(&resultParcial, 1, MPI_FLOAT, i, TAG_RESULT_PARCIAL, 
                                            MPI_COMM_WORLD, &status); 
      resultado +=resultParcial;
    }
    printf("El resultado de es %f \n",resultado);    
}
else
{
     MPI_Recv(&x, 1, MPI_FLOAT, MASTER, TAG_INICIO, MPI_COMM_WORLD, 
                                                          &status); 
     MPI_Recv(&xfinal, 1, MPI_FLOAT, MASTER, TAG_FIN, MPI_COMM_WORLD, 
                                                          &status); 
     while (x<xfinal)
     {
        resultParcial += incremento * (calcularFuncion(x)+
                           calcularFuncion(x+incremento))/(float)2;
        x+=incremento;
     }
     MPI_Send(&resultParcial,1,MPI_FLOAT, MASTER, TAG_RESULT_PARCIAL
                                                 , MPI_COMM_WORLD);
 }             
}

float calcularFuncion(float x)
{
  return x*x;
}

En términos de consumo de red, parece ser mejor la segunda opción.

A continuación vamos a ver un planteo de este mismo problema implementado con MapReduce de Hadoop en su versión 1. Por motivos de simplicidad no se utilizó el File System distribuido, que es una de las ventajas que tiene este enfoque:

La interfaz IFuncion es la responsable de darles una “cara” consumible a las funciones:

package ar.edu.rdipasquale;

public interface IFuncion {
    public abstract Double calcularY(Double x);
}

La Función x2 se implementa de la siguiente manera:

package ar.edu.rdipasquale;

public class FuncionXCuadrado implements IFuncion{

    public FuncionXCuadrado() {
    }

    @Override
    public Double calcularY(Double x)
    {
        return x*x;
    }
}

La clase que se encarga de configurar el job y ejecutarlo se llama Riemann. Por simplicidad, no utilizará el file system distribuido y generará un archivo con una clave y un valor. La clave será la función (el id de la función) a integrar y el valor será el comienzo del rectángulo en el eje x:

package ar.edu.rdipasquale;

import java.io.PrintWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Riemann extends Configured implements Tool {

    private static final Double XINICIAL=(double)0;
    private static final Double XFINAL=(double)10;
    private static final int PASOS=100;
    private static final int FUNCIONES_A_INTEGRAR=1;

    public int run(String[] args) throws Exception {

        JobConf conf = new JobConf(Riemann.class);
          conf.setJobName("Riemann");      
          conf.setOutputKeyClass(Text.class);
          conf.setOutputValueClass(DoubleWritable.class);

          Double x=XINICIAL;

          // Habría que aprovechar el HDFS en vez del local... 
          PrintWriter out = new PrintWriter("funcionesaintegrar.txt");
          for (int i=0;i<FUNCIONES_A_INTEGRAR;i++)
              for (int j=0;j<PASOS;j++)
              {
                  out.println(String.valueOf(i+1)+";"+
                                                   String.valueOf(x));
                  x=(j+1)*XFINAL/PASOS;
              }
          out.flush();
          out.close();

          conf.setMapperClass(MapRiemann.class);
          conf.setCombinerClass(ReduceRiemann.class);
          conf.setReducerClass(ReduceRiemann.class);

          conf.setInputFormat(TextInputFormat.class);
          conf.setOutputFormat(TextOutputFormat.class);

          FileInputFormat.setInputPaths(conf, 
                                  new Path("funcionesaintegrar.txt"));
          FileOutputFormat.setOutputPath(conf, new Path("salida"));

          conf.set("riemann.funcion", 
                               "ar.edu.rdipasquale.FuncionXCuadrado");
          conf.set("riemann.incremento", 
                             String.valueOf((XFINAL-XINICIAL)/PASOS));
          JobClient.runJob(conf);        
          return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Riemann(), 
                                                               args);
        System.exit(res);
    }
}

El Mapper será la Clase MapRiemann que tomará cada línea del archivo en cuestión y emitirá la superficie de cada rectángulo:

package ar.edu.rdipasquale;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class MapRiemann extends MapReduceBase implements 
                   Mapper<LongWritable, Text, Text, DoubleWritable> 
{
    private IFuncion funcion;
    private Double incremento;

    @Override
    public void configure(JobConf job) {
        try {
            funcion = (IFuncion)Class.forName(
                          job.get("riemann.funcion")).newInstance();
            incremento=Double.parseDouble(
                                     job.get("riemann.incremento"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void map(LongWritable arg0, Text value,
            OutputCollector<Text, DoubleWritable> output, 
            Reporter arg3)
            throws IOException {

        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line,";");
        String funcionAIntegrar=tokenizer.nextToken();
        Double valor=Double.parseDouble(tokenizer.nextToken());
        output.collect(new Text(funcionAIntegrar), 
               new DoubleWritable(incremento*
                           (funcion.calcularY(valor)+
                           funcion.calcularY(valor+incremento))/2));
    }
}

El Reducer está implementado en la clase ReduceRiemann y se encargará de sumar todas las superficies calculadas por los mappers (para cada función, obviamente, no mezclará peras con manzanas):

package ar.edu.rdipasquale;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class ReduceRiemann extends MapReduceBase implements 
                  Reducer<Text, DoubleWritable, Text, DoubleWritable> 
{
    @Override
    public void reduce(Text key, Iterator<DoubleWritable> values,
            OutputCollector<Text, DoubleWritable> output, 
            Reporter arg3)
            throws IOException {

      double sum = 0;
      while (values.hasNext()) 
          sum += values.next().get();
      output.collect(key, new DoubleWritable(sum));                
    }   
}

Como combiner se utiliza la misma clase utilizada como reducer para poder optimizar la performance realizando algunas agregaciones locales, lo que puede observarse en la configuración del job en la clase Riemann.

Sería tonto hacer una comparativa de performance entre los dos enfoques, ya que son herramientas que se utilizan para fines distintos.

El nivel de performance en el cálculo en la plataforma C+MPI es probablemente inalcanzable.

La plataforma MapReduce (en Hadoop) se va a destacar en trabajos orientados a grandes volúmenes de datos, donde minimizará el costo de red frente a la plataforma MPI.

Por otro lado la plataforma MPI se aplica a lenguajes de programación general, y si bien MapReduce puede utilizarse en C++, Java, Python y otros, los algoritmos deben expresarse en términos de concatenación de Mappers y Reducers, lo cual agrega una complejidad, entre las que está la no existencia de un estado, es decir, que son implementaciones stateless (esto en algoritmos de grafos puede llevar a dolores de cabeza, o a soluciones por debajo del óptimo en términos de performance).

A favor del enfoque MapReduce vamos a decir que es tolerante a fallos, en una buena instalación, en un cluster lo suficientemente grande y bien administrado, se nos puede romper un nodo en la mitad del proceso y el mismo finalizará sin ningún problema. Eso es muy difícil de alcanzar con C-MPI. Asimismo el nivel de seguimiento de las tareas está administrado por la herramienta provista por Hadoop, de modo que el monitoreo ya viene de fábrica.

En un clúster heterogéneo, las soluciones que brindamos en C serían tan performantes como el nodo menos performante del cluster, ya que asignamos la carga de manera lineal. En cambio en MapReduce la carga es administrada por el framework, de modo que asignará a cada uno según sus necesidades y les pedirá según sus posibilidades (Ahí está el verdadero Socialismo del siglo XXI en los clusters de computadoras).

Anuncios

Deje un comentario

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión / Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión / Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión / Cambiar )

Google+ photo

Estás comentando usando tu cuenta de Google+. Cerrar sesión / Cambiar )

Conectando a %s