This is the last post in our series on using Python for network analysis.
In the previous posts, we used Scapy, Plotly, Pandas and more to analyze and visualize network data. All of those examples used data from packet captures (PCAP) files.
In this post, we’ll cover how to observe (sniff) packets in real time and plot them using Matplotlib.
Requirements
This was tested on OSX, but will work on any Linux machine. You will need to install Scapy and Matplotlib:
#> pip3 install scapy matplotlib
|
Imports
To run in real-time, you will import Scapy and Matplotlib. We’ll import argparse to get the interface as a command line option, and os to check to see if we are root and can snoop on the specified interface:
from scapy.all import *
import matplotlib.pyplot as plt
import argparse
from os import getuid
|
Arguments
The user will supply the interface, and we will allow another option to only capture X packets before exiting:
parser = argparse.ArgumentParser(description='Live Traffic Examiner')
parser.add_argument('interface', help="Network interface", type=str)
parser.add_argument('--count', help="Capture X packets and exit", type=int)
args=parser.parse_args()
|
User Check
As mentioned above, we want to see if the user can access the interface. Usually, only root can do that, but we’ll allow a check to see if it works:
#Check to see if we are root, otherwise Scapy might not be able to listen if getuid() != 0 :
print("Warning: Not running as root, packet listening may not work.")
try:
print("--Trying to listen on {}".format(args.interface))
sniff(iface=args.interface,count=1)
print("--Success!")
except:
print("--Failed!\nError: Unable to sniff packets, try using sudo.")
quit()
|
Building the plot
To start building the plot, we will add titles and labels to the plot. We will also use ion(), which allows for an interactive plot:
#Interactive Mode
plt.ion()
#Labels
plt.ylabel("Bytes")
plt.xlabel("Count")
plt.title("Real Time Network Traffic")
plt.tight_layout()
|
Loop
The script will have a loop that listens for one packet at a time. Here, we’ll set up an empty list to hold a count and the byte data:
#Empty list to hold bytes
yData=[]
i=0
#Listen indefinitely, or until we reach count
while True:
#Listen for 1 packet
for pkt in sniff(iface=args.interface,count=1): |
The important thing in the above code is count=1 because it tells Scapy to sniff for a single packet. Now that we have a single packet, the code will enter a try/except loop/. The reason for try/except is to allow the user to hit CTRL-C to exit, otherwise the program will not exit easily:
try:
if IP in pkt:
yData.append(pkt[IP].len)
plt.plot(yData)
#Pause and draw
plt.pause(0.1)
i+=1
if args.count:
if i >= args.count:
quit() |
The code above adds the size of the packet to an array, adds it to the plot, and draws it with plt.pause:
except KeyboardInterrupt:
print("Captured {} packets on interface {} ".format(i, args.interface))
quit() |
Last, we’ll use the except for CTRL-C to exit and send a count of packets.
Usage
#> ./realtime.py -h
usage: realtime.py [-h] [--count COUNT] interface
Live Traffic Examiner
positional arguments:
interface Network interface to listen on i.e. en0
optional arguments:
-h, --help show this help message and exit
--count COUNT Capture X packets and exit |
#> sudo ./realtime.py en0 --count 200
Password:
Capturing 200 packets on interface en0 |
For your reference, here is the full code:
from scapy.all import *
import matplotlib.pyplot as plt
import argparse
from os import getuid
parser = argparse.ArgumentParser(description='Live Traffic Examiner')
parser.add_argument('interface', help="Network interface to listen on i.e. en0", type=str)
parser.add_argument('--count', help="Capture X packets and exit", type=int)
args=parser.parse_args()
#Check to see if we are root, otherwise Scapy might not be able to listen
if getuid() != 0 :
print("Warning: Not running as root, packet listening may not work.")
try:
print("--Trying to listen on {}".format(args.interface))
sniff(iface=args.interface,count=1)
print("--Success!")
except:
print("--Failed!\nError: Unable to sniff packets, try again using sudo.")
quit()
if args.count:
print("Capturing {} packets on interface {} ".format(args.count, args.interface))
else:
print("Capturing unlimited packets on interface {} \n--Press CTRL-C to exit
".format(args.interface))
#Interactive Mode
plt.ion()
#Labels
plt.ylabel("Bytes")
plt.xlabel("Count")
plt.title("Real time Network Traffic")
plt.tight_layout()
#Empty list to hold bytes
yData=[]
i=0
#Listen indefinitely, or until we reach count
while True:
#Listen for 1 packet
for pkt in sniff(iface=args.interface,count=1):
try:
if IP in pkt:
yData.append(pkt[IP].len)
plt.plot(yData)
#Pause and draw
plt.pause(0.1)
i+=1
if args.count:
if i >= args.count:
quit()
except KeyboardInterrupt:
print("Captured {} packets on interface {} ".format(i, args.interface))
quit()
|
Visualizing network data in real time with Python takeaway
I hope this series on analyzing network traffic with Python has helped you. You can take most of the code in the examples and customize it to your specific needs. I have a feeling that once you master this you’ll use Wireshark less and less.
Automox for Easy IT Operations
Automox is the cloud-native IT operations platform for modern organizations. It makes it easy to keep every endpoint automatically configured, patched, and secured – anywhere in the world. With the push of a button, IT admins can fix critical vulnerabilities faster, slash cost and complexity, and win back hours in their day.
Grab your free trial of Automox and join thousands of companies transforming IT operations into a strategic business driver.