paullouisageneau / libdatachannel

C/C++ WebRTC network library featuring Data Channels, Media Transport, and WebSockets
https://libdatachannel.org/
Mozilla Public License 2.0
1.77k stars 360 forks source link

Receive video in "streamer" #706

Open sym19991125 opened 2 years ago

sym19991125 commented 2 years ago

When I use the "streamer" project, after I establish a P2P connection with others, I want to receive video streams sent by others on the C++ side, what should I do?

sym19991125 commented 2 years ago

Hello author, I also want to ask, when the Android side uses datachannel to send me data, how can I receive data that is not of type string? The Android side uses buffer to send and receive data. How can I receive the byte array or jpg or txt file sent by them?

paullouisageneau commented 2 years ago

When I use the "streamer" project, after I establish a P2P connection with others, I want to receive video streams sent by others on the C++ side, what should I do?

You may get the track on the receiving side with the onTrack callback, then the onMessage callback of the track will allow you to receive the RTP stream as binary packets. You should first set a RtcpReceivingSession to handle RTCP for you (see the media-receiver example).

Hello author, I also want to ask, when the Android side uses datachannel to send me data, how can I receive data that is not of type string? The Android side uses buffer to send and receive data. How can I receive the byte array or jpg or txt file sent by them?

The onMessage callback of rtc::DataChannel allows to receive binary data:

dc->onMessage([](std::variant<rtc::binary, rtc::string> message) {
    if (std::holds_alternative<rtc::binary>(message)) {
        // Get the binary data as a vector of std::bytes with std::get<rtc::binary>(message)
    } else {
       // It's a string
    }
});
sym19991125 commented 2 years ago

Thank you very much for your answer, I still have some questions for you. Is the onTrack function a function of PeerConnection? Can I achieve the function of receiving video stream according to my code below? 1

When using datachannel to receive data, I did the experiment according to your method, and the cout output cannot be achieved by the following two methods. What is the reason? The Android side uses the binary method to send byte[]. 2 3

paullouisageneau commented 2 years ago

Is the onTrack function a function of PeerConnection?

onTrack is a method of rtc::PeerConnection to which you need to pass a callback to get the tracks created remotely, for instance:

pc->onTrack([](shared_ptr<Track> track) {
    track->setMediaHandler([...]);
    track->onMessage([](auto message) {
        auto packet = std::get<rtc::binary>(message);
        // do something
    });
    myTrack = track; // You must store the track shared_ptr somewhere
});

When using datachannel to receive data, I did the experiment according to your method, and the cout output cannot be achieved by the following two methods. What is the reason? The Android side uses the binary method to send byte[].

You have to create the datachannel on one side and get it with the onDataChannel callback on the other side, is this what you do? In your code, it looks like the shared pointer dc is a local variable, if so the datachannel will be closed immediately when leaving the context. Also, please post complete code as actual text rather than snapshots.

sym19991125 commented 2 years ago

You have to create the datachannel on one side and get it with the onDataChannel callback on the other side, is this what you do? In your code, it looks like the shared pointer dc is a local variable, if so the datachannel will be closed immediately when leaving the context. Also, please post complete code as actual text rather than snapshots.

Here is my modification on your "streamer" project. I use dc->onMessage directly in createPeerConnection, nothing happens, dc->onOpen and dc->onClose can work. In the following, I also tried to use dc->onMessage in pc->onDataChannel, but in onDataChannel the dc channel was closed directly, and dc->onMessage did not respond. Why is this? Here is my entire code:

#include "nlohmann/json.hpp"

#include "h264fileparser.hpp"
#include "opusfileparser.hpp"
#include "helpers.hpp"
#include "ArgParser.hpp"

using namespace rtc;
using namespace std;
using namespace std::chrono_literals;

using json = nlohmann::json;

template <class T> weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }

/// all connected clients
unordered_map<string, shared_ptr<Client>> clients{};

