Tibetan | Software Engineer | Application Dev
Network Traffic Monitoring and Analysis Tool
Network Traffic Monitoring and Analysis Tool

Network Traffic Monitoring and Analysis Tool

import socket
import struct
import time
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict

class Packet:
def __init__(self, timestamp, source_ip, dest_ip, protocol, length):
self.timestamp = timestamp
self.source_ip = source_ip
self.dest_ip = dest_ip
self.protocol = protocol
self.length = length

def __str__(self):
return f”{self.timestamp}: {self.source_ip} -> {self.dest_ip} [{self.protocol}] {self.length} bytes”

class NetworkMonitor:
def __init__(self, interface=’eth0′):
self.interface = interface
self.packets = []
self.protocol_counts = defaultdict(int)
self.total_bytes = 0

def start_monitoring(self):
print(f”Starting packet capture on {self.interface}…”)
with socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(3)) as sock:
while True:
raw_data, _ = sock.recvfrom(65535)
self.process_packet(raw_data)

def process_packet(self, raw_data):
# Unpack Ethernet header
eth_length = 14
eth_header = raw_data[:eth_length]
eth = struct.unpack(‘!6s6sH’, eth_header)

# Get protocol
protocol = socket.ntohs(eth[2])
if protocol == 8: # IP protocol
self.process_ip_packet(raw_data[eth_length:])

def process_ip_packet(self, ip_data):
# Unpack IP header
ip_header = ip_data[:20]
iph = struct.unpack(‘!BBHHHBBH4s4s’, ip_header)

version_ihl = iph[0]
ihl = version_ihl & 0xF
ip_header_length = ihl * 4

# Extract source and destination IP addresses
source_ip = socket.inet_ntoa(iph[8])
dest_ip = socket.inet_ntoa(iph[9])

# Extract protocol and packet length
protocol = iph[6]
packet_length = len(ip_data)

packet = Packet(time.time(), source_ip, dest_ip, protocol, packet_length)
self.packets.append(packet)
self.protocol_counts[protocol] += 1
self.total_bytes += packet_length

print(packet)

def analyze_traffic(self):
print(“\nTraffic Analysis Report:”)
print(f”Total Packets Captured: {len(self.packets)}”)
print(f”Total Bytes Transferred: {self.total_bytes} bytes”)
print(“Protocol Distribution:”)
for proto, count in self.protocol_counts.items():
print(f”Protocol {proto}: {count} packets”)

def visualize_traffic(self):
# Plotting protocol distribution
labels = list(self.protocol_counts.keys())
sizes = list(self.protocol_counts.values())

plt.figure(figsize=(10, 6))
plt.bar(labels, sizes, color=’skyblue’)
plt.title(‘Network Protocol Distribution’)
plt.xlabel(‘Protocol Number’)
plt.ylabel(‘Packet Count’)
plt.xticks(rotation=45)
plt.grid(axis=’y’)
plt.tight_layout()
plt.show()

if __name__ == “__main__”:
monitor = NetworkMonitor(interface=’eth0′) # Adjust interface as needed
try:
monitor.start_monitoring()
except KeyboardInterrupt:
monitor.analyze_traffic()
monitor.visualize_traffic()

Leave a Reply

Your email address will not be published. Required fields are marked *

0
    0
    Your Cart
    Your cart is emptyReturn to Shop
      Calculate Shipping
      Apply Coupon