BelledonneCommunications / flexisip

Linphone.org mirror for flexisip (git://git.linphone.org/flexisip.git)
http://flexisip.org
GNU Affero General Public License v3.0
140 stars 68 forks source link

Sip Agent Blocking when APNS tls connecting #113

Open simon6329 opened 3 years ago

simon6329 commented 3 years ago

Sometimes, the TLS connecting of APNS (http2) take time for 1 minute or more. in applepush.cc line 319 mConn->connect(); // sometimes need 1 minute or more.

In this case, the SIP agent will block and can not handle any sip messages until connect() completes. I think it is caused by is same process with SIP agent.

Any solution?

simon6329 commented 3 years ago

my solutions

///////////// AppleClient::AppleClient(su_root_t &root, std::unique_ptr &&conn, std::string name) : mRoot{root}, mIdleTimer{&root, sIdleTimeout * 1000}, mConn{std::move(conn)} ,mThread(), mThreadRunning(false), mThreadWaiting(true) { ostringstream os{}; //os << "AppleClient[" << this << "]"; os << "AppleClient[" << name << "]"; mLogPrefix = os.str(); SLOGD << mLogPrefix << ": constructing AppleClient with TlsConnection[" << mConn.get() << "]";

//
if (mThreadRunning) {
    mThreadRunning = false;
    mMutex.lock();
    if (mThreadWaiting) mCondVar.notify_one();
    mMutex.unlock();
    mThread.join();
}

}

AppleClient::~AppleClient() { if (mThreadRunning) { mThreadRunning = false; mMutex.lock(); if (mThreadWaiting) mCondVar.notify_one(); mMutex.unlock(); mThread.join(); } }

void AppleClient::run() { std::unique_lock lock(mMutex);

SLOGE << mLogPrefix << ": mThread start running";

// create a own root        
sroot = su_root_create(NULL);
// create a timer for idle handle
mIdleTimer2 = make_unique<sofiasip::Timer>(sroot, sIdleTimeout * 1000);

lock.unlock();

while (mThreadRunning) {

    if (!mPendingPNRs.empty()) {

        if (mState != State::Connected) {
            //SLOGE << mLogPrefix << ": service not connected";
            if (mState == State::Disconnected) {
                SLOGE << mLogPrefix << ": do connect";
                connect();
            }
            // waiting event and 100ms timeout
            su_root_step(sroot, 100);
            //std::this_thread::sleep_for(std::chrono::seconds(1));
            continue;
        }

        //SLOGE << mLogPrefix << ": run send ";
        lock.lock();
        sendAllPendingPNRs();
        lock.unlock();
        SLOGE << mLogPrefix << ": run send done";

    }
    su_root_step(sroot, 10);
    /*else {
        mThreadWaiting = true;
        mCondVar.wait(lock);
        mThreadWaiting = false;
    }
    */
}

}

bool AppleClient::sendPush(const std::shared_ptr &req) {

if (!mThreadRunning) {
    // start thread only when we have at least one push to send
    SLOGE << mLogPrefix << ": mThread not runing, create it";
    mThreadRunning = true;
    mThreadWaiting = false;
    mThread = std::thread(&AppleClient::run, this); 
}

auto appleReq = dynamic_pointer_cast<AppleRequest>(req);

//SLOGE << mLogPrefix << ": new push ready to queue !!!";
mMutex.lock();
mPendingPNRs.emplace(move(appleReq));
if (mThreadWaiting) mCondVar.notify_one();
mMutex.unlock();
SLOGE << mLogPrefix << ": new push ready to queue";

return true;

}

/* bool AppleClient::sendPush(const std::shared_ptr &req) { auto appleReq = dynamic_pointer_cast(req); mPendingPNRs.emplace(move(appleReq));

SLOGE << mLogPrefix << ": new push ready to send !!!";

if (mState != State::Connected) {
    SLOGE << mLogPrefix << ": service != connected";
    if (mState == State::Disconnected) connect();
    return true;
}

return sendAllPendingPNRs();

} */

