openlvc / disco

Java library for Distributed Interactive Simulation
11 stars 3 forks source link

Add support for replaying DIS data from Wireshark recordings #72

Open michaelrfraser opened 2 weeks ago

michaelrfraser commented 2 weeks ago

This request is to expand Disco's distributor application by supporting replay from wireshark recording files.

Wireshark is the defacto standard tool for network troubleshooting and has a low barrier to entry. As such we receive quite a lot of wireshark network recordings as attachments to support issues, which at present we cannot replay and analyze through our disco toolchain.

I've attached a reference implementation that uses the Sombrero library to parse a pcap file and dissect packets on the UDP level. The datagram payload is then converted to a PDU through Disco's PDU factory and sent to the network through Disco's DisApplication class.

References:

michaelrfraser commented 2 weeks ago

Note: Cannot attach java files to issues, so will list the reference impl code instead

michaelrfraser commented 2 weeks ago

Reference implementation is as follows (requires Disco and Sombrero)

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;

import org.openlvc.disco.DiscoException;
import org.openlvc.disco.PduFactory;
import org.openlvc.disco.application.DisApplication;
import org.openlvc.disco.configuration.DiscoConfiguration;
import org.openlvc.disco.configuration.Flag;
import org.openlvc.disco.pdu.PDU;
import org.openlvc.sombrero.PcapException;
import org.openlvc.sombrero.block.EnhancedPacketBlock;
import org.openlvc.sombrero.block.IPcapBlock;
import org.openlvc.sombrero.interpreter.PacketInterpreter;
import org.openlvc.sombrero.reader.IPcapReader;

/**
 * An example of how to replay PDUs stored in a PCAP recording.
 * <p/>
 * Specify the file to replay PDUs from with the <code>--file</code> command line argument.
 * <p/>
 * You can optionally specify the port number that DIS traffic was recorded on through the 
 * <code>--dis.port</code> command line argument.
 */
public class PduReplayer
{
    //----------------------------------------------------------
    //                    STATIC VARIABLES
    //----------------------------------------------------------
    private static final int DEFAULT_DISPORT = 3000;

    //----------------------------------------------------------
    //                   INSTANCE VARIABLES
    //----------------------------------------------------------
    private int disPort;
    private File replayFile;

    private String[] args;

    //----------------------------------------------------------
    //                      CONSTRUCTORS
    //----------------------------------------------------------
    public PduReplayer( String[] args )
    {
        this.args = args;
        this.disPort = DEFAULT_DISPORT;

        this.replayFile = null;
    }

    //----------------------------------------------------------
    //                    INSTANCE METHODS
    //----------------------------------------------------------
    public void runMain()
    {
        // Apply command line configuration values
        this.applyCommandLine();

        // Construct a DisApplication to manage the replay through
        DiscoConfiguration discoConfig = new DiscoConfiguration();
        DisApplication disAppl = new DisApplication( discoConfig );
        disAppl.start();

        // Open the file
        System.out.println( "Opening file "+this.replayFile.getAbsolutePath() );
        try( FileInputStream in = new FileInputStream(this.replayFile) )
        {
            // These two values provide the baseline values for conversions between current 
            // wallclock time and the time the PDUs were recorded 
            Instant baseReplayTime = null;
            Instant baseSimTime = null;

            // Construct the PcapDisStreamer which sequentially reads through a PCAP file and
            // finds PDU records
            PcapDisStreamer disStreamer = new PcapDisStreamer( in, this.disPort );
            TimestampedPdu nextRecord = disStreamer.nextPdu();
            while( nextRecord != null )
            {
                // Get the next PDU in the pcap file
                PDU pdu = nextRecord.pdu();

                Instant now = Instant.now();
                Instant nextSimTime = nextRecord.timestamp();
                if( baseSimTime != null )
                {
                    // Wait appropriate amount of time before sending
                    Duration deltaSim = Duration.between( baseSimTime, nextSimTime );
                    Instant targetReplayTime = baseReplayTime.plus( deltaSim );

                    Duration deltaReplay = Duration.between( now, targetReplayTime );
                    if( deltaReplay.isPositive() )
                        Thread.sleep( deltaReplay );
                }
                else
                {
                    // This is the first record to be played, so use its time stamp as our baseline
                    baseReplayTime = now;
                    baseSimTime = nextSimTime;
                }

                // Send the PDU to the network
                System.out.println( "Sending "+pdu );
                disAppl.sendRaw( pdu );

                // Get the next PDU to process
                nextRecord = disStreamer.nextPdu();
            }
        }
        catch( Exception e )
        {
            e.printStackTrace();
        }

        disAppl.stop();
    }

