Испытания способа агрегации в MS SQL

В предыдущем посте я писал о том, как импортировать NetFlow статистику в MS SQL. Произвел замеры скорости решения на наших данных. Мы пропускаем за 15 минут от 6 до 18 миллионов потоков (трафик в несколько сотен Mbit/s).

Что получилось с этим решением:

Не удается включить фильтр по сетям (чтоб оставить только наши IP адреса) при суммировании данных из #flow_report в tbl_summarized. При выполнении GetFlowReports с соответствующими условиями в WHERE происходит ошибка 3930, полагаю из-за слишком больших объемов (у меня SQL Server 2008 Standard на 5 Гб ОЗУ).

Если индекс по IP в tbl_summarized (это результирующая таблица, из которой будут брать данные пользователи) выключен, то GetFlowReports отрабатывает 32 15-минутных интервала (8 часов статистики) за 1 час 16 минут, объем в БД 2.8 Гб. Добавим сюда 1 мин. на работу flow-report на подготовку каждого файла и ещё 1 минуту на доставку файла на MS SQL сервер. Итого - 140 мин.

Коэффициент (время обработки/время сбора) в этом случае: 0,29.

Если индекс по IP в tbl_summarized включен скорость импорта слишком стремительно падает с увеличением импортированного объема. Если импорт первого 15 минутного интервала занимает 2 минуты, то импорт 32 интервала уже 10 минут и т.д. в прогрессии. В результате, GetFlowReports отрабатывает те же 32 интервала за 3 часа 9 минут. Т.е. общий итог - 253 мин.

Коэффициент (время обработки/время сбора): 0,53.

Теперь надо сказать, почему результат этой работы использовать нельзя:

Выполнение любого запроса по tbl_summarized, содержащей данные 32 интервалов, где условие IP BETWEEN .. AND .. - около 14-17 минут без особой зависимости от индекса. Скорее даже с обратной зависимостью.

Проблема, полагаю, в том, что данные остаются не отфильтрованными по нашим сетям, т.е. мы имеем множество лишних записей в индексе. Те же самые 32 15-ти минутных интервала дают 49 987 505 строк (это одна запись в час по каждому уникальному IP встреченному в нашем трафике), из которых нам нужны только 38400 (это наши 4800 IP адресов * 8 часов).

Выполнить фильтрацию по tbl_summarized и переложить куда-нибудь в tbl_summarized_filtered за приемлемое время так же не удалось.

В итоге, если хотя бы по 10 % адресов кто-то захочет получить отчет, на отчеты уйдет 120 часов.

Способ агрегации в PostgreSQL

