Notice
Recent Posts
Recent Comments
Link
«   2026/05   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31
Tags
more
Archives
Today
Total
관리 메뉴

yesolje

Scapy를 활용한 실시간 TCP 패킷 수집과 Kafka‑DB 데이터 파이프라인 구축 본문

기술

Scapy를 활용한 실시간 TCP 패킷 수집과 Kafka‑DB 데이터 파이프라인 구축

yesolje 2025. 7. 24. 14:50

들어가며

네트워크 트래픽 분석은 사이버 보안과 디지털 포렌식에 있어 필수적입니다. 특히 다크웹 활동을 추적하고 분석하는 과정에서 TCP 패킷에 포함된 페이로드와 raw 데이터를 수집하는 것은 불법 활동의 패턴을 파악하는 데 중요한 역할을 합니다.

이 프로젝트에서는 회사의 다크웹 추적 애플리케이션 개발의 일환으로, Python의 Scapy 라이브러리를 활용하여 다크웹 네트워크 요청에서 발생하는 TCP 패킷을 수집하고, Kafka를 통해 데이터를 전송하며, 최종적으로 데이터베이스에 저장하는 파이프라인을 구축하였습니다

아키텍쳐 설계

전체 시스템은 다음과 같은 세 가지 주요 컴포넌트로 구성되어 있습니다 :

  • 데이터 수집 레이어: Scapy를 활용한 TCP 패킷 스니핑 및 Queue를 통한 비동기 처리
  • 데이터 전송 레이어: Kafka Producer를 통한 메시지 큐잉 시스템
  • 데이터 저장 레이어: Kafka Consumer와 데이터베이스 연동

시나리오는 다음과 같은 단계로 진행됩니다:

  1. 사용자가 웹 서버에 HTTP 요청을 보냄
  2. Scapy 스니퍼가 이 트래픽을 포착
  3. 스니퍼는 패킷을 큐에 넣음
  4. 별도의 스레드가 Kafka를 통해 원격 브로커에 메시지 전송
  5. KafkaConsumer가 이 메시지를 읽어 데이터베이스에 적재

이러한 아키텍처는 다음과 같은 이점을 제공합니다:

  • 실시간 데이터 처리
  • 확장성 및 견고성
  • 컴포넌트 간 느슨한 결합(Loose Coupling)
  • 장애 복구 기능

데이터베이스 스키마

패킷 수집 시스템에서 캡처된 데이터는 다음과 같은 스키마로 데이터베이스에 저장됩니다:

컬럼명 데이터타입 설명
id INT(PK) 고유 식별자(자동 증가)
src_ip VARCHAR(45) 출발지 IP 주소
src_port INT 출발지 포트 번호
dst_ip VARCHAR(45) 목적지 IP 주소
dst_port INT 목적지 포트 번호
capture_time DATETIME(3) 패킷 캡처 시간 (밀리초 포함)
packet_size INT 패킷 페이로드 크기 (바이트)
collector_ip VARCHAR(45) 수집기 IP 주소
created_at TIMESTAMP DB 저장 시간 (기본값: CURRENT_TIMESTAMP)

데이터 수집 : Scapy 와 Queue 를 이용한 TCP 패킷 스니핑

Scapy는 강력한 패킷 조작 및 분석 도구로, 이를 활용하여 네트워크 인터페이스에서 TCP 패킷을 실시간으로 캡처합니다. 주요 구현 포인트는 다음과 같습니다:

from scapy.all import sniff, IP, TCP
from datetime import datetime
from queue import Queue, Empty
import threading

# 큐 설정 (최대 20,000개 패킷 저장)
packet_queue = Queue(maxsize=20000)

# 패킷 처리 함수
def packet_handler(packet):
    if packet.haslayer(IP) and packet.haslayer(TCP):
        ip_layer = packet[IP]
        tcp_layer = packet[TCP]

        # 페이로드가 있는 패킷만 처리
        if len(tcp_layer.payload) > 0:
            data = {
                "src_ip": ip_layer.src,
                "src_port": tcp_layer.sport,
                "dst_ip": ip_layer.dst,
                "dst_port": tcp_layer.dport,
                "capture_time": datetime.fromtimestamp(packet.time).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
                "packet_size": len(tcp_layer.payload)
            }
            try:
                packet_queue.put_nowait(data)
            except:
                # 큐가 가득 찬 경우, 오래된 데이터를 파일로 저장하고 새 데이터 추가
                try:
                    old_data = packet_queue.get_nowait()
                    save_overflow_data(old_data)
                    packet_queue.put_nowait(data)
                except:
                    # 그래도 안되면 새 데이터를 파일로 저장
                    save_overflow_data(data)

