Импорт Netflow статистики в базы данных #1

Сетевую статистику, собираемую по NetFlow, можно, а зачастую нужно, в каком-то виде иметь в базе данных, для построения отчетов. Вот как я это сделал с SQL Server первый раз.

Собираем статистику утилитой flow-capture пакета flow-tools (оригинальная версия - http://www.splintered.net/sw/flow-tools, более поздняя ветка - http://code.google.com/p/flow-tools).

При использовании MS SQL Server, нам потребуется сделать агрегацию и преобразовать данные в ASCII при помощи flow-report, доставить на сервер и загрузить при помощи BULK INSERT (это будет самый быстрый способ). Если нужны исходные данные, хотя я советую загружать их лишь по необходимости, их следует выгрузить опять же в ASCII при помощи flow-export и загрузить в СУБД тем же BULK INSERT.

В MS SQL сервере объявляем таблицы так:

--- для агрегированных данных
CREATE TABLE [dbo].[flow_report](
  [first] int NOT NULL,
  [last] int NOT NULL,
  [ip-source-address] char(15) NOT NULL,
  [ip-destination-address] char(15) NOT NULL,
  [flows] int NOT NULL,
  [octets] bigint NOT NULL,
  [packets] int NOT NULL,
  [duration] int NOT NULL,
  [avg-bps] numeric(16,6) DEFAULT NULL,
  [min-bps] numeric(16,6) DEFAULT NULL,
  [max-bps] numeric(16,6) DEFAULT NULL,
  [avg-pps] numeric(14,6) DEFAULT NULL,
  [min-pps] numeric(14,6) DEFAULT NULL,
  [max-pps] numeric(14,6) DEFAULT NULL,
)

-- для исходных данных
CREATE TABLE [dbo].[flow_export](
  [unix_secs] bigint NOT NULL,
  [sysuptime] bigint NOT NULL,
  [dpkts] int NOT NULL,
  [doctets] int NOT NULL,
  [first] bigint NOT NULL,
  [last] bigint NOT NULL,
  [srcaddr] char(15) NOT NULL,
  [dstaddr] char(15) NOT NULL,
  [srcport] int NOT NULL,
  [dstport] int NOT NULL,
  [prot] smallint NOT NULL
)

Размерности полей в таблице, теоретически, должно хватить при трафике до 10 Гб/сек на поток при агрегации до 30 минут.

Конфигурационный файл для фильтров пишем такой (вместо x.x.x.x/x пишем нужные сети):

filter-primitive our-networks
  type ip-address-prefix
  permit x.x.x.x/x
  permit x.x.x.x/x
  permit x.x.x.x/x

filter-definition our-destination
  match ip-destination-address our-networks

filter-definition our-source
  match ip-source-address our-networks

filter-definition our-traffic
  match ip-destination-address our-networks
  or
  match ip-source-address our-networks

Конфигурационный файл для отчетов пишем такой:

include-filter конфиг_фильтров
stat-report ourset_fullformat
  type ip-source/destination-address
  filter our-traffic
  output
    format ascii
    fields +first,+last,+octets,+packets,+flows,+pps,+bps

stat-report ourset_shortformat
  type ip-source/destination-address
  filter our-traffic
  output
    format ascii
    fields +first,+last,+octets,+packets,+flows

stat-definition by_day
  time-series 86400
  report ourset_fullformat

stat-definition by_hour
  time-series 3600
  report ourset_fullformat

stat-definition by_minute
  time-series 60
  report ourset_fullformat

stat-definition by_5min
  time-series 300
  report ourset_fullformat

stat-definition by_10min
  time-series 600
  report ourset_fullformat

stat-definition by_15min
  time-series 900
  report ourset_fullformat

stat-definition by_30min
  time-series 1800
  report ourset_fullformat

Выгрузка делается так:

flow-report -s конфиг_фильтров -S by_5min < файл_статистики | grep -v '#' > выходной_файл

У меня занимает до 2-х минут, если мой файл содержит данные собранные за 15 минут на канале > 100 Mbit/s: исходные данные ~ 18 млн. строк, на выходе - 4.5 млн.

Вырезать '#' приходится не только из первой строки, т.к. заголовок периодически повторяется (у меня раза три, думаю это ошибка flow-report).

Загрузка в SQL агрегированных данных так:

BULK INSERT [flow_report] FROM 'выходной_файл'
WITH (DATAFILETYPE = 'char', FIELDTERMINATOR = ',', ROWTERMINATOR = '0x0A')

Занимает чуть меньше 3 минут (~ 4.5 млн. строк).

Если мне нужны исходные данные, можно выгрузить их, отфильтровав тем же фильтром:

flow-nfilter -f конфиг_фильтров -F our-traffic < файл_статистики | flow-export -f2 -m0x3831E5 > выходной_файл

Это занимает около 2-х минут (~18 млн. строк за 15 мин. в моем случае).

Загрузка в SQL исходных данных так:

BULK INSERT [flow_export] FROM 'выходной_файл'
WITH (DATAFILETYPE = 'char', FIELDTERMINATOR = ',', ROWTERMINATOR = '0x0A', FIRSTROW = 2, BATCHSIZE = 1000)

Это занимает обычно в четыре (как минимум в два) раза больше времени и объема, чем агрегированные по 5 минут данные. Т.е. в моем случае, 12 минут, что неприемлемо, т.к. едва успевает за скоростью сбора данных.

Надо сказать, что Windows Server с MS SQL в моем случае виртуализован на Oracle VM (xen) и на этом сервере одновременно работают ещё 7 ВМ. Если запустить MS SQL на физическом сервере, распределив журналы транзакций и файлы данных с целью оптимизации, то, возможно, я смогу загружать все сырые данные.

В следующих статьях я описал, как я сделал это лучше: