Problemas con la memoria del Cluster Hadoop

Habiendo entendido que los nodos del cluster quedaron configurados con la mitad de memoria que se suponía iban a tener, comenzamos a experimentar problemas de memoria (no relacionados con el mal de Alzheimer que nos acosa a los señores mayores).

Dado este problema comenzamos a hacer lo que todo “hombre de la casa” debe hacer: No leer el manual y empezar a tocar parámetros al azar hasta que funcione. Bueno, al poco tiempo se evidenció que no es la mejor técnica. Lástima… Estaba preparando un artículo sobre eso…

Por suerte encontré la información debidamente tabulada por la gente de Hortonworks aquí. Fue de gran utilidad.

Anuncios

Configurando un Cluster

He configurado un cluster casero con 4 nodos, las versiones del software son estables, quizás no sean las últimas. Lo único que lamento, es no haber podido configurar Hadoop serie 2 con YARN… La próxima será. Las versiones del software que configuré son:

Luego de bajar los paquetes, los pasos para la instalación y configuración son los siguientes:

    • Primero vamos a crear un usuario en todos los nodos para que corra hadoop:
      sudo useradd -d /home/hadoopuser -m hadoopuser
      sudo passwd hadoopuser
    • Y vamos a agregarlo a un grupo con tal de darle permisos de lectura, escritura y ejecución en los recursos de Hadoop.
    • Vamos a sanear los nombres de host en todos los servidores y la unificación de /etc/hosts si no contamos con un DNS. Esto parece trivial, pero es con lo que más dolores de cabeza nos da Hadoop.
    • Crear un directorio para la instalación de Hadoop, al que setearemos en la variable de entorno HADOOP_HOME
    • Descomprimir allí dentro el tar de hadoop (tar xfz hadoop-xxxx)
    • Verificar que esté definida la variable JAVA_HOME
    • Verificar la configuración de NTP en el cluster.
    • Agregar al Path del directorio bin de Hadoop.
    • Crear un directorio para los logs de hadoop.
    • Verificar loopback en cada máquina:
      sudo /etc/hosts

      Verificar que haya 2 entradas a 127.0.0.1 (localhost y el nombre de la máquina) ya que ubuntu suele usar el 127.0.1.1

    • Crear directorios data, log, data/mapred/local/ , data/mapred/system , data/namenode y data/datanode dentro de la estructura de Hadoop.
    • Hay que habilitar en el namenode la posibilidad de que se comunique vía ssh a todos los esclavos sin tener que utilizar una password. Para eso hay un buen tutorial aquí. Desde el namenode (hacer su al usuario que va a correr hadoop):
      ssh-keygen -t rsa
      ssh-copy-id hadoopuser@remotemachine (esto para cada nodo)
    • Hacer ssh a cada nodo para ver que pueda acceder sin necesidad de poner pwd.
    • Editar dentro del directorio conf (HADOOP_CONF_DIR), hdfs-site.xml y setear el directorio donde se almacenará los datos del namenode (namespace y transaction log), y agregar la lista (separada por comas) de directorios (todos previamente creados) donde se almacenan los datos. Usé 32Mb de blocksize para el file system distribuido… El default es 128Mb, me parece mucho para un fin académico. Por ejemplo:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
     <property>
         <name>dfs.name.dir</name>
         <value>/opt/hadoop/hadoop-1.1.2/data/namenode</value>
     </property>
     <property>
         <name>dfs.blocksize</name>
         <value>33554432</value>
     </property>
     <property>
         <name>dfs.data.dir</name>
         <value>/opt/hadoop/hadoop-1.1.2/data/datanode</value>
     </property>
</configuration>
    • En mapred-site.xml se configura la ubicación del job.tracker (en nuestro caso es el mimo equipo que el namenode):
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
  <property> 
    <name>mapred.job.tracker</name> 
    <value>192.168.1.33:9001</value> 
  </property>
  <property> 
    <name>mapred.system.dir</name>
    <value>/opt/hadoop/hadoop-1.1.2/data/mapred/system/</value> 
  </property>
  <property>
    <name>mapred.local.dir</name>
    <value>/opt/hadoop/hadoop-1.1.2/data/mapred/local/</value> 
</property>
</configuration>
        • Configuración del archivo slaves. Este es como el machinefiles de MPI. Una lista de todos los workers.
        • Configuración del archivo masters. Contiene una lista de todos los hosts masters (en nuestro caso, hay uno sólo).
        • Configuramos el archivo taskcontroller.cfg:
mapred.local.dir=/opt/hadoop/hadoop-1.1.2/mapredlocaldir
hadoop.log.dir=/opt/hadoop/hadoop-1.1.2/log
mapred.tasktracker.tasks.sleeptime-before-sigkill=60
mapreduce.tasktracker.group=hadoopuser
      • El archivo core-site.xml configura los parámetros del file system distribuido:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
  <property>
         <name>fs.default.name</name>
         <value>hdfs://192.168.1.33:9000</value>
  </property>
