Рубрики
Без рубрики

Сбор journald логов с помощью Logstash

В этом посте я хотел бы рассказать как с помощью Munin, Logstash и Kibana можно консолидировать и осуществлять мониторинг journald логов.

Вступление

Исследование логов, наверно самый универсальный способ узнать о неполадках в работе приложения и отправная точка с которой начинается любой «ремонт». Это простое действие не вызывает никаких проблем, если у нас имеется только один или два сервера. А если у нас десяток серверов с «размазанными» по ним приложениями? Тут уже не обойтись без средства консолидации, так как исследовать тысячи записей в логах десятка машин, постоянно «прыгая» с одной на другую, это далеко не самое интересное занятие.

Задача сбора логов

Итак, у нас имеется один или несколько серверов на linux, все логи на которых собираются systemd journald, и мы хотим воспользоваться преимуществами централизованного сбора логов и осуществлять их мониторинг на предмет ошибок. Для реализации данного решения, я предлагаю использовать следующие компоненты:просмотр логов собранных logstash в kibana
1) Logstash — средство для сбора и обработки логов
2) Elasticsearch — поисковый сервер и хранилище для собранных логов
3) Kibana — веб-интерфейс для Logstash

Про установку такой связки уже написан не один десяток инструкций, которые легко находятся по соответствующему запросу в гугле. Поэтому я не буду повторяться и далее мы будем считать что у нас все уже установлено.

Как удаленно получить логи из journald

К сожалению, в настоящее время journald не имеет функционала по отправке логов на удаленный сервер, поэтому нам придется как-то обходить этот недостаток.

Во-первых, можно написать свой скрипт, который будет отправлять все события и скопировать его на все свои сервера. Или же, использовать journal gatewayd демон, который идет вместе с systemd и позволяет читать лог удаленно.

Мы пойдем по второму пути и будем использовать journal gatewayd демон. Для этого на всех серверах выполняем:

systemctl start systemd-journal-gatewayd.socket
systemctl enable systemd-journal-gatewayd.socket

После этого мы можем удаленно получить лог сервера myserver в json формате с помощью вот такого HTTP запроса:

curl -f -s -H"Accept: application/json" "http://myserver:19531/entries"

Правда тут есть небольшая трудность, нам нужно получать не все записи, а только новые. Но и это не проблема, так как мы можем использовать курсор. Курсор это не что иное как указатель на определенную запись в journald. При запросе данных у gatewayd, с помощью HTTP заголовка header мы можем указать с какой записи мы хотим начать, сколько записей пропустить и сколько получить.

Таким образом, все что нам надо, это взять последний уже полученный курсор и запросить у удаленной машины все записи начиная с него. В этом случае наш запрос для получения лога с myserver примет такой вид:

curl -f -s -H"Accept: application/json" -H"Range: entries=$cursor" "http://myserver:19531/entries?boot"|tail -n+2

Параметр boot говорит о том что нас интересуют записи только с момента последнего старта системы.  А заголовок Range: entries=$cursor указывает что мы хотим начать с записи с курсором — $cursor. tail -n+2 поможет нам пропустить первую запись, так как она уже есть в нашем индексе.

Настраиваем Logstash

Конфигурация состоит из трех секций, которые говорят сами за себя и соответствуют порядку «прохождения» логов через Logstash:
— input, тут мы указываем плагины которыми мы хотим собирать записи;
filter, тут мы можем указать какие фильтры будут применяться к собранным записям;
output, тут мы указываем куда поместить отфильтрованные данные.

В нашем случае мы будем использовать exec для ввода, elasticsearch для вывода, mutate и date в качестве фильтров:
— exec, получает записи из стандартного вывода заданной команды;
mutate, поможет нам немного обрезать метку времени journald и получить unix timestamp в миллисекундах;
date, используя полученную выше метку времени установит  правильное время для полученной записи;
elasticseach, этот плагин, как понятно из названия, помещает обработанный лог в индекс elasticsearch.

Файл конфигурации может иметь любое имя и должен находиться в /etc/logstash/conf.d/

input {
  exec {
    type => "myserver"
    interval => 180
    codec => json_lines 
    command => 'cursor=`curl -f -s "http://elasticsearchserver:9200/logstash-$(date +%Y.%m.%d)/_search?q=type:myserver&pretty=true&size=1&sort=@timestamp:desc&fields=__CURSOR"|grep "__CURSOR"|sed "s/^.*\"\(s=[^\"]*\)\".*$/\1/"`;\
      [ $cursor"X" = "X" ]\
        &&curl -f -s -H"Accept: application/json" -H"Range: entries=:999999:1" "http://myserver:19531/entries?boot"\
        ||curl -f -s -H"Accept: application/json" -H"Range: entries=$cursor" "http://myserver:19531/entries?boot"|tail -n+2'
  }
}

filter {
  mutate {
    gsub => [ "__REALTIME_TIMESTAMP", ".{3,3}$", "" ]
  }
  date {
    timezone => "Europe/Moscow"
    match => [ "__REALTIME_TIMESTAMP", "UNIX_MS" ]
  }
}

output {
  elasticsearch {
    host => "elasticsearchserver"
  }
}

Не забываем вместо myserver и elasticsearchserver подставить имена своих серверов.

Также важно указать тип для каждого собираемого лога, например так — type => «myserver», в дальнейшем это позволит легко выбирать записи для определенного сервера.
interval => 180 — это периодичность с которой logstash будет запускать нашу команду.
codec => json_lines говорит о том что, входные данные будут идти в формате json и построчно. С этой опцией каждая json строка, еще на входе, будет преобразована в набор полей, по которым мы сможем фильтровать записи.

На официальном сайте можно подробно ознакомиться с тем какие плагины ввода/вывода, фильтры и параметры существуют в logstash.

Так как в секции input может быть несколько источников ввода, то нам надо только скопировать секцию exec для каждого компьютера с которого мы хотим собирать логи.

И напоследок, давайте поподробнее разберем команду которой мы получаем journald лог с myserver:

# получаем последний курсор для myserver из elasticsearch
cursor=`curl -f -s "http://elasticsearchserver:9200/logstash-$(date +%Y.%m.%d)/_search?q=type:myserver&pretty=true&size=1&sort=@timestamp:desc&fields=__CURSOR"|grep "__CURSOR"|sed "s/^.*\"\(s=[^\"]*\)\".*$/\1/"`;\
# тут "logstash-$(date +%Y.%m.%d)" это индекс в elasticsearch, который по-умолчанию использует logstash
# то есть каждый день у нас создается новый индекс, это кстати позволит нам более гибко управлять сохраненными данными

# проверяем что такой курсор есть
[ $cursor"X" = "X" ]\

    # если ничего не получили, значит у нас в базе нет логов с этого сервера
    # давайте начнем собирать с него логи и начнем с последней записи, которая на нем есть
    # запрашиваем у journald записи с момента последнего старта, и берем лишь одну пытаясь пропустить предыдущие 999999
    &&curl -f -s -H"Accept: application/json" -H"Range: entries=:999999:1" "http://myserver:19531/entries?boot"\

    # если же у нас курсор не пустой, то в базе уже есть логи с этого сервера
    # берем все записи начиная с записи с полученным курсором и при помощи tail пропускаем саму эту запись
    ||curl -f -s -H"Accept: application/json" -H"Range: entries=$cursor" "http://myserver:19531/entries?boot"|tail -n+2

 Вот и все!

Добавить комментарий

Ваш адрес email не будет опубликован.