Wave-Net / wavenet-backend

packet capture logic using scapy library
0 stars 0 forks source link

sniff 스레드 종료시키기 #39

Closed zhy2on closed 4 months ago

zhy2on commented 4 months ago

현재 scapy의 sniff 함수가 동기 함수이기 때문에 이를 비동기로 실행시키기 위해 스레드를 사용해서 실행중임.

filter_expr = f"ip and (ip src {self.iot.ip} or ip dst {self.iot.ip})"
sniff(prn=self._packet_callback,
              filter=filter_expr,
              stop_filter=lambda p: self.stop_event.is_set(),
              store=False)

이런식으로 IoT의 ip에 맞는 패킷들만 sniff를 하도록 filter를 사용해주고, stop_filter를 지정해서

def stop(self):
        if self.thread:
            self.reset()
            self.stop_event.set()
            print('끝내자')
            self.thread.join()
            print('스레드 끝')
            self.thread = None
            self.stop_event.clear()

self.stop_event.set(), self.thread.join()으로 sniff함수를 종료시키고, 스레드가 잘 종료되게 한 다음 thread.join 을 했었음.

이 때 문제 발생.

만약 filter를 만족시키는 패킷 전송이 아예 없는 경우, sniff 함수가 실행 되지도 않고 == blocking 상태이고 그래서 stop_filter 조건을 만족시켜도 sniff 함수가 종료도 안 되고 스레드가 종료도 안 됨. 왜냐면 sniff 함수가 실행조차 안 된 상태여서 stop_filter를 확인하는 로직도 실행이 안 됨.

결국 thread.join에서 blocking 발생.

이걸 어떻게 할 거냐~~ 로직 수정 필요

zhy2on commented 4 months ago

해결!!!

def _sniff(self, websocket, loop):
        filter_expr = f"ip and (ip src {self.iot.ip} or ip dst {self.iot.ip})"
        while not self.stop_event.is_set():
            sniff(prn=lambda packet: self._packet_callback(packet, websocket, loop),
                  filter=filter_expr,
                  timeout=1, store=False)

콜백함수를 안 쓸 이유가 없었다 그냥 콜백함수를 쓴 상태에서 타임아웃을 걸어두고 while 루프를 돌면 되는 거였던 것!

그리고 콜백 함수에 websocket, 메인스레드의 event loop를 전달해주고 asyncio.run_coroutine_threadsafe로 websocket.send를 해주면 된다!

def _send_packet_info(self, packet_info, websocket, loop):
        try:
            asyncio.run_coroutine_threadsafe(
                websocket.send(json.dumps(packet_info)), loop)
        except Exception as e:
            print(f"Error sending packet info: {e}")

def _packet_callback(self, packet, websocket, loop):
        handler = self._get_packet_handler(packet)
        if handler is None:
            return
        self._update_stat_info(
            packet[IP].src, packet[IP].dst, packet[MQTT].len)
        packet_info = self._make_packet_info(handler, packet)
        self._send_packet_info(packet_info, websocket, loop)

이전 삽질

다시 원점

아 클로드 믿지 말기;;;; 내가 원래 생각하던 게 맞았다. stop_filter 상관 없이 timeout 시간이 지나면 sniff가 강제 종료 된다. 대체 해결법이 뭘까

그냥 직접 loop 만들기

def _sniff(self):
        filter_expr = f"ip and (ip src {self.iot.ip} or ip dst {self.iot.ip})"
        while not self.stop_event.is_set():
            packets = sniff(count=1, filter=filter_expr,
                            timeout=0.1)
            if self.stop_event.is_set():
                break
            for packet in packets:
                self._packet_callback(packet)

sniff 함수에 timeout 옵션을 사용하는 대신, 직접 패킷 캡처 루프를 구현. 이 방식은 이전에 언급했던 와이어샤크의 접근 방식과 유사. 이러면 무한 블락킹 문제 해결 된다..~~


이전 삽질

결론 timeout 사용하기 -> X 클로드 믿지 말기

애초에 필터를 지정하고 뭐고 다 떠나서 sniff에서 패킷이동이 없을 때 blocking이 되는 상황을 어떻게 처리할지가 문제였음

결론 == timeout을 이용한다! ---> NONONONONONO 아님 timeout 설정하면 stop_filter 상관 없이 강제 종료 된다. 이럼 안 됨

해결과정

처음에 클로드가 timeout 사용하라 했을 땐 timeout을 설정하면 stop_filter가 false인 상태여도 타임아웃 시간을 넘기는 상태로 블락킹이 지속되면 무조건 종료가 되는 줄 알았음.

image image image image

근데 타임아웃을 걸어도 stop_filter가 false이면 계속 실행된 채로 있는 거고 stop_filter가 true이면 종료되는 거였다! 스레드를 강제종료 시키는 게 아니라 무한 블락킹 상태가 될 수 있는 sniff를 timeout을 걸어서 stop_filter가 true인 경우에는 무한 블락킹에서 빠져 나올 수 있게 해줘야 하는 거였음. 그럼 스레드가 안전하게 종료되고 강제 종료를 할 필요도 없음.

image image