Conclusiones de Paralelizar GA con MPI

Habiendo implementado un algoritmo genético simple (en particular una minimización de la función valle de Rosenbrock) con el fin de utilizarlo como benchmark, puedo destacar ciertas conclusiones.

El método de paralelización utilizado fue el básico para estos casos, es decir, el monolítico en términos de algoritmos genéticos, o sea, es decir existirá una sola población global que evoluciona. Este mecanismo no es el mejor desde el punto de vista del paralelismo, pero la intención es utilizarlo como benchmark, por lo que pude relajar la exigencia.

Las operaciones que se paralelizaron fueron:

  • Evaluación del Fitness
  • Crossover y Mutación

Mientras que las operaciones que quedaron seriales fueron:

  • Inicialización
  • Selección

No se utilizó una condición de optimalidad, ya que la idea fue implementar algo comparativo, por lo que se forzó siempre la ejecución de 100.000 generaciones, aunque en la mayoría de los casos, antes de las 2.000 generaciones se había alcanzado un valor más que aceptable.

Por otro lado, dado que la representación para los cromosomas fue la clásica tira de bits, no se representó un contínuo, sino una discretización del mismo, quedando excluido el punto en donde se halla el mínimo global de la función (1,1). De esta manera el algoritmo explora una franja con forma de banana que se da en la representación tridimensional de la función donde se encuentran los valores mínimos de la misma.

Por tanto debe quedar claro que el objetivo no fue optimizar la función de Rosenbrock, sino implementar un algoritmo genético paralelo comparable con fines de benchmark.

La parametrización de las corridas ha sido la siguiente:

POBLACION_SIZE 200
MAX_GENERACIONES 100000
CROSSOVER_PROBABILITY 0,65
MUTATION_PROBABILITY 0,035

El resultado arrojado por la versión serial en un dual core (2Ghz) con 2Gb RAM fue:

Operación Tiempo en ms %
Inicializacion 0 0,00%
Computo del Fitness 728085 78,69%
Operaciones Geneticas (Cross+Mut+Reempl) 191311 20,68%
Seleccion 5880 0,64%
925276

Cabe destacar que una corrida serial en un procesador i7 con 8Gb de RAM toma 70 segundos (aproximadamente el 8% respecto a la corrida descripta).

Las corridas paralelas han arrojado los siguientes resultados:

Nodos Workers Tiempo Total Inicializacion Computo del Fitness Operaciones Geneticas Seleccion
1 0 925276 0 728085 191311 5880
2 1 457484 0,077 347019 104263 6085
3 2 548708 0,082 349236 192929 6432
4 3 502676 0,082 318536 177568 6458
5 4 489597 0,081 288175 194847 6445
6 5 494720 0,116 281966 206180 6464
7 6 349025 0,082 206147 136326 6432
8 7 349025 0,082 206147 136326 6432
9 8 294204 0,077 171204 116748 6137
10 9 286247 0,077 162048 117613 6471
11 10 407975 0,078 236078 165234 6494
12 11 468207 0,078 267160 194435 6449
13 12 582009 0,078 340445 234938 6439

TiemposPorWorker

Con respecto a los tiempos de las operaciones genéticas en paralelo vemos que no se evidencia un incremento en la performance que sea beneficioso en términos reales. Esto puede deberse a que las operaciones genéticas, en este caso, son operaciones a nivel de bits, que suelen ser sumamente eficientes a nivel de procesamiento; y que, por otro lado, implican una transferencia de datos importantes: se envían 2 individuos (en este caso cabe recordar que cada individuo consiste en un arreglo de 13 bytes) y se devuelven 2 individuos, por tanto cada operación implica una transferencia de 52 bytes entre master y worker. Por tanto, la mejora en términos de procesamiento no compite con el consumo de recursos de red provocado por esta estrategia.

En cambio, en la evaluación del fitness se da otro escenario, favorable al paralelismo. En este caso, por cada individuo se envían 13 bytes y se reciben 4 bytes. En esta operación, el consumo de CPU tiene un peso más significativo que el overhead provocado por la utilización de la red (hasta cierto punto). Con la presente configuración, se encontró un mínimo tiempo de procesamiento con 10 nodos (9 workers) para la evaluación del fitness, como para el tiempo global, de una magnitud de 4,5 y 3,23 veces respectivamente.