/// Creates peer connection and client representation
/// @param config Configuration
/// @param wws Websocket for signaling
/// @param id Client ID
/// @returns Client
shared_ptr<Client> createPeerConnection(const Configuration &config,
                                        weak_ptr<WebSocket> wws,
                                        string id);

/// Creates stream
/// @param h264Samples Directory with H264 samples
/// @param fps Video FPS
/// @param opusSamples Directory with opus samples
/// @returns Stream object
shared_ptr<Stream> createStream(const string h264Samples, const unsigned fps, const string opusSamples);

/// Add client to stream
/// @param client Client
/// @param adding_video True if adding video
void addToStream(shared_ptr<Client> client, bool isAddingVideo);

/// Start stream
void startStream();

/// Main dispatch queue
DispatchQueue MainThread("Main");

/// Audio and video stream
optional<shared_ptr<Stream>> avStream = nullopt;

const string defaultRootDirectory = "../../../examples/streamer/samples/";
const string defaultH264SamplesDirectory = defaultRootDirectory + "h264/";
string h264SamplesDirectory = defaultH264SamplesDirectory;
const string defaultOpusSamplesDirectory = defaultRootDirectory + "opus/";
string opusSamplesDirectory = defaultOpusSamplesDirectory;
// const string defaultIPAddress = "127.0.0.1";
// const uint16_t defaultPort = 8000;
const string defaultIPAddress = "julyx.cn";
const uint16_t defaultPort = 8765;
string ip_address = defaultIPAddress;
uint16_t port = defaultPort;

