Monday 12 November 2012

Пост 7. Map-only задания.

В данном посте хотел бы рассказать о таком явлении как map only задании. Начав изучать hadoop и MapReduce я все больше больше задумывался о конкретной реализации различных операций в рамках MapReduce парадигмы, пытался представить что же будет делать шаг Map, а  что Reduce. Задача перевода текста в верхний региср. Хорошо, на стадии Map, все вроде бы понятно, берем кажрое слово по аналогии с wordcount и возвращаем его в верхнем регистре. А что делает reduce? Ничего. Есть такие задания, где выход Map задания и есть конечный ответ. Reduce просто напросто нет.

Примером таких Map-only MarReduce job могут быть:
- Обработка изображений
- ETL (sqoop, например)
- Конвертация из одного формата в другой

Мир hadoop удивителен:)
Если у вас есть какие либо вопросы - не стесняйтесь их задавать!
Пост 6. Combiner. Или как уменьшить колличество пересылок по сети.

Наверное каждый, кто начинал разбираться с hadoop и начинал с примера подсчета слов, задавал себе и окружающим один и тот же вопрос: зачем пересылать втупую все пары по сети, когда более эфективно было бы сделать локальную свертку, а только потом переслать по сети агрегаты. Hadoop не дураки писали, соответственно такая возможность есть!
Называется она combiner.

Как видно на картинке выше с выхода map пары key - value попадают на combiner, где выполняется код reduce. В итоге процент пересылки по сети значительно сокращается. Ура!  Но как вы пониимаете далеко не всегда можно использоавть данную возможность. В контексте задач вычисления агрегатов - "золотая" возможность, о которой вы должны помнить.

Если есть вопросы - не стесняйтесь их задавать!
Пост 5. Что такое MapReduce advanced version. Практика.

Постом ранее я достаточно подробно описал что же такое MapReduce. Наверное своим описанием еще больше Вас запутал. Но ничего думаю практический пример расставит все на свои места. Итак, давайте посчитаем слова! Да, это тот самый wordcount, только взгляд на него будет изнутри.
Итак, нам дан текст:
Мой дядя самых честных правил,
Когда не в шутку занемог,
Он уважать себя заставил
И лучше выдумать не мог.


Найти: как часто каждое из слов встречается в тексте.
Запускаем программу, смотрим на картинку и осознаем что же происходит за сценой:



1) Шаг InputFormat :
InputFormat - TextInputFormat, потому что если в Java коде явно не указывается тип входных данных, берется TextInputFormat по умолчанию.
2) Шаг Split;
Далее входной файл разбивается на блоки данных, что бы впоследствии сассоциировать один блок с одним map. Так как текст у нас невелик - скорее всего он полностью поместиться в один блок.
3) Шаг RecordReader (RR):
после того, как мы поняли, что это текст, и что ключом, в случае текста, будет смещение, а значение - остальная строка, строчку за строчкой (пару за парой) передаем вот это вот дело

K0,V0, где K0 – смещение начала стороки относительно документа(потому что формат TextInputFormat)

(0 ; Мой дядя самых честных правил)
(31; Когда не в шутку занемог)
(54; Он уважать себя заставил)
(78; И лучше выдумать не мог) 


программе map. RecordReader - цикл, который будет вызван в данном случае 4 раза. После того как отработает последний map на выходе будет что то вроде:

4) Шаг Map
5) Шаг Partitioner (+ shuffle):
На этом шаге определяется куда какая пара будет переслана, на какой сервер, на какой reduce. Если в коде специальным образом не указан класс partitioner - идет разбиенеие на основе hash функции от K1, например от "выдумать" или "честных"...
6) Шаг Sort
После пересылки запускается шаг Sort, на котром происходит преобразоавние K1, V1 к структуре K1, list(V) и сортировка по ключу. В итоге получается:
7) Ну и после того как все данные будут отсортированы они передаются на вход reduce(Java класс, который пишется програмистом, как и Map класс), который выполняет преобразование K1, list(V) в результирующие K2,V2.
Получается вот так:


Собственно все. Надеюсь после этого поста все стало намного понятнее. Если нет - welcome, задавайте вопросы!


Sunday 11 November 2012

