A lo largo del libro asumimos que son personas las que interactúan con nuestro sistema. El usuario solicita o inserta un dato, y el sistema, con suerte, responde a esa petición. Pero ese no es el único tipo de sistema que existe:
Services (online systems): El servicio espera por un request, y responde a este lo más rápido posible.
Batch Processing Systems (offline systems): El sistema toma una gran cantidad de datos de entrada y por medio de jobs la procesa para generar un resultado.
Stream processing Systems (near-real-time systems): Construido sobre batch processing, pero que requiere una respuesta casi inmediata del sistema.
En este capítulo estudiaremos como MapReduce nos ayuda a construir aplicaciones confiables, escalables y mantenibles, aún cuando este esté decayendo en importancia hoy en día.
Pero primero veremos las herramientas estándar de Unix, ya que su filosofía se soporta sobre sistemas distribuidos a gran escala y heterogéneos.
Batch Processing with Unix Tools
Usaremos como ejemplo un sistema que guarda logs en un archivo, donde cada línea es un log, y a su vez cada línea contiene información relevante.
Simple Log Analysis
Para hacer análisis de logs podemos usar los comandos que nos brinda Unix como awksort o unique entre otros usándolos de manera encadenada (pipes).
Podríamos hacerlo también usando algún lenguaje de programación como Ruby, usando Hash tables para este cometido.
La diferencia entre usar un método u otro va depender de varios factores, como la capacidad de procesamiento de la máquina, o la cantidad de datos del log etc etc.
The Unix Philosophy
Según Doug McIlroy, el creado de los Unix Pipes, es descrita como:
Cada programa debe hacer una sola cosa, y hacerla bien; se debe esperar que la salida de un programa, pueda ser la entrada para otro; crear pequeños programas que se puedan probar rápidamente; tener grandes herramientas que permitan desarrollar grandes cosas.
Para que un programa pueda usar de entrada la salida de otro programa, la entrada y la salida deben tener algo en común, llamado interfaz o file descriptor. Esta, por lo general, es un archivo de texto plano.
Unix separa muy bien la forma en que los programas operan de la forma en que se conectan con otros programas, haciendo el desacoplamiento una gran característica.
Otra característica importante de Unix es la inmutabilidad, permitiendo al usuario experimentar sin dañar nada.
MapReduce and Distributed Filesystems
MapReduce es similar a lo que se ha explicado de las características de Unix (desacoplado, se puede encadenar, trabajar sobre archivos etc etc), pero en máquinas distribuidas. Usa su propio File System llamado HDFS.
MapReduce Job Execution
MapReduce funciona de la siguiente manera:
Lee los archivos de entrada
El Map recorre las líneas y las setea en un key value
Ordena los registros
El Reduce ejecuta alguna función a cada línea
MapReduce permite ejecutar código paralelamente sin necesidad de escribir código para controlar ese paralelismo. Cada registro del archivo es independiente, por lo que el registro de un Map Task lo puede procesar otro Reducer Task
A MapReduce no se le pueden configurar workflows, pero se puede configurar las carpetas destino, para que un proceso de MapReduce tome el resultado de otro proceso.
Reduce-Side Joins and Grouping
Como se observó en capítulos pasados, un join generalmente busca complementar datos de una tabla, con datos de otra tabla.
Cuando se habla de joins en el contexto del batch processing, significa resolver todas las ocurrencias de alguna asociación dentro del dataset.
Por ejemplo tenemos logs de actividades de usuarios, pero esos logs solo contiene los ids de estos. No podemos consultar los ids en una BD en otro server porque eso impactaría demasiado en el performance. Lo que se hace es tener los logs en el HDFS y replicar en el mismo HDFS por medio de un ETL, los usuarios de dicha DB.
MapReduce puede usar un algoritmo llamado sort-merge join, donde por un lado, se tienen los records ordenados por id, y por otro, se tienen la correspondencia de esos ids, los cuales son unidos al final.
Además de hacer joins, MapReduce permite hacer agrupaciones (GROUP BY), las cuales pueden ser usadas para contar eventos, sumar cantidades, seleccionar ciertos registros que cumplan con alguna condición. Un ejemplo de esto son las acciones que hace un usuario en una sesión y determinar si cierta configuración permite que el usuario compre más en un sitio web.
Todo esto conlleva que se pueda generar hot spots en los reducers, por lo que existen algoritmos para esto, los cuáles consisten en repartir el trabajo en los hot spots en varios reducers.
Map-Side Joins
En la sección anterior vimos que es posible hacer joins y agrupaciones usando los reducers. El problema con esto es que esto es bastante costoso (afecta el rendimiento), y necesitamos saber la estructura de nuestros datos.
Es posible hacer algunas asunciones sobre los datos para realizar los joins desde el lado de los Maps. Esto hace que el proceso sea mucho más rápido y que lo único que se hace es, tomar datos del DFS y escribir la salida en el mismo.
Una técnica llamada Broadcast hash joins, hace que los joins puedan estar en memoria, así el Map puede acceder a estos directamente. Esto es usado cuando los joins son un data set pequeño, de lo contrario no podría subirse a memoria.
Incluso si se sabe que los datos en los Maps están bien particionados, se podrían subir a memoria ciertos joins, que son usados para esos Maps. Así se ahorra uso de memoria. A esto se le conoce Partitioned hash joins.
The Output of Batch Workflows
Generalmente el resultado final de un procesamiento en batch no es un reporte, sino un tipo diferente de estructura.
Uno de los resultados son los índices de búsquedas. Google usó en sus inicios MapReduce para esto.
Otro uso importante para el procesamiento en batch son archivos para bases de datos key-value. Estos pueden ser usados para construir sistemas de Machine Learning como clasificadores o sistemas de recomendación.
Como se observa la filosofía de Unix también es aplicada los procesos en batch:
Inmutabilidad (los archivos no se modifican)
Separación de responsabilidades (cada job de MapReduce separan la lógica de la conexión)
Reintento de tareas
Comparing Hadoop to Distributed Databases
Como hemos visto, Hadoop es una versión distribuida de Unix. Pero la idea de MapReduce no nació ahí. Antes teníamos implementaciones llamadas Bases de Datos MPP (Massively Parallel Processing).
Estas 2 implementaciones se diferencian en los tipos de almacenamiento usados. Mientras que Hadoop guarda todo en archivos, que van desde logs hasta imágenes, las MPP guardan los datos estructuradamente.
Esto hace que Hadoop sea usado como ETL para MPP.
MPP es un monolito de muchas piezas unidas, por lo que se desempeña muy bien para propósitos generales. Se puede usar SQL y herramientas de análisis de datos. Por otro lado, a pesar que MapReduce no se desempeña tan bien como MPP, se puede aplicar código a los datos, por lo que se pueden hacer cosas más específicas, como modelos de ML, sistemas de recomendación etc etc.
Las MPP tratan de mantener la mayor parte de la data en memoria, para evitar los problemas de performance que causan ir a disco por la información. Si un query falla, es posible que se vuelva a reintentar según alguna parametrización, pero por lo general no se hace causando que al usuario no se le muestren resultados. Mientras que MapReduce fue diseñado para ser tolerante a fallos, por lo que, se pueden re ejecutar los jobs cada vez que sea necesario, además consume muy pocos recursos.
Beyond MapReduce
MapReduce se hizo muy popular en la década del 2000, pero es apenas uno entre los muchos modelos de programación para sistemas distribuidos.
Materialization of Intermediate State
Como vimos anteriormente para, para crear un workflow en MapReduce, el archivo salida de un Job, se configura para que sea la entrada de otro en un lugar diferente en el HDFS. A esto se le conoce como estado intermedio (Intermediate state). El proceso de escribir ese estado en el directorio se le conoce como materialización.
Esto tiene algunas desventajas vs los pipes de Unix. Por ejemplo un job solo puede empezar una vez el job que deja el archivo finalice. Por lo que un problema en performance en el job1 puede afectar al job2. Otro problema es que materializar estados intermedios significa que todos esos archivos se replicarán en todos los nodos que usen el HDFS.
Con el fin de reducir dichas desventajas, se crearon herramientas sobre MapReduce, como lo son Spark Tez y flink. Convirtiéndolo en un flujo donde, la entrada de un proceso es la salida de otro.
Esto trae consigo varias ventajas como
Ahorro computacional, ya que en muchas ocasiones no es necesario hacer un ordenamiento
Muchas veces los estados intermedios no se copian en el HDFS, ya que no es necesario, por ende este no se replica en todos los nodos.
Graphs and Iterative Processing
El procesamiento en batch de datos en grafos es muy útil a la hora de crear modelos de ML de recomendación o sistemas de ranking.
MapReduce como tal no soporta iteraciones (es necesario para reprocesar nodos en cada una de sus ramas). Por lo que se podría hacer que un re-scheduler podría programar nuevamente el job para ejecutar nuevamente.
Como una optimización de dichas iteraciones, existen varios algoritmos basados en un Paper de Google (Google Pregel) el cual permite que se puedan procesar los nodos, y guardar un estado en cada uno, permitiendo reprocesamiento en caso que algo falle. Esto se hace por medio de mensajes, donde cada nodo envía un mensaje a su nodo adyacente.
La ejecución distribuida del procesamiento de grafos, generalmente es lenta, debido a la comunicación entre los nodos. Cuando son muchos nodos, y estos están en diferentes máquinas, el overhead de los mensajes entre nodos puede ralentizar bastante el procesamiento. La mejora de performance de este tipo de procesamiento es un área que aún está en desarrollo
High-Level APIs and Languages
Existen muchos productos que tienen implementado MapReduce por debajo como lo son, Spark, Hive, Pig, Cascading, entre otros.
Estas implementaciones usan un query language declarativo, lo que trae consigo muchas ventajas. Cada uno elige la mejor manera (algoritmo) de seleccionar los datos; además el código es más fácil de mover entre plataformas.
En los últimos años han ido saliendo implementaciones más específicas también, para poder implementar código más especializado usado para modelos de ML (Spark, Flink, MADlib)
Chapter 10 Batch Processing
A lo largo del libro asumimos que son personas las que interactúan con nuestro sistema. El usuario solicita o inserta un dato, y el sistema, con suerte, responde a esa petición. Pero ese no es el único tipo de sistema que existe:
Services (online systems): El servicio espera por un request, y responde a este lo más rápido posible. Batch Processing Systems (offline systems): El sistema toma una gran cantidad de datos de entrada y por medio de jobs la procesa para generar un resultado. Stream processing Systems (near-real-time systems): Construido sobre batch processing, pero que requiere una respuesta casi inmediata del sistema.
En este capítulo estudiaremos como MapReduce nos ayuda a construir aplicaciones confiables, escalables y mantenibles, aún cuando este esté decayendo en importancia hoy en día.
Pero primero veremos las herramientas estándar de Unix, ya que su filosofía se soporta sobre sistemas distribuidos a gran escala y heterogéneos.
Batch Processing with Unix Tools
Usaremos como ejemplo un sistema que guarda logs en un archivo, donde cada línea es un log, y a su vez cada línea contiene información relevante.
Simple Log Analysis
Para hacer análisis de logs podemos usar los comandos que nos brinda Unix como
awk
sort
ounique
entre otros usándolos de manera encadenada (pipes). Podríamos hacerlo también usando algún lenguaje de programación como Ruby, usando Hash tables para este cometido.La diferencia entre usar un método u otro va depender de varios factores, como la capacidad de procesamiento de la máquina, o la cantidad de datos del log etc etc.
The Unix Philosophy
Según Doug McIlroy, el creado de los Unix Pipes, es descrita como: Cada programa debe hacer una sola cosa, y hacerla bien; se debe esperar que la salida de un programa, pueda ser la entrada para otro; crear pequeños programas que se puedan probar rápidamente; tener grandes herramientas que permitan desarrollar grandes cosas.
Para que un programa pueda usar de entrada la salida de otro programa, la entrada y la salida deben tener algo en común, llamado interfaz o file descriptor. Esta, por lo general, es un archivo de texto plano.
Unix separa muy bien la forma en que los programas operan de la forma en que se conectan con otros programas, haciendo el desacoplamiento una gran característica.
Otra característica importante de Unix es la inmutabilidad, permitiendo al usuario experimentar sin dañar nada.
MapReduce and Distributed Filesystems
MapReduce es similar a lo que se ha explicado de las características de Unix (desacoplado, se puede encadenar, trabajar sobre archivos etc etc), pero en máquinas distribuidas. Usa su propio File System llamado HDFS.
MapReduce Job Execution
MapReduce funciona de la siguiente manera: Lee los archivos de entrada El Map recorre las líneas y las setea en un key value Ordena los registros El Reduce ejecuta alguna función a cada línea
MapReduce permite ejecutar código paralelamente sin necesidad de escribir código para controlar ese paralelismo. Cada registro del archivo es independiente, por lo que el registro de un Map Task lo puede procesar otro Reducer Task
A MapReduce no se le pueden configurar workflows, pero se puede configurar las carpetas destino, para que un proceso de MapReduce tome el resultado de otro proceso.
Reduce-Side Joins and Grouping
Como se observó en capítulos pasados, un join generalmente busca complementar datos de una tabla, con datos de otra tabla.
Cuando se habla de joins en el contexto del batch processing, significa resolver todas las ocurrencias de alguna asociación dentro del dataset.
Por ejemplo tenemos logs de actividades de usuarios, pero esos logs solo contiene los ids de estos. No podemos consultar los ids en una BD en otro server porque eso impactaría demasiado en el performance. Lo que se hace es tener los logs en el HDFS y replicar en el mismo HDFS por medio de un ETL, los usuarios de dicha DB.
MapReduce puede usar un algoritmo llamado sort-merge join, donde por un lado, se tienen los records ordenados por id, y por otro, se tienen la correspondencia de esos ids, los cuales son unidos al final.
Además de hacer joins, MapReduce permite hacer agrupaciones (GROUP BY), las cuales pueden ser usadas para contar eventos, sumar cantidades, seleccionar ciertos registros que cumplan con alguna condición. Un ejemplo de esto son las acciones que hace un usuario en una sesión y determinar si cierta configuración permite que el usuario compre más en un sitio web.
Todo esto conlleva que se pueda generar hot spots en los reducers, por lo que existen algoritmos para esto, los cuáles consisten en repartir el trabajo en los hot spots en varios reducers.
Map-Side Joins
En la sección anterior vimos que es posible hacer joins y agrupaciones usando los reducers. El problema con esto es que esto es bastante costoso (afecta el rendimiento), y necesitamos saber la estructura de nuestros datos.
Es posible hacer algunas asunciones sobre los datos para realizar los joins desde el lado de los Maps. Esto hace que el proceso sea mucho más rápido y que lo único que se hace es, tomar datos del DFS y escribir la salida en el mismo.
Una técnica llamada Broadcast hash joins, hace que los joins puedan estar en memoria, así el Map puede acceder a estos directamente. Esto es usado cuando los joins son un data set pequeño, de lo contrario no podría subirse a memoria.
Incluso si se sabe que los datos en los Maps están bien particionados, se podrían subir a memoria ciertos joins, que son usados para esos Maps. Así se ahorra uso de memoria. A esto se le conoce Partitioned hash joins.
The Output of Batch Workflows
Generalmente el resultado final de un procesamiento en batch no es un reporte, sino un tipo diferente de estructura.
Uno de los resultados son los índices de búsquedas. Google usó en sus inicios MapReduce para esto.
Otro uso importante para el procesamiento en batch son archivos para bases de datos key-value. Estos pueden ser usados para construir sistemas de Machine Learning como clasificadores o sistemas de recomendación.
Como se observa la filosofía de Unix también es aplicada los procesos en batch: Inmutabilidad (los archivos no se modifican) Separación de responsabilidades (cada job de MapReduce separan la lógica de la conexión) Reintento de tareas
Comparing Hadoop to Distributed Databases
Como hemos visto, Hadoop es una versión distribuida de Unix. Pero la idea de MapReduce no nació ahí. Antes teníamos implementaciones llamadas Bases de Datos MPP (Massively Parallel Processing).
Estas 2 implementaciones se diferencian en los tipos de almacenamiento usados. Mientras que Hadoop guarda todo en archivos, que van desde logs hasta imágenes, las MPP guardan los datos estructuradamente.
Esto hace que Hadoop sea usado como ETL para MPP.
MPP es un monolito de muchas piezas unidas, por lo que se desempeña muy bien para propósitos generales. Se puede usar SQL y herramientas de análisis de datos. Por otro lado, a pesar que MapReduce no se desempeña tan bien como MPP, se puede aplicar código a los datos, por lo que se pueden hacer cosas más específicas, como modelos de ML, sistemas de recomendación etc etc.
Las MPP tratan de mantener la mayor parte de la data en memoria, para evitar los problemas de performance que causan ir a disco por la información. Si un query falla, es posible que se vuelva a reintentar según alguna parametrización, pero por lo general no se hace causando que al usuario no se le muestren resultados. Mientras que MapReduce fue diseñado para ser tolerante a fallos, por lo que, se pueden re ejecutar los jobs cada vez que sea necesario, además consume muy pocos recursos.
Beyond MapReduce
MapReduce se hizo muy popular en la década del 2000, pero es apenas uno entre los muchos modelos de programación para sistemas distribuidos.
Materialization of Intermediate State
Como vimos anteriormente para, para crear un workflow en MapReduce, el archivo salida de un Job, se configura para que sea la entrada de otro en un lugar diferente en el HDFS. A esto se le conoce como estado intermedio (Intermediate state). El proceso de escribir ese estado en el directorio se le conoce como materialización.
Esto tiene algunas desventajas vs los pipes de Unix. Por ejemplo un job solo puede empezar una vez el job que deja el archivo finalice. Por lo que un problema en performance en el job1 puede afectar al job2. Otro problema es que materializar estados intermedios significa que todos esos archivos se replicarán en todos los nodos que usen el HDFS.
Con el fin de reducir dichas desventajas, se crearon herramientas sobre MapReduce, como lo son Spark Tez y flink. Convirtiéndolo en un flujo donde, la entrada de un proceso es la salida de otro.
Esto trae consigo varias ventajas como Ahorro computacional, ya que en muchas ocasiones no es necesario hacer un ordenamiento Muchas veces los estados intermedios no se copian en el HDFS, ya que no es necesario, por ende este no se replica en todos los nodos.
Graphs and Iterative Processing
El procesamiento en batch de datos en grafos es muy útil a la hora de crear modelos de ML de recomendación o sistemas de ranking.
MapReduce como tal no soporta iteraciones (es necesario para reprocesar nodos en cada una de sus ramas). Por lo que se podría hacer que un re-scheduler podría programar nuevamente el job para ejecutar nuevamente.
Como una optimización de dichas iteraciones, existen varios algoritmos basados en un Paper de Google (Google Pregel) el cual permite que se puedan procesar los nodos, y guardar un estado en cada uno, permitiendo reprocesamiento en caso que algo falle. Esto se hace por medio de mensajes, donde cada nodo envía un mensaje a su nodo adyacente. La ejecución distribuida del procesamiento de grafos, generalmente es lenta, debido a la comunicación entre los nodos. Cuando son muchos nodos, y estos están en diferentes máquinas, el overhead de los mensajes entre nodos puede ralentizar bastante el procesamiento. La mejora de performance de este tipo de procesamiento es un área que aún está en desarrollo
High-Level APIs and Languages
Existen muchos productos que tienen implementado MapReduce por debajo como lo son, Spark, Hive, Pig, Cascading, entre otros.
Estas implementaciones usan un query language declarativo, lo que trae consigo muchas ventajas. Cada uno elige la mejor manera (algoritmo) de seleccionar los datos; además el código es más fácil de mover entre plataformas.
En los últimos años han ido saliendo implementaciones más específicas también, para poder implementar código más especializado usado para modelos de ML (Spark, Flink, MADlib)