/// Incomming message handler for websocket
/// @param message Incommint message
/// @param config Configuration
/// @param ws Websocket
bool answerflag = false;
string sdp_sum="";
int cntt=0;
string candidate_sum="";
string candidate_sum1="";
string candi1="";
void wsOnMessage(json message, Configuration config, shared_ptr<WebSocket> ws) {

    string id = "android1";

    string type = message["eventName"];

    if (type == "__login_success"){
        cout<<"__login_success\n";
        json data = {
            {"roomSize", "2"},
            {"userID", "linux"},
            {"room", "a0f9b8b1-9bfe-4c53-9960-c6ff53f4175d1657986623231"}
        };
        json message = {
            {"data", data},
            {"eventName", "__create"}
        };
        ws->send(message.dump());
    } else if (type == "__peers"){
        cout<<"___peers\n";
        json data = {
            {"userList", "android1"},
            {"inviteID", "linux"},
            {"audioOnly", false},
            {"room", "a0f9b8b1-9bfe-4c53-9960-c6ff53f4175d1657986623231"}
        };
        json message = {
            {"data", data},
            {"eventName", "__invite"}
        };
        ws->send(message.dump());
    } else if (type == "__new_peer") {
        cout<<"___new_peer\n";
        id = "android1";
        shared_ptr<Client> c = createPeerConnection(config, make_weak_ptr(ws), id);
        clients.emplace(id, c);
    } else if (type == "__answer") {
        auto it = message.find("data");
        json data = it->get<json>();
        cout<<"___answer\n";
        string sdp = data["sdp"].get<string>();
        shared_ptr<Client> c;
        auto pc = clients.at(id)->peerConnection;
        auto description = Description(sdp, "answer");
        pc->setRemoteDescription(description);
    } else if (type == "__ice_candidate") {
        auto it = message.find("data");
        json data = it->get<json>();
        cout<<"___ice_candidate\n";
        shared_ptr<Client> c;
        auto pc = clients.at(id)->peerConnection;
        if (auto jt = clients.find(id); jt != clients.end()) {
            string candidate = data["candidate"].get<string>();
            pc->addRemoteCandidate(candidate);  
        }
    }
}
int main(int argc, char **argv) try {
    bool enableDebugLogs = false;
    bool printHelp = false;
    int c = 0;
    auto parser = ArgParser({{"a", "audio"}, {"b", "video"}, {"d", "ip"}, {"p","port"}}, {{"h", "help"}, {"v", "verbose"}});
    auto parsingResult = parser.parse(argc, argv, [](string key, string value) {
        if (key == "audio") {
            opusSamplesDirectory = value + "/";
        } else if (key == "video") {
            h264SamplesDirectory = value + "/";
        } else if (key == "ip") {
            ip_address = value;
        } else if (key == "port") {
            port = atoi(value.data());
        } else {
            cerr << "Invalid option --" << key << " with value " << value << endl;
            return false;
        }
        return true;
    }, [&enableDebugLogs, &printHelp](string flag){
        if (flag == "verbose") {
            enableDebugLogs = true;
        } else if (flag == "help") {
            printHelp = true;
        } else {
            cerr << "Invalid flag --" << flag << endl;
            return false;
        }
        return true;
    });
    if (!parsingResult) {
        return 1;
    }

    if (printHelp) {
        cout << "usage: stream-h264 [-a opus_samples_folder] [-b h264_samples_folder] [-d ip_address] [-p port] [-v] [-h]" << endl
        << "Arguments:" << endl
        << "\t -a " << "Directory with opus samples (default: " << defaultOpusSamplesDirectory << ")." << endl
        << "\t -b " << "Directory with H264 samples (default: " << defaultH264SamplesDirectory << ")." << endl
        << "\t -d " << "Signaling server IP address (default: " << defaultIPAddress << ")." << endl
        << "\t -p " << "Signaling server port (default: " << defaultPort << ")." << endl
        << "\t -v " << "Enable debug logs." << endl
        << "\t -h " << "Print this help and exit." << endl;
        return 0;
    }
    if (enableDebugLogs) {
        InitLogger(LogLevel::Debug);
    }

    Configuration config;
    string stunServer = "stun:stun.l.google.com:19302";
    string stunServer1 = "stun:42.192.40.58:3478?transport=udp";
    string turnServer = "turn:ddssingsong:123456@42.192.40.58:3478?transport=udp";
    string turnServer1 = "turn:ddssingsong:123456@42.192.40.58:3478?transport=tcp";
    cout << "Stun server is " << stunServer1 << endl;
    config.iceServers.emplace_back("stun:stun.ageneau.net:3478");
    config.iceServers.emplace_back("turn:datachannel_test:14018314739877@stun.ageneau.net:3478");
    config.iceServers.emplace_back(stunServer);
    config.iceServers.emplace_back(stunServer1);
    config.iceServers.emplace_back(turnServer);
    config.iceServers.emplace_back(turnServer1);
    config.disableAutoNegotiation = true;

    string localId = "server";
    cout << "The local ID is: " << localId << endl;

    auto ws = make_shared<WebSocket>();

    ws->onOpen([]() { cout << "WebSocket connected, signaling ready" << endl; });

    ws->onClosed([]() { cout << "WebSocket closed" << endl; });

    ws->onError([](const string &error) { cout << "WebSocket failed: " << error << endl; });

    ws->onMessage([&](variant<binary, string> data) {
        if (!holds_alternative<string>(data))
            return;

        json message = json::parse(get<string>(data));
        MainThread.dispatch([message, config, ws]() {
            wsOnMessage(message, config, ws);
        });
    });

    const string url = "ws://" + ip_address + ":" + to_string(port) + "/ws/" + "linux" + "/1";
    cout << "Url is " << url << endl;
    ws->open(url);

    cout << "Waiting for signaling to be connected..." << endl;
    while (!ws->isOpen()) {
        if (ws->isClosed())
            return 1;
        this_thread::sleep_for(100ms);
    }

    while (true) {
        string id;
        cout << "Enter to exit" << endl;
        cin >> id;
        cin.ignore();
        cout << "exiting" << endl;
        break;
    }

    cout << "Cleaning up..." << endl;

    return 0;

} catch (const std::exception &e) {
    std::cout << "Error: " << e.what() << std::endl;
    return -1;
}

shared_ptr<ClientTrackData> addVideo(const shared_ptr<PeerConnection> pc, const uint8_t payloadType, const uint32_t ssrc, const string cname, const string msid, const function<void (void)> onOpen) {
    auto video = Description::Video(cname);
    video.addH264Codec(payloadType);
    video.addSSRC(ssrc, cname, msid, cname);
    auto track = pc->addTrack(video);
    // create RTP configuration
    auto rtpConfig = make_shared<RtpPacketizationConfig>(ssrc, cname, payloadType, H264RtpPacketizer::defaultClockRate);
    // create packetizer
    auto packetizer = make_shared<H264RtpPacketizer>(H264RtpPacketizer::Separator::Length, rtpConfig);
    // create H264 handler
    auto h264Handler = make_shared<H264PacketizationHandler>(packetizer);
    // add RTCP SR handler
    auto srReporter = make_shared<RtcpSrReporter>(rtpConfig);
    h264Handler->addToChain(srReporter); 
    // add RTCP NACK handler
    auto nackResponder = make_shared<RtcpNackResponder>(); 
    h264Handler->addToChain(nackResponder);
    // set handler
    track->setMediaHandler(h264Handler); 
    track->onOpen(onOpen);
    auto trackData = make_shared<ClientTrackData>(track, srReporter);
    return trackData;
}

shared_ptr<ClientTrackData> addAudio(const shared_ptr<PeerConnection> pc, const uint8_t payloadType, const uint32_t ssrc, const string cname, const string msid, const function<void (void)> onOpen) {
    auto audio = Description::Audio(cname);
    audio.addOpusCodec(payloadType);
    audio.addSSRC(ssrc, cname, msid, cname);
    auto track = pc->addTrack(audio);
    // create RTP configuration
    auto rtpConfig = make_shared<RtpPacketizationConfig>(ssrc, cname, payloadType, OpusRtpPacketizer::defaultClockRate);
    // create packetizer
    auto packetizer = make_shared<OpusRtpPacketizer>(rtpConfig);
    // create opus handler
    auto opusHandler = make_shared<OpusPacketizationHandler>(packetizer);
    // add RTCP SR handler
    auto srReporter = make_shared<RtcpSrReporter>(rtpConfig);
    opusHandler->addToChain(srReporter);
    // add RTCP NACK handler
    auto nackResponder = make_shared<RtcpNackResponder>();
    opusHandler->addToChain(nackResponder);
    // set handler
    track->setMediaHandler(opusHandler);
    track->onOpen(onOpen);
    auto trackData = make_shared<ClientTrackData>(track, srReporter);
    return trackData;
}