Пост 4. Что такое MapReduce advanced version. Теория.

Полная(почти полная) схема MapReduce приведена ниже. Давайте я прокомментирую каждую фазу.


Итак, когда запускается MR задание, первым "в бой" вступает шаг InputFormat, на этом этапе hadoop понимает что за формат файла и подготавливает входные файлы к разбиению. Далее идет непосредственное разбиене файла на части, которые булут обработаны (как правило на блоки Hadoop), после этого на каждом вычислительном узле запускается столько map заданий, сколько блоков находится на нем. Затем каждый блок начинает считывать строку за строкой (пары key - value ) и передавать классу map (программе) пару за парой. Шаг map принимает  пару (K0, V0) на вход, что то с ней делает и выдает (K1, V1) на выходе. Далее идет детерминация - на какой редьюсер (какую ноду) отправить пару (K1, V1). По уполчанию для этого используется hash функция от K1, но можно написать свой класс, который будет определять алгоритм пересылки на редьюсеры. После этого идет пересылка по сети (шаг shuffle). И наконец... нет не reduce, перед ним есть еще один шаг - sort. На этом шаге идет преобразование пары (K1, V1) в структуру (K1, List(V) ), при этом все пары сортируются по ключу. Это очень важное свойство, которое используется во многих алгоритмах MapReduce. И наконец Reduce, Java класс, написанный пользователем, который преобразует структуру  (K1, List(V) ) в результирующую (K2, V2)!

Запутались?
Вот вам шпаргалка - описание потоков KeyValue

В следуюшем посте мы разберем практический пример и я очень надеюсь, что все станет ясно.
А пока прочитайте еще раз этот пост. Очень важно иметь четкие представления о том как же все таки работает MapReduce, для эффективного его использования!
Пост 3. MapReduce на пальцах.

Для того что бы начать описывать принцип работы Hive, я бы хотел сначала рассказать  о некоторых базовых понятиях hadoop. Итак, сегодня я хотел бы попытаться объяснить что же такое MapReduce. Классическое объяснение - можно посмотреть здесь. Но мне кажется, что это объяснение "на пальцах" не очень "пальцатое", то есть - нет какого-то четкого клеше, которое сформируется в голове у начинающего програмиста. Допустим у нас есть 5 корзинок(серверов), в каждой из которой лежат яблоки, груши, апельсины.


Задача - посчитать сколько у меня яблок, груш и апельсинов. В контексте MapReduce данная задача будет разбита на 2 фазы. Фаза Map - просто выделяем в каждой корзинке яблоки, груши и апльсины. Берем фрукт и говорим, что это.



После этого "выделения" начинается пересылка фруктов по сети. Все яблоки перемещаются в одну кучку (сервер), та же судьба ждет и груши с аппельсинами.


Да, это именно пересылка по сети. Сеть при определенной нагрузке - потенциальнео "слабое звено" кластера. Но впрочем в ряде случаев столь большой overhead можно обойти, но об это позже. После того как пересылка завершена - начинается стадия reduce - просто подсчитываем элемнты в каждой из кучек (один, два, три...)


Вот собственно и все. Это был самый вехний уровень абстракции. В следующем посте я хочу подробно рассказать о том как происходит процесс MapReduce. Да, маленькое замечание - весь этот процесс происходит на одних и тех же 5 серверах!
Если у Вас есть вопросы - не стесняйтесь их задавать!

Wednesday 7 November 2012

Пост 2. Sqoop импорт из реляционной базы данных в Hadoop.

Доброго времени суток! В своем первом посте я высказал мою точку зрения относительно Hadoop и Hive, теперь предлагаю перейти от философии к более конкретным вещам. Hive,как правило, работает с структурированными данными (таблицами). Зачастую данные попадают в Hive из реляционной базы данных (что то вроде архива, обеспечение жизненного цикла хранилища). Каким образом я могу это сделать? Голь на выдумку хитра, и существует множество способов проделать эту операцию. Я Вам расскажу об инструменте, предлагаемом cloudera – sqoop. Немного теории, что же умеет импортировать sqoop и какие вещи необходимо о нем знать:

1) Атомарной единицей импорта может быть:
- Партиция таблицы (поддерживает условие where)
- Одна таблица RDBMS
- Вся RDBMS
2) Sqoop умеет делать все это со всеми JDBC совместимыми базами. 
3) Импортирует из базы в HDFS ввиде плоских файлов (CSV) либо во внутреннем формате Hadoop Sequence File.
4) Поддерживает инкрементальный импорт данных

Можно долго еще описывать теоретическую часть, но в контексте sqoop будет полезнее посмотреть практический пример. Давайте импортируем данные из Oracle RDBMS.
Процесс импорта будет выглядеть примерно вот так:
Из реляционной базы данные попадают в hadoop, a далее файлы перемещаются внутри hadoop в дирректорию hive
Схема стенда в моем случае была следующей:

 Hadoop Client - отдельно выделенная машина с экземпляром ПО hadoop, установленном на нем Cloudera manager, Hue, Hive, Sqoop. Никаких вычислительных задач и задач хранения эта нода не выполняет (ни DataNode, TackTracker процессов там не запущено). С этой машины в дальнейшем я буду работать с этой машины с кластером Hadoop (9 серверов) и сервером баз данных Oracle RDBMS.  Давайте импортируем таблицу из базы данных Oracle в Hadoop. Для примера я взял стандартную схему scott и стандартную таблицу emp.
1) Устанавливаем на машине «Hadoop client» sqoop.

[root@cdh ~]# yum install sqoop

2) Шаг второй, опциональный – проверка JDBC соединения c базой данных. 
Для этого нам понадобится написать маленькую Java программку

[root@cdh ~]# cat OracleJDBC.java
import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.SQLException;
public class OracleJDBC {
        public static void main(String[] argv) {
                System.out.println("-------- Oracle JDBC Connection Testing ------");
                try {
                        Class.forName("oracle.jdbc.driver.OracleDriver");
                } catch (ClassNotFoundException e) {
                        System.out.println("Where is your Oracle JDBC Driver?");
                        e.printStackTrace();
                        return;
                }
                System.out.println("Oracle JDBC Driver Registered!");
                Connection connection = null;
                try {
                        connection = DriverManager.getConnection(
                                        "jdbc:oracle:thin:@db11203x64:1521:orcl", "scott",tiger");
                } catch (SQLException e) {
                        System.out.println("Connection Failed! Check output console");
                        e.printStackTrace();
                        return;
                }
                if (connection != null) {
                        System.out.println("You made it, take control your database now!");
                } else {
                        System.out.println("Failed to make connection!");
                }
        }
}
[root@cdh oracle]# javac OracleJDBC.java
[root@cdh oracle]# java -classpath /home/oracle/ojdbc6.jar:/home/oracle/ OracleJDBC
-------- Oracle JDBC Connection Testing ------
Oracle JDBC Driver Registered!
You made it, take control your database now!
[root@cdh oracle]#
Ура! Коннект есть!

3) На всех серверах кластера в /etc/hosts/ прописываем сервер базы данных (в моем случае это было echo >> “192.168.1.108 db11203x64”). 
Прим.: для подобных вещей удобно использовать утилиту pdsh

4) Импортируем из базы данные Oracle в Hadoop
[root@cdh oracle]# sqoop import --connect jdbc:oracle:thin:@//db11203x64:1521/orcl --username scott -password tiger --table emp  --target-dir /tmp/oracle_dump --columns "EMPNO,DEPTNO" -m 1
,где db11203x64 – имя сервера баз данных, 1521 – порт прослушки listener, orcl – SID Oracle RDBMS

5) Просмотрим результат
[root@cdh oracle]# hadoop fs -ls /tmp/oracle_dump
Found 3 items
-rw-r--r--   3 root hdfs          0 2012-11-02 09:05 /tmp/oracle_dump/_SUCCESS
drwxrwxrwt   - root hdfs          0 2012-11-02 09:04 /tmp/oracle_dump/_logs
-rw-r--r--   3 root hdfs        112 2012-11-02 09:04 /tmp/oracle_d
[root@cdh oracle]# hadoop fs -cat /tmp/oracle_dump/part-m-00000
12/11/02 10:22:08 INFO util.NativeCodeLoader: Loaded the native-hadoop library
7369,20
7499,30
7521,30
7566,20
7654,30
7698,30