Como conclusión general se puede extraer:

  • En GA simples, las operaciones genéticas consumen poco procesador respecto al consumo de red de enviar 2 individuos y recibir otros dos individuos.
  • La operación que más fácilmente se paraleliza es el cálculo del fitness.
  • La paralelización es muy sensible a los parámetros: Cambiará mucho si el tamaño del individuo se multiplica por 100 (vuelva totalmente inviable la paralelización de las operaciones genéticas), o si el tamaño de la población se multiplica, podríamos empezar a obtener buenos resultados paralelizando las operaciones genéticas. Otro dato sensible es el consumo de procesador durante la evaluación del fitness. Si este consumo se multiplica por 100, el rendimiento del paralelismo incrementará, dado que es la función más fácilmente paralelizable.
  • El ancho de banda es un impedimento para la escalabilidad de la solución en paralelo con MPI (GA simple y monolítico). Más aún si el GA consume o genera datos centralizados.

Ansi-C, Eclipse y Math.h

Escribiendo un algoritmo genético simple de benchmark para el cluster (una función de Rosenbrock) en ANSI-C, MPI con Eclipse me topé con un problema… Resulta que al utilizar la función pow de la librería Math, muchas veces (y no siempre) el compilador arrojaba undefined reference to ‘pow’.

Después de mucho pensarlo, parecía que el criterio era que cuando utilizaba una variable (y no un literal) entero en el segundo miembro de la función… Igual, tratando de encontrar una relación de causa efecto me topé con una correlación… Inútil como casi toda mi erudicción.

Googleando me di cuenta que hay un problema por el que el linker no incluye a la librería math. Entonces intenté agregar el parámetro -lm al compilador sin mucho éxito (Eclipse). Hasta que me encontré con el amigo Dystopia que dio en el clavo.

Configurando un Cluster

He configurado un cluster casero con 4 nodos, las versiones del software son estables, quizás no sean las últimas. Lo único que lamento, es no haber podido configurar Hadoop serie 2 con YARN… La próxima será. Las versiones del software que configuré son:

Luego de bajar los paquetes, los pasos para la instalación y configuración son los siguientes:

    • Primero vamos a crear un usuario en todos los nodos para que corra hadoop:
      sudo useradd -d /home/hadoopuser -m hadoopuser
      sudo passwd hadoopuser
    • Y vamos a agregarlo a un grupo con tal de darle permisos de lectura, escritura y ejecución en los recursos de Hadoop.
    • Vamos a sanear los nombres de host en todos los servidores y la unificación de /etc/hosts si no contamos con un DNS. Esto parece trivial, pero es con lo que más dolores de cabeza nos da Hadoop.
    • Crear un directorio para la instalación de Hadoop, al que setearemos en la variable de entorno HADOOP_HOME
    • Descomprimir allí dentro el tar de hadoop (tar xfz hadoop-xxxx)
    • Verificar que esté definida la variable JAVA_HOME
    • Verificar la configuración de NTP en el cluster.
    • Agregar al Path del directorio bin de Hadoop.
    • Crear un directorio para los logs de hadoop.
    • Verificar loopback en cada máquina:
      sudo /etc/hosts

      Verificar que haya 2 entradas a 127.0.0.1 (localhost y el nombre de la máquina) ya que ubuntu suele usar el 127.0.1.1

    • Crear directorios data, log, data/mapred/local/ , data/mapred/system , data/namenode y data/datanode dentro de la estructura de Hadoop.
    • Hay que habilitar en el namenode la posibilidad de que se comunique vía ssh a todos los esclavos sin tener que utilizar una password. Para eso hay un buen tutorial aquí. Desde el namenode (hacer su al usuario que va a correr hadoop):
      ssh-keygen -t rsa
      ssh-copy-id hadoopuser@remotemachine (esto para cada nodo)
    • Hacer ssh a cada nodo para ver que pueda acceder sin necesidad de poner pwd.
    • Editar dentro del directorio conf (HADOOP_CONF_DIR), hdfs-site.xml y setear el directorio donde se almacenará los datos del namenode (namespace y transaction log), y agregar la lista (separada por comas) de directorios (todos previamente creados) donde se almacenan los datos. Usé 32Mb de blocksize para el file system distribuido… El default es 128Mb, me parece mucho para un fin académico. Por ejemplo:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
     <property>
         <name>dfs.name.dir</name>
         <value>/opt/hadoop/hadoop-1.1.2/data/namenode</value>
     </property>
     <property>
         <name>dfs.blocksize</name>
         <value>33554432</value>
     </property>
     <property>
         <name>dfs.data.dir</name>
         <value>/opt/hadoop/hadoop-1.1.2/data/datanode</value>
     </property>