</configuration>
      • Sincronizar Hadoop con todos los clientes
      • Ahora configuramos ZooKeeper
      • Dentro de la estructura de Hadoop, armamos un directorio para su deployment y descomprimimos su tarball.
      • Creamos los siguientes directorios (dentro de la estructura):
sudo mkdir -p /usr/local/ZooKeeper/data 
sudo mkdir -p /usr/local/ZooKeeper/datalog 
sudo chown hadoopuser -R /usr/local/ZooKeeper
sudo mkdir /usr/local/ZooKeeper/var/datalog 
sudo chown hadoopuser /usr/local/ZooKeeper/var/datalog
      • Vamos a instalarlo en el master de forma standalone, pero en un cluster serio debería haber corriendo 3 copias.
      • Dentro del /conf de zookeeper, editar java.env y setear JAVA_HOME y ZK_HOME, por ejemplo:
JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 
ZK_HOME=/opt/hadoop/zookeeper-3.4.5
      • Copiar zoo_sample.cfg a zoo.cfg y cambiar el contenido de modo que quede de la siguiente manera:
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/usr/local/ZooKeeper/var/data
dataLogDir=/usr/local/ZooKeeper/var/datalog
# the port at which the clients will connect
clientPort=2181
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.0=192.168.1.33:2181
      • Sincronizar zookeeper con los clientes.
      • En el master, en /usr/local/ZooKeeper/data crear un file myid conteniendo un “1”.
      • Para ver si levanta, en el /bin del server hacemos ./zkServer.sh start para conectarnos desde los clientes, $ZK_HOME/bin/zkCli.sh -server master1:2181 y allí, ls / y finalmente para bajarlo, zkServer.sh stop en el server.
      • Ahora vamos a configurar HBase, para lo que descomprimimos el tarball dentro de la estructura de Hadoop, en un directorio especial para tal fin.
      • HBase debiera correr con un usuario distinto que hadoop, pero esta no es una instalación tan securitizada, por lo que se utilizará el mismo usuario que en hadoop.
      • Hay que formatear el HDFS:
hdfs namenode -format
      • En /conf de HBase vamos a configurar los regionservers (en nuestro caso son iguales que los datanodes) en un file llamado regionservers conteniendo una lista de los servidores para tal fin.
      • En hbase-env.sh configuramos las variables de entorno que hagan falta, como por ejemplo:
export HBASE_OPTS="-XX:+UseConcMarkSweepGC"
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
export HADOOP_CONF_DIR=/opt/hadoop/hadoop-1.1.2/conf
export HBASE_MANAGES_ZK=false
export HADOOP_HOME=/opt/hadoop/hadoop-1.1.2
export HBASE_HOME=/opt/hadoop/hbase-0.92.2
      • Las configuraciones más importantes ocurren en hbase-site.xml:
<configuration>
  <property> 
    <name>hbase.rootdir</name> 
    <value>hdfs://192.168.1.33:9000/hbase</value> 
  </property> 
  <property> 
    <name>hbase.cluster.distributed</name> 
    <value>true</value> 
  </property> 
     <property> 
         <name>hbase.tmp.dir</name> 
         <value>/tmp</value> 
     </property> 
     <property> 
         <name>hbase.zookeeper.quorum</name> 
         <value>192.168.1.33</value> 
     </property> 
</configuration>
        • Verificar si el límite de archivos abiertos es 1024:
           ulimit -n

          Verificar la cantidad de procesos (unlimitted):

           ulimit

          Cambiar el límite:

          sudo nano /etc/security/limits.conf
          hadoop user soft nofile 65535
          hadoop user hard nofile 65535
          hadoop user soft nproc 32000
          hadoop user hard nproc 32000

          Copiar las librerias correctas:

          rm /opt/hadoop/hbase-0.92.2/lib/hadoop-core-*.jar
          rm /opt/hadoop/hbase-0.92.2/lib/zookeeper-*.jar
          cp /opt/hadoop/hadoop-1.1.2/hadoop-core-*.jar /opt/hadoop/hbase-0.92.2/lib/
          cp /opt/hadoop/hadoop-1.1.2/lib/commons-configuration-1.6.jar /opt/hadoop/hbase-0.92.2/lib/
          cp /opt/hadoop/zookeeper-3.4.5/zookeeper-*.jar /opt/hadoop/hbase-0.92.2/lib/
        • Sincronizar con los regionservers.
        • Para levantar todo, armé un script único:
sudo su - hadoopuser -c "/opt/hadoop/hadoop-1.1.2/bin/start-all.sh"
sudo su - hadoopuser -c "/opt/hadoop/zookeeper-3.4.5/bin/zkServer.sh start"
sudo su - hadoopuser -c "/opt/hadoop/hbase-0.92.2/bin/start-hbase.sh"
    • Para bajar todo, armé también un script:
