USNavalResearchLaboratory / norm

NACK-Oriented Reliable Multicast (NORM) implementation & tools (RFCs 5740, 5401)
https://www.nrl.navy.mil/Our-Work/Areas-of-Research/Information-Technology/NCS/NORM/
Other
96 stars 33 forks source link

normCast not support dir? #71

Open honglei opened 1 year ago

honglei commented 1 year ago

commond args: id 172042175 send E:\PythonPrj\testFiles100K addr 224.1.2.4/6003 txAddr 10.65.39.191/8002 segment 1400 block 128 parity 4 auto 1 rate 5000000 buffer 50000000 debug 3

stack:

    normCast.exe!NormCaster::EnqueueFileObject() Line 608   C++
    normCast.exe!NormCaster::SendFiles() Line 559   C++
    normCast.exe!main(int argc, char * * argv) Line 1578    C++

error:

normCast error: transmit file path ""  exceeds NORM segment size limit!
honglei commented 1 year ago

caused by the follwing line:

https://github.com/USNavalResearchLaboratory/norm/blob/b896f5408fc98b1355041609f0a5d23ca46de2c0/examples/normCast.cpp#L475

bebopagogo commented 1 year ago

Setting the tx_pending_path[0] = '\0' is used to indicate there is no pending transmit file. The TxFilePending() method is checked before EnqueueFileObject() is called when it returns false, EnqueueFileObject() should not be called. So I am not sure how this problem is occurring. I am investigating it further.

honglei commented 1 year ago

The C++17 std::filesystem provides much eacher way to iterate dir:

#include <iostream>
#include <string>
#include <filesystem> // C++ 17
#include "normApi.h"   

namespace fs = std::filesystem;

// find first normal file 
bool fileEnqueue(NormSessionHandle session, fs::recursive_directory_iterator& itEntry)
{
    while (itEntry != fs::recursive_directory_iterator()) {
        if (!itEntry->is_regular_file()) {
            ++itEntry;
            continue;
        };
        const auto filePathStr = itEntry->path().string(); //.filename().string();
        auto info = std::string(filePathStr);
        std::replace(info.begin(), info.end(), '\\', '/');
        NormObjectHandle objHandler = NormFileEnqueue(session, filePathStr.c_str(), info.c_str(), info.length());
        if (objHandler != NORM_OBJECT_INVALID) {
            std::cout << "push:" << filePathStr<<std::endl;
        };
        ++itEntry;
        return true;
    } 
    return false;
}

NormSessionHandle create_Session(NormInstanceHandle normInstance, fs::recursive_directory_iterator& itEntry, const char* addr, uint16_t port, const char* localAddr, NormNodeId localID) {
    auto session = NormCreateSession(normInstance, addr, port, localID);
    NormSetTxPort(session, 0, true, localAddr);
    NormSetTxOnly(session, true);
    NormSetCongestionControl(session, true);
    //NormSetTxRateBounds
    NormSessionId instanceId = NormGetRandomSessionId();

    NormStartSender(session, 1, 100 * 1024 * 1024, 1400, 128, 0);
    //auto& dirEntry : std::filesystem::recursive_directory_iterator(path)
    fileEnqueue(session, itEntry);
    return session;
}

int main()
{
    NormInstanceHandle normInstance = NormCreateInstance();
    NormSetDebugLevel(3);
    uint16_t port = 6003;
    int localID = 191;
    auto localAddr = "10.65.39.191";

    auto addr = "224.1.2.3";
    const char* dirPath = "E:\\PythonPrj\\sendFiles\\ch1"; //Change this to real dir
    auto itEntry = fs::recursive_directory_iterator(dirPath);

    auto session1 = create_Session(normInstance, itEntry, addr, port, localAddr, localID);

    auto addr2 = "224.1.2.4";
    const char* dirPath2 = "E:\\PythonPrj\\sendFiles\\ch2"; //Change this to real dir
    auto itEntry2 = fs::recursive_directory_iterator(dirPath2);

    auto session2 = create_Session(normInstance, itEntry2, addr2, port, localAddr, localID);

    char buffer[512];
    bool success = false;
    bool keepGoing = true;
    int count1 = 0;
    int count2 = 0;
    NormEvent theEvent =NormEvent();
    while (keepGoing)
    {

        auto session = theEvent.session;
        if (!NormGetNextEvent(normInstance, &theEvent)) continue;
        switch (theEvent.type)
        {
        case NORM_TX_QUEUE_EMPTY:
            if (theEvent.session == session1)
            {
                fileEnqueue(session, itEntry);
            }
            else if (theEvent.session == session2)
            {
                fileEnqueue(session, itEntry2);
            };
            break;

        case NORM_TX_OBJECT_PURGED:

            success = NormFileGetName(theEvent.object, buffer, sizeof(buffer));
            if (success) {
                fs::remove(fs::path(buffer) ); //remove file
            }
            else 
             {
                std::cout << "xxx"<<std::endl;
            }
            if (theEvent.session == session1) {
                ++count1;
            }
            else if (theEvent.session == session2)
            {
                ++count2;
            }
            std::cerr << "count1:" << count1 << " count2:" << count2 << std::endl;
            if (theEvent.session == session1)
            {
                fileEnqueue(session, itEntry);
            }
            else if (theEvent.session == session2)
            {
                fileEnqueue(session, itEntry2);
            };
            break;

        //case NORM_TX_FLUSH_COMPLETED:
        //    std::cerr << "normFileSend: NORM_TX_FLUSH_COMPLETED event ...\n";
        //    keepGoing = false;  // our file has been sent (we think)
        //    break;

        default:
            std::cerr <<"type:"<< theEvent.type<<std::endl;
        }  // end switch(theEvent.type)
    }  // end while (NormGetNextEvent())

    NormStopSender(session1);
    NormDestroySession(session1);
    NormStopSender(session2);
    NormDestroySession(session2);
    NormDestroyInstance(normInstance);
    //listFiles("E:\\PythonPrj\\NORM\\norm159\\examples");
    return 0;
}