</configuration>
    • En mapred-site.xml se configura la ubicación del job.tracker (en nuestro caso es el mimo equipo que el namenode):
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
  <property> 
    <name>mapred.job.tracker</name> 
    <value>192.168.1.33:9001</value> 
  </property>
  <property> 
    <name>mapred.system.dir</name>
    <value>/opt/hadoop/hadoop-1.1.2/data/mapred/system/</value> 
  </property>
  <property>
    <name>mapred.local.dir</name>
    <value>/opt/hadoop/hadoop-1.1.2/data/mapred/local/</value> 
</property>
</configuration>
        • Configuración del archivo slaves. Este es como el machinefiles de MPI. Una lista de todos los workers.
        • Configuración del archivo masters. Contiene una lista de todos los hosts masters (en nuestro caso, hay uno sólo).
        • Configuramos el archivo taskcontroller.cfg:
mapred.local.dir=/opt/hadoop/hadoop-1.1.2/mapredlocaldir
hadoop.log.dir=/opt/hadoop/hadoop-1.1.2/log
mapred.tasktracker.tasks.sleeptime-before-sigkill=60
mapreduce.tasktracker.group=hadoopuser
      • El archivo core-site.xml configura los parámetros del file system distribuido:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
  <property>
         <name>fs.default.name</name>
         <value>hdfs://192.168.1.33:9000</value>
  </property>
</configuration>
      • Sincronizar Hadoop con todos los clientes
      • Ahora configuramos ZooKeeper
      • Dentro de la estructura de Hadoop, armamos un directorio para su deployment y descomprimimos su tarball.
      • Creamos los siguientes directorios (dentro de la estructura):
sudo mkdir -p /usr/local/ZooKeeper/data 
sudo mkdir -p /usr/local/ZooKeeper/datalog 
sudo chown hadoopuser -R /usr/local/ZooKeeper
sudo mkdir /usr/local/ZooKeeper/var/datalog 
sudo chown hadoopuser /usr/local/ZooKeeper/var/datalog
      • Vamos a instalarlo en el master de forma standalone, pero en un cluster serio debería haber corriendo 3 copias.
      • Dentro del /conf de zookeeper, editar java.env y setear JAVA_HOME y ZK_HOME, por ejemplo:
JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 
ZK_HOME=/opt/hadoop/zookeeper-3.4.5
      • Copiar zoo_sample.cfg a zoo.cfg y cambiar el contenido de modo que quede de la siguiente manera:
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/usr/local/ZooKeeper/var/data
dataLogDir=/usr/local/ZooKeeper/var/datalog
# the port at which the clients will connect
clientPort=2181
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.0=192.168.1.33:2181
      • Sincronizar zookeeper con los clientes.
      • En el master, en /usr/local/ZooKeeper/data crear un file myid conteniendo un “1”.
      • Para ver si levanta, en el /bin del server hacemos ./zkServer.sh start para conectarnos desde los clientes, $ZK_HOME/bin/zkCli.sh -server master1:2181 y allí, ls / y finalmente para bajarlo, zkServer.sh stop en el server.
      • Ahora vamos a configurar HBase, para lo que descomprimimos el tarball dentro de la estructura de Hadoop, en un directorio especial para tal fin.
      • HBase debiera correr con un usuario distinto que hadoop, pero esta no es una instalación tan securitizada, por lo que se utilizará el mismo usuario que en hadoop.
      • Hay que formatear el HDFS:
hdfs namenode -format
      • En /conf de HBase vamos a configurar los regionservers (en nuestro caso son iguales que los datanodes) en un file llamado regionservers conteniendo una lista de los servidores para tal fin.
      • En hbase-env.sh configuramos las variables de entorno que hagan falta, como por ejemplo:
