facebook / proxygen

A collection of C++ HTTP libraries including an easy to use HTTP server.
Other
8.16k stars 1.5k forks source link

Correct way to implement middleware in proxygen #508

Closed orange-puff closed 3 months ago

orange-puff commented 3 months ago

Here is the flow I would like requests to go through client -> DecompressionMiddleware -> AppHandler -> CompressionFilter -> ^---------------------------------------------------------------------------------|

I see that Filter.h exists and a CompressionFilter is even provided, which does exactly what I want. But I am not seeing an example of how to implement middleware, something that the request starts at, potentially modifies the request, and then feeds that into the app handler.

Should middleware implement RequestHandler, or should it also be a Filter?

What I currently have is

options.handlerFactories = proxygen::RequestHandlerChain()
                                   .addThen<middleware::RequestDecompressionMiddlewareFactory>()
                                   .addThen<server::HandlerFactory>(std::move(handlerFactory))
                                   .build();

class HandlerFactory : public proxygen::RequestHandlerFactory {
public:

    void onServerStart(folly::EventBase* eventBase) noexcept override {}

    void onServerStop() noexcept override {}

    proxygen::RequestHandler* onRequest(proxygen::RequestHandler* requestHandler, proxygen::HTTPMessage* msg) noexcept override
    {
        const auto& path = msg->getPath();

        if (path == "/myapppath") {
            return new (std::nothrow) AppHandler()
        }
        return new (std::nothrow) NotFoundHandler();
    }
};

class RequestDecompressionMiddlewareFactory : public proxygen::RequestHandlerFactory {
    void onServerStart(folly::EventBase* eventBase) noexcept override {}

    void onServerStop() noexcept override {}

    proxygen::RequestHandler* onRequest(proxygen::RequestHandler* nextHandler, proxygen::HTTPMessage* msg) noexcept override
    {
        return new (std::nothrow) RequestDecompressionMiddleware(nextHandler);
    }
};

// here is how the RequestDecompressionMiddleware::onRequest method looks
void RequestDecompressionMiddleware::onRequest(std::unique_ptr<proxygen::HTTPMessage> headers) noexcept
{
    std::string contentEncoding = headers->getHeaders().getSingleOrEmpty(kContentEncodingHeader);
    if (contentEncoding == kGzip) {
        codecType_ = folly::io::CodecType::GZIP;
    } else if (contentEncoding == kZstd) {
        codecType_ = folly::io::CodecType::ZSTD;
    }
    // Pass the request headers to the next handler
    nextHandler_->onRequest(std::move(headers));
}

but what I found is

virtual void setResponseHandler(ResponseHandler* handler) noexcept {
    downstream_ = CHECK_NOTNULL(handler);
  }

is only called on the RequestDecompressionMiddleware whereas the handler my HandlerFactory instantiates has a null downstream_

orange-puff commented 3 months ago

After reading more code, I believe a filter is the correct way to implement middleware

client -> DecompressionMiddleware -> AppHandler -> CompressionFilter ->
^---------------------------------------------------------------------------------|

This seems impossible to achieve with how proxygen works, but something very similar can be achieved. Suppose DecompressionMiddleware is a filter, rather than a handler. The RequestFactoryChain should be built like

.addThen<CompressionFilterFactory>
.addThen<DecompressionMiddlewareFactory>
.addThen<AppHandlerFactory>

