[Python] 패킷 필터 도구 코드 해설

Posted by nkjok
2025. 5. 27. 12:31 낙서장[1]/94. Python
반응형

[소스코드]

import socket
import struct

def capture_packets(interface):
    sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
    sock.bind((interface, 0))

    while True:
        packet = sock.recvfrom(65565)[0]

        eth_length = 14
        eth_header = packet[:eth_length]
        eth = struct.unpack('!6s6sH', eth_header)
        eth_protocol = socket.ntohs(eth[2])

        if eth_protocol == 8:  # IP 패킷
            ip_header = packet[eth_length:eth_length+20]
            iph = struct.unpack('!BBHHHBBH4s4s', ip_header)
            version_ihl = iph[0]
            ihl = version_ihl & 0xF
            iph_length = ihl * 4
            protocol = iph[6]
            s_addr = socket.inet_ntoa(iph[8])
            d_addr = socket.inet_ntoa(iph[9])

            print(f'[IP] From: {s_addr} To: {d_addr} Protocol: {protocol}')

            if protocol == 6:  # TCP
                t = eth_length + iph_length
                tcp_header = packet[t:t+20]
                tcph = struct.unpack('!HHLLBBHHH', tcp_header)
                source_port = tcph[0]
                dest_port = tcph[1]
                sequence = tcph[2]
                acknowledgment = tcph[3]
                doff_reserved = tcph[4]
                tcph_length = doff_reserved >> 4

                print(f'[TCP] Src Port: {source_port} Dst Port: {dest_port} Seq: {sequence} Ack: {acknowledgment}')

                h_size = eth_length + iph_length + tcph_length * 4
                data = packet[h_size:]
                print(f'[Data] {data[:64]}...')  # 처음 64바이트만 출력

            elif protocol == 17:  # UDP
                u = eth_length + iph_length
                udp_header = packet[u:u+8]
                udph = struct.unpack('!HHHH', udp_header)
                source_port = udph[0]
                dest_port = udph[1]
                length = udph[2]

                print(f'[UDP] Src Port: {source_port} Dst Port: {dest_port} Length: {length}')

                h_size = eth_length + iph_length + 8
                data = packet[h_size:]
                print(f'[Data] {data[:64]}...')  # 처음 64바이트만 출력

if __name__ == "__main__":
    capture_packets("eth0")  # 실제 사용하는 인터페이스 이름으로 변경

 

 1. 모듈 임포트

import socket
import struct
  • socket: 네트워크 통신을 위한 저수준 소켓 API를 제공합니다.
  • struct: 바이트 데이터를 구조체로 변환하거나 그 반대로 변환할 때 사용합니다 (예: 패킷 헤더 파싱).

 2. 패킷 캡처 함수 정의

def capture_packets(interface):
  • interface: 예를 들어 "eth0" 또는 "wlan0" 같은 네트워크 인터페이스 이름을 인자로 받습니다.

 3. RAW 소켓 생성 및 바인딩

    sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
    sock.bind((interface, 0))
  • AF_PACKET: 이더넷 프레임을 직접 캡처할 수 있는 소켓 타입 (Linux 전용).
  • SOCK_RAW: 원시 소켓으로, 모든 패킷을 캡처할 수 있습니다.
  • 0x0003: 모든 프로토콜을 캡처하겠다는 의미 (ETH_P_ALL).
  • bind: 특정 인터페이스에 소켓을 바인딩합니다.

 4. 무한 루프에서 패킷 수신

    while True:
        packet = sock.recvfrom(65565)[0]
  • recvfrom: 최대 65565 바이트까지 수신.
  • [0]: 실제 패킷 데이터만 추출 (두 번째 값은 송신자 주소).

 5. 이더넷 헤더 파싱

        eth_length = 14
        eth_header = packet[:eth_length]
        eth = struct.unpack('!6s6sH', eth_header)
        eth_protocol = socket.ntohs(eth[2])
  • 이더넷 헤더는 14바이트: 목적지 MAC(6) + 출발지 MAC(6) + 프로토콜(2)
  • !6s6sH: 네트워크 바이트 순서로 6바이트 문자열 2개와 2바이트 정수
  • eth_protocol: 상위 계층 프로토콜 (예: 0x0800은 IP)

 6. IP 패킷인지 확인

        if eth_protocol == 8:  # IP 패킷
  • 0x0800은 IP 프로토콜을 의미합니다. 8은 ntohs(0x0800)의 결과입니다.

 7. IP 헤더 파싱

            ip_header = packet[eth_length:eth_length+20]
            iph = struct.unpack('!BBHHHBBH4s4s', ip_header)
  • IP 헤더는 최소 20바이트.
  • iph: IP 헤더 필드들을 튜플로 파싱.
            version_ihl = iph[0]
            ihl = version_ihl & 0xF
            iph_length = ihl * 4
            protocol = iph[6]
            s_addr = socket.inet_ntoa(iph[8])
            d_addr = socket.inet_ntoa(iph[9])
  • ihl: IP 헤더 길이 (단위: 4바이트)
  • protocol: 상위 계층 프로토콜 (TCP=6, UDP=17 등)
  • s_addr, d_addr: 출발지/목적지 IP 주소

 8. TCP 패킷 처리

            if protocol == 6:  # TCP
  • TCP 프로토콜이면...
                t = eth_length + iph_length
                tcp_header = packet[t:t+20]
                tcph = struct.unpack('!HHLLBBHHH', tcp_header)
  • TCP 헤더는 최소 20바이트.
  • tcph: TCP 헤더 필드들을 파싱.
                source_port = tcph[0]
                dest_port = tcph[1]
                sequence = tcph[2]
                acknowledgment = tcph[3]
                doff_reserved = tcph[4]
                tcph_length = doff_reserved >> 4
  • tcph_length: TCP 헤더 길이 (단위: 4바이트)
                h_size = eth_length + iph_length + tcph_length * 4
                data = packet[h_size:]
                print(f'[Data] {data[:64]}...')  # 처음 64바이트만 출력
  • 실제 데이터(payload)를 추출하고 처음 64바이트만 출력

 9. UDP 패킷 처리

            elif protocol == 17:  # UDP
  • UDP 프로토콜이면...
                u = eth_length + iph_length
                udp_header = packet[u:u+8]
                udph = struct.unpack('!HHHH', udp_header)
  • UDP 헤더는 8바이트
                source_port = udph[0]
                dest_port = udph[1]
                length = udph[2]
  • 포트 번호와 길이 출력
                h_size = eth_length + iph_length + 8
                data = packet[h_size:]
                print(f'[Data] {data[:64]}...')  # 처음 64바이트만 출력
  • UDP 데이터도 처음 64바이트만 출력

 요약

이 코드는 다음을 수행합니다:

  1. 네트워크 인터페이스에서 모든 패킷을 캡처
  2. 이더넷 → IP → TCP/UDP 헤더를 파싱
  3. 출발지/목적지 IP, 포트, 데이터 일부를 출력
    출력결과

 

반응형

'낙서장[1] > 94. Python' 카테고리의 다른 글

[Python] 함수 def 사용방법  (0) 2025.02.09