    private void applyCommandLine()
    {
        for( int i = 0; i < args.length; i++ )
        {
            if( args[i].equalsIgnoreCase("--dis.port") )
                this.disPort = Integer.parseInt( args[++i] );
            else if( args[i].equalsIgnoreCase("--file") )
                this.replayFile = new File( args[++i] );
            else
                throw new IllegalArgumentException( "Unknown argument: "+args[i] );
        }

        if( this.replayFile == null )
            throw new IllegalStateException( "Expected value for argument --file" );
    }

    ////////////////////////////////////////////////////////////////////////////////////////////
    /////////////////////////////// Accessor and Mutator Methods ///////////////////////////////
    ////////////////////////////////////////////////////////////////////////////////////////////

    //----------------------------------------------------------
    //                     STATIC METHODS
    //----------------------------------------------------------
    public static void main( String[] args )
    {
        // Allow pass-through for PDUs that disco doesn't know how to process 
        DiscoConfiguration.set( Flag.Unparsed );

        new PduReplayer( args ).runMain();
    }

    /**
     * A Timestamp/PDU tuple
     */
    private record TimestampedPdu( Instant timestamp, PDU pdu ) {}

    /**
     * An indirection layer for managing references between parent code and lambdas.
     * <p/>
     * Starts with a <code>null</code> value, which can be subsequently set by calling 
     * {@link #set(Object)}.
     * 
     * @param <T> the type of object to hold
     */
    private class Holder<T>
    {
        private T value;

        public Holder()
        {
            this.value = null;
        }

        public boolean hasValue() { return this.value != null; }
        public void set( T value ) { this.value = value; }
        public T get() { return this.value; }
    }

    /**
     * Helper class for reading DIS PDUs sequentially from a PCAP file
     */
    private class PcapDisStreamer
    {
        private IPcapReader parser;
        private int disPort;
        private PduFactory pduFactory;

        public PcapDisStreamer( InputStream in, int disPort ) throws IOException, PcapException
        {
            this.parser = IPcapReader.createFor( in );
            this.disPort = disPort;
            this.pduFactory = new PduFactory();
        }

        /**
         * @return the next {@link PDU} in the stream, along with the time stamp it was recorded
         * 
         * @throws IOException if there was an error reading the stream
         * @throws PcapException if the file has invalid PCAP data
         * @throws DiscoException if there was an error constructing a PDU from the PCAP file data 
         */
        public TimestampedPdu nextPdu() throws IOException, PcapException, DiscoException
        {
            Holder<TimestampedPdu> collected = new Holder<>();

            IPcapBlock nextBlock = parser.nextBlock();
            while( nextBlock != null )
            {
                // Look out for EnhancedPacketBlocks
                if( nextBlock instanceof EnhancedPacketBlock packet )
                {
                    Instant timestamp = packet.getTimestamp();

                    // Construct packet interpreter with a UDP processor hook
                    PacketInterpreter interpreter = new PacketInterpreter();
                    interpreter.onUdp( (udpFrame) -> {

                        // Only process packets with the configured destination port
                        if( udpFrame.getDestPort() == disPort )
                        {
                            try
                            {
                                // If we can successfully construct a PDU from the UDP data, then
                                // set the collected reference accordingly
                                PDU pdu = pduFactory.create( udpFrame.getData() );
                                collected.set( new TimestampedPdu(timestamp, pdu) );
                            }
                            catch( Exception e )
                            {
                                throw new DiscoException( e );
                            }
                        }
                    } );

                    // Submit the packet for processing
                    interpreter.process( packet );
                }

                // If we managed to collect a value in this iteration, then we can break out
                // of the loop
                if( collected.hasValue() )
                    break;

                // Otherwise try the next block
                nextBlock = parser.nextBlock();
            }

            // Return the collected PDU, or null if we got the end of the file
            return collected.get();
        }
    }
}