void AppleClient::connect() { if (mState != State::Disconnected) { throw BadStateError(mState); } setState(State::Connecting);

try {
    SLOGE << mLogPrefix << ": tls connect start";
    mConn->connect();
    if (!mConn->isConnected()) throw runtime_error{"TLS connection failed"};

    SLOGE << mLogPrefix << ": tls connected";

    auto sendCb = [](nghttp2_session *session, const uint8_t *data, size_t length, int flags, void *user_data) noexcept {
        auto thiz = static_cast<AppleClient *>(user_data);
        return thiz->send(*session, data, length);
    };
    auto recvCb = [](nghttp2_session *session, uint8_t *buf, size_t length, int flags, void *user_data) noexcept {
        auto thiz = static_cast<AppleClient *>(user_data);
        return thiz->recv(*session, buf, length);
    };
    auto frameSentCb = [](nghttp2_session *session, const nghttp2_frame *frame, void *user_data) noexcept {
        auto thiz = static_cast<AppleClient *>(user_data);
        thiz->onFrameSent(*session, *frame);
        return 0;
    };
    auto frameRecvCb = [](nghttp2_session *session, const nghttp2_frame *frame, void *user_data) noexcept {
        auto thiz = static_cast<AppleClient *>(user_data);
        thiz->onFrameRecv(*session, *frame);
        return 0;
    };
    auto onHeaderRecvCb = [](nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t namelen,
                            const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data) noexcept {
        auto thiz = static_cast<AppleClient *>(user_data);
        string nameStr{reinterpret_cast<const char *>(name), namelen};
        string valueStr{reinterpret_cast<const char *>(value), valuelen};
        thiz->onHeaderRecv(*session, *frame, nameStr, valueStr, flags);
        return 0;
    };
    auto onDataChunkRecvCb = [](nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *user_data) noexcept {
        auto thiz = static_cast<AppleClient *>(user_data);
        thiz->onDataReceived(*session, flags, stream_id, data, len);
        return 0;
    };
    auto onStreamClosedCb = [](nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data) noexcept {
        auto thiz = static_cast<AppleClient *>(user_data);
        thiz->onStreamClosed(*session, stream_id, error_code);
        return 0;
    };

    nghttp2_session_callbacks *cbs;
    nghttp2_session_callbacks_new(&cbs);
    nghttp2_session_callbacks_set_send_callback(cbs, sendCb);
    nghttp2_session_callbacks_set_recv_callback(cbs, recvCb);
    nghttp2_session_callbacks_set_on_frame_send_callback(cbs, frameSentCb);
    nghttp2_session_callbacks_set_on_frame_recv_callback(cbs, frameRecvCb);
    nghttp2_session_callbacks_set_on_header_callback(cbs, onHeaderRecvCb);
    nghttp2_session_callbacks_set_on_data_chunk_recv_callback(cbs, onDataChunkRecvCb);
    nghttp2_session_callbacks_set_on_stream_close_callback(cbs, onStreamClosedCb);;

    unique_ptr<nghttp2_session_callbacks, void(*)(nghttp2_session_callbacks *)> cbsPtr{cbs, nghttp2_session_callbacks_del};

    nghttp2_session *session;
    nghttp2_session_client_new(&session, cbs, this);
    NgHttp2SessionPtr httpSession{session};

    int status;
    if ((status = nghttp2_submit_settings(session, NGHTTP2_FLAG_NONE, nullptr, 0)) < 0) {
        throw runtime_error{"submitting settings failed [status=" + to_string(status) + "]"};
    }
    if ((status = nghttp2_session_send(session)) < 0) {
        throw runtime_error{"sending SETTINGS frame failed [status=" + to_string(status) + "]"};
    }

    mHttpSession = move(httpSession);

    if(0 != su_wait_create(&mPollInWait, mConn->getFd(), SU_WAIT_IN))       {
        SLOGE << mLogPrefix << ": su_wait_create fail";
        return;
    }

    //su_root_register(&mRoot, &mPollInWait, onPollInCb, this, su_pri_normal);
    su_root_register(sroot, &mPollInWait, onPollInCb, this, su_pri_normal);

    //resetIdleTimer();
    resetIdleTimer2();

} catch (const runtime_error &e) {
    SLOGE << mLogPrefix << ": " << e.what();
    disconnect();
}

//SLOGE << mLogPrefix << ": connecting end";

}

