from scapy.all import sniff
import threading
def packet_callback(packet):
if CoAP in packet:
print("packet captured:", packet.summary())
def start_sniff():
# Here, specify the loopback interface for your OS; e.g., 'lo0' for macOS, 'lo' for Linux
sniff(prn=packet_callback, iface='lo0', store=False)
if __name__ == "__main__":
sniff_thread = threading.Thread(target=start_sniff)
sniff_thread.start()
sniff_thread.join() # 이 라인을 추가하여 스레드가 종료될 때까지 기다립니다.
위 코드처럼 localhost에서 지나가는 모든 패킷을 검출하려고 시도하였고 성공적으로 검출되고 있음
아래는 패킷 생성 코드
import asyncio
import aiocoap.resource as resource
import aiocoap
class BasicResource(resource.Resource):
async def render_post(self, request):
# 요청 받은 데이터 출력
print('POST payload: %s' % request.payload.decode('utf-8'))
# 요청에 대한 응답 메시지
return aiocoap.Message(code=aiocoap.CONTENT, payload=b"Received")
async def init_coap_server():
# 리소스 설정
root = resource.Site()
root.add_resource(['resource'], BasicResource())
# CoAP 서버 컨텍스트 생성
context = await aiocoap.Context.create_server_context(root, bind=('localhost', 5683))
print("CoAP server is running")
await asyncio.get_event_loop().create_future() # 서버가 계속 실행되도록 유지
async def coap_client():
context = await aiocoap.Context.create_client_context()
await asyncio.sleep(1) # 서버가 준비될 때까지 기다림
while True:
# CoAP 메시지 생성 (POST 요청)
request = aiocoap.Message(code=aiocoap.POST, payload=b"My data", uri="coap://localhost/resource")
# 요청을 서버에 보내고 응답을 기다림
response = await context.request(request).response
print('Client received: %s' % response.payload.decode('utf-8'))
await asyncio.sleep(1) # 다음 요청 전에 잠시 대기
async def main():
# 서버와 클라이언트를 동시에 실행
server = asyncio.create_task(init_coap_server())
client = asyncio.create_task(coap_client())
await asyncio.gather(server, client) # 두 태스크가 완료될 때까지 기다림
if __name__ == "__main__":
asyncio.run(main())
하지만 sniffer에서 시도 시 'find CoAP Packet' 문이 출력되지 않고있음
위 코드처럼 localhost에서 지나가는 모든 패킷을 검출하려고 시도하였고 성공적으로 검출되고 있음
아래는 패킷 생성 코드
하지만 sniffer에서 시도 시 'find CoAP Packet' 문이 출력되지 않고있음