Sunday, 17 March 2013


Пост 8. MapReduce для взрослых. Performance tuning.

Я продолжаю описывать принцип работы MapReduce заданий. Еще раз напоминаю, что hive - это целиком и полнстью MapReduce, просто описываете вы его языком более верхнего уровня, нежели java. Давайте посмотрем более детально, что же происходит, когда мы вновь пытаемся посчитать слова в "Евгение Онегине".
Как было сказано раньше, блоки бьются строго по размеру, никакой дополнительной логики нет.
Допустим строка "Когда не в шутку занемог" была разбита на 2 части, которые ушли в соответствующие блоки. В первый блок: "Когда не в шутку занем", а "ог" во второй.


Определив размеры входных порций (размер блока или параметр mapred.max.split.size)

Блоки располагаются на двух разных серверах, когда RecordReader читает следующую строчку, в случае если он не видит символа перевода строки - подтягивает недостающий элемент из соседнего блока - пересылка по сети. Здесь, кстати говоря, очень важна низколатентная сеть.

Дальше начинается процесс Map схема изображенная выше - слишком поверхностная, давайте рассмотрим более подробную:

1)Map пишет выход в некий буфер (io.sort.mb по умолчанию 100 Мб). На вход получает (k0, v0) на выходе (k1, v1) - пишутся в этот самый буфер.
2) Когда буфер заполнен на io.sort.spill.percent (по умолчанию 0.8) в бэкграунде начинает происходить spill (сброс данных на диск), при этом Map продолжает писать в буфер, НО если буфер будет заполнен полностью – map заморозит работу до конца spill. Spill пишет на локальный диск (mapred.local.dir). Во время spill происходит:

- Разделение на reduce порции (partitioner) и запись в отдельные файлы (partition file).
- Сортировка по ключу каждой reduce порции.
- Опционально. Combiner – “локальный reduce”.
Если параметр mapred.compress.map.output = true, на диск пишутся компрессированные данные. Компрессионный кодек:mapred.map.output.compression.codec.

3) После того как map записывает последний spill, начинается объединение и сортировка каждого partition файла в один (в io.sort.factor потоков, по умолчанию один). Идеальный вариант: один map – один spill. Если partition файлов больше чем min.num.spills.for.combine (по умолчанию 3) – запускается еще раз combiner.

4) После окончания map стартует copy фаза reduce - начинает забирать свою порцию данных в mapred.reduce.parallel.copies потоков (по умолчанию 5 на каждый reduce) в JVM память reduce. Это т буфер регулируется параметром mapred.job.shuffle.input.buffer.percent

5) После того как отработал последний map начинается сортировка-слияние map-ов в один файл. Количество файлов (которое определяется количеством map) участвующих в одной итерации равно io.sort.factor (10 по умолчанию).

Например было 40 map, тогда sort phase выполнится в 5 итераций (4 по 10 + одна – финальная сортировка-слияние). В идеале количество map должно стремиться к io.sort.factor. Попутно происходит преобразование пар (K1,V1) в (K1,(ist(V))

6) Reduce программа написанная пользователем. Преобразование (K1,V1) в (K1,(ist(V)) в (K2,V2)
Все вышеописанное позволяет понять способы разгона и оптимизации MapReduce заданий. Я намерено не консолидировал их в таблицу стандартных советов, что бы читатель смог подумать, поэксперементировать, самостоятельно сделать выводы о значениях некоторых параметров.

2 comments:

  1. Добрый день, Mijatovic.

    Много вопросов по статье:
    1. в Hadoop есть namenood - Она одна (есть еще резервная). Так вот - правильно ли я понимаю, что inputformat и split делает именно она ?
    2. когда происходит RR чтение другой строки и видит что часть строки надо подтягивать с другого сервера (другой Node) то этот процесс у вас никак не прокомментировать. Хотелось бы подробнее понять что в таком случае происходит и как этого избежать. Т.к. опять сеть является слабым звеном.

    Большое спасибо за статью.

    С уважением,
    Николай.

    ReplyDelete
  2. Вопросы это хорошо:)
    Ответы:
    1)
    > в Hadoop есть namenood - Она одна (есть еще резервная)
    начиная с версии CDH4 есть такая фича как NameNode High Availability (о ней я планирую написать позже), когда у меня есть "боевая" NN и Standbay. До CDH4 была немного другая модель (не было HA, была только Secondary Name Node, которая не выполняла роль standby).

    Но это только предыстория:) На самом деле NN никак не задействована в split - это выполняется на DN (локально). Все выше описанное происходит на DN

    2) Идет запрос на другой сервер (IPC). Происходит тогда когда строка попала в 2 разных блока. Избежать наверное никак нельзя - hadoop пилит файл строго по блокам. Использование низколатентных сетей (Infiniband) сильно снижает масштабы бедствия.

    ReplyDelete