Пост 16. Потоковая загрузка данных в Hadoop. Flume.
Доброго времени суток!
Сегодня хотел бы рассказать об еще одном способе загрзки данных в Hadoop с помощью утилиты flume. Мы уже знаем что помимо этого способа существуют как минимум еще 2:
- использование hadoop fs API (на сервере источнике данных надо набрать hadoop fs -put ...)
- использование sqoop (выгрузки из реляционных баз данных)
Итак, сегодня я опишу третий способ загрузки данных в Hadoop - flume.Предназначение - потокоавя загрузка данных в hadoop.
У Apache flume есть явное преимущество перед многими open source продуктами - хорошая документация.
Итак, логическая архитектура до безобразия проста:
Рис1. Архитектура Flume.
Источником данных может являться:
- Avro клиент. Указываем port который слушаем и ловим avro stream
- Thrift киент. Полностью аналогично предыдущему пункту.
- Exec. На вход подается вывод некой команды. Например, можно написать tail -F /var/log/secure
и все новые строки которые будут появляться в этом файле, будут попадать в канал (некая прослушка файла).
- JMS очередь. Указываются явки/пароли на JMS очередь и вот оно счастье.
- SpoolDir. Очень вкусная штука позволяет смотреть на дирректорию куда валятся логи. Как только суммарный размер файлов или колличество строчек превышают некий порог (либо срабатывает тайм-аут), файл попадает в исток. Хорошо для аггрегации множества мелких файлов (например, у нас приходят файлы по 100 КБ, которые мы хотим агрегировать до размера блока HDFS - не вопрос, ставим прослушку на дирректорию и дело в шляпе!)
- Прослушка netcat на определенном порту.
Канал, это некий промежуточный буффер куда накапливаются данные перед сливом в сток. На практике я ничего кроме memory не использовал, так что описывать не буду (если что есть документация).
Истоком в моей практике так же было только HDFS.
Ниже привожу несколько конфигов, в виде примера:
1) Прослушка файла и запись его в HDFS
# sources, sinks and channels in the agent
movieagent.sources=logFile
movieagent.channels=memoryChannel
movieagent.sinks=hdfs-sink
# source properties
movieagent.sources.logFile.type=exec
movieagent.sources.logFile.command=tail -F
/u01/Middleware/logs/activity.out
# sink properties
movieagent.sinks.hdfs-sink.type=hdfs
movieagent.sinks.hdfs-sink.hdfs.path=hdfs://localhost.localdomain/user/oracle/moviework/applog
movieagent.sinks.hdfs-sink.hdfs.filePrefix=streamed-movieapp-
movieagent.sinks.hdfs-sink.hdfs.fileType=DataStream
movieagent.sinks.hdfs-sink.hdfs.writeFormat=Text
movieagent.sinks.hdfs-sink.hdfs.rollInterval=30
movieagent.sinks.hdfs-sink.hdfs.rollSize= 1024
movieagent.sinks.hdfs-sink.hdfs.rollCount= 100
# flowh
movieagent.channels.memoryChannel.type=memory
movieagent.channels.memoryChannel.capacity=100000
movieagent.sources.logFile.channels=memoryChannel
movieagent.sinks.hdfs-sink.channel=memoryChannel
2) Прослушка дирректории, осуществление ротации фалов и запись их в HDFS:
# sources, sinks and channels in the agent
movieagent.sources=logFile
movieagent.channels=memoryChannel
movieagent.sinks=hdfs-sink
# source properties
movieagent.sources.logFile.type=spooldir
movieagent.sources.logFile.spoolDir=/tmp/spooldir_test
movieagent.sources.logFile.spoolDir.deletePolicy=immediate
movieagent.sources.logFile.fileHeader=true
movieagent.sources.logFile.batchSize=100
# sink properties
movieagent.sinks.hdfs-sink.type=hdfs
movieagent.sinks.hdfs-sink.hdfs.path=hdfs://localhost.localdomain/user/oracle/moviework/applog
movieagent.sinks.hdfs-sink.hdfs.filePrefix=streamed-movieapp-
movieagent.sinks.hdfs-sink.hdfs.writeFormat=Text
movieagent.sinks.hdfs-sink.hdfs.appendTimeout = 1000
movieagent.sinks.hdfs-sink.hdfs.rollSize = 2097152
movieagent.sinks.hdfs-sink.hdfs.rollCount = 600000
movieagent.sinks.hdfs-sink.hdfs.batchSize = 100
movieagent.sinks.hdfs-sink.hdfs.txnEventMax = 100000
movieagent.sinks.hdfs-sink.hdfs.threadsPoolSize= 100
movieagent.sinks.hdfs-sink.hdfs.rollInterval = 60
movieagent.sinks.hdfs-sink.hdfs.fileType = DataStream
movieagent.sinks.hdfs-sink.hdfs.file.writeFormat= Text
# flow
movieagent.channels.memoryChannel.type=memory
movieagent.sources.logFile.channels=memoryChannel
movieagent.sinks.hdfs-sink.channel=memoryChannel
Запустить это великолепие можно через красивый интерфейс cloudera, а можно через linux консоль:
$ flume-ng agent -n movieagent -c conf -f /var/run/cloudera-scm-agent/process/301-flume-AGENT/flume.conf
Не так flume страшен, как может показаться на превый взгляд. Очень милая утилита для загрузки данных, если исходный набор файлов не отчечает требованиям hadoop (файл должен быть больших размеров). Пробуйте и если будут какие либо вопросы - пишите!
Вот чего я не пойму, если мне надо несоклько потоков организовать, то они все в одном конгфиге?
ReplyDeleteможна вопросик
ReplyDeletehadoop hortonworks install развернут на отдельном компе
как мне настроить Flume чтоб он читал файли с моего компа?
также интересует вариант чтения файлов из фтп.
заранее спасибо.
С новим годом!!!