Логи, собственно, уже собираем. Следующая задача их нужно каким-то образом обработать логи: подготовить и отправить в базу данных.

Чем обрабатывать?

В принципе вариантов на эту тему масса. Первый вариант — использовать готовое решение, второй вариант — писать свое. Собственно сложность состоит в том, что хоть и логи +/- похожи друг на друга, но все же имеют отличия. Возьмем первый вариант и попробуем его реализовать.

Для получения и парсинга логов, дума, можно взять fluentd. Продукт открытый, документация объемная, готовые плагины тоже есть. Попробуем разобраться.

Парсим логи

Для начала его нужно установить. Идем на официальную страницу и смотрим установку:


curl -fsSL https://toolbelt.treasuredata.com/sh/install-debian-bullseye-td-agent4.sh | sh

Вроде как все просто.

Следующий момент — взять файл с логами и обработать его. Для этого будем использовать уже готовую директорию с логами, которую получаем с других серверов.

Но перед парсингом нужно установить еще один плагин:


td-agent-gem install fluent-plugin-record-modifier --no-document

Этот плагин понадобится для удобной модификации записей. Теперь у нас практически все готово для работы. Приступим…

NGinx

Создаем файл /etc/td-agent/conf.d/nginx_site.conf:


  @type tail
  tag nginx.site
  path /var/log/10.x.x.x/nginx_snipchi.log
  pos_file /var/log/td-agent/nginx_snipchi.log.pos
  @log_level info
  @id nginx_site

    @type regexp
    expression ^(?[^ ]*\s+[^ ]*\s+[^ ]*)\s+(?[^ ]*) (?[^ ]*) (?[^ ]*) (?[^ ]*) (?[^ ]*) \[(?[^\]]*)\] "(?\S+)(?: +(?[^ ]*?) (?[^"]*)?)?" (?[^ ]*) (?[^ ]*)(?: "(?[^\"]*)" "(?[^\"]*)"(?:\s+(?[^ ]+))?)?$
    time_format %d/%b/%Y:%H:%M:%S %z

    @type record_modifier

        time ${time}

#
#    @type stdout
#

  @type http
  #open_timeout 2

  endpoint http://10.x.x.x:8123/?user=login&password=password&database=logs&query=INSERT%20INTO%20nginx%20FORMAT%20JSONEachRow

    @type json

  json_array true

    flush_interval 10s

В данном случае мы берем файл с диска и обрабатываем его построчно выделяя поля из записи с помощью регулярных выражений. Далее прогоняем через фильтр и уже в конце через директиву «match» собираем обработанные записи, накапливаем и отправляем уже в БД в формате JSON. Здесь я использую ClickHouse, но об это я напишу в следующей заметке. Здесь стоит запомнить лишь то, что fluent берет итоговый фвйл JSON и отправляет POST-запросом по протоколу http на указанный URI, а вот ClickHouse в свою очередь спокойно получает запрос и полученный JSON просто раскладывает по таблице.

Если присмотреться в раздел source, то в нашем случае важны следующие параметры:

@type — тип источника

tag — тэг для записей лога

path — путь к файлу лога

pos_file — что-то типа pipe-файла. Он нужен для fluent

@log_level — какие события отфильтровывать

Раздел parse в source объясняет источнику как распарсить полученную запись лога. Для этого используются следующие директивы:

@type — тип будущей записи

expression — регулярное выражение для парсинга записи и дальнейшего преобразования в JSON

time_format — формат времени чтобы система смогла определить формат и преобразовать строку даты/времени в числовое значение.

Еще стоит обрабтить внимание, что один из блоков match закомментирован. Его я использовал просто для отладки. В нем полученный результат записывается в лог самого fluent.

Squid

Создаем файл /etc/td-agent/conf.d/squid.conf:


  @type tail
  tag squid.access
  path /var/log/10.x.x.x/squid-access.log
  pos_file /var/log/td-agent/squid-access.log.pos
  @log_level info
  @id squid

    @type regexp
    expression ^(?[^ ]*\s+[^ ]*\s+[^ ]*)\s+(?[^ ]*)\s+(?[^ ]*)\s+(?[^\.]*)\.(?[^ ]*)\s+(?[^ ]*)\s+(?[^ ]*)\s+(?[^/]*)/(?[^ ]*)\s+(?[^ ]*)\s+(?[^ ]*)\s+(?[^ ]*)\s+(?[^ ]*)\s+(?[^/]*)/(?[^ ]*)\s+(?[^\s$]*)
    time_format %b %d %H:%M:%S %z

    @type record_modifier
    remove_keys dt_syslog, programm

        domain ${record['url']}

        key domain
        #expression (http(s)?://)?(?[^\s\t/:$]*).*
        expression (?http(s)?:\/\/)
        replace ""

        key domain
        expression (?:[^\/$]*)
        replace ""

        key domain
        expression \/.*$
        replace ""

#
#    @type stdout
#
#       @type json
#
#

  @type http
  open_timeout 2

  endpoint http://10.x.x.x:8123/?user=login&password=password&database=logs&query=INSERT%20INTO%20squid%20FORMAT%20JSONEachRow

    @type json

  json_array true

    flush_interval 10s

Эта настройка предназначена для парсинга логов прокси-сервера Squid. По сути в нем так же ничего сверхъестественного нет. Вся разница заключается в разделе source, а конкретней в expression.

Электронная почта

В моем случае используется Postfix. Вот у него логи немного сложнее, но не намного. Стоит проявить немного внимания и уситчивости и все получится.

Создаем файл /etc/td-agent/conf.d/postfix.conf:


  type tail
  path /var/log/10.x.x.x/postfix.log
  tag postfix.relay
  #format /^(?[^ ]+) (?[^ ]+) (?[^:]+): (?((?[^ :]+)[ :])? ?((to|from)=[^>]+)>)?.*)$/
  #format /(?[\w]+\s+[\d]+\s[\d:]+)\s+(?.+)/
  time_format %b %d %H:%M:%S
  #format none
  pos_file /var/log/td-agent/postfix-relay.log.pos

        @type regexp
        expression ^(?[^ ]*\s+[^ ]*\s+[^ ]*)\s+(?[^ ]*)\s+(?[^/]*)/(?[^\[]*)\[(?[^\]]*)\]:\s+(connect to (?[^\[]+)\[(?[^\]]+)\](:(?[^:]+):\s(?.+)?)?)?(connect\sfrom\s(?[^\[]+)\[(?[^\]]+)\])?(disconnect\sfrom\s(?[^\[]+)\[(?[^\]]+)\]\sehlo=(?[^ ]+)\sstarttls=(?[^ ]+)\smail=(?[^ ]+)\srcpt=(?[^ ]+)\sdata=(?[^ ]+)\squit=(?[^ ]+)\scommands=(?.+))?(lost connection after CONNECT from (?[^\[]+)\[(?[^\]]+))?((?[^:]+): (removed(?))?(client=(?[^\[]+)\[(?[^\]]+\]))?(message\-id=[^>]+))?)?(to=[^>]+)>, (orig_to=[^>]+)>, )?(relay=(?[^\[]+)\[(?[^\]]+)\](:(?\d+))?)?(,\sdelay=(?[^,]+))?(,\sdelays=(?[^,]+))?(,\sdsn=(?[^,]*))?(,\sstatus=(?[^ ]*)\s)?(?.+)?)?(from=[^>]+)>,\ssize=(?[^,]+),\snrcpt=(?[^ $]+)(\s(?.+))?)?
        time_format %b %d %H:%M:%S

    @type record_modifier

        time ${time}

