Импорт Netflow статистики в базы данных #3
Испытания способа агрегации в MS SQL
В предыдущих двух постах я писал о том, как импортировать NetFlow статистику в MS SQL:
Произвел замеры скорости решения на наших данных. Мы пропускаем за 15 минут от 6 до 18 миллионов потоков (трафик в несколько сотен Mbit/s).
Что получилось с этим решением:
Не удается включить фильтр по сетям (чтоб оставить только наши IP адреса) при суммировании данных из #flow_report в tbl_summarized. При выполнении GetFlowReports с соответствующими условиями в WHERE происходит ошибка 3930, полагаю из-за слишком больших объемов (у меня SQL Server 2008 Standard на 5 Гб ОЗУ).
Если индекс по IP в tbl_summarized (это результирующая таблица, из которой будут брать данные пользователи) выключен, то GetFlowReports отрабатывает 32 15-минутных интервала (8 часов статистики) за 1 час 16 минут, объем в БД 2.8 Гб. Добавим сюда 1 мин. на работу flow-report на подготовку каждого файла и ещё 1 минуту на доставку файла на MS SQL сервер. Итого - 140 мин.
Коэффициент (время обработки/время сбора) в этом случае: 0,29.
Если индекс по IP в tbl_summarized включен скорость импорта слишком стремительно падает с увеличением импортированного объема. Если импорт первого 15 минутного интервала занимает 2 минуты, то импорт 32 интервала уже 10 минут и т.д. в прогрессии. В результате, GetFlowReports отрабатывает те же 32 интервала за 3 часа 9 минут. Т.е. общий итог - 253 мин.
Коэффициент (время обработки/время сбора): 0,53.
Теперь надо сказать, почему результат этой работы использовать нельзя:
Выполнение любого запроса по tbl_summarized, содержащей данные 32 интервалов, где условие IP BETWEEN .. AND .. - около 14-17 минут без особой зависимости от индекса. Скорее даже с обратной зависимостью.
Проблема, полагаю, в том, что данные остаются не отфильтрованными по нашим сетям, т.е. мы имеем множество лишних записей в индексе. Те же самые 32 15-ти минутных интервала дают 49 987 505 строк (это одна запись в час по каждому уникальному IP встреченному в нашем трафике), из которых нам нужны только 38400 (это наши 4800 IP адресов * 8 часов).
Выполнить фильтрацию по tbl_summarized и переложить куда-нибудь в tbl_summarized_filtered за приемлемое время так же не удалось.
Если по 1% адресов будут запрошены отчеты, на них уйдет 12 часов.
Способ агрегации в PostgreSQL
Пришлось применить другое решение. Я стряхнул пыль с утилиты flow-pgdb, которая была написана мною для экспорта статистики NetFlow в СУБД PostgreSQL три года назад. Кое что исправил, улучшил и документировал. Я пошел таким путем — экспортировать и суммировать статистику в PostgreSQL, а затем забирать в MS SQL результат.
Оказалось, что мы убираем минуту на подготовку данных в ASCII и минуту доставку файла на MS SQL сервер, вместо этого мы за 45 сек. загружаем 12 млн. строк сразу в БД PostgreSQL. На агрегацию (см. ниже) уходит ещё 45 секунд. Итого — 8 млн. строк в минуту.
Почему так быстро? Благодаря бинарной массовой вставке (кстати сетевой, идущей через удалённое подключение к БД) и ещё потому, что в PostgreSQL есть специальный тип данных — IP адрес, плюс операции проверки вхождения адреса в сеть, что делает очень быстрым работу выражений вида WHERE dstaddr << 'ваша сеть/маска'.
Кроме того, утилитой обеспечивается целостность данных, в частности, строится учет обработанных файлов (если файл докладывается в хранилище, он экспортируется следующим запуском утилиты), при этом, гарантируется отсутствие дублирования и полнота данных экспортированных из каждого отдельно взятого файла благодаря транзакционности.
Коэффициент время обработки / время сбора (в моем случае): 0,1. Если не использовать временные таблицы (ключ -x), он быстро ухудшается, по мере увеличения объема уже обработанных данных. Поэтому, используем flow-pgdb с ключом -x.
Что мы получаем помимо скорости:
- Агрегацию или подготовку отчетов удобно настраивать, т.к. ваша функция это SQL выражение, оперировать которым проще чем flow-filter.
- Решена проблема контроля за целостностью данных.
Решение можно дешево масштабировать, обрабатывая статистику с разных маршрутизаторов на разных серверах PostgreSQL и загружая в MS только итоги.
Вот как это делается:
1) Качаете и компилируете flow-pgdb (http://sourceforge.net/projects/flow-pgdb/).
Дистрибутив содержит библиотеки необходимые для работы с файлами данных flow-tools (от оригинальной 66-й версии http://www.splintered.net/sw/flow-tools), но может потребоваться установить библиотеки PosgreSQL, Zlib и компилятор.
Далее (RedHat Enterprise Linux, Oracle Enterprise Linux или CentOS):
yum install postgresql-libs postgresql-devel gcc
tar -zxvf flow-pgdb-0.73.tgz
cd flow-pgdb-0.73
./configure
make install
2) Создаете следующее в БД:
CREATE LANGUAGE 'plpgsql';
-- Tables
CREATE TABLE files (
filename varchar(256) NOT NULL UNIQUE,
export_begin timestamp with time zone DEFAULT NULL,
export_finish timestamp with time zone DEFAULT NULL,
capture_start timestamp with time zone NOT NULL,
capture_end timestamp with time zone NOT NULL,
flows_count integer NOT NULL DEFAULT 0,
flows_lost integer NOT NULL DEFAULT 0,
flows_corrupt integer NOT NULL DEFAULT 0,
flows_exported integer NOT NULL DEFAULT 0
) WITHOUT OIDS;
CREATE TABLE data (
first integer NOT NULL,
last integer NOT NULL,
srcaddr inet NOT NULL,
dstaddr inet NOT NULL,
doctets bigint NOT NULL,
dpkts bigint NOT NULL
) WITHOUT OIDS;
CREATE TABLE aggregation_step1 (
h timestamp without time zone,
addr inet,
out_octets bigint,
out_pkts bigint,
out_flows bigint,
in_octets bigint,
in_pkts bigint,
in_flows bigint
)
WITHOUT OIDS;
CREATE TABLE data_aggregated (
exported2mssql timestamp with time zone,
h timestamp without time zone,
addr inet,
out_octets bigint,
out_pkts bigint,
out_flows bigint,
in_octets bigint,
in_pkts bigint,
in_flows bigint
) WITHOUT OIDS;
-- Aggregation step 1 function (run it as -c argument of flow-pgdb)
CREATE OR REPLACE FUNCTION aggregation_step1(varchar(255))
RETURNS VOID
AS $$
DECLARE
BEGIN
-- aggregate our IP by destination
EXECUTE '
INSERT INTO aggregation_step1 (h, addr, in_octets, in_pkts, in_flows)
SELECT date_trunc(''hour'',(date ''1970-01-01''+first*interval ''1 second'')) AS h, dstaddr AS addr, sum(doctets) AS in_octets, sum(dpkts) AS in_pkts, count(*) AS in_flows
FROM "'||$1||'"
WHERE dstaddr << ''ваша сеть/маска'' OR
dstaddr << ''ваша сеть/маска'' OR
dstaddr << ''ваша сеть/маска'' OR
dstaddr << ''ваша сеть/маска'' OR
dstaddr << ''ваша сеть/маска'' OR
dstaddr << ''ваша сеть/маска'' OR
dstaddr << ''ваша сеть/маска''
GROUP BY h, dstaddr;';
-- aggregate our IP by source
EXECUTE '
INSERT INTO aggregation_step1 (h, addr, out_octets, out_pkts, out_flows)
SELECT date_trunc(''hour'',(date ''1970-01-01''+first*interval ''1 second'')) AS h, srcaddr AS addr, sum(doctets) AS out_octets, sum(dpkts) AS out_pkts, count(*) AS out_flows
FROM "'||$1||'"
WHERE srcaddr << ''ваша сеть/маска'' OR
srcaddr << ''ваша сеть/маска'' OR
srcaddr << ''ваша сеть/маска'' OR
srcaddr << ''ваша сеть/маска'' OR
srcaddr << ''ваша сеть/маска'' OR
srcaddr << ''ваша сеть/маска'' OR
srcaddr << ''ваша сеть/маска''
GROUP BY h, srcaddr;';
END;
$$ LANGUAGE 'plpgsql' SECURITY INVOKER;
-- Aggregation step 2 (run it with "now() - interval '1 hour'" argument to work on prev. hour. !wrap in transaction!)
CREATE OR REPLACE FUNCTION aggregation_step2(timestamp with time zone)
RETURNS VOID AS $$
-- join 'out' and 'in' data
INSERT INTO data_aggregated (h, addr, out_octets, out_pkts, out_flows, in_octets, in_pkts, in_flows)
SELECT h, addr, SUM(out_octets), SUM(out_pkts), SUM(out_flows), SUM(in_octets), SUM(in_pkts), SUM(in_flows)
FROM aggregation_step1
WHERE h < $1 AT TIME ZONE 'UTC'
GROUP BY h, addr;
DELETE FROM aggregation_step1 WHERE h < $1 AT TIME ZONE 'UTC';
$$ LANGUAGE SQL SECURITY INVOKER;
Схема агрегации в моем случае двухшаговая:
- команда aggregation_step1() выполняется сразу после импорта файла данных.
- команда aggregation_step2() должна быть выполнена когда час уже будет полностью обработан aggregation_step1(). Мы используем 15 мин. интервалы и aggregation_step2() завершает агрегацию объединяя записи в пределах одного часа.
3) Ставите в крон следующее:
flow-pgdb -x -u DB_LOGIN:DB_PASSWORD:DB_HOST:5432:DB_NAME -w /<папка со статистикой>/ -c "SELECT aggregation_step1('$');"
4) Так же ставите в расписание запуск процедуры aggregation_step2(). Оберните запуск в транзакцию!
5) Настраиваете перенос данных в MS SQL его средствами.
Итоговые данные в PostgreSQL в таблице data_aggregated имеют время в UTC.