Wednesday 27 March 2013

Пост 10. Сортировка в hadoop. Синтаксис hive.

Итак, сегодня хотел бы рассказать об основных алгоритмах MapReduce и как они могут быть имплементированы в рамках данной парадигмы.
Сортировка – идея проста: на шаге map идет простая разметка, например на вход подается
(K0,V0) – (0, word1)
На выходы это преобразуется в
(K1,V1) – (word1,1)
Стоит заметить, что 0 во входной паре и 1 в выходной  - удовлетворение требования.
Далее идет стадия partition – на которой определяется на какой редьюсер пойдет выходная пара (если редьюсер один – то все становится просто)
                Вся магия сортировки в Hadoop – выбираем функцию partitioning такую, что если k1<k2, то и partitioning(k1) <= partitioning(k2).
Reduce не делает ничего. Framework уже все отсортироавл, прежде чем подавать это на reduce.

Давайте рассмотрим пример.
Создаем файл 12345 куда вбиваем алфавит:
A
B
C
….
Размножаем его:
for i in `seq 1 100000`; do cat 12345 >> /tmp/alpha.txt; done;
Загружаем в HDFS с размером блока 1 Мб
hadoop fs -D dfs.blocksize=1048576 -put /tmp/alpha.txt /tmp/
Проверяем количество блоков:
hadoop fsck /tmp/alpha.txt -locations -blocks –files
Создаем в HIVE  табличку с одной колонкой (я назвал ее sort_test)
Пишем запрос:
create table sort_result as
select col_0 from sort_test
sort by col_0
До того как его запустить выставим параметр
mapred.max.split.size=1048576
Иначе не будет нескольких map  - все свалится в один (с этим параметром у меня получилось 5 map), количество reduce тоже выставляем равным mapred.reduce.tasks=5
(! Сейчас я только имитирую большое количество данных и серверов, поэтому выставляю эти параметры).
После того как запрос отработает можно посмотреть что получилось:
[oracle@localhost ~]$ hadoop fs -ls /user/beeswax/warehouse/sort_result/
Found 5 items
-rw-r--r--   1 oracle hadoop     878714 2013-03-24 12:46 /user/beeswax/warehouse/sort_result/000000_0
-rw-r--r--   1 oracle hadoop     879928 2013-03-24 12:46 /user/beeswax/warehouse/sort_result/000001_0
-rw-r--r--   1 oracle hadoop     878716 2013-03-24 12:46 /user/beeswax/warehouse/sort_result/000002_0
-rw-r--r--   1 oracle hadoop     879966 2013-03-24 12:46 /user/beeswax/warehouse/sort_result/000003_0
-rw-r--r--   1 oracle hadoop     882676 2013-03-24 12:47 /user/beeswax/warehouse/sort_result/000004_0
Вытянемэто на локальную машину
hadoop fs -get /user/beeswax/warehouse/sort_result/*
И увидим 5 практически идентичных файлов.
Ну вот как то так оно все сортируется. Из hadoop все это можно вытащить в виде одного файла
hadoop fs -getmerge /user/beeswax/warehouse/sort_result/* big_file
но не стоит обольщаться – вы получите один большой файл вида:
a
z
a
z
То есть финальной сортировки не будет.

Если все таки хочется получить на выходе один файл в отсортированном порядке – можно указать вместо sort by конструкцию order by
create table sort_result as
select col_0 from sort_test
order by col_0

Tuesday 19 March 2013

Пост 9. Hive и MapReduce. Агрегаты.

Итак, если вы внимательно прочитали все мои предыдцщие посты - вы теперь знаете достаточно, что бы воспринять Hive как MapReduce инструмент. Только задания Map и Reduce вы пишете не на Java, а на HQL (язык весьма напоминающий SQL, но там нет некоторых конструкций - одиночной вставки, update...). На самом деле MapReduce вы "пишете в двух метах" - при создании таблицы и при написании select.
Рассмотрим пример. У Вас есть плоский файл (csv), который лежит в HDFS. Плоский файл вида:
col1, col2, col3, col4
........
Лежит допустим где то в /user/examples/csv (естественно в hdfs).
Описываем его следующим DDL:
CREATE EXTERNAL TABLE `simplest_csv`
(
  `a` string ,
  `b` string ,
  `c` string ,
  `d` string )
ROW FORMAT   DELIMITED
    FIELDS TERMINATED BY ','
  STORED AS TextFile LOCATION "/user/examples/csv"

Возьмен конкретный пример файла, который поместим в HDFS:
333, wqer, 56, 88m
2, wqer, 56, 88m
334, wqert, 54, 88w
335, wqerty, 59, 88e

Напишем запрос к этиму файлу

select sum(a),b from simplest_csv
group by b

Что будет происходить на фазах Map и Reduce?
На вход будет подаваться строка, RR будет преобразовывать ее в пару ключ-значение (ключом будет являться смещением строки относительно начала файла). Внутри Map будет происходить преобразование к новой паре ключ-значение: будут "викинуты" не перечисленые колонки (так что перечисляйте только то что надо перечислить!) колонка (колонки) по которой идет группировка - будет ключом, та котонка которую надо суммировать - значением. Новая пара KV идет на Reduce (как и многие другие). Но перед пересылкой на Reduce в Hive всегда на агрегационных функцияю применяется combiner. Что делает reduce думаю объяснять не надо.
Описав этот процесс думаю так же бессмысленно говорить о том что подобные запросы линейно масштабируемы - я думаю все и так все поняли:)

Окей, с csv все вроде бы ясно. А как быть с полуструктурированными даными? Допустим с JSON? Все опять предельно просто и алгоритмично! 
Возьмем файл данных 
{column1:333, column2: wqer, column3: 56}
{column1: 2, column2: wqer, column3: 56}
{column1: 334, column2: wqert, column3: 54}
{column1: 335, column2: wqerty, column3: 59}

и положим его в HDFS
[root@cdh ~]# sudo -u hdfs hadoop fs -mkdir /user/examples/json
[root@cdh ~]# sudo -u hdfs hadoop fs -put /tmp/1234 /user/examples/json/

Создаем таблицу:
CREATE EXTERNAL TABLE simplest_json
(column1 string,column2 string,column3 string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.JsonSerde'
LOCATION '/user/examples/json/'

И пишем запрос. Запрос к слову ничем не отличается от предыдущего:
select sum(column1),column2 from simplest_json
group by column2

И что же происходит? Практически то же самое что и в случае CSV, только на стадии Map происходит еще и парсинг строки (разбор JSON).


Просто магия какая то:)
Подытожим: 

1) Map:
- Перекомбинирование пар Key-Value
- Парсинг строчки
- Фильтрация по условию where
- Combiner (локальный reduce)
- Применение функций к некоторым
- Локальная свертка по колонкам если это необходимо

2) Reduce:
- свертка
Вывоы:
- Перечисляйте только нужные колонки (сократит выход map)
- Используйте где это возможно условие where (сократит выход map)

Sunday 17 March 2013


Пост 8. MapReduce для взрослых. Performance tuning.

Я продолжаю описывать принцип работы MapReduce заданий. Еще раз напоминаю, что hive - это целиком и полнстью MapReduce, просто описываете вы его языком более верхнего уровня, нежели java. Давайте посмотрем более детально, что же происходит, когда мы вновь пытаемся посчитать слова в "Евгение Онегине".
Как было сказано раньше, блоки бьются строго по размеру, никакой дополнительной логики нет.
Допустим строка "Когда не в шутку занемог" была разбита на 2 части, которые ушли в соответствующие блоки. В первый блок: "Когда не в шутку занем", а "ог" во второй.


Определив размеры входных порций (размер блока или параметр mapred.max.split.size)

Блоки располагаются на двух разных серверах, когда RecordReader читает следующую строчку, в случае если он не видит символа перевода строки - подтягивает недостающий элемент из соседнего блока - пересылка по сети. Здесь, кстати говоря, очень важна низколатентная сеть.

Дальше начинается процесс Map схема изображенная выше - слишком поверхностная, давайте рассмотрим более подробную:

1)Map пишет выход в некий буфер (io.sort.mb по умолчанию 100 Мб). На вход получает (k0, v0) на выходе (k1, v1) - пишутся в этот самый буфер.
2) Когда буфер заполнен на io.sort.spill.percent (по умолчанию 0.8) в бэкграунде начинает происходить spill (сброс данных на диск), при этом Map продолжает писать в буфер, НО если буфер будет заполнен полностью – map заморозит работу до конца spill. Spill пишет на локальный диск (mapred.local.dir). Во время spill происходит:

- Разделение на reduce порции (partitioner) и запись в отдельные файлы (partition file).
- Сортировка по ключу каждой reduce порции.
- Опционально. Combiner – “локальный reduce”.
Если параметр mapred.compress.map.output = true, на диск пишутся компрессированные данные. Компрессионный кодек:mapred.map.output.compression.codec.

3) После того как map записывает последний spill, начинается объединение и сортировка каждого partition файла в один (в io.sort.factor потоков, по умолчанию один). Идеальный вариант: один map – один spill. Если partition файлов больше чем min.num.spills.for.combine (по умолчанию 3) – запускается еще раз combiner.

4) После окончания map стартует copy фаза reduce - начинает забирать свою порцию данных в mapred.reduce.parallel.copies потоков (по умолчанию 5 на каждый reduce) в JVM память reduce. Это т буфер регулируется параметром mapred.job.shuffle.input.buffer.percent

5) После того как отработал последний map начинается сортировка-слияние map-ов в один файл. Количество файлов (которое определяется количеством map) участвующих в одной итерации равно io.sort.factor (10 по умолчанию).

Например было 40 map, тогда sort phase выполнится в 5 итераций (4 по 10 + одна – финальная сортировка-слияние). В идеале количество map должно стремиться к io.sort.factor. Попутно происходит преобразование пар (K1,V1) в (K1,(ist(V))

6) Reduce программа написанная пользователем. Преобразование (K1,V1) в (K1,(ist(V)) в (K2,V2)
Все вышеописанное позволяет понять способы разгона и оптимизации MapReduce заданий. Я намерено не консолидировал их в таблицу стандартных советов, что бы читатель смог подумать, поэксперементировать, самостоятельно сделать выводы о значениях некоторых параметров.