// Create and setup a PeerConnection
shared_ptr<Client> createPeerConnection(const Configuration &config,
                                                weak_ptr<WebSocket> wws,
                                                string id) {
    auto pc = make_shared<PeerConnection>(config);
    auto client = make_shared<Client>(pc);

    pc->onStateChange([id](PeerConnection::State state) {
        cout << "State: " << state << endl;
        if (state == PeerConnection::State::Disconnected ||
            state == PeerConnection::State::Failed ||
            state == PeerConnection::State::Closed) {
            // remove disconnected client
            MainThread.dispatch([id]() {
                clients.erase(id);
            });
        }
    });

    pc->onGatheringStateChange(
        [wpc = make_weak_ptr(pc), id, wws](PeerConnection::GatheringState state) {
        cout << "Gathering State: " << state << endl;
        if (state == PeerConnection::GatheringState::Complete) {
            if(auto pc = wpc.lock()) {
                auto description = pc->localDescription();
                //sdp
                string sdp = string(description.value());
                int pos1 = sdp.find("a=candidate");
                int pos2 = sdp.find("a=end-of-candidates");

                string sdp1 = sdp.substr(0,pos1);
                string sdp2 = sdp.substr(pos2);
                sdp1 = sdp1+sdp2;

                json data = {
                    {"userID", "android1"},
                    {"fromID", "linux"},
                    {"sdp", sdp1}
                };
                json message = {
                    {"data", data},
                    {"eventName", "__offer"}
                };
                // Gathering complete, send answer
                if (auto ws = wws.lock()) {
                    ws->send(message.dump());
                }
                string candidate = sdp.substr(pos1,pos2-pos1);
                string candidate1 = candidate;
                while(candidate!="")
                {
                    candidate = candidate.substr(2);
                    int p = candidate.find("a=");
                    cout<<endl<<"P:"<<p<<endl;
                    if(p==-1)
                    {
                        int rn=candidate.find("\r\n");
                        candidate=candidate.substr(0,rn);
                        data = {
                        {"candidate", candidate},
                        {"id", "0"},
                        {"label", 0},
                        {"userID", "android1"},
                        {"fromID", "linux"}
                        };

                        message = {
                            {"data", data},
                            {"eventName", "__ice_candidate"}
                        };
                        if (auto ws = wws.lock()) {
                            ws->send(message.dump());
                        }
                        break;
                    }
                    else
                    {
                        string candidate_str = candidate.substr(0,p);
                        candidate = candidate.substr(p);
                        int rn=candidate_str.find("\r\n");
                        candidate_str=candidate_str.substr(0,rn);

                        data = {
                        {"candidate", candidate_str},
                        {"id", "0"},
                        {"label", 0},
                        {"userID", "android1"},
                        {"fromID", "linux"}
                        };

                        message = {
                            {"data", data},
                            {"eventName", "__ice_candidate"}
                        };
                        if (auto ws = wws.lock()) {
                            ws->send(message.dump());
                        }
                    }
                }
            }
        }
    });

    client->video = addVideo(pc, 102, 1, "video-stream", "stream1", [id, wc = make_weak_ptr(client)]() {
        MainThread.dispatch([wc]() {
            if (auto c = wc.lock()) {
                addToStream(c, true);
            }
        });
        cout << "Video from " << id << " opened" << endl;
    });

    client->audio = addAudio(pc, 111, 2, "audio-stream", "stream1", [id, wc = make_weak_ptr(client)]() {
        MainThread.dispatch([wc]() {
            if (auto c = wc.lock()) {
                addToStream(c, false);
            }
        });
        cout << "Audio from " << id << " opened" << endl;
    });

    // auto dc = pc->createDataChannel("ping-pong");
    const std::string label = "test";

    auto dc = pc->createDataChannel(label);

    dc->onOpen([id, wdc = make_weak_ptr(dc)]() {
        std::cout << "DataChannel from " << id << " open" << std::endl;
        if (auto dc = wdc.lock())
            dc->send("Hello android");
    });

    dc->onClosed([id]() { std::cout << "DataChannel from " << id << " closed" << std::endl; });

    dc->onMessage([id, wdc = make_weak_ptr(dc)](auto data) {
        cout<<"wo jin onmessage le !\n";
        data holds either std::string or rtc::binary
        if (std::holds_alternative<std::string>(data))
            std::cout << "Message from " << id << " received: " << std::get<std::string>(data)
                      << std::endl;
        else
            std::cout << "Binary message from " << id
                      << " received, size=" << std::get<rtc::binary>(data).size() << std::endl;
    }); 

    pc->onDataChannel([id](shared_ptr<rtc::DataChannel> dc) {
        std::cout << "DataChannel from " << id << " received with label \"" << dc->label() << "\""
                  << std::endl;
        dc->send("Hello world");
        dc->onOpen([wdc = make_weak_ptr(dc)]() {
            if (auto dc = wdc.lock()){
                dc->send("Hello from linux");

            }

        });
        dc->onClosed([id]() { std::cout << "DataChannel from " << id << " closed" << std::endl; });

        dc->onMessage([id](auto data) {
            cout<<"\nhen buhao\n";
            // data holds either std::string or rtc::binary
            if (std::holds_alternative<std::string>(data))
                std::cout << "Message from " << id << " received: " << std::get<std::string>(data)
                          << std::endl;
            else
                std::cout << "Binary message from " << id
                          << " received, size=" << std::get<rtc::binary>(data).size() << std::endl;
        });

        // dataChannelMap.emplace(id, dc);
    });  

    client->dataChannel = dc;

    pc->setLocalDescription();
    return client;
};