def main():
    global running

    # 스레드 시작
    processing_thread = threading.Thread(target=process_packets, daemon=True)
    processing_thread.start()

    try:
        # 패킷 캡처 시작
        sniff(iface=IFACE,
        filter="tcp dst port 8000 and tcp[13] & 8 != 0",
        prn=packet_handler,
        store=0)

위 코드에서는 멀티스레딩과 큐를 활용하여 패킷 캡처와 처리를 분리함으로써 성능 병목 현상을 방지했습니다. 특히 maxsize=20000으로 큐 크기를 제한하여 메모리 사용량을 제어했습니다.

데이터 전송 : kafka Producer 구현

캡처된 패킷 데이터는 Kafka Producer를 통해 메시지 큐로 전송됩니다. 이는 시스템의 확장성과 안정성을 보장합니다.

from kafka import KafkaProducer
import json
import threading
from queue import Queue, Empty

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def kafka_sender():
    global running
    while running or not packet_queue.empty():
        try:
            data = packet_queue.get(timeout=1)

            try:
                producer.send(TOPIC_NAME, data)
            except Exception as e:
                print(f"Kafka 전송 실패: {e}")
                save_error_backup(data)

            packet_queue.task_done()
        except Empty:
            continue

이 구현에서는 다음과 같은 최적화 기법을 적용했습니다:

  • 비동기 전송: packet_queue.get(timeout=1)으로 큐에서 데이터를 가져와 비동기적으로 처리
  • 예외 처리: Kafka 전송 실패 시 save_error_backup() 함수로 데이터 백업
  • 타임아웃 적용: Empty 예외 처리를 통해 큐가 비어있을 때 불필요한 CPU 사용 방지

데이터 수신 및 저장 : kafka Consumer 와 DB Insert

Kafka Consumer는 메시지 큐에서 데이터를 가져와 데이터베이스에 저장합니다.

from kafka import KafkaConsumer
import json
import pymysql

# Kafka Consumer 설정
consumer = KafkaConsumer(
    'packet-logs',
    bootstrap_servers='localhost:9092',
    group_id='packet-log-consumer-group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='latest',
    enable_auto_commit=True
)

# MariaDB 연결
db_conn = pymysql.connect(
    host='localhost',
    user='collect_user',
    password='1234',
    database='collector_db'
)
cursor = db_conn.cursor()

try:
    print(" Kafka Consumer 대기중  ")
    for message in consumer:
        data = message.value
        print("받은 메시지:", data)

        sql = """
            INSERT INTO packet_logs
            (src_ip, src_port, dst_ip, dst_port, capture_time, packet_size, collector_ip)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
        """

        try:
            cursor.execute(sql, (
                data['src_ip'],
                data['src_port'],
                data['dst_ip'],
                data['dst_port'],
                data['capture_time'],
                data['packet_size'],
                data['collector_ip']
            ))
            db_conn.commit()
            print("✅ DB 저장 완료")
        except Exception as e:
            print("❌ DB 저장 실패:", e)

finally:
    cursor.close()
    db_conn.close()
    consumer.close()

코드를 보면 다음과 같은 구현 특징을 확인할 수 있습니다:

  • 그룹 ID('packet-log-consumer-group')를 통한 메시지 소비 관리
  • 'latest' 오프셋 설정으로 최신 메시지부터 소비
  • 자동 커밋 기능 활성화를 통한 처리 간소화
  • try-finally 구조를 통한 DB 연결 및 리소스 안전한 해제

시스템 성능 검증: tcpdump를 통한 패킷 손실 테스트

구현한 시스템의 실제 성능과 패킷 손실 여부를 검증하기 위해 tcpdump를 병행하여 패킷 수집을 진행했습니다. 이는 Scapy 기반 수집기가 모든 패킷을 성공적으로 캡처하는지 확인하기 위한 중요한 검증 단계였습니다.

테스트 방법론

JMeter를 사용하여 다양한 부하 시나리오에서 테스트를 진행했습니다:

  • 낮은 부하: 10개의 요청을 10초 동안 발생
  • 중간 부하: 100개의 요청을 10초 동안 발생
  • 높은 부하: 1,000개의 요청을 5초 동안 발생
  • 극한 부하: 10,000개의 요청을 5초 동안 발생

각 시나리오에서 다음과 같은 방식으로 검증을 수행했습니다:

# tcpdump로 패킷 캡처 및 카운트
sudo tcpdump -i any 'tcp dst port 8000 and tcp[13] & 8 != 0' -w /tmp/test_dump.pcap

# 캡처된 패킷 수 확인
tcpdump -r /tmp/test_dump.pcap | wc -l

# 동시에 Scapy 기반 수집기 실행
python3 sniffer_producer_v3.py

테스트 결과

모든 부하 시나리오에서 Scapy 기반 수집기와 tcpdump의 패킷 수집 결과가 일치했습니다. 다음은 각 시나리오별 결과입니다:

