Integrales por suma de Riemann: MPI & MapReduce

La Suma de Riemann es una técnica de integración numérica para calcular valores de integrales definidas (bajo ciertas condiciones, como la continuidad). Visualmente lo que estamos haciendo es calcular el área por “debajo” de la curva definida por la función en cuestión.

Para calcular esa superficie, este método divide el continuo del eje x en “barras” o rectángulos, como se ve en la imagen:

Riemann_Integration_3Estos rectángulos van desde el eje x hasta la curva, dependiendo el enfoque que se tome (sumas izquierdas, derechas, máximas, mínimas, etc.)

El error en este tipo de sumas es alto respecto a otros métodos, pero mientras las bases de los rectángulos sean más chicas (tiendan a cero), más converge la suma hacia el resultado de la integral.

Un programa en lenguaje C que hace ese cálculo para la función x2 en el intervalo [0;10] con 100 rectángulos (pasos):

#include <stdio.h>
#include <stdlib.h>

#define XINICIAL 0
#define XFINAL 10
#define PASOS 100

float calcularFuncion(float x);

int main (int argc, char *argv[])
{
      float resultado = 0.0;        
      float incremento = ((float)XFINAL-(float)XINICIAL)/(float)PASOS;
      float semiIncremento = incremento/2;
      float x = XINICIAL;
      int i;
      for (i=0;i<PASOS;i++)
      {
         resultado += incremento * (calcularFuncion(x)+calcularFuncion
                                             (x+incremento))/(float)2;
         x+=incremento;
      }

      printf("El resultado de es %f \n",resultado);
}

float calcularFuncion(float x)
{
  return x*x;
}

Un código realmente simple. Si quisiéramos procesarlo en un cluster utilizando MPI (OpenMP) en C, tenemos dos opciones:

  1. Que el nodo master le pase a los workers cada posición x del rectángulo a calcular, el worker calcule todos sus rectángulos asignados, sume sus superficies y devuelva un valor al nodo master.
  2. Que el nodo master divida la curva a asignar en tantas partes como workers disponibles tenga el cluster y les asigne una posición de inicio y final para que cada worker calcule con este método las superficies parciales y devuelva un valor al nodo master.

Veamos una implementación posible en C (con MPI) de la primer opción:

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

#define XINICIAL 0
#define XFINAL 10
#define PASOS 100
#define MASTER 0
#define TAG_X 1
#define TAG_RESULT_PARCIAL 2

float calcularFuncion(float x);

int main (int argc, char *argv[])
{
      int rank, value, size, dest,i,workers;  
      float resultado,incremento,x,resultParcial;
      MPI_Status status; 

      incremento = ((float)XFINAL-(float)XINICIAL)/(float)PASOS;

      MPI_Init( &argc, &argv );

      MPI_Comm_rank( MPI_COMM_WORLD, &rank );
      MPI_Comm_size( MPI_COMM_WORLD, &size ); 

      workers=size-1;

      if (size<2)
      {
    printf("No se puede procesar con %i nodos\n",size);
    return 0;
      }

      if (rank==MASTER)
      {
    dest=1;
    x=XINICIAL;
    // Hacemos un Round Robin
    for (i=0;i<PASOS;i++)
    {
      MPI_Send(&x, 1, MPI_FLOAT, dest, TAG_X, MPI_COMM_WORLD); 
      dest++;
      if (dest==size) dest=1;
      x+=incremento;
    }
    for (i=MASTER+1;i<size;i++)
    {
      MPI_Recv(&resultParcial, 1, MPI_FLOAT, i, TAG_RESULT_PARCIAL, 
                                          MPI_COMM_WORLD, &status); 
      resultado +=resultParcial;
    }

    printf("El resultado de es %f \n",resultado);

 }
 else
 {
    for (i=0; i<PASOS-(((int)(PASOS/workers))*workers)>=rank?
                                          (int)(PASOS/workers)+1:
                                          (int)(PASOS/workers); i++) 
    {
      MPI_Recv(&x, 1, MPI_FLOAT, MASTER, TAG_X, MPI_COMM_WORLD, 
                                                          &status); 
      resultParcial += incremento * (calcularFuncion(x)+
                            calcularFuncion(x+incremento))/(float)2;

    }
    MPI_Send(&resultParcial,1, MPI_FLOAT, MASTER, TAG_RESULT_PARCIAL,
                                                    MPI_COMM_WORLD);
      }      
}

float calcularFuncion(float x)
{
  return x*x;
}

Veamos una implementación posible en C (con MPI) de la segunda opción:

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

#define XINICIAL 0
#define XFINAL 10
#define PASOS 1000
#define MASTER 0
#define TAG_INICIO 1
#define TAG_FIN 2
#define TAG_RESULT_PARCIAL 3