6) Ну а теперь напрямую в hive
[root@cdh oracle]# sqoop import --connect jdbc:oracle:thin:@//db11203x64:1521/orcl --username scott -password tiger --table emp --columns "EMPNO,DEPTNO" --hive-import --warehouse-dir /user/beeswax/warehouse --fields-terminated-by ',' --split-by EMPNO --hive-table emp -m 1Импорт «напрямую» подразумевает добавление мета информации в базу данных Hive.

7)  Давайте импортируем еще одну таблицу - побольше. Сначала мы ее создадим.
На сервере базы данных:
[oracle@db11203x64 ~]$ sqlplus scott/tiger@orcl
SQL*Plus: Release 11.2.0.3.0 Production on Wed Nov 7 02:31:52 2012
Copyright (c) 1982, 2011, Oracle.  All rights reserved.
Connected to:
Oracle Database 11g Enterprise Edition Release 11.2.0.3.0 - 64bit Production
With the Partitioning, Automatic Storage Management, OLAP, Data Mining
and Real Application Testing options
SQL> create table test_for_dump(a number,b number,c varchar2(256));
SQL> begin
  for i in 1..10000000 loop
  insert into test_for_dump values(i,i*8,'ramdom string');
  end loop;
  commit;
  end;
/
SQL> select bytes/(1024*1024) from user_segments where segment_name='TEST_FOR_DUMP';

BYTES/(1024*1024)
-----------------
              341
Ну вот уже не плохо таблица 341 Мб.
На сервере клиента hadoop
[root@cdh oracle]# sqoop import --connect jdbc:oracle:thin:@//db11203x64:1521/orcl --username scott -password tiger --table test_for_dump --columns "A,B,C" --hive-import --warehouse-dir /user/beeswax/warehouse --fields-terminated-by ',' --split-by A --hive-table sqoop_test  -m 1
Время выполнения - 1min43sec. Как бы мне ускорить этот поцесс? Распараллелить! Меняем в настройках   -m 1 на   -m 9 это означает, что к базе данных будут подключаться не один мэпер (сервер), а 9 и в параллель тащить данные от туда.

[root@cdh oracle]# sqoop import --connect jdbc:oracle:thin:@//db11203x64:1521/orcl --username scott -password tiger --table test_for_dump --columns "A,B,C" --hive-import --warehouse-dir /user/beeswax/warehouse --fields-terminated-by ',' --split-by A --hive-table sqoop_test  -m 9

Время выполнения - 31sec, намного лучше. Как правило при импорте данных на большой кластер Hadoop узким местом становится дисковая подсистема баз данных.

8) Ну и напоследок давайте рассмотрим инкрементальную загрузку данных (догрузку)
на сервере баз данных:
SQL>  insert into test_for_dump values(10000001,1,'');
1 row created.
SQL> commit;
Commit complete.

На клиенте Hadoop
[12:28]hdfs@cdh:~$ sqoop import --connect jdbc:oracle:thin:@//db11203x64:1521/orcl --username scott -password tiger --table test_for_dump --columns "A,B,C" --hive-import --warehouse-dir /user/beeswax/warehouse --fields-terminated-by ',' --split-by A --hive-table sqoop_test -m 9 --check-column A --incremental append --last-value 10000000

Догрузилась одна строка. Ура!
До новых постов!
Если У вас есть вопросы - не стесняйтесь их задавать!

Tuesday 30 October 2012

Пост 1. Взгляд автора на Hadoop и Hive

