Импорт Netflow статистики в базы данных #2

Я более тщательно проработал решение под MS SQL, начатое в предыдущем посте. Итак, если хотим загружать статистику, собираемую пакетом flow-tools по NetFlow, в БД MS SQL, нужно сделать следующее:

  1. Установить flow-tools и сконфигурировать flow-report (см. предыдущий пост).
  2. Организовать доставку на сервер СУБД MS SQL.
  3. В БД создать нижеописанные вещи.

Для хранения IPv4 адресов в целочисленном виде:

CREATE FUNCTION [dbo].[int_to_ip] @ip int
RETURNS varchar(15)
AS
BEGIN
    DECLARE @octet0 varchar(3)
    DECLARE @octet1 varchar(3)
    DECLARE @octet2 varchar(3)
    DECLARE @octet3 varchar(3)
    DECLARE @bigint bigint;
    SET @bigint = @ip+2147483648;
    SET @octet3 = (@bigint / power(2, 24)) % 256
    SET @octet2 = (@bigint / power(2, 16)) % 256
    SET @octet1 = (@bigint / power(2, 8)) % 256
    SET @octet0 = @bigint % 256
    RETURN @octet3 + '.' + @octet2 + '.' + @octet1 + '.' + @octet0
END

CREATE FUNCTION [dbo].[ip_to_int] @ip varchar(15)
RETURNS int
AS
BEGIN
    DECLARE @octet0 bigint
    DECLARE @octet1 bigint
    DECLARE @octet2 bigint
    DECLARE @octet3 bigint
    SET @octet3 = ParseName(@ip, 4)
    SET @octet2 = ParseName(@ip, 3)
    SET @octet1 = ParseName(@ip, 2)
    SET @octet0 = ParseName(@ip, 1)
    RETURN (@octet3 * power(2, 24) + @octet2 * power(2, 16) + @octet1 * power(2, 8) + @octet0)-2147483648;
END

Далее, таблицы и функция для импорта и агрегации данных и учета импортированных файлов:

CREATE TABLE [dbo].[tbl_files](
 [filename] [nchar](35) NULL,
 [status] [tinyint] NOT NULL,
 [imported] [datetime2] NULL
)

ALTER TABLE [dbo].[tbl_files] ADD  CONSTRAINT [DF_Table_1_imported]  DEFAULT ((0)) FOR [status]

CREATE TABLE [tbl_summarized](
 [ip] int NOT NULL,
 [first] smalldatetime NOT NULL,
 [last] smalldatetime NOT NULL,
 [src-flows] int NOT NULL,
 [src-octets] bigint NOT NULL,
 [src-packets] int NOT NULL,
 [dst-flows] int NOT NULL,
 [dst-octets] bigint NOT NULL,
 [dst-packets] int NOT NULL
)

CREATE NONCLUSTERED INDEX IX_tbl_summarized_ip
ON [tbl_summarized](ip)