float calcularFuncion(float x);

int main (int argc, char *argv[])
{
      int rank, value, size, i,workers;  
      float resultado,incremento,x,resultParcial,xfinal;
      MPI_Status status; 

      incremento = ((float)XFINAL-(float)XINICIAL)/(float)PASOS;

      MPI_Init( &argc, &argv );

      MPI_Comm_rank( MPI_COMM_WORLD, &rank );
      MPI_Comm_size( MPI_COMM_WORLD, &size ); 
      resultParcial=0;
      workers=size-1;

      if (size<2)
      {
    printf("No se puede procesar con %i nodos\n",size);
    return 0;
      }

      if (rank==MASTER)
      {
    for (i=MASTER+1;i<size;i++)
    {
      x=XINICIAL+(i-1)*XFINAL/(size-1);
      MPI_Send(&x, 1, MPI_FLOAT, i, TAG_INICIO, MPI_COMM_WORLD); 
      xfinal=XINICIAL+i*XFINAL/(size-1);
      MPI_Send(&xfinal, 1, MPI_FLOAT, i, TAG_FIN, MPI_COMM_WORLD); 
    }
    for (i=MASTER+1;i<size;i++)
    {
      MPI_Recv(&resultParcial, 1, MPI_FLOAT, i, TAG_RESULT_PARCIAL, 
                                            MPI_COMM_WORLD, &status); 
      resultado +=resultParcial;
    }
    printf("El resultado de es %f \n",resultado);    
}
else
{
     MPI_Recv(&x, 1, MPI_FLOAT, MASTER, TAG_INICIO, MPI_COMM_WORLD, 
                                                          &status); 
     MPI_Recv(&xfinal, 1, MPI_FLOAT, MASTER, TAG_FIN, MPI_COMM_WORLD, 
                                                          &status); 
     while (x<xfinal)
     {
        resultParcial += incremento * (calcularFuncion(x)+
                           calcularFuncion(x+incremento))/(float)2;
        x+=incremento;
     }
     MPI_Send(&resultParcial,1,MPI_FLOAT, MASTER, TAG_RESULT_PARCIAL
                                                 , MPI_COMM_WORLD);
 }             
}

float calcularFuncion(float x)
{
  return x*x;
}

En términos de consumo de red, parece ser mejor la segunda opción.

A continuación vamos a ver un planteo de este mismo problema implementado con MapReduce de Hadoop en su versión 1. Por motivos de simplicidad no se utilizó el File System distribuido, que es una de las ventajas que tiene este enfoque:

La interfaz IFuncion es la responsable de darles una “cara” consumible a las funciones:

package ar.edu.rdipasquale;

public interface IFuncion {
    public abstract Double calcularY(Double x);
}

La Función x2 se implementa de la siguiente manera:

package ar.edu.rdipasquale;

public class FuncionXCuadrado implements IFuncion{

    public FuncionXCuadrado() {
    }

    @Override
    public Double calcularY(Double x)
    {
        return x*x;
    }
}

La clase que se encarga de configurar el job y ejecutarlo se llama Riemann. Por simplicidad, no utilizará el file system distribuido y generará un archivo con una clave y un valor. La clave será la función (el id de la función) a integrar y el valor será el comienzo del rectángulo en el eje x:

package ar.edu.rdipasquale;

import java.io.PrintWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Riemann extends Configured implements Tool {

    private static final Double XINICIAL=(double)0;
    private static final Double XFINAL=(double)10;
    private static final int PASOS=100;
    private static final int FUNCIONES_A_INTEGRAR=1;

    public int run(String[] args) throws Exception {

        JobConf conf = new JobConf(Riemann.class);
          conf.setJobName("Riemann");      
          conf.setOutputKeyClass(Text.class);
          conf.setOutputValueClass(DoubleWritable.class);

          Double x=XINICIAL;

          // Habría que aprovechar el HDFS en vez del local... 
          PrintWriter out = new PrintWriter("funcionesaintegrar.txt");
          for (int i=0;i<FUNCIONES_A_INTEGRAR;i++)
              for (int j=0;j<PASOS;j++)
              {
                  out.println(String.valueOf(i+1)+";"+
                                                   String.valueOf(x));
                  x=(j+1)*XFINAL/PASOS;
              }
          out.flush();
          out.close();

          conf.setMapperClass(MapRiemann.class);
          conf.setCombinerClass(ReduceRiemann.class);
          conf.setReducerClass(ReduceRiemann.class);

          conf.setInputFormat(TextInputFormat.class);
          conf.setOutputFormat(TextOutputFormat.class);

          FileInputFormat.setInputPaths(conf, 
                                  new Path("funcionesaintegrar.txt"));
          FileOutputFormat.setOutputPath(conf, new Path("salida"));

          conf.set("riemann.funcion", 
                               "ar.edu.rdipasquale.FuncionXCuadrado");
          conf.set("riemann.incremento", 
                             String.valueOf((XFINAL-XINICIAL)/PASOS));
          JobClient.runJob(conf);        
          return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Riemann(), 
                                                               args);
        System.exit(res);
    }
}