export HBASE_OPTS="-XX:+UseConcMarkSweepGC"
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
export HADOOP_CONF_DIR=/opt/hadoop/hadoop-1.1.2/conf
export HBASE_MANAGES_ZK=false
export HADOOP_HOME=/opt/hadoop/hadoop-1.1.2
export HBASE_HOME=/opt/hadoop/hbase-0.92.2
      • Las configuraciones más importantes ocurren en hbase-site.xml:
<configuration>
  <property> 
    <name>hbase.rootdir</name> 
    <value>hdfs://192.168.1.33:9000/hbase</value> 
  </property> 
  <property> 
    <name>hbase.cluster.distributed</name> 
    <value>true</value> 
  </property> 
     <property> 
         <name>hbase.tmp.dir</name> 
         <value>/tmp</value> 
     </property> 
     <property> 
         <name>hbase.zookeeper.quorum</name> 
         <value>192.168.1.33</value> 
     </property> 
</configuration>
        • Verificar si el límite de archivos abiertos es 1024:
           ulimit -n

          Verificar la cantidad de procesos (unlimitted):

           ulimit

          Cambiar el límite:

          sudo nano /etc/security/limits.conf
          hadoop user soft nofile 65535
          hadoop user hard nofile 65535
          hadoop user soft nproc 32000
          hadoop user hard nproc 32000

          Copiar las librerias correctas:

          rm /opt/hadoop/hbase-0.92.2/lib/hadoop-core-*.jar
          rm /opt/hadoop/hbase-0.92.2/lib/zookeeper-*.jar
          cp /opt/hadoop/hadoop-1.1.2/hadoop-core-*.jar /opt/hadoop/hbase-0.92.2/lib/
          cp /opt/hadoop/hadoop-1.1.2/lib/commons-configuration-1.6.jar /opt/hadoop/hbase-0.92.2/lib/
          cp /opt/hadoop/zookeeper-3.4.5/zookeeper-*.jar /opt/hadoop/hbase-0.92.2/lib/
        • Sincronizar con los regionservers.
        • Para levantar todo, armé un script único:
sudo su - hadoopuser -c "/opt/hadoop/hadoop-1.1.2/bin/start-all.sh"
sudo su - hadoopuser -c "/opt/hadoop/zookeeper-3.4.5/bin/zkServer.sh start"
sudo su - hadoopuser -c "/opt/hadoop/hbase-0.92.2/bin/start-hbase.sh"
    • Para bajar todo, armé también un script:
sudo su - hadoopuser -c "/opt/hadoop/hbase-0.92.2/bin/stop-hbase.sh"
sudo su - hadoopuser -c "/opt/hadoop/zookeeper-3.4.5/bin/zkServer.sh stop"
sudo su - hadoopuser -c "/opt/hadoop/hadoop-1.1.2/bin/stop-all.sh"

Algunas notas sobre “Scaling Genetic Algorithms using MapReduce”