Итак, наконец-таки я начал делать то что хотел сделать очень давно – завел блог в котором планирую рассказывать о Hadoop и Hive (мне как человеку из мира баз данных hive ближе всего в контексте высокоуровневых языков). Сразу хочу спозиционировать данный ресурс, как описывающий «внутренний мир» hive, мне не очень хочется рассказывать о том как писать SQL  HQL, обо всем этом можно прочитать здесь, я планирую описать операции с  хранилищем с точки зрения Map Reduce и HDFS (то есть hadoop). Откуда он взялся и почему в последнее время набирает такую популярность? Существует много мнений озвучу свое. Не так давно наткнулся на замечательную статью на хабре, если вкратце там идет описание о том что раньше в ВУЗах (американских) информатике учили лучше, программировали на С, было множество фундаментальных курсов, но расплатой за это было то что из 100 человек до финиша доходило 20, остальных отчисляли. Когда же ИТ перестало быть «глупым» увлечением фанатиков, а стало частью бизнеса (или отдельным бизнесом), на выпускников подобных специальностей упало пристальное внимание множества фирм. Но людей было мало, по крайней мере меньше чем требовалось, и коммерческие организации завыли, что некому писать очередную бухгалтерскую программу. Учебные заведения сопротивлялись, но в итоге пошли «в ногу со временим» и убрали ряд фундаментальных курсов, заменили С на Java и.т.п. Тем самым вырос поток, выпускаемый из альма-матер в мир денег и бухгалтерских приложений (до конца доживало 90 из 100). Отсюда получили большое количество «быдло программистов», которые начали плодить глючные приложения, прочитав что то вроде «Java для начинающих», отсюда и миф о том что Java простой и плохой язык. Скажу сразу я не большой спец по Java (я один из быдло программистов, но прогающий для души, а не для предприятия), но весьма уважаемые мною люди неоднократно говорили, что Java не «медленный язык» на котором ничего толкового написать нельзя, а мощный инструмент которым мало кто умеет правильно пользоваться. Но впрочем что я о Java да о Java? Давайте поговорим о том как масштабировать приложения. Начинается все с него, с ноутбука. После разработки приложение переносится на сервер, допустим стареньки и слабенький. Потом после того как нагрузка выросла – переносим на более или менее приличный сервер наше приложение. Нагрузка выросла еще больше... что делать? Дальше можно купить за большие деньги дорогой сервер (масштабирование по вертикали), либо рядышком поставить такой же сервер и распределить нагрузку между ними (масштабирование по горизонтали). Но приложение в случае горизонтального масштабирования должно знать о том, что оно распределенное. Тут на помощь приходит MPI (или PVM). Если вы когда-либо использовали MPI, вы понимаете, что нередко большой проект становился линиями Манергейма. Писать под много серверов сложно (чем больше серверов, тем сложнее программа). Это первый момент. Второй момент – это не эластичность программы. Есть программа написана под 3 сервера, она будет работать на 3 серверах, для того что бы распараллелить на 4 надо переписывать программу (еще один проект со своими рисками). Даже если вы все же решились запустить MPI проект стоит иметь ввиду, то что программист будет стоить очень и очень хороших денег. Альтернативы? Альтернатива – MapReduce. MapReduce – принцип распределенных приложений (одно приложение, запущенное на множестве серверов). Плюсы: легкая масштабируемость – пишем код, запускаем на 16 серверах. Выросли из 16 серваков, доставили еще 8, запускаем туже самую программу на 24 и она работает на всех машинах кластера (а не на 16 как это было бы в случае MPI). Второй плюс то что в имплементации hadoop разрабатывать достаточно просто,ну а hive еще больше упрощает процесс разработки. Третий и наверное основной плюс – масштабируемость на большое количество вычислительных узлов в yahoo,например, стоит кластер около 40 000 нод, сколько у гугла даже боюсь представить. Ура! Ну а минусы? Чудес не бывает – MapReduce медленнее MPI. Как правило, мы платим производительностью. При прочих равных MapReduce,скорее всего, будет работать медленнее MPI.
Итак, все таки что же такое Hadoop?
Hadoop = MapReduce + HDFS
HDFS () – распределенная файловая система. Одна логическая файловая система на множестве серверов.
MapReduce – распределенные вычисления над HDFS.
Ну и естественно, hadoop это Sharing Nothing система. Код доставляется к данным.
А что же такое Hive? Hive – некий переводчик с HQL (язык похожий на SQL) в MapReduce. Когда вы пишете HQL, hive переводит его в MapReduce и выполняет (соответственно используя всю мощь параллельных вычислений).
Вот как то так. Далее я планирую рассказывать о том как именно выполняется HQL и о том какие особенности разработки под Hive существуют. Не стесняйтесь задавать вопросы, если таковые появятся!