Thursday 29 August 2013


Пост 19. Pushdown запросов в Hadoop из Oracle RDBMS или как заставить hadoop  выполнять тяжелые запросы на своей стороне.

Доброго времени суток!
В предыдущем посте я описывал способ, с помощью которого можно прочитать данные, лежащие на HDFS, через внешнюю таблицу Oracle. Важно понимать, что в этом случае hadoop используется только как хранилище фалов (HDFS) и мы никак не задействуем мощь MapReduce.
То есть если мы пытаемся сделать join двух таблиц, он будет выполняться на стороне СУБД (поток данных будет благополучно передан на головы базы в PGA, где и будет происходить объединение). А что же делать, если мне хочется сделать JOIN двух таблиц на стороне Hadoop (использовать всю мощь MapReduce). Для этого надо сделать несколько шаманских действий:
       1)      Создать shell скрипт (назовем его 7.hive_pushdown.sh), который положим на сервер СУБД:

#!/bin/bash
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/usr/lib/hive/lib/*
export ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1
export ORACLE_SID=orcl
source /home/oracle/.bash_profile
export BASH=/bin/bash
HADOOP_CLASSPATH=':/u01/app/oracle/product/11.2.0/db_home1/jdbc/lib/ojdbc6.jar:/u01/app/oracle/product/11.2.0/db/jdbc/lib/ojdbc6.jar:/usr/lib/hive/lib/hive-metastore-0.10.0-cdh4.3.0.jar:/usr/lib/hive/lib/hive-metastore-0.10.0-cdh4.3.0.jar:/u01/app/oracle/product/11.2.0/db/jlib/oraclepki.jar:/usr/lib/hbase/hbase-0.94.6-cdh4.3.0-security.jar:/usr/lib/hive/lib/libthrift-0.9.0.jar:/usr/lib/hive/lib/libfb303-0.9.0.jar:/usr/lib/hive/lib/*:/usr/lib/hadoop/lib/*:/usr/lib/hadoop/lib/*'
export HADOOP_CLASSPATH=/u01/connectors/oraloader/jlib/*:/usr/lib/hive/conf:/u01/connectors/orahdfs/jlib/*:/u01/app/oracle/product/11.2.0/dbhome_1/jdbc/lib/*:/u01/nosql/kv-2.0.26/lib/kvstore.jar:/usr/lib/hive/lib/*:/usr/lib/hive/lib/*
export HADOOP_CONF_DIR=/home/oracle/hadoop-conf
export JAVA_HOME=/usr/java/latest/
export NLS_LANG=AMERICAN_AMERICA.AL32UTF8
export PATH=/u01/app/oracle/product/11.2.0/dbhome_1/bin:/usr/kerberos/bin:/usr/local/bin:/bin:/usr/bin:/home/oracle/bin
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/usr/lib/hive/lib/*
export OSCH_HOME=/u01/connectors/orahdfs

/usr/bin/hive -e 'drop table new.test_results';
str1="create table new.test_results as ";
str2=$*;
str3=$str1$str2
echo $str3
/usr/bin/hive -e "$str3";
sqlplus -s demouser/welcome1 <<+EOF
drop table demouser.test_results;
+EOF
/usr/bin/hadoop jar $OSCH_HOME/jlib/orahdfs.jar \
       oracle.hadoop.exttab.ExternalTable \
       -conf /home/oracle/scripts/conf/7.create_table_from_hive_push.xml \
       -createTable

На вход этой программе подается SQL который надо выполнить. Скрипт выполняет его (предварительно удалив временную таблицу hive, если она осталась с прошлого раза):

/usr/bin/hive -e 'drop table new.test_results';
str1="create table new.test_results as ";
str2=$*;
str3=$str1$str2
echo $str3
/usr/bin/hive -e "$str3";

После чего лезет в СУБД Oracle и удаляет таблицу результатов (так же с прошлого раза):

sqlplus -s demouser/welcome1 <<+EOF
drop table demouser.test_results;
+EOF

После чего создает внешнюю таблицу в Oracle c по шаблону только что созданной  hive таблицы:

/usr/bin/hadoop jar $OSCH_HOME/jlib/orahdfs.jar \
       oracle.hadoop.exttab.ExternalTable \
       -conf /home/oracle/scripts/conf/7.create_table_from_hive_push.xml \
       -createTable

Конфиг (точнее пример), необходимый OSCH прилагается наже:

<?xml version="1.0"?>
 <configuration>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://localhost:9083</value>
  </property>
    <property>
      <name>oracle.hadoop.exttab.tableName</name>
      <value>test_results</value>
    </property>
    <property>
      <name>oracle.hadoop.exttab.locationFileCount</name>
      <value>4</value>
    </property>
    <property>
      <name>oracle.hadoop.exttab.hive.databaseName</name>
      <value>new</value>
    </property>
    <property>
      <name>oracle.hadoop.exttab.sourceType</name>
      <value>hive</value>
    </property>
    <property>
      <name>oracle.hadoop.exttab.hive.tableName</name>
      <value>test_results</value>
    </property>
    <property>
      <name>oracle.hadoop.exttab.defaultDirectory</name>
      <value>HADOOP_DIR</value>
    </property>
    <property>
      <name>oracle.hadoop.connection.user</name>
      <value>demouser</value>
    </property>
    <property>
      <name>oracle.hadoop.connection.wallet_location</name>
      <value>/u01/app/oracle/product/11.2.0/dbhome_1/network/admin/</value>
    </property>

    <property>
      <name>oracle.hadoop.connection.tnsEntryName</name>
      <value>orcl</value>
    </property>

    <property>
      <name>oracle.hadoop.connection.tns_admin</name>
      <value>/u01/app/oracle/product/11.2.0/dbhome_1/network/admin/</value>
    </property>

</configuration>

Это делается один раз. Потом используется.

     2)      Создать в СУБД Oracle Java процедуру для вызова внешних процедур операционной системы (тоже делается только один раз ):

create or replace and compile java source named "OsUtils" as

import java.io.IOException;
import java.io.InputStream;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

class StreamGobbler
  extends Thread {

  InputStream is;
  String type;

  StreamGobbler (InputStream is, String type) {
    this.is = is;
    this.type = type;
  }

  public void run () {
    try {
      InputStreamReader isr = new InputStreamReader(is);
      BufferedReader br = new BufferedReader(isr);
      String line;
      while ((line = br.readLine()) != null) {
        OsUtils.result.append(line).append("\n");
      }
    } catch (IOException ioe) {
      ioe.printStackTrace();
    }
  }
}

public class OsUtils {

  public static StringBuffer result;

  public static int runCommand (String command)
    throws Throwable {

    result = new StringBuffer();

    Runtime rt = Runtime.getRuntime();
    Process proc = rt.exec(command);
    // Any error message?
    StreamGobbler errorGobbler = new StreamGobbler(proc.getErrorStream(), "ERROR");
    // Any output?
    StreamGobbler outputGobbler = new StreamGobbler(proc.getInputStream(), "OUTPUT");
    // Kick them off.
    errorGobbler.start();
    outputGobbler.start();
    // Any error?
    proc.waitFor();

    System.out.println(result.toString());
    return proc.exitValue();
  }
}
/

create or replace function run_os_command (p_cmd in varchar2) return number as
  language java name 'OsUtils.runCommand (java.lang.String) return int';
/

Назначим права:

exec dbms_java.grant_permission('DEMOUSER', 'SYS:java.lang.RuntimePermission', 'writeFileDescriptor', '' );
/
exec dbms_java.grant_permission('DEMOUSER', 'SYS:java.lang.RuntimePermission', 'readFileDescriptor', '' );
/
exec dbms_java.grant_permission('DEMOUSER', 'SYS:java.io.FilePermission', '<<ALL FILES>>', 'execute' );
/
commit;
/
exit;

     3)      Используем предыдущие 2 механизма для вызова ad-hoc запроса и выполнения его на стороне Hadoop (вызываем из базы данных Oracle):

set serveroutput on;
declare
i number;
v_sql varchar2(256):='select max(a_num) from new.tab_on_hadoop';
str   varchar2(1024);
begin   
                str:='/home/oracle/scripts /7.hive_pushdown.sh '||v_sql;
                dbms_output.enable(100000);
                i:=run_os_command(str);
                dbms_output.put_line(i);
end;
/
select * from  "DEMOUSER"."TEST_RESULTS";
/
exit;

Вот как то так. Описанный выше метод не истина в последней инстанции это только один из способов совместного применения СУБД и Hadoop. Минусы описанного выше способа: при каждом запросе удаляется и создается таблица в Oracle и Hadoop (а это изменение в словаре данных, что далеко не бесплатно). Вторым минусом является однопоточность данного решения. Только один пользователь может выполнять такой запрос. По поводу удаления если у вас предполагается один и тот же запрос, который будет многократно выполняться, можно использовать Oracle temporary table и каждый раз делать туда вставку.

Все вышенаписанное лишь выглядит страшно. Попробуйте и вы сами поймете весь профит который можно получить от «слонов». Если есть вопросы – welcome!

Tuesday 27 August 2013

Пост 18. Hadoop + RDBMS. Как прочитать данные из Hadoop через базу данных Oracle RDBMS.
OSCH.

"А зачем нужна дорога, если через неё нельзя перевести бабку" (С). 
Hadoop как много в этом звуке для седрца моего слилось... Но не будем забывать, что существуют конкретные технологии, которые сильны на определенных операциях. На других операциях они могут проигрывать. Слон не исключение. Так он откровенно проигрывает на большом колличестве низколатентных одновременных операций, которые по сути своей индекс ориентированы.
Что же делать? Использовать более подходящие технологии, например Oracle RDBMS. Для перегонки данных из hadoop в Oracle RDBMS существует множество способов. Один из них я опишу сегодня. Итак, название этому способу OSCH (Oracle SQL Connector for HDFS). Взять его можно здесь. Замечательная документация располагается здесь. Что же такое OSCH и когда его надо использовать? 

Oracle SQL Connector для HDFS является высокоскоростным коннектором для доступа к данным HDFS непосредственно из Oracle Database. 

Oracle SQL  onnector for HDFS позволяет осуществлять импорт данных из HDFS в любое время, по запросу приложений работающих с базой данных Oracle. Такой подход позволяет использовать стандартный SQL для доступа к данным HDFS, осуществлять связь с транзакционными таблицами Oracle DB. 

При доступе к HDFS осуществляется балансировка нагрузки между узалми. Данные хранящиеся на HDFS должны содержать файлы с разделителями или oracle data pump файлы созданные с помощью Oracle Loader for Hadoop.


В основу этого прложен механизм внешних таблиз Oracle (External table). Этот механизм позволяет читать плоские файлы с разделителями через SQL интерфейс. В версии 11.2 появилась новая фича - препроцессор, т.е. я могу выполнить некую команду перед чтением данных. Например, если у меня лежит zip файл и  мне его надо загрузить в СУБД (insert ... select ), я могу в качестве пропроцессора объявить unzip, а в качестве аргумента зипованный файл.Файл будет раззипован и поток данных пойдет в PGA. Этот механизм и положен в основу OSCH. Пропроцессором является некая программа, написанная Oracle которая читает файл (объявленный в location) и гонит поток данных в PGA. Итак, что нам необходимо для использования этого коннктора.
1) Подготовить среду на серере СУБД
2) С помощью инструментов OSCH создать внешнюю таблицу
3) Используя SQL интерфейс получить доступ к данным.

Установка:
[oracle@db11203x64 ~]$ sudo mkdir /u01/connectors
[oracle@db11203x64 ~]$ sudo chown oracle:dba /u01/connectors
[oracle@db11203x64 ~]$ cp oraosch-2.1.0.zip /u01/connectors
[oracle@db11203x64 ~]$ cd /u01/connectors
[oracle@db11203x64 connectors]$ unzip oraosch-2.1.0.zip
[oracle@db11203x64 connectors]$ unzip orahdfs-2.1.0.zip
[oracle@db11203x64 orahdfs-2.1.0]$ export OSCH_HOME=/u01/connectors/orahdfs-2.1.0
[oracle@db11203x64 orahdfs-2.1.0]$ echo "export OSCH_HOME=/u01/connectors/orahdfs-2.1.0" >> ~/.bash_profile
add OSCH_HOME=/u01/connectors/orahdfs-2.1.0 in /u01/connectors/orahdfs-2.1.0/bin/hdfs_stream
[root@db11203x64 ~]# yum install hadoop-client.noarch
[oracle@db11203x64 osch]$ export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/u01/app/oracle/product/11.2.0/db/jdbc/lib/ojdbc6.jar
[oracle@db11203x64 osch]$ echo "export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/u01/app/oracle/product/11.2.0/db/jdbc/lib/ojdbc6.jar" >> ~/.bash_profile
[root@db11203x64 ~]# yum install -y hive
[root@db11203x64 ~]# yum install -y hive-server
[oracle@db11203x64 osch]$ export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/usr/lib/hive/lib/*
Создаю пользователя и наделяю его правами (это опционально, можно использовать существующего пользователя):
SQL> CREATE USER demouser IDENTIFIED BY welcome1
  2  DEFAULT tablespace users
  3  TEMPORARY tablespace temp
  4  quota unlimited ON users;
GRANT CONNECT, resource TO demouser;
GRANT create procedure TO demouser;
GRANT create session TO demouser;
GRANT create table TO demouser;
grant select any dictionary to demouser;
GRANT DEBUG CONNECT SESSION TO demouser;

[oracle@db11203x64 osch]$ mkdir /home/oracle/sh/osch/oracle_dir/
[oracle@db11203x64 osch]$ sqlplus / as sysdba
...
CREATE DIRECTORY HADOOP_DIR AS '/home/oracle/sh/osch/oracle_dir/'
GRANT WRITE ON DIRECTORY hadoop_dir TO demouser;
GRANT READ ON DIRECTORY hadoop_dir TO demouser;

CREATE DIRECTORY OSCH_BIN_PATH AS '/u01/connectors/orahdfs-2.1.0/bin/';
GRANT WRITE ON DIRECTORY OSCH_BIN_PATH TO demouser;
GRANT READ ON DIRECTORY OSCH_BIN_PATH TO demouser;
GRANT EXECUTE ON DIRECTORY OSCH_BIN_PATH TO demouser;

Сгенерим файлик и закинем его на HDFS:

for i in `seq 1 100`; do echo 13`< /dev/urandom tr -dc 0-9 | head -c${1:-8};echo;` \
, 985`< /dev/urandom tr -dc 0-9 | head -c${1:-3};echo;`65` < /dev/urandom tr -dc 0-9 | head -c${1:-2};echo;` \
, 985`< /dev/urandom tr -dc 0-9 | head -c${1:-3};echo;`65` < /dev/urandom tr -dc 0-9 | head -c${1:-2};echo;`\
, 25001`< /dev/urandom tr -dc 0-9 | head -c${1:-8};echo;` \
, 3511`< /dev/urandom tr -dc 0-9 | head -c${1:-8};echo;` \
,  `< /dev/urandom tr -dc 0-9 | head -c${1:-2};echo;`,  `< /dev/urandom tr -dc 1-3 | head -c${1:-2};echo;` >> /tmp/test.cdr; done;
[oracle@db11203x64 osch]$ hadoop fs -mkdir /tmp/cdr_store/
[oracle@db11203x64 osch]$ hadoop fs -put /tmp/test.cdr /tmp/cdr_store/

Послле этого можно созать конфиг для OSCH:
<?xml version="1.0"?>
 <configuration>
    <property>
      <name>oracle.hadoop.exttab.tableName</name>
      <value>CDR_ON_HADOOP</value>
    </property>
    <property>
      <name>oracle.hadoop.exttab.locationFileCount</name>
      <value>4</value>
    </property>
    <property>
      <name>oracle.hadoop.exttab.dataPaths</name>
      <value>/tmp/cdr_store/*</value>
    </property>
    <property>
      <name>oracle.hadoop.exttabl.fieldTerminator</name>
      <value>,</value>
    </property>
    <property>
      <name>oracle.hadoop.exttab.defaultDirectory</name>
      <value>HADOOP_DIR</value>
    </property>
    <property>
      <name>oracle.hadoop.exttab.columnNames</name>
      <value>unixtime,a_num,b_num,imsi,imei,lac,rac</value>
    </property>
    <property>
      <name>oracle.hadoop.exttab.sourceType</name>
      <value>text</value>
    </property>
    <property>
      <name>oracle.hadoop.connection.url</name>
      <value>jdbc:oracle:thin:@//db11203x64:1521/orcl</value>
    </property>
    <property>
      <name>oracle.hadoop.connection.user</name>
      <value>demouser</value>
    </property>
</configuration>

И скрипт создания таблицы:
[oracle@db11203x64 osch]$ cat create_table.sh
#!/bin/bash
export OSCH_HOME="/u01/connectors/orahdfs-2.1.0"
hadoop jar $OSCH_HOME/jlib/orahdfs.jar \
       oracle.hadoop.exttab.ExternalTable \
       -conf create_table_conf.xml \
       -createTable

Запускаем create_table.sh и есть у нас счастье!
После создания таблицы можно проверить права следующей командой:
[oracle@db11203x64 ~]$ /u01/connectors/orahdfs-2.1.0/bin/hdfs_stream /home/oracle/sh/osch/oracle_dir/osch-20130717031307-3212-4
У меня был вывод на экран данных из фалов:
1384679623 , 9856406598 , 9850086592, 2500177363357 , 351133071222 , 35, 22
1390537385 , 9857676598 , 9858736550, 2500100777055 , 351184607283 , 82, 23
1335107726 , 9853106532 , 9852956508, 2500127223291 , 351145561629 , 57, 22

Если будут вопросы - не стесняйтесь задавать!

Tuesday 20 August 2013

Пост 17. Насколько быстр слон. Hadoop performance.

Доброго времени суток!
Достаточно много я описывал все прелести Hadoop и возможно у вас уже возникло желание попробовать его под конкретный проект и тут встает вопрос о том на сколько производетелен слон. Для прояснения сложившейся ситуации расскажу о своих тестированиях Hadoop.
Последноего по-возрослому я тестировал только на оракловой железке Big Data Appliance x2-2.
Рис1. Oracle Big Data Appliance.

Это Hadoop шкаф предыдущего поколения (сейчас железки помощнее), состояший из 18 серверов. Каждый сервер представлял из себя следующее:
48 GB RAM, 12 core CPU, 12x3TB Disks.
+ волшебная сеть infiniband (40 Гб/сек при очень низкой латентности).

Тест1. SQL запросы вида 
select function1(col1), function2(col2), col3 from table
group by col3 

Около 4 минут на сканирование терабайта данны (сжатых).

select function1(col1), function2(col2), col3 from table
where
....
group by col3 

Где условие where отсекало половину данных.
Около 3 минут на сканирование терабайта данны (сжатых).

Это скорее нижняя граница. Верхняя граница это около 2 мин на TB данных, но это при определенных условиях. 4 минуты, это можно сказать гарантированный результат.

Тест2. 
Сортировка 10 Тб данных 2 часа 35 мин
=> 4 Гб/сек
WordCount – подсчет уникальных слов в тексте
10 Тб – 3 часа 23 мин
=> 3 Гб/сек

Тест3. Кейс банка, описанного в предыдущем посте


Быстро это или медленно... все в этом мире относительно:) Если есть желние протестировать свою систему на мощном Hadoop кластере - пишите!

Пост 16. Потоковая загрузка данных в Hadoop. Flume.

Доброго времени суток!
Сегодня хотел бы рассказать об еще одном способе загрзки данных в Hadoop с помощью утилиты flume. Мы уже знаем что помимо этого способа существуют как минимум еще 2:
- использование hadoop fs API (на сервере источнике данных надо набрать hadoop fs -put ...)
- использование sqoop (выгрузки из реляционных баз данных)

Итак, сегодня я опишу третий способ загрузки данных в Hadoop - flume.Предназначение - потокоавя загрузка данных в hadoop.
У Apache flume есть явное преимущество перед многими open source продуктами - хорошая документация.
Итак, логическая архитектура до безобразия проста:

Рис1. Архитектура Flume.

Есть источник данных, канал и сток.

Источником данных может являться:
- Avro клиент. Указываем port который слушаем и ловим avro stream
- Thrift киент. Полностью аналогично предыдущему пункту.
- Exec. На вход подается вывод некой команды. Например, можно написать tail -F /var/log/secure
и все новые строки которые будут появляться в этом файле, будут попадать в канал (некая прослушка файла).
- JMS очередь. Указываются явки/пароли на JMS очередь и вот оно счастье.
- SpoolDir. Очень вкусная штука позволяет смотреть на дирректорию куда валятся логи. Как только суммарный размер файлов или колличество строчек превышают некий порог (либо срабатывает тайм-аут), файл попадает в исток. Хорошо для аггрегации множества мелких файлов (например, у нас приходят файлы по 100 КБ, которые мы хотим агрегировать до размера блока HDFS - не вопрос, ставим прослушку на дирректорию и дело в шляпе!)
- Прослушка netcat на определенном порту.

Канал, это некий промежуточный буффер куда накапливаются данные перед сливом в сток. На практике я ничего кроме memory не использовал, так что описывать не буду (если что есть документация). 

Истоком в моей практике так же было только HDFS.

Ниже привожу несколько конфигов, в виде примера:

1) Прослушка файла и запись его в HDFS

# sources, sinks and channels in the agent
movieagent.sources=logFile
movieagent.channels=memoryChannel
movieagent.sinks=hdfs-sink

# source properties
movieagent.sources.logFile.type=exec
movieagent.sources.logFile.command=tail -F /u01/Middleware/logs/activity.out

# sink properties
movieagent.sinks.hdfs-sink.type=hdfs
movieagent.sinks.hdfs-sink.hdfs.path=hdfs://localhost.localdomain/user/oracle/moviework/applog
movieagent.sinks.hdfs-sink.hdfs.filePrefix=streamed-movieapp-
movieagent.sinks.hdfs-sink.hdfs.fileType=DataStream
movieagent.sinks.hdfs-sink.hdfs.writeFormat=Text
movieagent.sinks.hdfs-sink.hdfs.rollInterval=30
movieagent.sinks.hdfs-sink.hdfs.rollSize= 1024
movieagent.sinks.hdfs-sink.hdfs.rollCount= 100

# flowh
movieagent.channels.memoryChannel.type=memory
movieagent.channels.memoryChannel.capacity=100000
movieagent.sources.logFile.channels=memoryChannel
movieagent.sinks.hdfs-sink.channel=memoryChannel

2) Прослушка дирректории, осуществление ротации фалов и запись их в HDFS:
# sources, sinks and channels in the agent
movieagent.sources=logFile
movieagent.channels=memoryChannel
movieagent.sinks=hdfs-sink

# source properties
movieagent.sources.logFile.type=spooldir
movieagent.sources.logFile.spoolDir=/tmp/spooldir_test
movieagent.sources.logFile.spoolDir.deletePolicy=immediate
movieagent.sources.logFile.fileHeader=true
movieagent.sources.logFile.batchSize=100

# sink properties
movieagent.sinks.hdfs-sink.type=hdfs
movieagent.sinks.hdfs-sink.hdfs.path=hdfs://localhost.localdomain/user/oracle/moviework/applog
movieagent.sinks.hdfs-sink.hdfs.filePrefix=streamed-movieapp-
movieagent.sinks.hdfs-sink.hdfs.writeFormat=Text
movieagent.sinks.hdfs-sink.hdfs.appendTimeout = 1000
movieagent.sinks.hdfs-sink.hdfs.rollSize = 2097152
movieagent.sinks.hdfs-sink.hdfs.rollCount = 600000
movieagent.sinks.hdfs-sink.hdfs.batchSize = 100
movieagent.sinks.hdfs-sink.hdfs.txnEventMax = 100000
movieagent.sinks.hdfs-sink.hdfs.threadsPoolSize= 100
movieagent.sinks.hdfs-sink.hdfs.rollInterval = 60
movieagent.sinks.hdfs-sink.hdfs.fileType = DataStream
movieagent.sinks.hdfs-sink.hdfs.file.writeFormat= Text

# flow
movieagent.channels.memoryChannel.type=memory
movieagent.sources.logFile.channels=memoryChannel
movieagent.sinks.hdfs-sink.channel=memoryChannel

Запустить это великолепие можно через красивый интерфейс cloudera, а можно через linux консоль:
$ flume-ng agent -n movieagent  -c conf -f /var/run/cloudera-scm-agent/process/301-flume-AGENT/flume.conf

Не так flume страшен, как может показаться на превый взгляд. Очень милая утилита для загрузки данных, если исходный набор файлов не отчечает требованиям hadoop  (файл должен быть больших размеров). Пробуйте и если будут какие либо вопросы - пишите!
Пост 15. Power of Hadoop. Обработка полуструктурированных данных.

Плавно перетекая из поста в пост я хотел бы еще раз подчеркнуть одно из основных достоинств Hadoop - обработку полуструктурированных данных. Итак, начну с места в карьер. Как говорится лучше один раз увидеть:)
Дано: заархивированный JSON файл примерно вот такой:

[oracle@localhost applog]$ zcat movieapp_30months.log.gz|head
{"custId":1314864,"movieId":null,"genreId":null,"time":"2010-01-01:00:00:52","recommended":null,"activity":8}
{"custId":1071688,"movieId":null,"genreId":null,"time":"2010-01-01:00:03:29","recommended":null,"activity":8}
{"custId":1126945,"movieId":null,"genreId":null,"time":"2010-01-01:00:04:39","recommended":null,"activity":8}
{"custId":1237919,"movieId":null,"genreId":null,"time":"2010-01-01:00:05:22","recommended":null,"activity":8}
{"custId":1283601,"movieId":null,"genreId":null,"time":"2010-01-01:00:05:27","recommended":null,"activity":8}
{"custId":1178337,"movieId":null,"genreId":null,"time":"2010-01-01:00:05:43","recommended":null,"activity":8}
{"custId":1133759,"movieId":null,"genreId":null,"time":"2010-01-01:00:05:49","recommended":null,"activity":8}
{"custId":1086779,"movieId":null,"genreId":null,"time":"2010-01-01:00:05:51","recommended":null,"activity":8}
{"custId":1071688,"movieId":null,"genreId":null,"time":"2010-01-01:00:06:16","recommended":null,"activity":8}
{"custId":1048842,"movieId":null,"genreId":null,"time":"2010-01-01:00:06:42","recommended":null,"activity":8}
[oracle@localhost applog]$ 

Создаем директорию и закидываем туда файл:
[oracle@localhost applog]$ hadoop fs -mkdir /tmp/json_files/

[oracle@localhost applog]$ hadoop fs -put movieapp_30months.log.gz /tmp/json_files/

Теперь открываем hive и создаем мапирование на эту дирректорию:
[oracle@localhost applog]$ hive
hive>  CREATE EXTERNAL TABLE movieapp_log_json(
                                                    custId INT,
                                                    movieId INT,
                                                    genreId INT,
                                                    time STRING,
                                                    recommended STRING,
                                                    activity INT,
                                                    rating INT,
                                                    price FLOAT
                                   )
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.JsonSerde'
LOCATION '/tmp/json_files/';

Наслаждаемся запросами к этой таблице без дополнительных выкрутасов:

hive> SELECT * FROM movieapp_log_json limit 3;                                                                            
OK
1185972 NULL    NULL    2012-07-01:00:00:07     NULL    8       NULL    NULL
1354924 1948    9       2012-07-01:00:00:22     N       7       NULL    NULL
1083711 NULL    NULL    2012-07-01:00:00:26     NULL    9       NULL    NULL
Time taken: 6.731 seconds
hive> 

Сразу можно использовать аналитические запросы:

hive> SELECT MIN(time), MAX(time) FROM movieapp_log_json;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapred.reduce.tasks=<number>
Starting Job = job_201308200352_0001, Tracking URL = http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201308200352_0001
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_201308200352_0001
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2013-08-20 04:18:48,305 Stage-1 map = 0%,  reduce = 0%
2013-08-20 04:19:06,605 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:07,633 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:08,653 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:09,669 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:10,688 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:11,706 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:12,724 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:13,744 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:14,770 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 16.07 sec
2013-08-20 04:19:15,795 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 16.07 sec
2013-08-20 04:19:16,816 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 16.07 sec
2013-08-20 04:19:17,845 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 16.07 sec
2013-08-20 04:19:18,868 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 16.07 sec
2013-08-20 04:19:19,898 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 16.07 sec
MapReduce Total cumulative CPU time: 16 seconds 70 msec
Ended Job = job_201308200352_0001
MapReduce Jobs Launched: 
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 16.07 sec   HDFS Read: 2448984 HDFS Write: 40 SUCCESS
Total MapReduce CPU Time Spent: 16 seconds 70 msec
OK
2012-07-01:00:00:07     2012-10-01:03:19:24
Time taken: 47.629 seconds
hive> 

Получаем разархивация, парсинг и аналитика "в один проход". Ключевым в этом объявлении таблицы являтся 
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.JsonSerde'
Здесь я говорю что за данные у меня лежат (в каком формате). Если у нас что то экзотическое - не вопрос, кладем на Hadoop что то экзотическое и описываем рядом Java класс который будет парсить это.

Что у нас происходит на стадии Map и Reduce (на самом деле некоторое время назад я это уже описывал здесь, но повтороенье мать ученья):

1) Допустим у нас есть файл:

{column1:333, column2wqer,   column3: 56}
{column1: 2, column2:     wqer,   column3: 56}
{column1: 334, column2: wqert,  column3: 54}
{column1: 335, column2: wqerty, column3: 59}
….

2) Мы хотим обработать его запросом
select sum(column1), column2 from my_table
where
column3>55
group by column2
3) MapReduce происходит согласно картинке ниже:

На стадии Map происходит парсинг JSON строки, отсечение ненужных колонок (колонки сolumn3  в запросе нет и фильтрация по горизонтали, согласно фильтру column3 >55)
Вот такая вот магия полуструктурированных данных. Если у кого будут вопросы - с удовольствием на них отвечу!

Friday 16 August 2013

Пост 14. Hadoop на службе банка.

Доброго времени суток!
Не так давно я столкнулся с одним интересным кейсом в одном из банком.
У банка были данные, весьма интересного формата - набор файлов по несколько сотен мегабайт каждый без единого переноса строки, колонки разделены по фиксированному смещению. Драма ситуации в том, что формат файла постоянно меняется.


Рис 1. Формат входных файлов

Что требовалось? Распарсить файл взять из 700 колонок 50, загрузить их в СУБД, доступ к которой имел ряд аналитиков. Традиционными средствами эта задача могла бы быть решена следующим образом:













                             Рис 2. Решение задачи традиционными средствами.

Да, еще я совсем забыл упомянуть, что размер исходных фалов (множества) - несколько терабайт. Ну и конечно же аппетиты аналитиков постоянно росли и они хотели все новые и новые колонки в свою аналитическую тулзовину и конечно же хотели ретроспективу по новым величинам, которые находились в уже разобранных и удаленных файлах.
Коллабс... Тут на помощь пришел Hadoop.

Была придумана следующая схема:


Рис 3. Схема преобразования данных

Были взяты 2 оракловые железки Big Data Appliance (Hadoop Cluster на 18 серверов) и ExaData (high performance СУБД Oracle).

1) MapReduce програма читала исходные файлы и по конфигу преобразовывала их в полуструктурированный формат (для простоты изложения пусть это будет JSON).
2) Над полуструктурированным форматом создавалась таблица HIVE. Выполнялся агрегационный запрос, который преподготавливал некую витрину данных, которая загружалась потом в Oracle (с помощью Oracle Loader for Hadoop).
Если аналитики хотели произвольных (ad-hoc запросов, они могли напрямую обратиться к hive, где в их распоряжении были все 700 колонок)

Так банк построил свое маленькое счастье с помощью Hadoop. Будут вопросы - задавайте, с удовольствием отвечу!