En esta publicación, los autores (Abhishek Verma, Xavier Llor`, David E. Goldberg y Roy H. Campbell) porponen la utilización del framework MapReduce (en particular con la implementación de Hadoop) para implementar GA paralelos. Fijan posición en que la implementación tradicional basada en MPI requiere mucho conocimiento sobre la máquina donde se monta, y que, por el contrario, MapReduce permite un nivel de escalabilidad que no introduce ningún cuello de botella.

En particular, se enfocan en las variantes tradicionales y sencilla de los GA para brindar una implementación de referencia.

De los pasos del algoritmo clásico genético, la evaluación de los fitness tiene una analogía directa con la función Map, la cuál evaluará el fitness del individuo y guardará track (en un counter) del mejor individuo, finalmente persistiéndolo en el HDFS, el proceso cliente (el que inició la tarea MapReduce) chequea recurrentemente en este archivo si se ha alcanzado el criterio de convergencia.

Para poder implementar la selección distribuida, hay que tener control o mantener estado global, lo cual es un tanto complejo en MapReduce, por lo que el único componente que puede realizar este tipo de tarea es el Partitioner. El problema puntual por el cual no conviene utilizar la implementación por defecto es que durante la etapa de Shuffling (el momento en el que el framework reparte, mezcla y distribuye las emisiones de los mappers a los reducers) es porque utiliza una función de hash sobre la key (módulo la cantidad de reducers) para distribuir la carga, generando inconvenientes de convergencia del GA. Por otro lado, mientras el algoritmo genético pogrese, muchos individuos serán iguales, por lo que serán tratados por el mismo reducer, con la reducción de performance que esto implicaría. Es por eso que los autores utilizaron un partitioner especial que en vez de utilizar un hash, utiliza un número pseudoaleatorio cuya semilla es (mapperId * time).

El mecanismo de selección que han utilizado es el de torneo, ya que es el único que puede implementarse con facilidad en MapReduce, ya que la ruleta implicaría el mantenimiento de un estado global y un par de iteraciones más. Para esto han decidido tomar individuos al azar y ver quien es el ganador n veces (n es el tamaño de la población) del torneo para someterlo a la cruza. La operación de crossover se realiza en conjunto con la selección de dos individuos.

El tipo de crossover que utilizaron es el uniforme.

Cuando las poblaciones son muy grandes, la inicialización serial de los individuos tomó mucho tiempo, por lo que decidieron agregar una tanda de procesamiento map reduce en el que el map genera un individuo al azar y el reduce es trivial.

Las medidas de performance que realizaron fueron hechas con la implementación de un problema trivial utilizado para benchmark llamado OneMax Problem que consiste en encontrar la máxima cantidad de 1’s en los cromosomas de los individuos, es decir, un individuo tal que todos sus alelos sean 1’s. El análisis realizado arrojó que:

  • Análisis de convergencia: Para 104 variables, converge en 220 iteraciones con un promedio de 149 segundos por iteración.
  • Escalabilidad con carga constante por nodo: Incrementando la cantidad de recursos, no cambia el tiempo de procesamiento por iteración.
  • Escalabilidad con carga constante general: Fijaron la cantidad de variables en 50.000 e incrementaron el número de mappers, logrando una reducción del tiempo de iteración. Aunque el speed-up general queda limitado por la ley de Amdahl (“la mejora obtenida en el rendimiento de un sistema debido a la alteración de uno de sus componentes está limitada por la fracción de tiempo que se utiliza dicho componente”) dado el overhead de Hadoop (unos 10 segundos para iniciar y terminar un job).
  • Escalabilidad con incrementos en el tamaño del problema: Utilizaron el máximo de recursos y fueron incrementando el número de variables, logrando un incremento razonable con incrementos de población superlineales (n log n).

El gran inconveniente de trabajar con MPI en clústers standard es la poca tolerancia a fallos del framework. MapReduce no tiene este inconveniente. Introducen ciertas críticas a la implementación llamada MRPGA respecto a la escalabilidad.

Otros enfoques han tratado de modificar MapReduce para que se parezca a algo más consumible desde los GA, lo interesante de este trabajo es que no han intentado martillar MapReduce, sino todo lo contrario.

Como línea de investigación proponen comparar rendimientos con MPI y demostrar la importancia de GA escalables en las aplicaciones prácticas.

Por otro lado cabe destacar que no se proveen detalles de la implementación a bajo nivel, de modo que mucho se puede conjeturar sobre la implementación específica que han realizado, y que no se han aventurado a explorar alternativas un poco más complejas que el GA básico.

Integrales por suma de Riemann: MPI & MapReduce

La Suma de Riemann es una técnica de integración numérica para calcular valores de integrales definidas (bajo ciertas condiciones, como la continuidad). Visualmente lo que estamos haciendo es calcular el área por “debajo” de la curva definida por la función en cuestión.

Para calcular esa superficie, este método divide el continuo del eje x en “barras” o rectángulos, como se ve en la imagen:

Riemann_Integration_3Estos rectángulos van desde el eje x hasta la curva, dependiendo el enfoque que se tome (sumas izquierdas, derechas, máximas, mínimas, etc.)

El error en este tipo de sumas es alto respecto a otros métodos, pero mientras las bases de los rectángulos sean más chicas (tiendan a cero), más converge la suma hacia el resultado de la integral.

Un programa en lenguaje C que hace ese cálculo para la función x2 en el intervalo [0;10] con 100 rectángulos (pasos):

#include <stdio.h>
#include <stdlib.h>

#define XINICIAL 0
#define XFINAL 10
#define PASOS 100

float calcularFuncion(float x);

int main (int argc, char *argv[])
{
      float resultado = 0.0;        
      float incremento = ((float)XFINAL-(float)XINICIAL)/(float)PASOS;
      float semiIncremento = incremento/2;
      float x = XINICIAL;
      int i;
      for (i=0;i<PASOS;i++)
      {
         resultado += incremento * (calcularFuncion(x)+calcularFuncion
                                             (x+incremento))/(float)2;
         x+=incremento;
      }

      printf("El resultado de es %f \n",resultado);
}

float calcularFuncion(float x)
{
  return x*x;
}

Un código realmente simple. Si quisiéramos procesarlo en un cluster utilizando MPI (OpenMP) en C, tenemos dos opciones:

  1. Que el nodo master le pase a los workers cada posición x del rectángulo a calcular, el worker calcule todos sus rectángulos asignados, sume sus superficies y devuelva un valor al nodo master.
  2. Que el nodo master divida la curva a asignar en tantas partes como workers disponibles tenga el cluster y les asigne una posición de inicio y final para que cada worker calcule con este método las superficies parciales y devuelva un valor al nodo master.

Veamos una implementación posible en C (con MPI) de la primer opción:

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

#define XINICIAL 0
#define XFINAL 10
#define PASOS 100
#define MASTER 0
#define TAG_X 1
#define TAG_RESULT_PARCIAL 2

float calcularFuncion(float x);

int main (int argc, char *argv[])
{
      int rank, value, size, dest,i,workers;  
      float resultado,incremento,x,resultParcial;
      MPI_Status status; 

      incremento = ((float)XFINAL-(float)XINICIAL)/(float)PASOS;

      MPI_Init( &argc, &argv );

      MPI_Comm_rank( MPI_COMM_WORLD, &rank );
      MPI_Comm_size( MPI_COMM_WORLD, &size ); 

      workers=size-1;

      if (size<2)
      {
    printf("No se puede procesar con %i nodos\n",size);
    return 0;
      }

      if (rank==MASTER)
      {
    dest=1;
    x=XINICIAL;
    // Hacemos un Round Robin
    for (i=0;i<PASOS;i++)
    {
      MPI_Send(&x, 1, MPI_FLOAT, dest, TAG_X, MPI_COMM_WORLD); 
      dest++;
      if (dest==size) dest=1;
      x+=incremento;
    }
    for (i=MASTER+1;i<size;i++)
    {
      MPI_Recv(&resultParcial, 1, MPI_FLOAT, i, TAG_RESULT_PARCIAL, 
                                          MPI_COMM_WORLD, &status); 
      resultado +=resultParcial;
    }

    printf("El resultado de es %f \n",resultado);

 }
 else
 {
    for (i=0; i<PASOS-(((int)(PASOS/workers))*workers)>=rank?
                                          (int)(PASOS/workers)+1:
                                          (int)(PASOS/workers); i++) 
    {
      MPI_Recv(&x, 1, MPI_FLOAT, MASTER, TAG_X, MPI_COMM_WORLD, 
                                                          &status); 
      resultParcial += incremento * (calcularFuncion(x)+
                            calcularFuncion(x+incremento))/(float)2;

    }
    MPI_Send(&resultParcial,1, MPI_FLOAT, MASTER, TAG_RESULT_PARCIAL,
                                                    MPI_COMM_WORLD);
      }      
}

float calcularFuncion(float x)
{
  return x*x;
}

Veamos una implementación posible en C (con MPI) de la segunda opción:

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

#define XINICIAL 0
#define XFINAL 10
#define PASOS 1000
#define MASTER 0
#define TAG_INICIO 1
#define TAG_FIN 2
#define TAG_RESULT_PARCIAL 3

float calcularFuncion(float x);

int main (int argc, char *argv[])
{
      int rank, value, size, i,workers;  
      float resultado,incremento,x,resultParcial,xfinal;
      MPI_Status status; 

      incremento = ((float)XFINAL-(float)XINICIAL)/(float)PASOS;

      MPI_Init( &argc, &argv );

      MPI_Comm_rank( MPI_COMM_WORLD, &rank );
      MPI_Comm_size( MPI_COMM_WORLD, &size ); 
      resultParcial=0;
      workers=size-1;

      if (size<2)
      {
    printf("No se puede procesar con %i nodos\n",size);
    return 0;
      }

      if (rank==MASTER)
      {
    for (i=MASTER+1;i<size;i++)
    {
      x=XINICIAL+(i-1)*XFINAL/(size-1);
      MPI_Send(&x, 1, MPI_FLOAT, i, TAG_INICIO, MPI_COMM_WORLD); 
      xfinal=XINICIAL+i*XFINAL/(size-1);
      MPI_Send(&xfinal, 1, MPI_FLOAT, i, TAG_FIN, MPI_COMM_WORLD); 
    }
    for (i=MASTER+1;i<size;i++)
    {
      MPI_Recv(&resultParcial, 1, MPI_FLOAT, i, TAG_RESULT_PARCIAL, 
                                            MPI_COMM_WORLD, &status); 
      resultado +=resultParcial;
    }
    printf("El resultado de es %f \n",resultado);    
}
else
{
     MPI_Recv(&x, 1, MPI_FLOAT, MASTER, TAG_INICIO, MPI_COMM_WORLD, 
                                                          &status); 
     MPI_Recv(&xfinal, 1, MPI_FLOAT, MASTER, TAG_FIN, MPI_COMM_WORLD, 
                                                          &status); 
     while (x<xfinal)
     {
        resultParcial += incremento * (calcularFuncion(x)+
                           calcularFuncion(x+incremento))/(float)2;
        x+=incremento;
     }
     MPI_Send(&resultParcial,1,MPI_FLOAT, MASTER, TAG_RESULT_PARCIAL
                                                 , MPI_COMM_WORLD);
 }             
}

float calcularFuncion(float x)
{
  return x*x;
}

En términos de consumo de red, parece ser mejor la segunda opción.

A continuación vamos a ver un planteo de este mismo problema implementado con MapReduce de Hadoop en su versión 1. Por motivos de simplicidad no se utilizó el File System distribuido, que es una de las ventajas que tiene este enfoque:

La interfaz IFuncion es la responsable de darles una “cara” consumible a las funciones:

package ar.edu.rdipasquale;

public interface IFuncion {
    public abstract Double calcularY(Double x);
}

La Función x2 se implementa de la siguiente manera:

package ar.edu.rdipasquale;

public class FuncionXCuadrado implements IFuncion{

    public FuncionXCuadrado() {
    }

    @Override
    public Double calcularY(Double x)
    {
        return x*x;
    }
}

La clase que se encarga de configurar el job y ejecutarlo se llama Riemann. Por simplicidad, no utilizará el file system distribuido y generará un archivo con una clave y un valor. La clave será la función (el id de la función) a integrar y el valor será el comienzo del rectángulo en el eje x:

package ar.edu.rdipasquale;

import java.io.PrintWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Riemann extends Configured implements Tool {

    private static final Double XINICIAL=(double)0;
    private static final Double XFINAL=(double)10;
    private static final int PASOS=100;
    private static final int FUNCIONES_A_INTEGRAR=1;

    public int run(String[] args) throws Exception {

        JobConf conf = new JobConf(Riemann.class);
          conf.setJobName("Riemann");      
          conf.setOutputKeyClass(Text.class);
          conf.setOutputValueClass(DoubleWritable.class);

          Double x=XINICIAL;

          // Habría que aprovechar el HDFS en vez del local... 
          PrintWriter out = new PrintWriter("funcionesaintegrar.txt");
          for (int i=0;i<FUNCIONES_A_INTEGRAR;i++)
              for (int j=0;j<PASOS;j++)
              {
                  out.println(String.valueOf(i+1)+";"+
                                                   String.valueOf(x));
                  x=(j+1)*XFINAL/PASOS;
              }
          out.flush();
          out.close();

          conf.setMapperClass(MapRiemann.class);
          conf.setCombinerClass(ReduceRiemann.class);
          conf.setReducerClass(ReduceRiemann.class);

          conf.setInputFormat(TextInputFormat.class);
          conf.setOutputFormat(TextOutputFormat.class);

          FileInputFormat.setInputPaths(conf, 
                                  new Path("funcionesaintegrar.txt"));
          FileOutputFormat.setOutputPath(conf, new Path("salida"));

          conf.set("riemann.funcion", 
                               "ar.edu.rdipasquale.FuncionXCuadrado");
          conf.set("riemann.incremento", 
                             String.valueOf((XFINAL-XINICIAL)/PASOS));
          JobClient.runJob(conf);        
          return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Riemann(), 
                                                               args);
        System.exit(res);
    }
}

El Mapper será la Clase MapRiemann que tomará cada línea del archivo en cuestión y emitirá la superficie de cada rectángulo:

package ar.edu.rdipasquale;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class MapRiemann extends MapReduceBase implements 
                   Mapper<LongWritable, Text, Text, DoubleWritable> 
{
    private IFuncion funcion;
    private Double incremento;

    @Override
    public void configure(JobConf job) {
        try {
            funcion = (IFuncion)Class.forName(
                          job.get("riemann.funcion")).newInstance();
            incremento=Double.parseDouble(
                                     job.get("riemann.incremento"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void map(LongWritable arg0, Text value,
            OutputCollector<Text, DoubleWritable> output, 
            Reporter arg3)
            throws IOException {

        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line,";");
        String funcionAIntegrar=tokenizer.nextToken();
        Double valor=Double.parseDouble(tokenizer.nextToken());
        output.collect(new Text(funcionAIntegrar), 
               new DoubleWritable(incremento*
                           (funcion.calcularY(valor)+
                           funcion.calcularY(valor+incremento))/2));
    }
}

El Reducer está implementado en la clase ReduceRiemann y se encargará de sumar todas las superficies calculadas por los mappers (para cada función, obviamente, no mezclará peras con manzanas):

package ar.edu.rdipasquale;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class ReduceRiemann extends MapReduceBase implements 
                  Reducer<Text, DoubleWritable, Text, DoubleWritable> 
{
    @Override
    public void reduce(Text key, Iterator<DoubleWritable> values,
            OutputCollector<Text, DoubleWritable> output, 
            Reporter arg3)
            throws IOException {

      double sum = 0;
      while (values.hasNext()) 
          sum += values.next().get();
      output.collect(key, new DoubleWritable(sum));                
    }   
}

Como combiner se utiliza la misma clase utilizada como reducer para poder optimizar la performance realizando algunas agregaciones locales, lo que puede observarse en la configuración del job en la clase Riemann.

Sería tonto hacer una comparativa de performance entre los dos enfoques, ya que son herramientas que se utilizan para fines distintos.

El nivel de performance en el cálculo en la plataforma C+MPI es probablemente inalcanzable.

La plataforma MapReduce (en Hadoop) se va a destacar en trabajos orientados a grandes volúmenes de datos, donde minimizará el costo de red frente a la plataforma MPI.

Por otro lado la plataforma MPI se aplica a lenguajes de programación general, y si bien MapReduce puede utilizarse en C++, Java, Python y otros, los algoritmos deben expresarse en términos de concatenación de Mappers y Reducers, lo cual agrega una complejidad, entre las que está la no existencia de un estado, es decir, que son implementaciones stateless (esto en algoritmos de grafos puede llevar a dolores de cabeza, o a soluciones por debajo del óptimo en términos de performance).

A favor del enfoque MapReduce vamos a decir que es tolerante a fallos, en una buena instalación, en un cluster lo suficientemente grande y bien administrado, se nos puede romper un nodo en la mitad del proceso y el mismo finalizará sin ningún problema. Eso es muy difícil de alcanzar con C-MPI. Asimismo el nivel de seguimiento de las tareas está administrado por la herramienta provista por Hadoop, de modo que el monitoreo ya viene de fábrica.

En un clúster heterogéneo, las soluciones que brindamos en C serían tan performantes como el nodo menos performante del cluster, ya que asignamos la carga de manera lineal. En cambio en MapReduce la carga es administrada por el framework, de modo que asignará a cada uno según sus necesidades y les pedirá según sus posibilidades (Ahí está el verdadero Socialismo del siglo XXI en los clusters de computadoras).

We Rock

Hoy tomé contacto con un clúster académico Rocks, que es una distribución de Linux especialmente preparada para Clusters de computadoras. Es una distribución Fedora->RedHat Enterprise->CentOS.

Tiene un soporte nativo (no de CentOS) para MPI (C/C++) y me cae muy simpática la política de “plugins” llamados Rolls (El famoso Rock & Roll) con funcionalidad adicional a la básica.

También noté que facilita bastante la instalación de un cluster Hadoop, que típicamente no tiene mucho drama con el sistema operativo donde se monta, pero que requiere una instalación por nodo, lo que lleva a utilizar herramientas de gestión de instalaciones, máquinas virtuales, o similares. De este modo, habilita una instalación rápida. Queda investigar detalles tales como que no se pisen las funcionalidades de NFS con las de HDFS.