/// Create stream
shared_ptr<Stream> createStream(const string h264Samples, const unsigned fps, const string opusSamples) {
    // video source
    auto video = make_shared<H264FileParser>(h264Samples, fps, true);
    // audio source
    auto audio = make_shared<OPUSFileParser>(opusSamples, true);

    auto stream = make_shared<Stream>(video, audio);
    // set callback responsible for sample sending
    stream->onSample([ws = make_weak_ptr(stream)](Stream::StreamSourceType type, uint64_t sampleTime, rtc::binary sample) {
        // cout<<"\n``````````````onSample````````````````\n";
        vector<ClientTrack> tracks{};
        string streamType = type == Stream::StreamSourceType::Video ? "video" : "audio";
        // get track for given type
        function<optional<shared_ptr<ClientTrackData>> (shared_ptr<Client>)> getTrackData = [type](shared_ptr<Client> client) {
            return type == Stream::StreamSourceType::Video ? client->video : client->audio;
        };
        // get all clients with Ready state
        for(auto id_client: clients) {
            auto id = id_client.first;
            auto client = id_client.second;
            auto optTrackData = getTrackData(client);
            if (client->getState() == Client::State::Ready && optTrackData.has_value()) {
                auto trackData = optTrackData.value();
                tracks.push_back(ClientTrack(id, trackData));
            }
        }
        if (!tracks.empty()) {
            for (auto clientTrack: tracks) {
                auto client = clientTrack.id;
                auto trackData = clientTrack.trackData;
                // sample time is in us, we need to convert it to seconds
                auto elapsedSeconds = double(sampleTime) / (1000 * 1000);
                auto rtpConfig = trackData->sender->rtpConfig;
                // get elapsed time in clock rate
                uint32_t elapsedTimestamp = rtpConfig->secondsToTimestamp(elapsedSeconds);

                // set new timestamp
                rtpConfig->timestamp = rtpConfig->startTimestamp + elapsedTimestamp;

                // get elapsed time in clock rate from last RTCP sender report
                auto reportElapsedTimestamp = rtpConfig->timestamp - trackData->sender->previousReportedTimestamp;
                // check if last report was at least 1 second ago
                if (rtpConfig->timestampToSeconds(reportElapsedTimestamp) > 1) {
                    trackData->sender->setNeedsToReport();
                }
                // cout << "Sending " << streamType << " sample with size: " << to_string(sample.size()) << " to " << client << endl;
                bool send = false;
                try {
                    // send sample
                    send = trackData->track->send(sample);
                } catch (...) {
                    send = false;
                }
                if (!send) {
                    cerr << "Unable to send "<< streamType << " packet" << endl;
                    break;
                }
            }
        }
        MainThread.dispatch([ws]() {
            if (clients.empty()) {
                // we have no clients, stop the stream
                if (auto stream = ws.lock()) {
                    stream->stop();
                }
            }
        });
    });
    //receive audio
    string id = "android1";

    auto pc = clients.at(id)->peerConnection;

    pc->onTrack([](shared_ptr<Track> track){
        auto session = std::make_shared<rtc::RtcpReceivingSession>();
        track->setMediaHandler(session);
        track->onMessage([session](auto message) {
            auto packet = std::get<rtc::binary>(message);
             cout<<"\n````````````````receive``````````````````````````\n";
            });

    });
    return stream;
}

/// Start stream
void startStream() {
    shared_ptr<Stream> stream;
    if (avStream.has_value()) {
        stream = avStream.value();
        if (stream->isRunning) {
            // stream is already running
            return;
        }
    } else {
        stream = createStream(h264SamplesDirectory, 30, opusSamplesDirectory);
        avStream = stream;
    }
    stream->start();
}