#
#    @type stdout
#
#       @type json
#
#

  @type http
  #open_timeout 2

  endpoint http://10.x.x.x:8123/?user=login&password=password&database=logs&query=INSERT%20INTO%20postfix%20FORMAT%20JSONEachRow

    @type json

  json_array true

    flush_interval 10s

Собственно вся загвоздка в том, что по сути Postfix имеет многострочные логи, а если быть точнее, то логи однострочные, но форматы строк все разные (ну или почти). Но это тоже решается.

Немного велосипеда

Если смотреть в документации fluent, то можно обнаружить, что формат парсинга для того же Nginx уже есть, но он не будет работать, так как после приема логов через syslog в результирующий файл попадает еще несколько дополнительных полей в начало записи. Так что пришлось немного переписать. А вот как это поправить в rsyslog — я, честно, не разобрался.

Кроме всего прочего, судя по документации fluent умеет самостоятельно принимать логи в формате syslog и можно было бы отказаться от серверного rsyslog, но мы так прикинули, что пусть уж лучше будет что-то стандартное, тем более событий не так много, а временно хранящиеся файлы на диске в течении какого-то времени, как бы, хороший тон.

Работай!

После написания правил обработки данных их неплохо было бы включить, иначе работать просто не будут. Для этого редктируем файл /etc/td-agent/td-agent.conf:


@include conf.d/nginx_site.conf
@include conf.d/postfix.conf
@include conf.d/squid.conf

После можно перезапустить агента:


systemctl restart td-agent

Замечание

При написании правил порядок разделов имеет значение. Если сначала написать **** а потом ****, то работать не будет. соответственно и обработчики, например ****, тоже должны идти последовательно в том порядке, который того требует обработка данных.

Результат

Сейчас есть обработчик на одном сервере, который собирает события откуда требуется, обрабатывает и отправляет их в базу. Теперь нужно решить вопрос о хранении.