Page 1 of 1

softflowd and external ELK-stack

Posted: August 12th, 2017, 4:55 pm
by ummeegge
Hi all,
wanted to document my first functional steps (possibly only for me) for a lightweight but also comprehensive possibility to not only monitor IPFire network traffic which will be done on IPFire via softflowd --> --> ... softflowd/ but let a second machine in the local network bring on a flexible data structure with some nice visualization which will be done via the ELK-stack --> . Advantages ? IPFire do not need to hold and process all that data. One additional is needed on IPFire and the rest can be done externally.

How does it work:
- softflowd captures the data on IPFire and delivers it (currently not encrypted) to another local machine, in my case a linux mint 18.2 system. The configuration of softflowd are managed over the initscript (findable after softflowd installation under /etc/rc.d/init.d/softflowd ) btw. available in those packages --> ... softflowd/ . Since you can start multiple instances of softflowd (my first works with NFsen), i used the "##Set 2 start" section which looks like this:

Code: Select all

DESC="softflowd flow collector";
DAEMON=$(which ${NAME});

## Set 1 start
ARGS1="-n ${HOST1}:${PORT1} -i ${IFACE1} -t ${MLIFE1} -v ${VER1}";
## Set 1 stop

## Set 2 start
ARGS2="-n ${HOST2}:${PORT2} -i ${IFACE2} -t ${MLIFE2} -v ${VER2}";
## Set 2 stop

. /etc/sysconfig/rc
. ${rc_functions}

case "${1}" in
		boot_mesg "Starting ${DESC}... ";
		${DAEMON} ${ARGS1};
		${DAEMON} ${ARGS2};

		boot_mesg "Stopping ${DESC}";
		killproc ${DAEMON};

(only the '## Set 2 start' and '${DAEMON} ${ARGS2}'; lines are relevant). With this entries the collected data will be send to my local machine "" to port 9995 udp -->

In there, the data will be grabbed via
- "Logstash" --> with the following configuration file:

Code: Select all

# Netflow receiver
input {
  udp {
    port => 9995
    type => netflow
    codec => netflow

# Netflow sender
output {
        if ( [type] == "netflow" ) {
                elasticsearch {
                        hosts => "localhost:9200"
                        index => "logstash_netflow5"
        } else {
                elasticsearch {
                        hosts => "localhost:9200"
                        index => "logstash_netflow5"

which works with a netflow codec and will send the output to the search and analytics engine called
- "Elasticsearch" --> where the following index has been applied on the external machine via curl formatted as json which looks like this:

Code: Select all

curl -XPUT localhost:9200/logstash_netflow5 -d '{
    "template" : "logstash_netflow5-*",
    "settings": {
      "index.refresh_interval": "5s"
    "mappings" : {
      "_default_" : {
        "_all" : {"enabled" : false},
        "properties" : {
          "@version": { "index": "analyzed", "type": "integer" },
          "@timestamp": { "index": "analyzed", "type": "date" },
          "netflow": {
            "dynamic": true,
            "type": "object",
            "properties": {
              "version": { "index": "analyzed", "type": "integer" },
              "flow_seq_num": { "index": "not_analyzed", "type": "long" },
              "engine_type": { "index": "not_analyzed", "type": "integer" },
              "engine_id": { "index": "not_analyzed", "type": "integer" },
              "sampling_algorithm": { "index": "not_analyzed", "type": "integer" },
              "sampling_interval": { "index": "not_analyzed", "type": "integer" },
              "flow_records": { "index": "not_analyzed", "type": "integer" },
              "ipv4_src_addr": { "index": "analyzed", "type": "ip" },
              "ipv4_dst_addr": { "index": "analyzed", "type": "ip" },
              "ipv4_next_hop": { "index": "analyzed", "type": "ip" },
              "input_snmp": { "index": "not_analyzed", "type": "long" },
              "output_snmp": { "index": "not_analyzed", "type": "long" },
              "in_pkts": { "index": "analyzed", "type": "long" },
              "in_bytes": { "index": "analyzed", "type": "long" },
              "first_switched": { "index": "not_analyzed", "type": "date" },
              "last_switched": { "index": "not_analyzed", "type": "date" },
              "l4_src_port": { "index": "analyzed", "type": "long" },
              "l4_dst_port": { "index": "analyzed", "type": "long" }, 
              "tcp_flags": { "index": "analyzed", "type": "integer" },
              "protocol": { "index": "analyzed", "type": "integer" },
              "src_tos": { "index": "analyzed", "type": "integer" },
              "src_as": { "index": "analyzed", "type": "integer" },
              "dst_as": { "index": "analyzed", "type": "integer" },
              "src_mask": { "index": "analyzed", "type": "integer" },
              "dst_mask": { "index": "analyzed", "type": "integer" }
This was enough to get the first data delivered by

- Kibana --> . Kibana´s configuration file (/etc/kibana/kibana.yml) has little modifications whereby the diff looks like this:

Code: Select all

--- kibana.yml.orig	2017-08-12 18:35:34.816000000 +0200
+++ kibana.yml	2017-08-12 10:45:25.596000000 +0200
@@ -1,10 +1,10 @@
 # Kibana is served by a back end server. This setting specifies the port to use.
-#server.port: 5601
+server.port: 5601
 # Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
 # The default is 'localhost', which usually means remote machines will not be able to connect.
 # To allow connections from remote users, set this parameter to a non-loopback address. "localhost" ""
 # Enables you to specify a path to mount Kibana at if you are running behind a proxy. This only affects
 # the URLs generated by Kibana, your proxy is expected to remove the basePath value before forwarding requests

after open Kibana over URL , there was the need to define the before in Elasticsearch added index which i did with a "logstash_* , this look like this
you can also check if the index has been applied with a curl command like this

Code: Select all

curl 'localhost:9200/_cat/indices?v'
which can looks like this:

Code: Select all

health status index             uuid                   pri rep docs.count docs.deleted store.size
yellow open   customer          ASq0X3FHRfWUtYOq0N_yyg   5   1          0            0       955b           955b
yellow open   .kibana           jJcy4qhkRTGOv0Qf-xXgwQ   1   1          2            1     15.1kb         15.1kb
yellow open   logstash_netflow5 ifH3iw5DTfOUwVIT5qE9Cw   5   1       7659            0      1.7mb          1.7mb
So after a little while the following results appear:

Let´s start to make some nice dashboards O0 .

There can be made surely more/better/nicer things even it is my first try with this solution it makes a good impression for me and as always --> other users with a further_development/interest/critics in this topic are welcome... Possibly to be continued :) ?! Let´s see...

P.S. i know Big Data is evil but better to know how it works ::)



Some helpful docs (thanks to the people :-) can be found in here -->
- ... t/deb.html
- ... stash.html
- ... -analytics
- ... k-openwrt/
- ... untu-14-04
- and much more which i can´t count/remember.