CREATE PROCEDURE [dbo].[GetFlowReports]
AS
BEGIN
 SET NOCOUNT ON;
 DECLARE
  @file NCHAR(35),
  @filepath NVARCHAR(255),
  @query NVARCHAR(max);
 
 -- Take file list
 CREATE TABLE #dir([filename] nchar(35));
 INSERT INTO #dir EXEC xp_cmdshell 'dir E:\NetFlow\report.*.csv /b';
 INSERT INTO [tbl_files] ([filename])
  SELECT * FROM #dir WHERE [filename] IS NOT NULL AND
   NOT EXISTS (SELECT * FROM [tbl_files] AS f
    WHERE f.[filename] = #dir.[filename]);
 DROP TABLE #dir;
 
 -- Temp table for import data
 CREATE TABLE #flow_report(
  [first] [int] NOT NULL,
  [last] [int] NOT NULL,
  [ip-source-address] [char](15) NOT NULL,
  [ip-destination-address] [char](15) NOT NULL,
  [flows] [int] NOT NULL,
  [octets] [bigint] NOT NULL,
  [packets] [int] NOT NULL,
  [duration] [int] NOT NULL,
  [avg-bps] [numeric](16, 6) NULL,
  [min-bps] [numeric](16, 6) NULL,
  [max-bps] [numeric](16, 6) NULL,
  [avg-pps] [numeric](14, 6) NULL,
  [min-pps] [numeric](14, 6) NULL,
  [max-pps] [numeric](14, 6) NULL
 );
 
 DECLARE fc CURSOR FOR SELECT [filename] FROM [tbl_files]
  WHERE [status] = 0;
 OPEN fc;
 FETCH NEXT FROM fc INTO @file;
 WHILE @@FETCH_STATUS = 0
 BEGIN
  BEGIN TRY

   -- Mark file as currently importing
   UPDATE [tbl_files] SET [status]=1 WHERE [filename] = @file AND [status] = 0;
   
   -- If something wrong (maybe other process is trying to import file), skip
   IF @@ROWCOUNT != 1 CONTINUE;
   
   -- Begin import from file
   SET @filepath = 'E:\NetFlow\'+@file;
   SET @query = N'BULK INSERT #flow_report FROM '''+@filepath+''' WITH (DATAFILETYPE = ''char'', FIELDTERMINATOR = '','', ROWTERMINATOR = ''0x0A'')';
   EXEC sp_executesql @query;

   -- Mark file as imported
   UPDATE [tbl_files] SET [status]=2 WHERE [filename] = @file;
      
   -- Aggregation
   BEGIN TRAN
   
    -- Do aggregation by source
    INSERT INTO [tbl_summarized]
     SELECT
      dbo.ip_to_int([ip]),
      DATEADD(ss,[first],'1970-01-01') AS [first],
      DATEADD(ss,[last],'1970-01-01') AS [last],
      SUM([src-flows]) AS [src-flows],
      SUM([src-octets]) AS [src-octets],
      SUM([src-packets]) AS [src-packets],
      SUM([dst-flows]) AS [dst-flows],
      SUM([dst-octets]) AS [dst-octets],
      SUM([dst-packets]) AS [dst-packets]
      FROM (
      SELECT
       [ip-source-address] AS [ip],
       [first],
       [last],
       SUM([flows]) AS [src-flows],
       SUM([octets]) AS [src-octets],
       SUM([packets]) AS [src-packets],
       0 AS [dst-flows],
       0 AS [dst-octets],
       0 AS [dst-packets]
      FROM #flow_report
      GROUP BY [first], [last], [ip-source-address]
      UNION
      SELECT
       [ip-destination-address] AS [ip],
       [first],
       [last],
       0 AS [src-flows],
       0 AS [src-octect],
       0 AS [src-packets],
       SUM([flows]) AS [dst-flows],
       SUM([octets]) AS [dst-octets],
       SUM([packets]) AS [dst-packets]
      FROM #flow_report
      GROUP BY [first], [last], [ip-destination-address]
     ) AS s
   --  WHERE
   --   [ip] BETWEEN dbo.ip_to_int('..') AND dbo.ip_to_int('..') OR
   --   [ip] BETWEEN dbo.ip_to_int('..') AND dbo.ip_to_int('..') OR
   --   [ip] BETWEEN dbo.ip_to_int('..') AND dbo.ip_to_int('..') OR
   --   [ip] BETWEEN dbo.ip_to_int('..') AND dbo.ip_to_int('..')
     GROUP BY [ip], [first], [last];
    
    -- mark file as aggregated
    UPDATE [tbl_files] SET [status]=3, [imported]=GETDATE() WHERE [filename] = @file;
   
   COMMIT TRAN
   
  END TRY
  BEGIN CATCH
   -- If import fails, make file available to next import
   UPDATE [tbl_files] SET [status]=0 WHERE [filename] = @file AND [status] != 3;
  END CATCH
  
  DELETE FROM #flow_report;  
  FETCH NEXT FROM fc INTO @file;
 END
 
 DROP TABLE #flow_report;
END

Предполагается, что [dbo].[GetFlowReports] будет запускаться по расписанию.

Правда, потом я всё переделал на PostgreSQL, об этом в следующей статье.