Monday, December 10, 2018

sFlow to JSON

The latest version of sflowtool can convert sFlow datagrams into JSON, making it easy to write scripts to process the standard sFlow telemetry streaming from devices in the network.

Download and compile the latest version of sflowtool:
git clone https://github.com/sflow/sflowtool.git
cd sflowtool/
./boot.sh 
./configure 
make
sudo make install
The -J option formats the JSON output to be human readable:
$ sflowtool -J
{
 "datagramSourceIP":"10.0.0.162",
 "datagramSize":"396",
 "unixSecondsUTC":"1544241239",
 "localtime":"2018-12-07T19:53:59-0800",
 "datagramVersion":"5",
 "agentSubId":"0",
 "agent":"10.0.0.231",
 "packetSequenceNo":"1068783",
 "sysUpTime":"1338417874",
 "samplesInPacket":"2",
 "samples":[
  {
   "sampleType_tag":"0:2",
   "sampleType":"COUNTERSSAMPLE",
   "sampleSequenceNo":"148239",
   "sourceId":"0:3",
   "elements":[
    {
     "counterBlock_tag":"0:1",
     "ifIndex":"3",
     "networkType":"6",
     "ifSpeed":"1000000000",
     "ifDirection":"1",
     "ifStatus":"3",
     "ifInOctets":"4162076356",
     "ifInUcastPkts":"16312256",
     "ifInMulticastPkts":"187789",
     "ifInBroadcastPkts":"2566",
     "ifInDiscards":"0",
     "ifInErrors":"0",
     "ifInUnknownProtos":"0",
     "ifOutOctets":"2115351089",
     "ifOutUcastPkts":"7087570",
     "ifOutMulticastPkts":"4453258",
     "ifOutBroadcastPkts":"6141715",
     "ifOutDiscards":"0",
     "ifOutErrors":"0",
     "ifPromiscuousMode":"0"
    },
    {
     "counterBlock_tag":"0:2",
     "dot3StatsAlignmentErrors":"0",
     "dot3StatsFCSErrors":"0",
     "dot3StatsSingleCollisionFrames":"0",
     "dot3StatsMultipleCollisionFrames":"0",
     "dot3StatsSQETestErrors":"0",
     "dot3StatsDeferredTransmissions":"0",
     "dot3StatsLateCollisions":"0",
     "dot3StatsExcessiveCollisions":"0",
     "dot3StatsInternalMacTransmitErrors":"0",
     "dot3StatsCarrierSenseErrors":"0",
     "dot3StatsFrameTooLongs":"0",
     "dot3StatsInternalMacReceiveErrors":"0",
     "dot3StatsSymbolErrors":"0"
    }
   ]
  },
  {
   "sampleType_tag":"0:1",
   "sampleType":"FLOWSAMPLE",
   "sampleSequenceNo":"11791",
   "sourceId":"0:3",
   "meanSkipCount":"2000",
   "samplePool":"34185160",
   "dropEvents":"0",
   "inputPort":"3",
   "outputPort":"10",
   "elements":[
    {
     "flowBlock_tag":"0:1",
     "flowSampleType":"HEADER",
     "headerProtocol":"1",
     "sampledPacketSize":"102",
     "strippedBytes":"0",
     "headerLen":"104",
     "headerBytes":"0C-AE-4E-98-0B-89-05-B6-D8-D9-A2-66-80-00-54-00-00-45-08-12-04-00-04-10-4A-FB-A0-00-00-BC-A0-00-00-EF-80-00-DE-B1-E7-26-00-20-75-04-B0-C5-00-00-00-00-96-01-20-00-00-00-00-00-01-11-21-31-41-51-61-71-81-91-A1-B1-C1-D1-E1-F1-02-12-22-32-42-52-62-72-82-92-A2-B2-C2-D2-E2-F2-03-13-23-33-43-53-63-73-1A-1D-4D-76-00-00",
     "dstMAC":"0cae4e980b89",
     "srcMAC":"05b6d8d9a266",
     "IPSize":"88",
     "ip.tot_len":"84",
     "srcIP":"10.0.0.203",
     "dstIP":"10.0.0.254",
     "IPProtocol":"1",
     "IPTOS":"0",
     "IPTTL":"64",
     "IPID":"8576",
     "ICMPType":"8",
     "ICMPCode":"0"
    },
    {
     "flowBlock_tag":"0:1001",
     "extendedType":"SWITCH",
     "in_vlan":"1",
     "in_priority":"0",
     "out_vlan":"1",
     "out_priority":"0"
    }
   ]
  }
 ]
}
The output shows the JSON representation of a single sFlow datagram containing one counter sample and one flow sample.

The -j option output formats the JSON output as a single line per datagram making the output easy to parse in scripts. For example, the following Python script, flow.py, runs sflowtool and parses the JSON output:
#!/usr/bin/env python

import subprocess
from json import loads

p = subprocess.Popen(
  ['/usr/local/bin/sflowtool','-j'],
  stdout=subprocess.PIPE,
  stderr=subprocess.STDOUT
)
lines = iter(p.stdout.readline,'')
for line in lines:
  datagram = loads(line)
  localtime = datagram["localtime"]
  samples = datagram["samples"]
  for sample in samples:
    sampleType = sample["sampleType"]
    elements = sample["elements"]
    if sampleType == "FLOWSAMPLE":
      for element in elements:
        tag = element["flowBlock_tag"]
        if tag == "0:1":
          try:
            src = element["srcIP"]
            dst = element["dstIP"]
            pktsize = element["sampledPacketSize"]
            print "%s %s %s %s" % (localtime,src,dst,pktsize)
          except KeyError:
            pass
Running the script prints flow records showing time, source, destination and number of bytes:
$ ./flow.py 
2018-12-07T20:53:06-0800 10.0.0.70 10.0.0.238 110
2018-12-07T20:53:06-0800 10.0.0.70 10.0.0.238 70
2018-12-07T20:53:06-0800 10.0.0.70 10.0.0.238 70
2018-12-07T20:53:06-0800 10.0.0.238 10.0.0.70 90
The script can easily be modified to add additional fields, push data into an SIEM tool (e.g. Logstash), push counter data into a time series database (e.g. InfluxDB), or perform additional analysis in Python. For example, the following script builds on the example, downloading the Emerging Threats compromised address list and logging any flows that match the list:
#!/usr/bin/env python

import subprocess
from json import loads
from requests import get

blacklist = set()
r = get('https://rules.emergingthreats.net/blockrules/compromised-ips.txt')
for line in r.iter_lines():
  blacklist.add(line)

p = subprocess.Popen(
  ['/usr/local/bin/sflowtool','-j'],
  stdout=subprocess.PIPE,
  stderr=subprocess.STDOUT
)
lines = iter(p.stdout.readline,'')
for line in lines:
  datagram = loads(line)
  localtime = datagram["localtime"]
  samples = datagram["samples"]
  for sample in samples:
    sampleType = sample["sampleType"]
    elements = sample["elements"]
    if sampleType == "FLOWSAMPLE":
      for element in elements:
        tag = element["flowBlock_tag"]
        if tag == "0:1":
          try:
            src = element["srcIP"]
            dst = element["dstIP"]
            if src in blacklist or dst in blacklist:
              print "%s %s %s" % (localtime,src,dst)
          except KeyError:
            pass
The open source Host sFlow agent provides a convenient means of experimenting with sFlow if you don't have access to network devices. The Host sFlow agent is also a simple way to gather real-time telemetry from public cloud virtual machine instances where access to the physical network infrastructure is not permitted.

Finally, for advanced sFlow analytics, try sFlow-RT, a real-time analytics engine that exposes a REST API.