Tuesday, 20 August 2013

Пост 15. Power of Hadoop. Обработка полуструктурированных данных.

Плавно перетекая из поста в пост я хотел бы еще раз подчеркнуть одно из основных достоинств Hadoop - обработку полуструктурированных данных. Итак, начну с места в карьер. Как говорится лучше один раз увидеть:)
Дано: заархивированный JSON файл примерно вот такой:

[oracle@localhost applog]$ zcat movieapp_30months.log.gz|head
{"custId":1314864,"movieId":null,"genreId":null,"time":"2010-01-01:00:00:52","recommended":null,"activity":8}
{"custId":1071688,"movieId":null,"genreId":null,"time":"2010-01-01:00:03:29","recommended":null,"activity":8}
{"custId":1126945,"movieId":null,"genreId":null,"time":"2010-01-01:00:04:39","recommended":null,"activity":8}
{"custId":1237919,"movieId":null,"genreId":null,"time":"2010-01-01:00:05:22","recommended":null,"activity":8}
{"custId":1283601,"movieId":null,"genreId":null,"time":"2010-01-01:00:05:27","recommended":null,"activity":8}
{"custId":1178337,"movieId":null,"genreId":null,"time":"2010-01-01:00:05:43","recommended":null,"activity":8}
{"custId":1133759,"movieId":null,"genreId":null,"time":"2010-01-01:00:05:49","recommended":null,"activity":8}
{"custId":1086779,"movieId":null,"genreId":null,"time":"2010-01-01:00:05:51","recommended":null,"activity":8}
{"custId":1071688,"movieId":null,"genreId":null,"time":"2010-01-01:00:06:16","recommended":null,"activity":8}
{"custId":1048842,"movieId":null,"genreId":null,"time":"2010-01-01:00:06:42","recommended":null,"activity":8}
[oracle@localhost applog]$ 

Создаем директорию и закидываем туда файл:
[oracle@localhost applog]$ hadoop fs -mkdir /tmp/json_files/

[oracle@localhost applog]$ hadoop fs -put movieapp_30months.log.gz /tmp/json_files/

Теперь открываем hive и создаем мапирование на эту дирректорию:
[oracle@localhost applog]$ hive
hive>  CREATE EXTERNAL TABLE movieapp_log_json(
                                                    custId INT,
                                                    movieId INT,
                                                    genreId INT,
                                                    time STRING,
                                                    recommended STRING,
                                                    activity INT,
                                                    rating INT,
                                                    price FLOAT
                                   )
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.JsonSerde'
LOCATION '/tmp/json_files/';

Наслаждаемся запросами к этой таблице без дополнительных выкрутасов:

hive> SELECT * FROM movieapp_log_json limit 3;                                                                            
OK
1185972 NULL    NULL    2012-07-01:00:00:07     NULL    8       NULL    NULL
1354924 1948    9       2012-07-01:00:00:22     N       7       NULL    NULL
1083711 NULL    NULL    2012-07-01:00:00:26     NULL    9       NULL    NULL
Time taken: 6.731 seconds
hive> 

Сразу можно использовать аналитические запросы:

hive> SELECT MIN(time), MAX(time) FROM movieapp_log_json;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapred.reduce.tasks=<number>
Starting Job = job_201308200352_0001, Tracking URL = http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201308200352_0001
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_201308200352_0001
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2013-08-20 04:18:48,305 Stage-1 map = 0%,  reduce = 0%
2013-08-20 04:19:06,605 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:07,633 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:08,653 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:09,669 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:10,688 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:11,706 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:12,724 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:13,744 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 12.41 sec
2013-08-20 04:19:14,770 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 16.07 sec
2013-08-20 04:19:15,795 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 16.07 sec
2013-08-20 04:19:16,816 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 16.07 sec
2013-08-20 04:19:17,845 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 16.07 sec
2013-08-20 04:19:18,868 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 16.07 sec
2013-08-20 04:19:19,898 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 16.07 sec
MapReduce Total cumulative CPU time: 16 seconds 70 msec
Ended Job = job_201308200352_0001
MapReduce Jobs Launched: 
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 16.07 sec   HDFS Read: 2448984 HDFS Write: 40 SUCCESS
Total MapReduce CPU Time Spent: 16 seconds 70 msec
OK
2012-07-01:00:00:07     2012-10-01:03:19:24
Time taken: 47.629 seconds
hive> 

Получаем разархивация, парсинг и аналитика "в один проход". Ключевым в этом объявлении таблицы являтся 
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.JsonSerde'
Здесь я говорю что за данные у меня лежат (в каком формате). Если у нас что то экзотическое - не вопрос, кладем на Hadoop что то экзотическое и описываем рядом Java класс который будет парсить это.

Что у нас происходит на стадии Map и Reduce (на самом деле некоторое время назад я это уже описывал здесь, но повтороенье мать ученья):

1) Допустим у нас есть файл:

{column1:333, column2wqer,   column3: 56}
{column1: 2, column2:     wqer,   column3: 56}
{column1: 334, column2: wqert,  column3: 54}
{column1: 335, column2: wqerty, column3: 59}
….

2) Мы хотим обработать его запросом
select sum(column1), column2 from my_table
where
column3>55
group by column2
3) MapReduce происходит согласно картинке ниже:

На стадии Map происходит парсинг JSON строки, отсечение ненужных колонок (колонки сolumn3  в запросе нет и фильтрация по горизонтали, согласно фильтру column3 >55)
Вот такая вот магия полуструктурированных данных. Если у кого будут вопросы - с удовольствием на них отвечу!

3 comments:

  1. Не очень понятно насчёт JSON SerDe, его надо отдельно ставить отсюда ( https://code.google.com/p/hive-json-serde/ ) или он входит в какие-то дистрибутивы?

    ReplyDelete
  2. Hi kalimba!

    тестировали на CDH 4.2 (Cloudera Distribution Hadoop). Там он уже был. На самом деле не суть, если чего что нет, пишите/скачиваете и добавляете в distributed cache в случае MapReduce програмы, либо простой командой add jar 'path_to_jar' в случае hive (что на самом деле тождественно эта команда загоняет jar в distributed cache).

    ReplyDelete