Closed wrachwal closed 4 years ago
Thank you for the patch!
I have tested using extracted data from the following PCAPs, but I did it with Elixir :( I can paste the code here if you wish to..
Sure, that would be great!
I don't know how to do it nicely in Erlang, though. Sorry.
No problem. Some of the tests use pcap files for the test data. If the license allows it, these PCAP files could be added (maybe not, project seems to be licensed under GPL3). I'll see if I can get a similar packet capture.
Here you are. The test module, small Pkt
extracting records definitions from .hrl
to Elixir counterparts, and a simple stream (reading) from a PCAP file.
defmodule IPFragTest
use ExUnit.Case
require Pkt
@pcap_ipv4 "#{__DIR__}/ipv4-udp-fragmented.pcap" # frame (4..6) ~> 7
@pcap_ipv6 "#{__DIR__}/ipv6-udp-fragmented.pcap" # frame (3..5) ~> 6
# Test.Pcap.stream("test/ipv4-udp-fragmented.pcap") |> Enum.slice(3, 4) |> Enum.map(fn {p, d} -> :pkt.decode(p, d) end)
# Test.Pcap.stream("test/ipv6-udp-fragmented.pcap") |> Enum.slice(2, 4) |> Enum.map(fn {p, d} -> :pkt.decode(p, d) end)
defp decapsulate_pkt({proto, data}) do
pkt = :pkt.decapsulate(proto, data)
assert is_list(pkt)
pkt
end
defp decode_pkt({proto, data}) do
assert {:ok, {headers, payload}} = :pkt.decode(proto, data)
{headers, payload}
end
test "assemble ipv4 fragments" do
frags = Test.Pcap.stream(@pcap_ipv4) |> Enum.slice(3, 4)
assert [{:ipv4, _} | _] = frags
assert [[Pkt.ipv4(), Pkt.udp(), <<_::binary>>],
[Pkt.ipv4(), <<_::binary>>] | _] = Enum.map(frags, &decapsulate_pkt/1)
frags = Enum.map(frags, &decode_pkt/1)
assert [{[Pkt.ipv4() = ipv4,
Pkt.udp() = udp], _} | _] = frags # udp header in 1st fragment
payload = frags |> Enum.map(&elem(&1, 1)) |> Enum.reduce("", &Kernel.<>(&2, &1))
assert 8 + byte_size(payload) == Pkt.udp(udp, :ulen)
assert :pkt.makesum([ipv4, Pkt.udp(udp, sum: 0), payload]) == Pkt.udp(udp, :sum)
end
test "assemble ipv6 fragments" do
frags = Test.Pcap.stream(@pcap_ipv6) |> Enum.slice(2, 4)
assert [{:ipv6, _} | _] = frags
assert [[Pkt.ipv6(), Pkt.ipv6_fragment(), Pkt.udp(), <<_::binary>>],
[Pkt.ipv6(), Pkt.ipv6_fragment(), <<_::binary>>] | _] = Enum.map(frags, &decapsulate_pkt/1)
frags = Enum.map(frags, &decode_pkt/1)
assert [{[Pkt.ipv6() = ipv6,
Pkt.ipv6_fragment(),
Pkt.udp() = udp], _} | _] = frags # udp header in 1st fragment
payload = frags |> Enum.map(&elem(&1, 1)) |> Enum.reduce("", &Kernel.<>(&2, &1))
ipv6 = Pkt.ipv6(ipv6,
len: 8 + byte_size(payload), # udp header + total payload
next: Pkt.ipproto(:udp)) # change to udp (was: ipv6_fragment)
assert 8 + byte_size(payload) == Pkt.udp(udp, :ulen)
assert :pkt.makesum([ipv6, Pkt.udp(udp, sum: 0), payload]) == Pkt.udp(udp, :sum)
end
end
defmodule Pkt do
require Record
@macros [IPPROTO_TCP: 6, IPPROTO_NONE: 59]
@extract [
ipv4: "pkt_ipv4",
ipv6: "pkt_ipv6",
ipv6_fragment: "pkt_ipv6",
udp: "pkt_udp"
]
Enum.each(@extract, fn {rec, hdr} ->
Record.defrecord(rec, Record.extract(rec, from_lib: "pkt/include/#{hdr}.hrl", macros: @macros))
end)
end
defmodule Test.Pcap do
# https://wiki.wireshark.org/Development/LibpcapFileFormat
# http://www.tcpdump.org/linktypes.html
# https://en.wikipedia.org/wiki/EtherType
require Record
@linktype_ethernet 1
Record.defrecordp :state, [
device: nil,
network: nil,
snaplen: nil
]
def stream(file) do
Stream.resource(fn -> init_pcap(file) end, &next_pcap/1, &after_pcap/1)
end
defp init_pcap(file) do
device = File.open!(file, [:read, :binary])
state = state(device: device)
case IO.binread(device, 24) do
<<0xA1B2C3D4::native-32, # magic
_major::native-16,
_minor::native-16,
_thiszone::native-signed-32,
_sigfigs::native-32,
snaplen::native-32,
network::native-32>> ->
network == @linktype_ethernet
or raise("#{file}: unsupported linktype=#{network}, expected ethernet(#{@linktype_ethernet}})")
state(state, network: network, snaplen: snaplen)
end
end
defp next_pcap(state(device: device) = state) do
case IO.binread(device, 16) do
<<_ts_sec::native-32, _ts_usec::native-32, incl_len::native-32, orig_len::native-32>> ->
pad_len = incl_len - orig_len
<<ether::binary-size(orig_len), _::binary-size(pad_len)>> = IO.binread(device, incl_len)
<<_::binary-12, type::16, packet::binary>> = ether
proto = :pkt.ether_type(type) # e.g. 0x0800 ~> :ipv4, 0x86DD ~> :ipv6
{[{proto, packet}], state}
:eof ->
{:halt, state}
end
end
defp after_pcap(state(device: device)) do
File.close(device)
end
end
When long IP packet is fragmented, upper layer header is present in the first fragment, and the next fragments have successive pieces of the payload directly following
ipv4
oripv6_fragment
header.I have tested using extracted data from the following PCAPs, but I did it with Elixir :( I can paste the code here if you wish to.. I don't know how to do it nicely in Erlang, though. Sorry.