Я продолжаю описывать принцип работы MapReduce заданий. Еще раз напоминаю, что hive - это целиком и полнстью MapReduce, просто описываете вы его языком более верхнего уровня, нежели java. Давайте посмотрем более детально, что же происходит, когда мы вновь пытаемся посчитать слова в "Евгение Онегине".
Как было сказано раньше, блоки бьются строго по размеру, никакой дополнительной логики нет.
Допустим строка "Когда не в шутку занемог" была разбита на 2 части, которые ушли в соответствующие блоки. В первый блок: "Когда не в шутку занем", а "ог" во второй.

Определив размеры входных порций (размер блока или параметр mapred.max.split.size)
Блоки располагаются на двух разных серверах, когда RecordReader читает следующую строчку, в случае если он не видит символа перевода строки - подтягивает недостающий элемент из соседнего блока - пересылка по сети. Здесь, кстати говоря, очень важна низколатентная сеть.
Дальше начинается процесс Map схема изображенная выше - слишком поверхностная, давайте рассмотрим более подробную:

1)Map пишет выход в некий буфер (io.sort.mb по умолчанию 100 Мб). На вход получает (k0, v0) на выходе (k1, v1) - пишутся в этот самый буфер.
2) Когда буфер заполнен на io.sort.spill.percent (по умолчанию 0.8) в бэкграунде начинает происходить spill (сброс данных на диск), при этом Map продолжает писать в буфер, НО если буфер будет заполнен полностью – map заморозит работу до конца spill. Spill пишет на локальный диск (mapred.local.dir). Во время spill происходит:
- Разделение на reduce порции (partitioner) и запись в отдельные файлы (partition file).
- Сортировка по ключу каждой reduce порции.
- Опционально. Combiner – “локальный reduce”.
Если параметр mapred.compress.map.output = true, на диск пишутся компрессированные данные. Компрессионный кодек:mapred.map.output.compression.codec.
3) После того как map записывает последний spill, начинается объединение и сортировка каждого partition файла в один (в io.sort.factor потоков, по умолчанию один). Идеальный вариант: один map – один spill. Если partition файлов больше чем min.num.spills.for.combine (по умолчанию 3) – запускается еще раз combiner.
4) После окончания map стартует copy фаза reduce - начинает забирать свою порцию данных в mapred.reduce.parallel.copies потоков (по умолчанию 5 на каждый reduce) в JVM память reduce. Это т буфер регулируется параметром mapred.job.shuffle.input.buffer.percent
5) После того как отработал последний map начинается сортировка-слияние map-ов в один файл. Количество файлов (которое определяется количеством map) участвующих в одной итерации равно io.sort.factor (10 по умолчанию).
Например было 40 map, тогда sort phase выполнится в 5 итераций (4 по 10 + одна – финальная сортировка-слияние). В идеале количество map должно стремиться к io.sort.factor. Попутно происходит преобразование пар (K1,V1) в (K1,(ist(V))
6) Reduce программа написанная пользователем. Преобразование (K1,V1) в (K1,(ist(V)) в (K2,V2)
Все вышеописанное позволяет понять способы разгона и оптимизации MapReduce заданий. Я намерено не консолидировал их в таблицу стандартных советов, что бы читатель смог подумать, поэксперементировать, самостоятельно сделать выводы о значениях некоторых параметров.
Добрый день, Mijatovic.
ReplyDeleteМного вопросов по статье:
1. в Hadoop есть namenood - Она одна (есть еще резервная). Так вот - правильно ли я понимаю, что inputformat и split делает именно она ?
2. когда происходит RR чтение другой строки и видит что часть строки надо подтягивать с другого сервера (другой Node) то этот процесс у вас никак не прокомментировать. Хотелось бы подробнее понять что в таком случае происходит и как этого избежать. Т.к. опять сеть является слабым звеном.
Большое спасибо за статью.
С уважением,
Николай.
Вопросы это хорошо:)
ReplyDeleteОтветы:
1)
> в Hadoop есть namenood - Она одна (есть еще резервная)
начиная с версии CDH4 есть такая фича как NameNode High Availability (о ней я планирую написать позже), когда у меня есть "боевая" NN и Standbay. До CDH4 была немного другая модель (не было HA, была только Secondary Name Node, которая не выполняла роль standby).
Но это только предыстория:) На самом деле NN никак не задействована в split - это выполняется на DN (локально). Все выше описанное происходит на DN
2) Идет запрос на другой сервер (IPC). Происходит тогда когда строка попала в 2 разных блока. Избежать наверное никак нельзя - hadoop пилит файл строго по блокам. Использование низколатентных сетей (Infiniband) сильно снижает масштабы бедствия.