Импорт Netflow статистики в базы данных #2
Я более тщательно проработал решение под MS SQL, начатое в предыдущем посте. Итак, если хотим загружать статистику, собираемую пакетом flow-tools по NetFlow, в БД MS SQL, нужно сделать следующее:
- Установить flow-tools и сконфигурировать flow-report (см. предыдущий пост).
- Организовать доставку на сервер СУБД MS SQL.
- В БД создать нижеописанные вещи.
Для хранения IPv4 адресов в целочисленном виде:
CREATE FUNCTION [dbo].[int_to_ip] @ip int
RETURNS varchar(15)
AS
BEGIN
DECLARE @octet0 varchar(3)
DECLARE @octet1 varchar(3)
DECLARE @octet2 varchar(3)
DECLARE @octet3 varchar(3)
DECLARE @bigint bigint;
SET @bigint = @ip+2147483648;
SET @octet3 = (@bigint / power(2, 24)) % 256
SET @octet2 = (@bigint / power(2, 16)) % 256
SET @octet1 = (@bigint / power(2, 8)) % 256
SET @octet0 = @bigint % 256
RETURN @octet3 + '.' + @octet2 + '.' + @octet1 + '.' + @octet0
END
CREATE FUNCTION [dbo].[ip_to_int] @ip varchar(15)
RETURNS int
AS
BEGIN
DECLARE @octet0 bigint
DECLARE @octet1 bigint
DECLARE @octet2 bigint
DECLARE @octet3 bigint
SET @octet3 = ParseName(@ip, 4)
SET @octet2 = ParseName(@ip, 3)
SET @octet1 = ParseName(@ip, 2)
SET @octet0 = ParseName(@ip, 1)
RETURN (@octet3 * power(2, 24) + @octet2 * power(2, 16) + @octet1 * power(2, 8) + @octet0)-2147483648;
END
Далее, таблицы и функция для импорта и агрегации данных и учета импортированных файлов:
CREATE TABLE [dbo].[tbl_files](
[filename] [nchar](35) NULL,
[status] [tinyint] NOT NULL,
[imported] [datetime2] NULL
)
ALTER TABLE [dbo].[tbl_files] ADD CONSTRAINT [DF_Table_1_imported] DEFAULT ((0)) FOR [status]
CREATE TABLE [tbl_summarized](
[ip] int NOT NULL,
[first] smalldatetime NOT NULL,
[last] smalldatetime NOT NULL,
[src-flows] int NOT NULL,
[src-octets] bigint NOT NULL,
[src-packets] int NOT NULL,
[dst-flows] int NOT NULL,
[dst-octets] bigint NOT NULL,
[dst-packets] int NOT NULL
)
CREATE NONCLUSTERED INDEX IX_tbl_summarized_ip
ON [tbl_summarized](ip)
CREATE PROCEDURE [dbo].[GetFlowReports]
AS
BEGIN
SET NOCOUNT ON;
DECLARE
@file NCHAR(35),
@filepath NVARCHAR(255),
@query NVARCHAR(max);
-- Take file list
CREATE TABLE #dir([filename] nchar(35));
INSERT INTO #dir EXEC xp_cmdshell 'dir E:\NetFlow\report.*.csv /b';
INSERT INTO [tbl_files] ([filename])
SELECT * FROM #dir WHERE [filename] IS NOT NULL AND
NOT EXISTS (SELECT * FROM [tbl_files] AS f
WHERE f.[filename] = #dir.[filename]);
DROP TABLE #dir;
-- Temp table for import data
CREATE TABLE #flow_report(
[first] [int] NOT NULL,
[last] [int] NOT NULL,
[ip-source-address] [char](15) NOT NULL,
[ip-destination-address] [char](15) NOT NULL,
[flows] [int] NOT NULL,
[octets] [bigint] NOT NULL,
[packets] [int] NOT NULL,
[duration] [int] NOT NULL,
[avg-bps] [numeric](16, 6) NULL,
[min-bps] [numeric](16, 6) NULL,
[max-bps] [numeric](16, 6) NULL,
[avg-pps] [numeric](14, 6) NULL,
[min-pps] [numeric](14, 6) NULL,
[max-pps] [numeric](14, 6) NULL
);
DECLARE fc CURSOR FOR SELECT [filename] FROM [tbl_files]
WHERE [status] = 0;
OPEN fc;
FETCH NEXT FROM fc INTO @file;
WHILE @@FETCH_STATUS = 0
BEGIN
BEGIN TRY
-- Mark file as currently importing
UPDATE [tbl_files] SET [status]=1 WHERE [filename] = @file AND [status] = 0;
-- If something wrong (maybe other process is trying to import file), skip
IF @@ROWCOUNT != 1 CONTINUE;
-- Begin import from file
SET @filepath = 'E:\NetFlow\'+@file;
SET @query = N'BULK INSERT #flow_report FROM '''+@filepath+''' WITH (DATAFILETYPE = ''char'', FIELDTERMINATOR = '','', ROWTERMINATOR = ''0x0A'')';
EXEC sp_executesql @query;
-- Mark file as imported
UPDATE [tbl_files] SET [status]=2 WHERE [filename] = @file;
-- Aggregation
BEGIN TRAN
-- Do aggregation by source
INSERT INTO [tbl_summarized]
SELECT
dbo.ip_to_int([ip]),
DATEADD(ss,[first],'1970-01-01') AS [first],
DATEADD(ss,[last],'1970-01-01') AS [last],
SUM([src-flows]) AS [src-flows],
SUM([src-octets]) AS [src-octets],
SUM([src-packets]) AS [src-packets],
SUM([dst-flows]) AS [dst-flows],
SUM([dst-octets]) AS [dst-octets],
SUM([dst-packets]) AS [dst-packets]
FROM (
SELECT
[ip-source-address] AS [ip],
[first],
[last],
SUM([flows]) AS [src-flows],
SUM([octets]) AS [src-octets],
SUM([packets]) AS [src-packets],
0 AS [dst-flows],
0 AS [dst-octets],
0 AS [dst-packets]
FROM #flow_report
GROUP BY [first], [last], [ip-source-address]
UNION
SELECT
[ip-destination-address] AS [ip],
[first],
[last],
0 AS [src-flows],
0 AS [src-octect],
0 AS [src-packets],
SUM([flows]) AS [dst-flows],
SUM([octets]) AS [dst-octets],
SUM([packets]) AS [dst-packets]
FROM #flow_report
GROUP BY [first], [last], [ip-destination-address]
) AS s
-- WHERE
-- [ip] BETWEEN dbo.ip_to_int('..') AND dbo.ip_to_int('..') OR
-- [ip] BETWEEN dbo.ip_to_int('..') AND dbo.ip_to_int('..') OR
-- [ip] BETWEEN dbo.ip_to_int('..') AND dbo.ip_to_int('..') OR
-- [ip] BETWEEN dbo.ip_to_int('..') AND dbo.ip_to_int('..')
GROUP BY [ip], [first], [last];
-- mark file as aggregated
UPDATE [tbl_files] SET [status]=3, [imported]=GETDATE() WHERE [filename] = @file;
COMMIT TRAN
END TRY
BEGIN CATCH
-- If import fails, make file available to next import
UPDATE [tbl_files] SET [status]=0 WHERE [filename] = @file AND [status] != 3;
END CATCH
DELETE FROM #flow_report;
FETCH NEXT FROM fc INTO @file;
END
DROP TABLE #flow_report;
END
Предполагается, что [dbo].[GetFlowReports] будет запускаться по расписанию.
Правда, потом я всё переделал на PostgreSQL, об этом в следующей статье.