/// Send previous key frame so browser can show something to user
/// @param stream Stream
/// @param video Video track data
void sendInitialNalus(shared_ptr<Stream> stream, shared_ptr<ClientTrackData> video) {
    auto h264 = dynamic_cast<H264FileParser *>(stream->video.get());
    auto initialNalus = h264->initialNALUS();

    // send previous NALU key frame so users don't have to wait to see stream works
    if (!initialNalus.empty()) {
        const double frameDuration_s = double(h264->sampleDuration_us) / (1000 * 1000);
        const uint32_t frameTimestampDuration = video->sender->rtpConfig->secondsToTimestamp(frameDuration_s);
        video->sender->rtpConfig->timestamp = video->sender->rtpConfig->startTimestamp - frameTimestampDuration * 2;
        video->track->send(initialNalus);
        video->sender->rtpConfig->timestamp += frameTimestampDuration;
        // Send initial NAL units again to start stream in firefox browser
        video->track->send(initialNalus);
    }
}

/// Add client to stream
/// @param client Client
/// @param adding_video True if adding video
void addToStream(shared_ptr<Client> client, bool isAddingVideo) {
    if (client->getState() == Client::State::Waiting) {
        client->setState(isAddingVideo ? Client::State::WaitingForAudio : Client::State::WaitingForVideo);
    } else if ((client->getState() == Client::State::WaitingForAudio && !isAddingVideo)
               || (client->getState() == Client::State::WaitingForVideo && isAddingVideo)) {

        // Audio and video tracks are collected now
        assert(client->video.has_value() && client->audio.has_value());

        auto video = client->video.value();
        auto audio = client->audio.value();

        auto currentTime_us = double(currentTimeInMicroSeconds());
        auto currentTime_s = currentTime_us / (1000 * 1000);

        // set start time of stream
        video->sender->rtpConfig->setStartTime(currentTime_s, RtpPacketizationConfig::EpochStart::T1970);
        audio->sender->rtpConfig->setStartTime(currentTime_s, RtpPacketizationConfig::EpochStart::T1970);

        // start stat recording of RTCP SR
        video->sender->startRecording();
        audio->sender->startRecording();

        if (avStream.has_value()) {
            sendInitialNalus(avStream.value(), video);
        }

        client->setState(Client::State::Ready);
    }
    if (client->getState() == Client::State::Ready) {
        startStream();
    }
}
sym19991125 commented 2 years ago

    //receive audio
    string id = "android1";

    auto pc = clients.at(id)->peerConnection;

    pc->onTrack([](shared_ptr<Track> track){
        auto session = std::make_shared<rtc::RtcpReceivingSession>();
        track->setMediaHandler(session);
        track->onMessage([session](auto message) {
            auto packet = std::get<rtc::binary>(message);
             cout<<"\n````````````````receive``````````````````````````\n";
            });

    });

I use onTrack to get the video stream according to your method, and it doesn't work, what is my problem? This code is in createStream.

paullouisageneau commented 2 years ago

Like I said earlier, you must store the shared_ptr you get in onDataChannel and onTrack, otherwise the data channel or the track will be closed immediately when leaving the callback context as the last reference is destroyed.

sym19991125 commented 2 years ago

Like I said earlier, you must store the shared_ptr you get in onDataChannel and onTrack, otherwise the data channel or the track will be closed immediately when leaving the callback context as the last reference is destroyed.

Thank you very much for your help, this project helped a lot in my graduation project. I do have this problem in onDataChannel, but my dc in createPeerConnection persists always. Just like the “media-receiver” in the project, when I send "Ping" to the web, it will send me a "Pong", then onMessage can be received, but when I send a message to Android and then it give me a message back, onMessage can't be received of. This is not caused by the closure of the datachannel, so why?

paullouisageneau commented 2 years ago

How do you open the data channel on Android side?

sym19991125 commented 2 years ago

This is the code for datachannel on the Android side:

public Peer(PeerConnectionFactory factory, List<PeerConnection.IceServer> list, String userId, IPeerEvent event) {
    mFactory = factory;
    mIceLis = list;
    mEvent = event;
    mUserId = userId;//B.id
    queuedRemoteCandidates = new ArrayList<>();
    this.pc = createPeerConnection();

    DataChannel.Init dcInit = new DataChannel.Init();
    dcInit.id = 1;
    dc = pc.createDataChannel("1", dcInit);;
    dc.registerObserver(mDataChannelObserver);
}

@Override
public void onDataChannel(DataChannel dataChannel) {
    Log.d(TAG, "onDataChannel");
    dataChannel.registerObserver(mDataChannelObserver);
}
private DataChannel.Observer mDataChannelObserver = new DataChannel.Observer() {
    boolean isHeader = true;
    String suffix = null;
    int fileLength = 0;
    long currentLength = 0;
    boolean isFinish = false;
    List<byte[]> queue = new ArrayList<>();
    @Override
    public void onBufferedAmountChange(long l) {
    }
    @Override
    public void onStateChange() {
        Log.d(TAG, "onDataChannel onStateChange:" + dc.state());
    }
    @Override
    public void onMessage(DataChannel.Buffer buffer) {
        Log.d(TAG, "onDataChannel onMessage : " + buffer);
        ByteBuffer data = buffer.data;
        byte[] bytes = new byte[data.capacity()];
        data.get(bytes);
        String msg = new String(bytes);
        //FTODO  ---------------------------------
        Log.e(TAG, "msg============> : " + msg);
    }

//send message
public void sendDataChannelMessage(String message) {
    byte[] msg = message.getBytes();
    DataChannel.Buffer buffer = new DataChannel.Buffer(ByteBuffer.wrap(msg), false);
    dc.send(buffer);
    Log.d(TAG, "send-success!!");
}
paullouisageneau commented 2 years ago

When is sendDataChannelMessage() called, and on which data channel? There is one data channel created on each side, so you will end up with two data channels. It looks like on Android side, sendDataChannelMessage() is called on dc, created with createDataChannel(). If so, it won't work, since this data channel will be closed immediately because of the problem I keep mentioning to you: on the other side you don't store the shared_ptr obtained in onDataChannel.

I think it would be good to clean up your code and remove useless stuff, in particular, in such a simple use case you should call createDataChannel only on one side and use only onDataChannel on the other.

sym19991125 commented 2 years ago

Hey Paul. First of all, thank you very much for your help, I do have many questions to trouble you. I want to receive video data sent by Android side on Linux side. According to your explanation, I used onTrack and saved the track, but I found that the onTrack function did not work when I used it, because my first cout did not output. Hope you can point out my mistakes and give guidance.

// Create and setup a PeerConnection
shared_ptr<Client> createPeerConnection(const Configuration &config,
                                                weak_ptr<WebSocket> wws,
                                                string id) {
    auto pc = make_shared<PeerConnection>(config);
    auto client = make_shared<Client>(pc);
    //omission

    //receive audio
    shared_ptr<Track> myTrack;
    pc->onTrack([&myTrack](shared_ptr<Track> track){
        cout<<"\n````````````````pc->onTrack``````````````````````````\n";
        auto session = std::make_shared<rtc::RtcpReceivingSession>();
        track->setMediaHandler(session);
        track->onOpen([]() { cout << "track already,reveive!!!!!" << endl; });
        track->onMessage([session](auto message) {
            // auto packet = std::get<rtc::binary>(message);
             cout<<"\n````````````````receive``````````````````````````\n";
            });
        std::atomic_store(&myTrack, track);
    });
    pc->setLocalDescription();
    return client;
};
sym19991125 commented 2 years ago

I have another question. In "streamer", the data types that datachannel->onMessage can receive are binary and string. If the data sent by the Android side is a buffer class defined as follows, how can I receive such data?

public static class Buffer{
    public final ByteBuffer data;
    public final boolean binary;

    @CalledByNative("Buffer")
    public Buffer(ByteBuffer data, boolean binary){
        this.data = data;
        this.binary = binary;
    }
}