void AppleClient::disconnect() { SLOGD << mLogPrefix << ": disconnecting from APNS"; if (mState == State::Disconnected) return; //su_root_unregister(&mRoot, &mPollInWait, onPollInCb, this); su_root_unregister(sroot, &mPollInWait, onPollInCb, this); mHttpSession.reset(); mConn->disconnect(); setState(State::Disconnected); mLastSID = -1; mPNRs.clear(); }

bool AppleClient::sendAllPendingPNRs() { auto pnrSent = false;

//if(!mPendingPNRs.empty())
//  SLOGE << mLogPrefix << ": sendAllPendingPNRs";

while (!mPendingPNRs.empty()) {
    auto appleReq = move(mPendingPNRs.front());
    mPendingPNRs.pop();
    auto host = mConn->getPort() == "443"
        ? mConn->getHost()
        : mConn->getHost() + ":" + mConn->getPort();
    auto path = string{"/3/device/"} + appleReq->getDeviceToken();
    auto topicLen = appleReq->getAppIdentifier().rfind(".");
    auto apnsTopic = appleReq->getAppIdentifier().substr(0, topicLen);

    // Check whether the appId is compatible with the payload type
    auto endsWithVoip = StringUtils::endsWith(apnsTopic, ".voip");
    if ((appleReq->mPayloadType == ApplePushType::Pushkit && !endsWithVoip)
            || (appleReq->mPayloadType != ApplePushType::Pushkit && endsWithVoip)) {
        SLOGE << mLogPrefix << ": apns-topic [" << apnsTopic << "] not compatible with payload type ["
            << toString(appleReq->mPayloadType) << "]. Aborting";
        continue;
    }

    HeaderStore hStore{};
    hStore.add( ":method"         , "POST"     );
    hStore.add( ":scheme"         , "https"    );
    hStore.add( ":path"           , move(path) );
    hStore.add( "host"            , move(host) );
    hStore.add( "apns-expiration" , "0"        );
    hStore.add( "apns-topic"      , apnsTopic  );
    auto hList = hStore.makeHeaderList();

    DataProvider dataProv{appleReq->getData()};
    auto streamId = nghttp2_submit_request(mHttpSession.get(), nullptr, hList.data(), hList.size(), dataProv.getCStruct(), nullptr);
    if (streamId < 0) {
        SLOGE << mLogPrefix << ": push request submit failed. reason=[" << nghttp2_strerror(streamId) << "]";
        continue;
    }
    auto logPrefix = string{mLogPrefix} + "[" + to_string(streamId) + "]";

    ostringstream msg{};
    msg << logPrefix << ": sending PNR " << appleReq << ":\n"
        << hStore.toString() << endl;
    msg.write(appleReq->getData().data(), appleReq->getData().size());
    //SLOGE << msg.str();

    auto status = nghttp2_session_send(mHttpSession.get());
    if (status < 0) {
        SLOGE << logPrefix << ": push request sending failed. reason=[" << nghttp2_strerror(status) << "]";
        continue;
    }

    mPNRs.emplace(streamId, PnrContext{*this, appleReq, sPnrTimeout});
    appleReq->setState(Request::State::InProgress);

    pnrSent = true;
}

//if (pnrSent) resetIdleTimer();
if (pnrSent) resetIdleTimer2();
return true;

}

void AppleClient::processGoAway() { SLOGD << mLogPrefix << ": closing connection after receiving GOAWAY frame. Last processed stream is [" << mLastSID << "]";

// move back all the non-treated PNRs into the pending queue
for (auto it = mPNRs.begin(); it != mPNRs.end(); it = mPNRs.erase(it)) {
    const auto &sid = it->first;
    auto &request = it->second.getPnr();
    if (sid > mLastSID) {
        SLOGD << mLogPrefix << ": PNR " << request  << " will be sent on next connection";
        request->setState(Request::State::NotSubmitted);
        mPendingPNRs.emplace(move(request));
    }
}

// disconnect and connect again if there still are PNRs to process
disconnect();
if (!mPendingPNRs.empty()) {
    SLOGD << mLogPrefix << ": PNRs are waiting. Connecting to server again";
    connect();
}

}

void AppleClient::setState(State state) noexcept { if (mState == state) return; SLOGD << mLogPrefix << ": switching state from [" << mState << "] to [" << state << "]"; mState = state; }

ssize_t AppleClient::send(nghttp2_session &session, const uint8_t *data, size_t length) noexcept { length = min(length, size_t(numeric_limits::max())); auto nwritten = mConn->write(data, int(length)); if (nwritten < 0) { SLOGE << mLogPrefix << ": error while writting into socket[" << nwritten << "]"; return NGHTTP2_ERR_CALLBACK_FAILURE; } if (nwritten == 0 && length > 0) return NGHTTP2_ERR_WOULDBLOCK; return nwritten; }

ssize_t AppleClient::recv(nghttp2_session &session, uint8_t *data, size_t length) noexcept { length = min(length, size_t(numeric_limits::max())); auto nread = mConn->read(data, length); if (nread < 0) { SLOGE << mLogPrefix << ": error while reading socket. " << strerror(errno); return NGHTTP2_ERR_CALLBACK_FAILURE; } if (nread == 0 && length > 0) return NGHTTP2_ERR_WOULDBLOCK; return nread; }

void AppleClient::onFrameSent(nghttp2_session &session, const nghttp2_frame &frame) noexcept { // SLOGD << mLogPrefix << "[" << frame.hd.stream_id << "]: frame sent (" << frame.hd.length << "B):\n" << frame; }

void AppleClient::onFrameRecv(nghttp2_session &session, const nghttp2_frame &frame) noexcept { // SLOGD << mLogPrefix << "[" << frame.hd.stream_id << "]: frame received (" << frame.hd.length << "B):\n" << frame; switch (frame.hd.type) { case NGHTTP2_SETTINGS: if (mState == State::Connecting && (frame.hd.flags & NGHTTP2_FLAG_ACK) == 0) { SLOGD << mLogPrefix << ": server settings received"; setState(State::Connected); SLOGE << mLogPrefix << ": APNS Connected"; //sendAllPendingPNRs(); //su_root_break(sroot); } break; case NGHTTP2_GOAWAY: { ostringstream msg{}; msg << mLogPrefix << ": GOAWAY frame received, errorCode=[" << frame.goaway.error_code << "], lastStreamId=[" << frame.goaway.last_stream_id << "]:"; if (frame.goaway.opaque_data_len > 0) { msg << endl; msg.write(reinterpret_cast<const char *>(frame.goaway.opaque_data), frame.goaway.opaque_data_len); } else { msg << " "; } SLOGD << msg.str(); SLOGD << "Scheduling connection closing"; mLastSID = frame.goaway.last_stream_id; break; } } }

void AppleClient::onHeaderRecv(nghttp2_session &session, const nghttp2_frame &frame, const std::string &name, const std::string &value, uint8_t flags) noexcept { const auto &streamId = frame.hd.stream_id; auto logPrefix = string{mLogPrefix} + "[" + to_string(streamId) + "]"; SLOGD << logPrefix << ": receiving HTTP2 header [" << name << " = " << value << "]"; if (name == ":status") { AppleRequest *pnr = nullptr; try { pnr = mPNRs.at(streamId).getPnr().get(); } catch (const logic_error &) { SLOGE << logPrefix << ": receiving header for an unknown stream. Just ignoring"; return; } try { pnr->mStatusCode = stoi(value); } catch (const logic_error &e) { SLOGE << logPrefix << ": error while parsing status code[" << value << "]: " << e.what(); return; } if (pnr->mStatusCode == 200) { pnr->setState(Request::State::Successful); SLOGD << logPrefix << ": PNR " << pnr << " succeeded"; } else { pnr->setState(Request::State::Failed); SLOGD << logPrefix << ": PNR " << pnr << " failed"; } } }

void AppleClient::onDataReceived(nghttp2_session &session, uint8_t flags, int32_t streamId, const uint8_t data, size_t datalen) noexcept { ostringstream msg{}; msg << mLogPrefix << "[" << streamId << "]"; msg << ": " << datalen << "B of data received on stream[" << streamId << "]:\n"; msg.write(reinterpret_cast<const char >(data), datalen); SLOGD << msg.str(); }

int AppleClient::onPollInCb(su_root_magic_t , su_wait_t , su_wakeup_arg_t arg) noexcept { auto thiz = static_cast<AppleClient >(arg); //SLOGE << thiz->mLogPrefix << ": onPollCb"; //return 0; auto status = nghttp2_session_recv(thiz->mHttpSession.get()); if (status < 0) { SLOGE << thiz->mLogPrefix << ": error while receiving HTTP2 data[" << nghttp2_strerror(status) << "]. Disconnecting"; thiz->disconnect(); return 0; } if (thiz->mLastSID >= 0) thiz->processGoAway(); return 0; }

void AppleClient::onStreamClosed(nghttp2_session &session, int32_t stream_id, uint32_t error_code) noexcept { auto logPrefix = mLogPrefix + "[" + to_string(stream_id) + "]"; SLOGD << logPrefix << ": stream closed with error code [" << error_code << "]"; auto it = mPNRs.find(stream_id); if (it != mPNRs.cend()) { SLOGD << logPrefix << ": end of PNR " << it->second.getPnr(); mPNRs.erase(it); } }

void AppleClient::onConnectionIdle() noexcept { SLOGE << mLogPrefix << ": connection is idle" << " (go reconnect)"; disconnect(); // always connect connect(); }

const char *Http2Tools::frameTypeToString(uint8_t frameType) noexcept { switch(frameType) { case NGHTTP2_DATA: return "DATA"; case NGHTTP2_HEADERS: return "HEADERS"; case NGHTTP2_PRIORITY: return "PRIORITY"; case NGHTTP2_RST_STREAM: return "RST_STREAM"; case NGHTTP2_SETTINGS: return "SETTINGS"; case NGHTTP2_PUSH_PROMISE: return "PUSH_PROMISE"; case NGHTTP2_PING: return "PING"; case NGHTTP2_GOAWAY: return "GOAWAY"; case NGHTTP2_WINDOW_UPDATE: return "WINDOW_UPDATE"; case NGHTTP2_CONTINUATION: return "CONTINUATION";

if NGHTTP2_VERSION_NUM >= 0x010a00 // v1.10.0

    case NGHTTP2_ALTSVC:        return "ALTSVC";

endif

if NGHTTP2_VERSION_NUM >= 0x012100 // v1.33.0

    case NGHTTP2_ORIGIN:        return "ORIGIN";

endif

}
return "UNKNOWN";

}

std::string Http2Tools::printFlags(uint8_t flags) noexcept { array<const char *, 4> flagsAsStr{};

auto len = 0;
if (flags & NGHTTP2_FLAG_END_STREAM) flagsAsStr.at(len++) = "END_STREAM";
if (flags & NGHTTP2_FLAG_END_HEADERS) flagsAsStr.at(len++) = "END_HEADERS";
if (flags & NGHTTP2_FLAG_ACK) flagsAsStr.at(len++) = "ACK";
if (flags & NGHTTP2_FLAG_PADDED) flagsAsStr.at(len++) = "PADDED";

string res{};
for (auto i = 0; i < len; ++i) {
    if (i != 0) res += " | ";
    res += flagsAsStr.at(i);
}
return res;

}

} // end of pushnotification namespace } // end of flexisip namespace

using namespace flexisip::pushnotification;

std::ostream &operator<<(std::ostream &os, const nghttp2_frame &frame) noexcept { os << Http2Tools::frameTypeToString(frame.hd.type) << endl; os << "streamId: " << frame.hd.stream_id << endl; os << hex << showbase; os << "flags: " << int(frame.hd.flags) << " [" << Http2Tools::printFlags(frame.hd.flags) << "]" << endl; os << dec << noshowbase; switch (frame.hd.type) { case NGHTTP2_HEADERS: os << endl; if (frame.headers.nvlen > 0) { for (unsigned i = 0; i < frame.headers.nvlen; ++i) { const auto &nva = frame.headers.nva[i]; os << nva.name << ": " << nva.value << endl; } } else { os << "" << endl; } break; case NGHTTP2_RST_STREAM: { const auto &error_code = frame.rst_stream.error_code; os << "errorCode: " << error_code;

if NGHTTP2_VERSION_NUM >= 0x010900 // v1.9.0

        os << "[" << nghttp2_http2_strerror(error_code) << "]";

endif

        os << endl;
        break;
    }
    case NGHTTP2_SETTINGS:
        os << endl;
        if (frame.settings.niv > 0) {
            for (unsigned i = 0; i < frame.settings.niv; ++i) {
                const auto &iv = frame.settings.iv[i];
                os << iv.settings_id << " : " << iv.value << endl;
            }
        } else {
            os << "<empty>" << endl;
        }
        break;
    case NGHTTP2_GOAWAY:
        os << "lastStreamId: " << frame.goaway.last_stream_id << endl;
        os << "errorCode: " << frame.goaway.error_code << endl;
        os << endl;
        if (frame.goaway.opaque_data) {
            os.write(reinterpret_cast<char *>(frame.goaway.opaque_data), frame.goaway.opaque_data_len);
            os << endl;
        } else {
            os << "<empty>" << endl;
        }
        break;
    default:
        break;
};
return os;

}

std::ostream &operator<<(std::ostream &os, flexisip::pushnotification::AppleClient::State state) noexcept { switch (state) { case AppleClient::State::Disconnected: return os << "Disconnected"; case AppleClient::State::Connecting: return os << "Connecting"; case AppleClient::State::Connected: return os << "Connected"; }; return os << "Unknown"; }

simon6329 commented 3 years ago

my solutions

applepush.cc.txt

Omid-Mohajerani commented 3 years ago

Hi @simon6329

Does Flexi SIP can send VoIP Push for IOS 14 ? I have linphone that registered in Asterisk and I want to send push for my IOS devices. Can I use Flexi SIP and is it working with the latest IOS ? Thanks