sudo su - hadoopuser -c "/opt/hadoop/hbase-0.92.2/bin/stop-hbase.sh"
sudo su - hadoopuser -c "/opt/hadoop/zookeeper-3.4.5/bin/zkServer.sh stop"
sudo su - hadoopuser -c "/opt/hadoop/hadoop-1.1.2/bin/stop-all.sh"

Algunas notas sobre “Scaling Genetic Algorithms using MapReduce”

En esta publicación, los autores (Abhishek Verma, Xavier Llor`, David E. Goldberg y Roy H. Campbell) porponen la utilización del framework MapReduce (en particular con la implementación de Hadoop) para implementar GA paralelos. Fijan posición en que la implementación tradicional basada en MPI requiere mucho conocimiento sobre la máquina donde se monta, y que, por el contrario, MapReduce permite un nivel de escalabilidad que no introduce ningún cuello de botella.

En particular, se enfocan en las variantes tradicionales y sencilla de los GA para brindar una implementación de referencia.

De los pasos del algoritmo clásico genético, la evaluación de los fitness tiene una analogía directa con la función Map, la cuál evaluará el fitness del individuo y guardará track (en un counter) del mejor individuo, finalmente persistiéndolo en el HDFS, el proceso cliente (el que inició la tarea MapReduce) chequea recurrentemente en este archivo si se ha alcanzado el criterio de convergencia.

Para poder implementar la selección distribuida, hay que tener control o mantener estado global, lo cual es un tanto complejo en MapReduce, por lo que el único componente que puede realizar este tipo de tarea es el Partitioner. El problema puntual por el cual no conviene utilizar la implementación por defecto es que durante la etapa de Shuffling (el momento en el que el framework reparte, mezcla y distribuye las emisiones de los mappers a los reducers) es porque utiliza una función de hash sobre la key (módulo la cantidad de reducers) para distribuir la carga, generando inconvenientes de convergencia del GA. Por otro lado, mientras el algoritmo genético pogrese, muchos individuos serán iguales, por lo que serán tratados por el mismo reducer, con la reducción de performance que esto implicaría. Es por eso que los autores utilizaron un partitioner especial que en vez de utilizar un hash, utiliza un número pseudoaleatorio cuya semilla es (mapperId * time).

El mecanismo de selección que han utilizado es el de torneo, ya que es el único que puede implementarse con facilidad en MapReduce, ya que la ruleta implicaría el mantenimiento de un estado global y un par de iteraciones más. Para esto han decidido tomar individuos al azar y ver quien es el ganador n veces (n es el tamaño de la población) del torneo para someterlo a la cruza. La operación de crossover se realiza en conjunto con la selección de dos individuos.

El tipo de crossover que utilizaron es el uniforme.

Cuando las poblaciones son muy grandes, la inicialización serial de los individuos tomó mucho tiempo, por lo que decidieron agregar una tanda de procesamiento map reduce en el que el map genera un individuo al azar y el reduce es trivial.

Las medidas de performance que realizaron fueron hechas con la implementación de un problema trivial utilizado para benchmark llamado OneMax Problem que consiste en encontrar la máxima cantidad de 1’s en los cromosomas de los individuos, es decir, un individuo tal que todos sus alelos sean 1’s. El análisis realizado arrojó que:

  • Análisis de convergencia: Para 104 variables, converge en 220 iteraciones con un promedio de 149 segundos por iteración.
  • Escalabilidad con carga constante por nodo: Incrementando la cantidad de recursos, no cambia el tiempo de procesamiento por iteración.
  • Escalabilidad con carga constante general: Fijaron la cantidad de variables en 50.000 e incrementaron el número de mappers, logrando una reducción del tiempo de iteración. Aunque el speed-up general queda limitado por la ley de Amdahl (“la mejora obtenida en el rendimiento de un sistema debido a la alteración de uno de sus componentes está limitada por la fracción de tiempo que se utiliza dicho componente”) dado el overhead de Hadoop (unos 10 segundos para iniciar y terminar un job).
  • Escalabilidad con incrementos en el tamaño del problema: Utilizaron el máximo de recursos y fueron incrementando el número de variables, logrando un incremento razonable con incrementos de población superlineales (n log n).

El gran inconveniente de trabajar con MPI en clústers standard es la poca tolerancia a fallos del framework. MapReduce no tiene este inconveniente. Introducen ciertas críticas a la implementación llamada MRPGA respecto a la escalabilidad.

Otros enfoques han tratado de modificar MapReduce para que se parezca a algo más consumible desde los GA, lo interesante de este trabajo es que no han intentado martillar MapReduce, sino todo lo contrario.

Como línea de investigación proponen comparar rendimientos con MPI y demostrar la importancia de GA escalables en las aplicaciones prácticas.

Por otro lado cabe destacar que no se proveen detalles de la implementación a bajo nivel, de modo que mucho se puede conjeturar sobre la implementación específica que han realizado, y que no se han aventurado a explorar alternativas un poco más complejas que el GA básico.

Integrales por suma de Riemann: MPI & MapReduce

La Suma de Riemann es una técnica de integración numérica para calcular valores de integrales definidas (bajo ciertas condiciones, como la continuidad). Visualmente lo que estamos haciendo es calcular el área por “debajo” de la curva definida por la función en cuestión.

Para calcular esa superficie, este método divide el continuo del eje x en “barras” o rectángulos, como se ve en la imagen:

Riemann_Integration_3Estos rectángulos van desde el eje x hasta la curva, dependiendo el enfoque que se tome (sumas izquierdas, derechas, máximas, mínimas, etc.)

El error en este tipo de sumas es alto respecto a otros métodos, pero mientras las bases de los rectángulos sean más chicas (tiendan a cero), más converge la suma hacia el resultado de la integral.

Un programa en lenguaje C que hace ese cálculo para la función x2 en el intervalo [0;10] con 100 rectángulos (pasos):

#include <stdio.h>
#include <stdlib.h>

#define XINICIAL 0
#define XFINAL 10
#define PASOS 100

float calcularFuncion(float x);

int main (int argc, char *argv[])
{
      float resultado = 0.0;        
      float incremento = ((float)XFINAL-(float)XINICIAL)/(float)PASOS;
      float semiIncremento = incremento/2;
      float x = XINICIAL;
      int i;
      for (i=0;i<PASOS;i++)
      {
         resultado += incremento * (calcularFuncion(x)+calcularFuncion
                                             (x+incremento))/(float)2;
         x+=incremento;
      }

      printf("El resultado de es %f \n",resultado);
}

float calcularFuncion(float x)
{
  return x*x;
}

Un código realmente simple. Si quisiéramos procesarlo en un cluster utilizando MPI (OpenMP) en C, tenemos dos opciones:

  1. Que el nodo master le pase a los workers cada posición x del rectángulo a calcular, el worker calcule todos sus rectángulos asignados, sume sus superficies y devuelva un valor al nodo master.
  2. Que el nodo master divida la curva a asignar en tantas partes como workers disponibles tenga el cluster y les asigne una posición de inicio y final para que cada worker calcule con este método las superficies parciales y devuelva un valor al nodo master.

Veamos una implementación posible en C (con MPI) de la primer opción:

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

#define XINICIAL 0
#define XFINAL 10
#define PASOS 100
#define MASTER 0
#define TAG_X 1
#define TAG_RESULT_PARCIAL 2

float calcularFuncion(float x);

int main (int argc, char *argv[])
{
      int rank, value, size, dest,i,workers;  
      float resultado,incremento,x,resultParcial;
      MPI_Status status; 

      incremento = ((float)XFINAL-(float)XINICIAL)/(float)PASOS;

      MPI_Init( &argc, &argv );

      MPI_Comm_rank( MPI_COMM_WORLD, &rank );
      MPI_Comm_size( MPI_COMM_WORLD, &size ); 

      workers=size-1;

      if (size<2)
      {
    printf("No se puede procesar con %i nodos\n",size);
    return 0;
      }

      if (rank==MASTER)
      {
    dest=1;
    x=XINICIAL;
    // Hacemos un Round Robin
    for (i=0;i<PASOS;i++)
    {
      MPI_Send(&x, 1, MPI_FLOAT, dest, TAG_X, MPI_COMM_WORLD); 
      dest++;
      if (dest==size) dest=1;
      x+=incremento;
    }
    for (i=MASTER+1;i<size;i++)
    {
      MPI_Recv(&resultParcial, 1, MPI_FLOAT, i, TAG_RESULT_PARCIAL, 
                                          MPI_COMM_WORLD, &status); 
      resultado +=resultParcial;
    }

    printf("El resultado de es %f \n",resultado);

 }
 else
 {
    for (i=0; i<PASOS-(((int)(PASOS/workers))*workers)>=rank?
                                          (int)(PASOS/workers)+1:
                                          (int)(PASOS/workers); i++) 
    {
      MPI_Recv(&x, 1, MPI_FLOAT, MASTER, TAG_X, MPI_COMM_WORLD, 
                                                          &status); 
      resultParcial += incremento * (calcularFuncion(x)+
                            calcularFuncion(x+incremento))/(float)2;

    }
    MPI_Send(&resultParcial,1, MPI_FLOAT, MASTER, TAG_RESULT_PARCIAL,
                                                    MPI_COMM_WORLD);
      }      
}

float calcularFuncion(float x)
{
  return x*x;
}

Veamos una implementación posible en C (con MPI) de la segunda opción:

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

#define XINICIAL 0
#define XFINAL 10
#define PASOS 1000
#define MASTER 0
#define TAG_INICIO 1
#define TAG_FIN 2
#define TAG_RESULT_PARCIAL 3

float calcularFuncion(float x);

int main (int argc, char *argv[])
{
      int rank, value, size, i,workers;  
      float resultado,incremento,x,resultParcial,xfinal;
      MPI_Status status; 

      incremento = ((float)XFINAL-(float)XINICIAL)/(float)PASOS;

      MPI_Init( &argc, &argv );

      MPI_Comm_rank( MPI_COMM_WORLD, &rank );
      MPI_Comm_size( MPI_COMM_WORLD, &size ); 
      resultParcial=0;
      workers=size-1;

      if (size<2)
      {
    printf("No se puede procesar con %i nodos\n",size);
    return 0;
      }

      if (rank==MASTER)
      {
    for (i=MASTER+1;i<size;i++)
    {
      x=XINICIAL+(i-1)*XFINAL/(size-1);
      MPI_Send(&x, 1, MPI_FLOAT, i, TAG_INICIO, MPI_COMM_WORLD); 
      xfinal=XINICIAL+i*XFINAL/(size-1);
      MPI_Send(&xfinal, 1, MPI_FLOAT, i, TAG_FIN, MPI_COMM_WORLD); 
    }
    for (i=MASTER+1;i<size;i++)
    {
      MPI_Recv(&resultParcial, 1, MPI_FLOAT, i, TAG_RESULT_PARCIAL, 
                                            MPI_COMM_WORLD, &status); 
      resultado +=resultParcial;
    }
    printf("El resultado de es %f \n",resultado);    
}
else
{
     MPI_Recv(&x, 1, MPI_FLOAT, MASTER, TAG_INICIO, MPI_COMM_WORLD, 
                                                          &status); 
     MPI_Recv(&xfinal, 1, MPI_FLOAT, MASTER, TAG_FIN, MPI_COMM_WORLD, 
                                                          &status); 
     while (x<xfinal)
     {
        resultParcial += incremento * (calcularFuncion(x)+
                           calcularFuncion(x+incremento))/(float)2;
        x+=incremento;
     }
     MPI_Send(&resultParcial,1,MPI_FLOAT, MASTER, TAG_RESULT_PARCIAL
                                                 , MPI_COMM_WORLD);
 }             
}

float calcularFuncion(float x)
{
  return x*x;
}

En términos de consumo de red, parece ser mejor la segunda opción.

A continuación vamos a ver un planteo de este mismo problema implementado con MapReduce de Hadoop en su versión 1. Por motivos de simplicidad no se utilizó el File System distribuido, que es una de las ventajas que tiene este enfoque:

La interfaz IFuncion es la responsable de darles una “cara” consumible a las funciones:

package ar.edu.rdipasquale;

public interface IFuncion {
    public abstract Double calcularY(Double x);
}

La Función x2 se implementa de la siguiente manera:

package ar.edu.rdipasquale;

public class FuncionXCuadrado implements IFuncion{

    public FuncionXCuadrado() {
    }

    @Override
    public Double calcularY(Double x)
    {
        return x*x;
    }
}

La clase que se encarga de configurar el job y ejecutarlo se llama Riemann. Por simplicidad, no utilizará el file system distribuido y generará un archivo con una clave y un valor. La clave será la función (el id de la función) a integrar y el valor será el comienzo del rectángulo en el eje x:

package ar.edu.rdipasquale;

import java.io.PrintWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Riemann extends Configured implements Tool {

    private static final Double XINICIAL=(double)0;
    private static final Double XFINAL=(double)10;
    private static final int PASOS=100;
    private static final int FUNCIONES_A_INTEGRAR=1;

    public int run(String[] args) throws Exception {

        JobConf conf = new JobConf(Riemann.class);
          conf.setJobName("Riemann");      
          conf.setOutputKeyClass(Text.class);
          conf.setOutputValueClass(DoubleWritable.class);

          Double x=XINICIAL;

          // Habría que aprovechar el HDFS en vez del local... 
          PrintWriter out = new PrintWriter("funcionesaintegrar.txt");
          for (int i=0;i<FUNCIONES_A_INTEGRAR;i++)
              for (int j=0;j<PASOS;j++)
              {
                  out.println(String.valueOf(i+1)+";"+
                                                   String.valueOf(x));
                  x=(j+1)*XFINAL/PASOS;
              }
          out.flush();
          out.close();

          conf.setMapperClass(MapRiemann.class);
          conf.setCombinerClass(ReduceRiemann.class);
          conf.setReducerClass(ReduceRiemann.class);

          conf.setInputFormat(TextInputFormat.class);
          conf.setOutputFormat(TextOutputFormat.class);

          FileInputFormat.setInputPaths(conf, 
                                  new Path("funcionesaintegrar.txt"));
          FileOutputFormat.setOutputPath(conf, new Path("salida"));

          conf.set("riemann.funcion", 
                               "ar.edu.rdipasquale.FuncionXCuadrado");
          conf.set("riemann.incremento", 
                             String.valueOf((XFINAL-XINICIAL)/PASOS));
          JobClient.runJob(conf);        
          return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Riemann(), 
                                                               args);
        System.exit(res);
    }
}

El Mapper será la Clase MapRiemann que tomará cada línea del archivo en cuestión y emitirá la superficie de cada rectángulo:

package ar.edu.rdipasquale;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class MapRiemann extends MapReduceBase implements 
                   Mapper<LongWritable, Text, Text, DoubleWritable> 
{
    private IFuncion funcion;
    private Double incremento;

    @Override
    public void configure(JobConf job) {
        try {
            funcion = (IFuncion)Class.forName(
                          job.get("riemann.funcion")).newInstance();
            incremento=Double.parseDouble(
                                     job.get("riemann.incremento"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void map(LongWritable arg0, Text value,
            OutputCollector<Text, DoubleWritable> output, 
            Reporter arg3)
            throws IOException {

        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line,";");
        String funcionAIntegrar=tokenizer.nextToken();
        Double valor=Double.parseDouble(tokenizer.nextToken());
        output.collect(new Text(funcionAIntegrar), 
               new DoubleWritable(incremento*
                           (funcion.calcularY(valor)+
                           funcion.calcularY(valor+incremento))/2));
    }
}

El Reducer está implementado en la clase ReduceRiemann y se encargará de sumar todas las superficies calculadas por los mappers (para cada función, obviamente, no mezclará peras con manzanas):

package ar.edu.rdipasquale;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class ReduceRiemann extends MapReduceBase implements 
                  Reducer<Text, DoubleWritable, Text, DoubleWritable> 
{
    @Override
    public void reduce(Text key, Iterator<DoubleWritable> values,
            OutputCollector<Text, DoubleWritable> output, 
            Reporter arg3)
            throws IOException {

      double sum = 0;
      while (values.hasNext()) 
          sum += values.next().get();
      output.collect(key, new DoubleWritable(sum));                
    }   
}

Como combiner se utiliza la misma clase utilizada como reducer para poder optimizar la performance realizando algunas agregaciones locales, lo que puede observarse en la configuración del job en la clase Riemann.

Sería tonto hacer una comparativa de performance entre los dos enfoques, ya que son herramientas que se utilizan para fines distintos.

El nivel de performance en el cálculo en la plataforma C+MPI es probablemente inalcanzable.

La plataforma MapReduce (en Hadoop) se va a destacar en trabajos orientados a grandes volúmenes de datos, donde minimizará el costo de red frente a la plataforma MPI.

Por otro lado la plataforma MPI se aplica a lenguajes de programación general, y si bien MapReduce puede utilizarse en C++, Java, Python y otros, los algoritmos deben expresarse en términos de concatenación de Mappers y Reducers, lo cual agrega una complejidad, entre las que está la no existencia de un estado, es decir, que son implementaciones stateless (esto en algoritmos de grafos puede llevar a dolores de cabeza, o a soluciones por debajo del óptimo en términos de performance).

A favor del enfoque MapReduce vamos a decir que es tolerante a fallos, en una buena instalación, en un cluster lo suficientemente grande y bien administrado, se nos puede romper un nodo en la mitad del proceso y el mismo finalizará sin ningún problema. Eso es muy difícil de alcanzar con C-MPI. Asimismo el nivel de seguimiento de las tareas está administrado por la herramienta provista por Hadoop, de modo que el monitoreo ya viene de fábrica.

En un clúster heterogéneo, las soluciones que brindamos en C serían tan performantes como el nodo menos performante del cluster, ya que asignamos la carga de manera lineal. En cambio en MapReduce la carga es administrada por el framework, de modo que asignará a cada uno según sus necesidades y les pedirá según sus posibilidades (Ahí está el verdadero Socialismo del siglo XXI en los clusters de computadoras).

¿Y entonces, para qué GA con MapReduce?

MapReduce es un framework para procesamiento de grandes volúmenes de datos en paralelo. Google tiene una patente sobre el concepto en EEUU, aunque sería discutible, ya que está inspirado en las operaciones Map y Reduce de Lisp y otros lenguajes funcionales.

El enfoque llamado BigData (también acuñado por Google) habla de manejar grandes volúmenes de datos. Los grandes volúmenes de datos no asustan a los motores de bases de datos relacionales tradicionales (RDBMS), lo que si acaba por dejarlos inutilizados es la complejidad de las relaciones… Facebook no podría montar una red social en forma de grafo de orden 5000 (cantidad máxima de amigos) eficientemente si sólo dependiera de RDBMS.

MapReduce podría ser el nivel más bajo de lenguaje utilizado para explotar estas bases de datos. No es conveniente hacer comparaciones, como muchos autores han hecho con herramientas OLAP, ya que es lo mismo que comparar un Audi Quattro R8 con el termo Lumilagro con el que estoy cebando mate. No hay mejor, ni peor… No tienen nada que ver, no me imagino extrayendo agua del radiador del Audi para hacer mate ni viajar a 300km/h con mi termo (salvo en caída libre, lo dicho, tiene ventajas el termo).

Lo que si nos interesa es que podemos escribir software, demostrar su correctitud y completitud (si no usamos lo que se llama combiners) y ponerlo a ejecutar en forma paralela sin atender demasiado al mecanismo de paralelismo. Eso si, tenemos que pensar nuestros algoritmos en términos de pipelines de operaciones map y reduce… Si sirve de consuelo, peor les va a los muchachos de la computación cuántica…

Hay otros métodos de ejecución en paralelo que permiten un manejo “más” nativo de los algoritmos como tantas implementaciones MPI que existen, pero requieren un conocimiento más exhaustivo sobre la arquitectura en la que están corriendo, no son agnósticos de la plataforma.

Por otro lado, existen varias implementaciones de MapReduce (tanto comerciales como libres) y existen en el mundo centenares de clusters MapReduce en funcionamiento. De hecho, herramientas como Hadoop (la elegida para hacer mi implementación por simplicidad, documentación y licencia). Por tanto expresar nuestro software en términos de MapReduce, habilita que lo podamos correr en la nube (ya sea comercial o académica), o que construyamos un clúster a partir de hardware obsoleto. Incluso si queremos invertir en hardware, es claro que es mucho más barato comprar 100 dispositivos de X procesamiento que uno de 100X procesamiento. Incluso se están utilizando las placas de video para paralelizar operaciones (lo que entiendo que es explotable en Hadoop).

Sobre paralelismo en algoritmos genéticos (de ahora en más, PGA), hay ríos de tinta escritos, investigaciones e implementaciones desde hace 30 años. Sobre implementaciones con MapReduce, muy poco:

Genetic Algorithms by using MapReduce 

Fei Teng y Doga Tuncay de la Universidad de Indiana proponen implementar el problema OneMax (maximizar los 1 en una cadena de bits) sobre Hadoop y Twister demostrando mayor performance con la implementación de la iteratividad con Twister.

Parallelization of genetic algorithms using Hadoop Map-Reduce

Dino Kečo y Abdulhamit Subasi de la Universidad Burch de Sarajevo también abordan el problema de implementar una solución para OneMax en MapReduce (con Hadoop), focalizándose en cómo encajar el paralelismo que necesita la solución específica de este problema con GAs en el esquema de MapReduce.

An Extension of MapReduce for Parallelizing Genetic Algorithms

Chao Jin, Christian Vecchiola y Rajkumar Buyya de la Universidad de Melbourne utilizaron una grila de procesamiento Microsoft (con Aneka) para construir una extensión del modelo de procesamiento MapReduce para que encaje mejor con los problemas que hay para compatibilizar los PGA con MapReduce. Parece una buena solución que se aparta de la norma y requiere de la presencia de este nuevo producto en los clúster MapReduce en vez de las implementaciones standard.

Scaling Populations of a Genetic Algorithm for Job Shop Scheduling Problems using MapReduce

Di Wei Huang y Jimmy Lin de la Universidad de Maryland se propusieron demostrar que para solucionar Job Shop Scheduling Problem con GA conviene agrandar el tamaño de las poblaciones y para eso utilizaron MapReduce con excelentes resultados. La implementación fue específica e iterativa para este problema, por lo que no es muy general, pero la conclusión es asombrosa. Llegaron a operar con poblaciones de 107 individuos. E incluso concluyen que la cantidad de generaciones necesarias se disminuye notablemente con este enfoque.

A Parallel Genetic Algorithm Based on Hadoop MapReduce for the Automatic Generation of JUnit Test Suites

Linda Di Geronimo, Filomena Ferrucci, Alfonso Murolo y Federica Sarro de la Universidad de Salerno proponen utilizar PGA para generar suites de pruebas unitarias que maximicen ciertas métricas de pruebas unitarias (como branch coverage, code coverage y otras).

Scaling Simple and Compact Genetic Algorithms using MapReduce

Abhishek Verma, Xavier Llora, David E. Goldberg y Roy H. Campbell de la Universidad de Illinois proponen estudiar las perspectivas de escalabilidad que puede ofrecer la implementación de PGA con MapReduce. Para eso toman una implementación simple (la más simple posible) de algoritmo genético y realizan experimentos con hasta 108 variables.

Una introducción básica

La Programación Matemática como rama de la Matemática Aplicada ha estudiado (desde mediados del siglo XX) problemas de decisión en los que se deben determinar acciones que optimicen un determinado objetivo (optimización).
El elemento clave con el que se sirve la programación matemática para verificar el método científico es el modelo matemático. Los modelos matemáticos son abstracciones simplificadas de sistemas reales que destacan las relaciones e interacciones principales entre las entidades del sistema estudiado1. Para elaborar dichos modelos, la programación matemática se nutre de resultados del álgebra, del análisis matemático, del cálculo numérico, de la geometría, de la topología, etc. El proceso de modelización debe verificar el método científico tal como puede observarse en el siguiente diagrama:
metodocientifico Dentro de los modelos, los problemas de optimización que se intentan resolver suelen tener la siguiente forma: min\{f(x) : x \in S\} es decir, encontrar si existe un elemento (o todos los elementos) donde la función objetivo f alcance su valor mínimo (o máximo), siendo el conjunto S el de las soluciones factibles. Las búsquedas de dichas soluciones puede concluir con uno de los siguientes resultados:

  • Problema no factible: No hay ninguna solución factible, es decir, que el conjunto S=\{\emptyset\}
  • Problema no acotado: Cuando no se determina que siempre se encontrará una solución mejor, que haga tender a infinito (positivo o negativo) la función objetivo.
  • Problema con óptimo: Cuando se encuentra al menos una solución que satisface el la función objetivo siendo un mínimo (o máximo) global.

Según las características de las variables e inecuaciones (restricciones) de cada modelo, la programación matemática puede dividirse en varias ramas, como la programación lineal (cuando la función objetivo es lineal), y la no lineal. Por otro lado, cuando la variable a estudiar es entera2 (o natural) se habla de Programación Entera (lineal o no lineal). Otra rama de interés es la Optimización Combinatoria, que intenta resolver problemas caracterizados por tener un número finito (pero muy grande en general) de posibles soluciones.
Son muchas las herramientas y técnicas que asisten a la programación matemática para intentar abordar estos problemas.
Todo algoritmo utilizado para resolver un problema posee un orden de complejidad estudiado por la Teoría de la Complejidad Algorítmica. Este orden de complejidad estudia la mayor número de “pasos” que deberá ejecutar el algoritmo para resolver el problema en función del tamaño de la entrada de datos al algoritmo. Por tanto es importante no el hecho de conocer en sí la cantidad de pasos que utilizará para cada tamaño de instancia (o entrada), sino el orden de crecimiento de la cantidad de pasos que realizará el algoritmo. Este orden de crecimiento se describe con la llamada notación “O”. Por ejemplo, para un orden de crecimiento cuadrático, se denotará O(). El orden de complejidad nos dará una pauta de cuán tratable es un problema, para ilustrar las connotaciones del crecimiento del orden de complejidad se adjunta la siguiente tabla que muestra para distintos órdenes de complejidad, como crecen según el tamaño de la entrada al algoritmo:

cuadroorden

Si un problema admite una solución con un algoritmo de orden polinomial, se dice que pertenece a la clase P. Los problemas con un orden mayor al polinomial se conocen como NP (no polinomiales). Muchos de los problemas que estudia la programación matemática pertenecen a la clase NP3.
Los algoritmos genéticos (o de forma más general, la programación evolutiva) son algoritmos que se inspiran en la evolución biológica para buscar soluciones en espacios de estados. La gran mayoría de los problemas que busca solucionar la inteligencia artificial son problemas complejos de búsqueda en espacios de estados muy grandes. Si bien son algoritmos de búsqueda, se han utilizado (casi exclusivamente) en problemas de optimización combinatoria para problemas NP. Tienen la característica de ser altamente paralelizables4, lo que lo diferencia de muchos de los mecanismos de resolución clásicos de programación matemática, que esencialmente son soluciones secuenciales poco paralelizables.
MapReduce es un aporte5 introducido con la aparición de las bases de datos NoSQL y la necesidad de manejar grandes volúmenes de datos cada vez más complejos (con un orden mucho mayor de relaciones que las bases de datos más tradicionales). Este mecanismo de naturaleza funcional permite paralelizar problemas sobre distintos tipos de clusters estableciendo un marco de trabajo estándar para desarrollar trabajos altamente paralelizables.
Una vez introducidos los conceptos claves del trabajo, se pueden enumerar los objetivos del mismo:

  1. Construir una máquina o framework para correr algoritmos genéticos sobre MapReduce maximizando el nivel de paralelismo.
  2. Una vez logrado el primer objetivo, se procederá a estudiar la complejidad de los algoritmos dentro de este nuevo marco.
  3. Se estudiarán alternativas de paralelismo, contando con los desarrollos ya aportados sobre poblaciones.
  4. Se estudiarán las analogías (evaluadas y no evaluadas aún) con la biología, desde el punto de vista del alto paralelismo alcanzado.
  5. Se tratarán modelos de gran complejidad para optimizar la generación de energía eléctrica térmica e hídrica en la República Argentina.

Notas
1 Algunos de los modelos proceden directamente de la realidad, pero muchos otros son planteos de otras ramas de la ciencia, como la Teoría de Juegos, la teoría de grafos, la teoría de la decisión, la bioinformática, etc.
2 Ecuaciones deofánticas.
3 Uno de los problemas abiertos de la ciencia de la computación más interesantes es determinar si P = NP.
4 Esto quiere decir que pueden ser resueltos en paralelo de manera eficiente por varios procesadores al mismo tiempo.
5 Inicialmente concebido por Google.