부하 시나리오 tcpdump 패킷수 scapy 패킷수 일치율
낮은 부하(10개/10초) 87 87 100%
중간 부하 (100개/10초) 4427 4427 100%
높은 부하 (1,000개/5초) 2277 2277 100%
극한 부하 (10,000개/5초) 3495 3495 100%

분석 및 결론

이 검증 과정을 통해 다음과 같은 결론을 도출할 수 있었습니다:

  • 구현된 Scapy 기반 패킷 수집기는 초당 2,000개 이상의 패킷도 손실 없이 처리 가능함
  • Queue와 멀티스레딩 구현 방식이 실시간 패킷 처리에 적합함

이러한 검증 결과는 프로덕션 환경에서도 안정적인 시스템 운영이 가능함을 보여주며, 향후 더 높은 부하 상황에서도 확장 가능한 아키텍처임을 입증합니다.

Trouble Shooting

프로젝트 진행 중 다음과 같은 문제에 직면했고 해결했습니다:

패킷 손실 방지를 위한 큐 최적화

초기 구현 시 큐 오버플로우를 대비하여 큐 크기를 maxsize=20000으로 설정하고, 큐 오버플로우시 전송하지 못한 데이터를 json 파일로 저장하는 백업 매커니즘을 구현했습니다.

# 문제: 패킷 손실 발생
# 원인: 단일 쓰레드에서 패킷 캡처와 처리를 동시에 수행

# 큐 오버플로우 시 파일에 저장
def save_overflow_data(data):
    timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
    filename = f"packet_overflow _err_bk_{timestamp}.json"

    try:
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        print(f"큐 오버플로우 데이터 저장: {filename}")
    except Exception as e:
        print(f"오버플로우 파일 저장 실패: {e}")

Kafka 연결 장애 대응

Kafka 브로커 연결 실패나 메시지 전송 실패 시 데이터 손실을 방지하기 위한 백업 메커니즘을 구현했습니다.

def save_error_backup(data):
    try:
        with open('kafka_error_backup.json', 'a') as f:
            f.write(json.dumps(data) + '\n')
    except Exception as e:
        print(f"백업 저장 실패: {e}")

메모리 누수 해결

장시간 실행 시 메모리 사용량이 지속적으로 증가하는 문제가 발생했습니다. 이는 대량의 패킷 객체가 적절히 해제되지 않아 발생한 문제였습니다.

# 문제: 메모리 사용량 지속적 증가
# 원인: 패킷 객체가 메모리에 계속 남아있음

# 해결책: 명시적 객체 해제와 가비지 컬렉션 수행
import gc

def gc_worker():
    global running
    while running:
        time.sleep(30)  # 30초마다 GC 실행
        collected = gc.collect()
        print(f"GC 실행: {collected}개 객체 정리")

def main():
    global running

    print(f"패킷 스니퍼 시작 - 포트: {TARGET_PORT}, IP: {collector_ip}")

    # 스레드 시작
    kafka_thread = threading.Thread(target=kafka_sender, daemon=True)
    gc_thread = threading.Thread(target=gc_worker, daemon=True)

    kafka_thread.start()
    gc_thread.start()

회고와 향후 과제

이 프로젝트를 통해 실시간 네트워크 트래픽 수집 및 처리 파이프라인을 성공적으로 구축했습니다. 주요 성과는 다음과 같습니다:

  • 실시간 처리 시스템 구현: tcpdump 테스트 결과와 같이 초당 최대 2,000개 이상의 패킷을 손실 없이 처리하는 안정적인 시스템 구축
  • 자원 효율성 달성: Queue와 멀티스레딩 아키텍처, 주기적 GC 실행을 통해 메모리 사용량을 일정하게 유지하며 장기간 실행 가능한 구조 설계
  • 장애 대응 메커니즘: 큐 오버플로우와 Kafka 연결 장애 시 JSON 파일 백업 시스템 등을 통해 데이터 손실을 최소화하는 견고한 아키텍처 구축

향후 개선할 수 있는 부분은 다음과 같습니다:

  • 모니터링 및 알림 시스템: kafka 및 수집기 서버의 용량 이슈를 비롯한 이상 징후 발생 시 알림 제공
  • 패킷 타임스탬프 개선: 현재 사용 중인 capture_time은 Scapy가 패킷을 캡처한 시간이지만, 이를 패킷 내부에 포함된 실제 전송 시각으로 대체하여 더 정확한 네트워크 활동 타임라인 분석 가능

이 프로젝트는 대규모 네트워크 환경에서 실시간 데이터 처리의 중요성과 도전 과제를 명확히 보여주었으며, 향후 이와 유사한 시스템을 구축할 때 귀중한 경험이 될 것입니다.