El Mapper será la Clase MapRiemann que tomará cada línea del archivo en cuestión y emitirá la superficie de cada rectángulo:

package ar.edu.rdipasquale;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class MapRiemann extends MapReduceBase implements 
                   Mapper<LongWritable, Text, Text, DoubleWritable> 
{
    private IFuncion funcion;
    private Double incremento;

    @Override
    public void configure(JobConf job) {
        try {
            funcion = (IFuncion)Class.forName(
                          job.get("riemann.funcion")).newInstance();
            incremento=Double.parseDouble(
                                     job.get("riemann.incremento"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void map(LongWritable arg0, Text value,
            OutputCollector<Text, DoubleWritable> output, 
            Reporter arg3)
            throws IOException {

        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line,";");
        String funcionAIntegrar=tokenizer.nextToken();
        Double valor=Double.parseDouble(tokenizer.nextToken());
        output.collect(new Text(funcionAIntegrar), 
               new DoubleWritable(incremento*
                           (funcion.calcularY(valor)+
                           funcion.calcularY(valor+incremento))/2));
    }
}

El Reducer está implementado en la clase ReduceRiemann y se encargará de sumar todas las superficies calculadas por los mappers (para cada función, obviamente, no mezclará peras con manzanas):

package ar.edu.rdipasquale;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class ReduceRiemann extends MapReduceBase implements 
                  Reducer<Text, DoubleWritable, Text, DoubleWritable> 
{
    @Override
    public void reduce(Text key, Iterator<DoubleWritable> values,
            OutputCollector<Text, DoubleWritable> output, 
            Reporter arg3)
            throws IOException {

      double sum = 0;
      while (values.hasNext()) 
          sum += values.next().get();
      output.collect(key, new DoubleWritable(sum));                
    }   
}

Como combiner se utiliza la misma clase utilizada como reducer para poder optimizar la performance realizando algunas agregaciones locales, lo que puede observarse en la configuración del job en la clase Riemann.

Sería tonto hacer una comparativa de performance entre los dos enfoques, ya que son herramientas que se utilizan para fines distintos.

El nivel de performance en el cálculo en la plataforma C+MPI es probablemente inalcanzable.

La plataforma MapReduce (en Hadoop) se va a destacar en trabajos orientados a grandes volúmenes de datos, donde minimizará el costo de red frente a la plataforma MPI.

Por otro lado la plataforma MPI se aplica a lenguajes de programación general, y si bien MapReduce puede utilizarse en C++, Java, Python y otros, los algoritmos deben expresarse en términos de concatenación de Mappers y Reducers, lo cual agrega una complejidad, entre las que está la no existencia de un estado, es decir, que son implementaciones stateless (esto en algoritmos de grafos puede llevar a dolores de cabeza, o a soluciones por debajo del óptimo en términos de performance).

A favor del enfoque MapReduce vamos a decir que es tolerante a fallos, en una buena instalación, en un cluster lo suficientemente grande y bien administrado, se nos puede romper un nodo en la mitad del proceso y el mismo finalizará sin ningún problema. Eso es muy difícil de alcanzar con C-MPI. Asimismo el nivel de seguimiento de las tareas está administrado por la herramienta provista por Hadoop, de modo que el monitoreo ya viene de fábrica.

En un clúster heterogéneo, las soluciones que brindamos en C serían tan performantes como el nodo menos performante del cluster, ya que asignamos la carga de manera lineal. En cambio en MapReduce la carga es administrada por el framework, de modo que asignará a cada uno según sus necesidades y les pedirá según sus posibilidades (Ahí está el verdadero Socialismo del siglo XXI en los clusters de computadoras).

Anuncios

¿Y entonces, para qué GA con MapReduce?

MapReduce es un framework para procesamiento de grandes volúmenes de datos en paralelo. Google tiene una patente sobre el concepto en EEUU, aunque sería discutible, ya que está inspirado en las operaciones Map y Reduce de Lisp y otros lenguajes funcionales.

El enfoque llamado BigData (también acuñado por Google) habla de manejar grandes volúmenes de datos. Los grandes volúmenes de datos no asustan a los motores de bases de datos relacionales tradicionales (RDBMS), lo que si acaba por dejarlos inutilizados es la complejidad de las relaciones… Facebook no podría montar una red social en forma de grafo de orden 5000 (cantidad máxima de amigos) eficientemente si sólo dependiera de RDBMS.

MapReduce podría ser el nivel más bajo de lenguaje utilizado para explotar estas bases de datos. No es conveniente hacer comparaciones, como muchos autores han hecho con herramientas OLAP, ya que es lo mismo que comparar un Audi Quattro R8 con el termo Lumilagro con el que estoy cebando mate. No hay mejor, ni peor… No tienen nada que ver, no me imagino extrayendo agua del radiador del Audi para hacer mate ni viajar a 300km/h con mi termo (salvo en caída libre, lo dicho, tiene ventajas el termo).

Lo que si nos interesa es que podemos escribir software, demostrar su correctitud y completitud (si no usamos lo que se llama combiners) y ponerlo a ejecutar en forma paralela sin atender demasiado al mecanismo de paralelismo. Eso si, tenemos que pensar nuestros algoritmos en términos de pipelines de operaciones map y reduce… Si sirve de consuelo, peor les va a los muchachos de la computación cuántica…

Hay otros métodos de ejecución en paralelo que permiten un manejo “más” nativo de los algoritmos como tantas implementaciones MPI que existen, pero requieren un conocimiento más exhaustivo sobre la arquitectura en la que están corriendo, no son agnósticos de la plataforma.

Por otro lado, existen varias implementaciones de MapReduce (tanto comerciales como libres) y existen en el mundo centenares de clusters MapReduce en funcionamiento. De hecho, herramientas como Hadoop (la elegida para hacer mi implementación por simplicidad, documentación y licencia). Por tanto expresar nuestro software en términos de MapReduce, habilita que lo podamos correr en la nube (ya sea comercial o académica), o que construyamos un clúster a partir de hardware obsoleto. Incluso si queremos invertir en hardware, es claro que es mucho más barato comprar 100 dispositivos de X procesamiento que uno de 100X procesamiento. Incluso se están utilizando las placas de video para paralelizar operaciones (lo que entiendo que es explotable en Hadoop).

Sobre paralelismo en algoritmos genéticos (de ahora en más, PGA), hay ríos de tinta escritos, investigaciones e implementaciones desde hace 30 años. Sobre implementaciones con MapReduce, muy poco:

Genetic Algorithms by using MapReduce 

Fei Teng y Doga Tuncay de la Universidad de Indiana proponen implementar el problema OneMax (maximizar los 1 en una cadena de bits) sobre Hadoop y Twister demostrando mayor performance con la implementación de la iteratividad con Twister.

Parallelization of genetic algorithms using Hadoop Map-Reduce

Dino Kečo y Abdulhamit Subasi de la Universidad Burch de Sarajevo también abordan el problema de implementar una solución para OneMax en MapReduce (con Hadoop), focalizándose en cómo encajar el paralelismo que necesita la solución específica de este problema con GAs en el esquema de MapReduce.

An Extension of MapReduce for Parallelizing Genetic Algorithms

Chao Jin, Christian Vecchiola y Rajkumar Buyya de la Universidad de Melbourne utilizaron una grila de procesamiento Microsoft (con Aneka) para construir una extensión del modelo de procesamiento MapReduce para que encaje mejor con los problemas que hay para compatibilizar los PGA con MapReduce. Parece una buena solución que se aparta de la norma y requiere de la presencia de este nuevo producto en los clúster MapReduce en vez de las implementaciones standard.

Scaling Populations of a Genetic Algorithm for Job Shop Scheduling Problems using MapReduce

Di Wei Huang y Jimmy Lin de la Universidad de Maryland se propusieron demostrar que para solucionar Job Shop Scheduling Problem con GA conviene agrandar el tamaño de las poblaciones y para eso utilizaron MapReduce con excelentes resultados. La implementación fue específica e iterativa para este problema, por lo que no es muy general, pero la conclusión es asombrosa. Llegaron a operar con poblaciones de 107 individuos. E incluso concluyen que la cantidad de generaciones necesarias se disminuye notablemente con este enfoque.

A Parallel Genetic Algorithm Based on Hadoop MapReduce for the Automatic Generation of JUnit Test Suites

Linda Di Geronimo, Filomena Ferrucci, Alfonso Murolo y Federica Sarro de la Universidad de Salerno proponen utilizar PGA para generar suites de pruebas unitarias que maximicen ciertas métricas de pruebas unitarias (como branch coverage, code coverage y otras).

Scaling Simple and Compact Genetic Algorithms using MapReduce

Abhishek Verma, Xavier Llora, David E. Goldberg y Roy H. Campbell de la Universidad de Illinois proponen estudiar las perspectivas de escalabilidad que puede ofrecer la implementación de PGA con MapReduce. Para eso toman una implementación simple (la más simple posible) de algoritmo genético y realizan experimentos con hasta 108 variables.