the flow is client http request ->CompressionFilter (does nothing with request but forwards it to upstream) -> DecompressionMiddleware (decompresses request body if Content-Encoding header is set) -> AppHandler AppHandler does business logic then writes response AppHandler HttpResponse -> DecompressionMiddleware (does nothing, just writes to downstream) -> CompressionFilter (compresses if necessary headers and size requirements are met -> client

Given all of this, would a PR that adds DecompressionFilter be accepted? It would be a lot of copy and pasted code and does not seem to require anything that the CompressionFilter does not already do, and seems like a nice complement to the already supplied CompressionFilter

afrind commented 3 months ago

As long as the class name doesn't contain the word middleware, we'll accept a PR for a DecompressionFilter :D

I found a non-open sourced component we have that is like the decompression filter you are referring to, but it is used for decompressing responses rather than requests (though it's pretty symmetric), and implements HTTPMessageFilter rather than Filter. I removed the non-oss dependencies and put it here, if it helps at all. Some of the logic (eg adding the chunked header) might not be right if used when decompressing requests prior to app handling.

class DecompressionFilter : public HTTPMessageFilter {
 public:
  DecompressionFilter()
      : decompress_(false), is1_0_(false) {
  }

  void onHeadersComplete(std::unique_ptr<HTTPMessage> msg) noexcept override {
    initializeFromHTTPMessage(msg.get());
    nextOnHeadersComplete(std::move(msg));
  }

  bool initializeFromHTTPMessage(HTTPMessage* msg) {
    HTTPHeaders& headers = msg->getHeaders();
    auto encoding = headers.getSingleOrEmpty(HTTP_HEADER_CONTENT_ENCODING);
    if (encoding == "gzip") {
      decompressor_.init(CompressionType::GZIP);
      decompress_ = true;
    } else if (encoding == "deflate") {
      decompressor_.init(CompressionType::DEFLATE);
      decompress_ = true;
    }

    is1_0_ = msg->isHTTP1_0();

    if (decompress_ && decompressor_.hasError()) {
      decompress_ = false;
    }

    if (decompress_) {
      // remove the header that marks the response as 'gzip' encoded
      headers.remove(HTTP_HEADER_CONTENT_ENCODING);
      // remove the content length, if present
      headers.remove(HTTP_HEADER_CONTENT_LENGTH);
      if (is1_0_) {
        // we can't predict the enflated size, so signal the end of the
        // reponse by Connection: close
        headers.set(HTTP_HEADER_CONNECTION, "close");
        msg->setIsChunked(false);
      } else {
        // send a chunked response since we can't predict the enflated size
        headers.set(HTTP_HEADER_TRANSFER_ENCODING, "chunked");
        msg->setIsChunked(true);
      }
    }
    return decompress_;
  }

  void onBody(std::unique_ptr<folly::IOBuf> chain) noexcept override {
    if (!chain) {
      return;
    }

    if (!decompress_) {
      nextOnBody(std::move(chain));
      return;
    }

    if (!chain->empty()) {
      comprBodyLen_ += chain->computeChainDataLength();
      auto decomprBody = decompressor_.decompress(chain.get());
      if (decompressor_.hasError()) {
        // If we get screwed up streaming, there's literally nothing we can
        // do about it, as we've already sent the headers.  We're just
        // going to have to serve them some gobbledygook
        // (http://en.wikipedia.org/wiki/Gobbledygook)
        XLOG(ERR) << "Error decompressing the response:";
        HTTPException ex(HTTPException::Direction::INGRESS,
                         "Unable to decompress the response");
        ex.setProxygenError(kErrorBadDecompress);
        nextOnError(ex);
        return;
      }
      int size = decomprBody->computeChainDataLength();
      // check for decompression errors
      if (size > 0) {
        if (is1_0_) {
          nextOnBody(std::move(decomprBody));
        } else {
          DestructorCheck::Safety safety(*this);
          nextOnChunkHeader(size);
          if (!safety.destroyed()) {
            nextOnBody(std::move(decomprBody));
          }
          if (!safety.destroyed()) {
            nextOnChunkComplete();
          }
        }
      }
    }
  }

  void onChunkHeader(size_t length) noexcept override {
    if (!decompress_) {
      nextOnChunkHeader(length);
    }
  }

  void onChunkComplete() noexcept override {
    if (!decompress_) {
      nextOnChunkComplete();
    }
  }

  void onTrailers(std::unique_ptr<HTTPHeaders> trailers) noexcept override {
    if (!decompress_) {
      nextOnTrailers(std::move(trailers));
      return;
    }
    trailers_ = std::move(trailers);
  }

  void onEOM() noexcept override {
    if (!decompress_) {
      nextOnEOM();
      return;
    }

    DestructorCheck::Safety safety(*this);
    if (trailers_) {
      nextOnTrailers(std::move(trailers_));
    }
    if (!safety.destroyed()) {
      nextOnEOM();
    }
  }

  const std::string& getFilterName() const noexcept override {
    return kDecompressionFilterName;
  }

  std::unique_ptr<HTTPMessageFilter> clone() noexcept override {
    return std::unique_ptr<DecompressionFilter>(new DecompressionFilter(*this));
  }

  bool allowDSR() const noexcept override {
    // DecompressionFilter is set when the client didn't provide the header
    // Accept-Encoding: gzip. Allowing DSR as BigCache rejects delegations for
    // compressed cached objects.
    return true;
  }

 private:
  explicit DecompressionFilter(const DecompressionFilter& that)
      : decompress_(that.decompress_), is1_0_(that.is1_0_) {
  }

  size_t comprBodyLen_{0};
  std::unique_ptr<HTTPHeaders> trailers_;
  ZlibStreamDecompressor decompressor_;
  bool decompress_ : 1;
  bool is1_0_ : 1;
};
orange-puff commented 3 months ago

Thanks for the response. Feel free to close this issue. I have my own custom decompression filter working exactly as expected