Пришлось применить другое решение. Я стряхнул пыль с утилиты flow-pgdb (http://sourceforge.net/projects/flow-pgdb/), которая была написана мною для экспорта статистики NetFlow в СУБД PostgreSQL три года назад. Кое что исправил, улучшил и документировал. Я пошел таким путем — экспортировать и суммировать статистику в PostgreSQL, а затем забирать в MS SQL результат.

Оказалось, что мы убираем минуту на подготовку данных в ASCII и минуту доставку файла на MS SQL сервер, вместо этого мы за 45 сек. загружаем 12 млн. строк сразу в БД PostgreSQL. На агрегацию (см. ниже) уходит ещё 45 секунд. Итого - 8 млн. строк в минуту.

Почему так быстро? Благодаря бинарной массовой вставке (кстати сетевой — идущей через подключение к БД) и ещё потому, что в PostgreSQL есть специальный тип данных - IP адрес, плюс операции проверки вхождения адреса в сеть, что делает очень быстрым работу выражений вида WHERE dstaddr << 'ваша сеть/маска'.

Кроме того, утилитой обеспечивается целостность данных, в частности, строится учет обработанных файлов (если файл докладывается в хранилище - он экспортируется следующим запуском утилиты), при этом, гарантируется отсутствие дублирования и полнота данных экспортированных из каждого отдельно взятого файла благодаря транзакционности.

Коэффициент (время обработки/время сбора) в моем случае: 0,1. Если не использовать временные таблицы (ключ -x), он быстро ухудшается, по мере увеличения объема уже обработанных данных. Поэтому, используем flow-pgdb с ключом -x.

Что мы получаем помимо скорости:

  1. Агрегацию или подготовку отчетов удобно настраивать, т.к. ваша функция это SQL выражения, оперировать которыми проще чем flow-filter.
  2. Решена проблема контроля за целостностью данных.

Решение можно дешево масштабировать, обрабатывая статистику с разных маршрутизаторов на разных серверах PostgreSQL и загружая в MS только итоги.

Вот как это делается:

1) Качаете и компилируете flow-pgdb (http://sourceforge.net/projects/flow-pgdb/).

Дистрибутив содержит библиотеки необходимые для работы с файлами данных flow-tools (от оригинальной 66-й версии http://www.splintered.net/sw/flow-tools), но может потребоваться установить библиотеки PosgreSQL, Zlib и компилятор.

Вот, что нужно сделать на RedHat Enterprise Linux, Oracle Enterprise Linux или CentOS:

yum install postgresql-libs postgresql-devel gcc
tar -zxvf flow-pgdb-0.73.tgz
cd flow-pgdb-0.73
./configure
make install

2) Создаете следующее в БД:

CREATE LANGUAGE 'plpgsql';

-- Tables
CREATE TABLE files (
filename varchar(256) NOT NULL UNIQUE,
export_begin timestamp with time zone DEFAULT NULL,
export_finish timestamp with time zone DEFAULT NULL,
capture_start timestamp with time zone NOT NULL,
capture_end timestamp with time zone NOT NULL,
flows_count integer NOT NULL DEFAULT 0,
flows_lost integer NOT NULL DEFAULT 0,
flows_corrupt integer NOT NULL DEFAULT 0,
flows_exported integer NOT NULL DEFAULT 0
) WITHOUT OIDS;

CREATE TABLE data (
first integer NOT NULL,
last integer NOT NULL,
srcaddr inet NOT NULL,
dstaddr inet NOT NULL,
doctets bigint NOT NULL,
dpkts bigint NOT NULL
) WITHOUT OIDS;

CREATE TABLE aggregation_step1 (
h timestamp without time zone,
addr inet,
out_octets bigint,
out_pkts bigint,
out_flows bigint,
in_octets bigint,
in_pkts bigint,
in_flows bigint
)
WITHOUT OIDS;

CREATE TABLE data_aggregated (
exported2mssql timestamp with time zone,
h timestamp without time zone,
addr inet,
out_octets bigint,
out_pkts bigint,
out_flows bigint,
in_octets bigint,
in_pkts bigint,
in_flows bigint
) WITHOUT OIDS;

-- Aggregation step 1 function (run it as -c argument of flow-pgdb)

CREATE OR REPLACE FUNCTION aggregation_step1(varchar(255))
RETURNS VOID
AS $$
DECLARE
BEGIN

-- aggregate our IP by destination
EXECUTE '
INSERT INTO aggregation_step1 (h, addr, in_octets, in_pkts, in_flows)
SELECT date_trunc(''hour'',(date ''1970-01-01''+first*interval ''1 second'')) AS h, dstaddr AS addr, sum(doctets) AS in_octets, sum(dpkts) AS in_pkts, count(*) AS in_flows
FROM "'||$1||'"
WHERE dstaddr << ''ваша сеть/маска'' OR
dstaddr << ''ваша сеть/маска'' OR
dstaddr << ''ваша сеть/маска'' OR
dstaddr << ''ваша сеть/маска'' OR
dstaddr << ''ваша сеть/маска'' OR
dstaddr << ''ваша сеть/маска'' OR
dstaddr << ''ваша сеть/маска''
GROUP BY h, dstaddr;';

-- aggregate our IP by source
EXECUTE '
INSERT INTO aggregation_step1 (h, addr, out_octets, out_pkts, out_flows)
SELECT date_trunc(''hour'',(date ''1970-01-01''+first*interval ''1 second'')) AS h, srcaddr AS addr, sum(doctets) AS out_octets, sum(dpkts) AS out_pkts, count(*) AS out_flows
FROM "'||$1||'"
WHERE srcaddr << ''ваша сеть/маска'' OR
srcaddr << ''ваша сеть/маска'' OR
srcaddr << ''ваша сеть/маска'' OR
srcaddr << ''ваша сеть/маска'' OR
srcaddr << ''ваша сеть/маска'' OR
srcaddr << ''ваша сеть/маска'' OR
srcaddr << ''ваша сеть/маска''
GROUP BY h, srcaddr;';

END;
$$ LANGUAGE 'plpgsql' SECURITY INVOKER;

-- Aggregation step 2 (run it with "now() - interval '1 hour'" argument to work on prev. hour. !wrap in transaction!)

CREATE OR REPLACE FUNCTION aggregation_step2(timestamp with time zone)
RETURNS VOID AS $$
-- join 'out' and 'in' data
INSERT INTO data_aggregated (h, addr, out_octets, out_pkts, out_flows, in_octets, in_pkts, in_flows)
SELECT h, addr, SUM(out_octets), SUM(out_pkts), SUM(out_flows), SUM(in_octets), SUM(in_pkts), SUM(in_flows)
FROM aggregation_step1
WHERE h < $1 AT TIME ZONE 'UTC'
GROUP BY h, addr;

DELETE FROM aggregation_step1 WHERE h < $1 AT TIME ZONE 'UTC';
$$ LANGUAGE SQL SECURITY INVOKER;

Схема агрегации в моем случае двухшаговая:

  1. команда aggregation_step1() выполняется сразу после импорта файла данных.
  2. команда aggregation_step2() должна быть выполнена когда час уже будет полностью обработан aggregation_step1(). Мы используем 15 мин. интервалы и aggregation_step2() завершает агрегацию объединяя записи в пределах одного часа.

3) Ставите в крон следующее:

flow-pgdb -x -u DB_LOGIN:DB_PASSWORD:DB_HOST:5432:DB_NAME -w /<папка со статистикой>/ -c "SELECT aggregation_step1('$');"

4) Так же ставите в расписание запуск процедуры aggregation_step2(). Оберните запуск в транзакцию!

5) Настраиваете перенос данных в MS SQL его средствами.

Итоговые данные в PostgreSQL в таблице data_aggregated имеют время в UTC.