zhangjun / my_notes

0 stars 0 forks source link

triton inference server #15

Open zhangjun opened 10 months ago

zhangjun commented 10 months ago

server

InferHandler->Start() => Process() => StartNewRequest(), Execute()


Server::Server() {
  // A common Handler for other non-inference requests
  common_handler_.reset(new CommonHandler(
      "CommonHandler", tritonserver_, shm_manager_, trace_manager_, &service_,
      &health_service_, common_cq_.get(), restricted_keys));

  for (int i = 0; i < REGISTER_GRPC_INFER_THREAD_COUNT; ++i) {
    model_infer_handlers_.emplace_back(new ModelInferHandler(
        "ModelInferHandler", tritonserver_, trace_manager_, shm_manager_,
        &service_, model_infer_cq_.get(),
        options.infer_allocation_pool_size_ /* max_state_bucket_count */,
        options.infer_compression_level_, restricted_kv,
        options.forward_header_pattern_));
  }

  // Handler for streaming inference requests. Keeps one handler for streaming
  // to avoid possible concurrent writes which is not allowed
  model_stream_infer_handlers_.emplace_back(new ModelStreamInferHandler(
      "ModelStreamInferHandler", tritonserver_, trace_manager_, shm_manager_,
      &service_, model_stream_infer_cq_.get(),
      options.infer_allocation_pool_size_ /* max_state_bucket_count */,
      options.infer_compression_level_, restricted_kv,
      options.forward_header_pattern_));
}

// server/src/grpc/grpc_server.cc
TRITONSERVER_Error*
StartGrpcService(
    std::unique_ptr<triton::server::grpc::Server>* service,
    const std::shared_ptr<TRITONSERVER_Server>& server,
    triton::server::TraceManager* trace_manager,
    const std::shared_ptr<triton::server::SharedMemoryManager>& shm_manager)
{
  TRITONSERVER_Error* err = triton::server::grpc::Server::Create(
      server, trace_manager, shm_manager, g_triton_params.grpc_options_,
      service);
  if (err == nullptr) {
    err = (*service)->Start();
  }

  if (err != nullptr) {
    service->reset();
  }

  return err;
}

// server/src/http_server.cc
TRITONSERVER_Error*
StartHttpService(
    std::unique_ptr<triton::server::HTTPServer>* service,
    const std::shared_ptr<TRITONSERVER_Server>& server,
    triton::server::TraceManager* trace_manager,
    const std::shared_ptr<triton::server::SharedMemoryManager>& shm_manager)
{
  TRITONSERVER_Error* err = triton::server::HTTPAPIServer::Create(
      server, trace_manager, shm_manager, g_triton_params.http_port_,
      g_triton_params.reuse_http_port_, g_triton_params.http_address_,
      g_triton_params.http_forward_header_pattern_,
      g_triton_params.http_thread_cnt_, service);
  if (err == nullptr) {
    err = (*service)->Start();
  }

  if (err != nullptr) {
    service->reset();
  }

  return err;
}

TRITONSERVER_Error*
StartMetricsService(
    std::unique_ptr<triton::server::HTTPServer>* service,
    const std::shared_ptr<TRITONSERVER_Server>& server)
{
  TRITONSERVER_Error* err = triton::server::HTTPMetricsServer::Create(
      server, g_triton_params.metrics_port_, g_triton_params.metrics_address_,
      1 /* HTTP thread count */, service);
  if (err == nullptr) {
    err = (*service)->Start();
  }
  if (err != nullptr) {
    service->reset();
  }

  return err;
}

bool
StartEndpoints(
    const std::shared_ptr<TRITONSERVER_Server>& server,
    triton::server::TraceManager* trace_manager,
    const std::shared_ptr<triton::server::SharedMemoryManager>& shm_manager)
{

#ifdef TRITON_ENABLE_GRPC
  // Enable GRPC endpoints if requested...
  if (g_triton_params.allow_grpc_) {
    TRITONSERVER_Error* err =
        StartGrpcService(&g_grpc_service, server, trace_manager, shm_manager);
    if (err != nullptr) {
      LOG_TRITONSERVER_ERROR(err, "failed to start GRPC service");
      return false;
    }
  }
#endif  // TRITON_ENABLE_GRPC

#ifdef TRITON_ENABLE_HTTP
  // Enable HTTP endpoints if requested...
  if (g_triton_params.allow_http_) {
    TRITONSERVER_Error* err =
        StartHttpService(&g_http_service, server, trace_manager, shm_manager);
    if (err != nullptr) {
      LOG_TRITONSERVER_ERROR(err, "failed to start HTTP service");
      return false;
    }
  }
#endif  // TRITON_ENABLE_HTTP

#ifdef TRITON_ENABLE_METRICS
  // Enable metrics endpoint if requested...
  if (g_triton_params.allow_metrics_) {
    TRITONSERVER_Error* err = StartMetricsService(&g_metrics_service, server);
    if (err != nullptr) {
      LOG_TRITONSERVER_ERROR(err, "failed to start Metrics service");
      return false;
    }
  }
#endif  // TRITON_ENABLE_METRICS

  return true;
}

bool
StopEndpoints()
{
  bool ret = true;

#ifdef TRITON_ENABLE_HTTP
  if (g_http_service) {
    TRITONSERVER_Error* err = g_http_service->Stop();
    if (err != nullptr) {
      LOG_TRITONSERVER_ERROR(err, "failed to stop HTTP service");
      ret = false;
    }

    g_http_service.reset();
  }
#endif  // TRITON_ENABLE_HTTP

#ifdef TRITON_ENABLE_GRPC
  if (g_grpc_service) {
    TRITONSERVER_Error* err = g_grpc_service->Stop();
    if (err != nullptr) {
      LOG_TRITONSERVER_ERROR(err, "failed to stop GRPC service");
      ret = false;
    }

    g_grpc_service.reset();
  }
#endif  // TRITON_ENABLE_GRPC

#ifdef TRITON_ENABLE_METRICS
  if (g_metrics_service) {
    TRITONSERVER_Error* err = g_metrics_service->Stop();
    if (err != nullptr) {
      LOG_TRITONSERVER_ERROR(err, "failed to stop Metrics service");
      ret = false;
    }

    g_metrics_service.reset();
  }
#endif  // TRITON_ENABLE_METRICS

  return ret;
}

bool
StartTracing(triton::server::TraceManager** trace_manager) {}

bool
StopTracing(triton::server::TraceManager** trace_manager) {}

#include "http_server.h"
#include "grpc/grpc_server.h"

std::unique_ptr<triton::server::HTTPServer> g_http_service;
std::unique_ptr<triton::server::grpc::Server> g_grpc_service;

int main() {
  // Trace manager.
  triton::server::TraceManager* trace_manager;

  // Manager for shared memory blocks.
  auto shm_manager = std::make_shared<triton::server::SharedMemoryManager>();

  // ServerOptions
  // Create the option setting to use when creating the inference
  // server object.
  TRITONSERVER_ServerOptions* server_options = nullptr;
  FAIL_IF_ERR(
      TRITONSERVER_ServerOptionsNew(&server_options),
      "creating server options");
  FAIL_IF_ERR(
      TRITONSERVER_ServerOptionsSetModelRepositoryPath(
          server_options, model_repository_path.c_str()),
      "setting model repository path");
  FAIL_IF_ERR(
      TRITONSERVER_ServerOptionsSetLogVerbose(server_options, verbose_level),
      "setting verbose logging level");
  FAIL_IF_ERR(
      TRITONSERVER_ServerOptionsSetBackendDirectory(
          server_options, "/opt/tritonserver/backends"),
      "setting backend directory");
  FAIL_IF_ERR(
      TRITONSERVER_ServerOptionsSetRepoAgentDirectory(
          server_options, "/opt/tritonserver/repoagents"),
      "setting repository agent directory");
  FAIL_IF_ERR(
      TRITONSERVER_ServerOptionsSetStrictModelConfig(server_options, true),
      "setting strict model configuration");
#ifdef TRITON_ENABLE_GPU
  double min_compute_capability = TRITON_MIN_COMPUTE_CAPABILITY;
#else
  double min_compute_capability = 0;
#endif  // TRITON_ENABLE_GPU
  FAIL_IF_ERR(
      TRITONSERVER_ServerOptionsSetMinSupportedComputeCapability(
          server_options, min_compute_capability),
      "setting minimum supported CUDA compute capability");

  TRITONSERVER_Server* server_ptr = nullptr;
  TRITONSERVER_ServerNew(&server_ptr, server_options);
  TRITONSERVER_ServerOptionsDelete(server_options);

  // Use a shared_ptr to manage the lifetime of the server object.
  std::shared_ptr<TRITONSERVER_Server> server(
      server_ptr, TRITONSERVER_ServerDelete);

  // Configure and start tracing if specified on the command line.
  if (!StartTracing(&trace_manager)) {
    exit(1);
  }
  // Start the HTTP, GRPC, and metrics endpoints.
  if (!StartEndpoints(server, trace_manager, shm_manager)) {
    exit(1);
  }

  // Wait until the server is both live and ready. The server will not
  // appear "ready" until all models are loaded and ready to receive
  // inference requests.
  size_t health_iters = 0;
  while (true) {
    bool live, ready;
    FAIL_IF_ERR(
        TRITONSERVER_ServerIsLive(server.get(), &live),
        "unable to get server liveness");
    FAIL_IF_ERR(
        TRITONSERVER_ServerIsReady(server.get(), &ready),
        "unable to get server readiness");
    std::cout << "Server Health: live " << live << ", ready " << ready
              << std::endl;
    if (live && ready) {
      break;
    }

    if (++health_iters >= 10) {
      FAIL("failed to find healthy inference server");
    }

    std::this_thread::sleep_for(std::chrono::milliseconds(500));
  }

  // Stop tracing and the HTTP, GRPC, and metrics endpoints.
  StopEndpoints();
  StopTracing(&trace_manager);
  return 0;
}
zhangjun commented 10 months ago

core

tritonserver

/// Create a new inference request object.
///
/// \param inference_request Returns the new request object.
/// \param server the inference server object.
/// \param model_name The name of the model to use for the request.
/// \param model_version The version of the model to use for the
/// request. If -1 then the server will choose a version based on the
/// model's policy.
/// \return a TRITONSERVER_Error indicating success or failure.
struct TRITONSERVER_Error*
TRITONSERVER_InferenceRequestNew(
    struct TRITONSERVER_InferenceRequest** inference_request,
    struct TRITONSERVER_Server* server, const char* model_name,
    const int64_t model_version);

/// Delete an inference request object.
///
/// \param inference_request The request object.
/// \return a TRITONSERVER_Error indicating success or failure.
struct TRITONSERVER_Error*
TRITONSERVER_InferenceRequestDelete(
    struct TRITONSERVER_InferenceRequest* inference_request);

/// Get the ID for a request. The returned ID is owned by
/// 'inference_request' and must not be modified or freed by the
/// caller.
///
/// \param inference_request The request object.
/// \param id Returns the ID.
/// \return a TRITONSERVER_Error indicating success or failure.
struct TRITONSERVER_Error*
TRITONSERVER_InferenceRequestId(
    struct TRITONSERVER_InferenceRequest* inference_request, const char** id);

/// Set the ID for a request.
///
/// \param inference_request The request object.
/// \param id The ID.
/// \return a TRITONSERVER_Error indicating success or failure.
struct TRITONSERVER_Error*
TRITONSERVER_InferenceRequestSetId(
    struct TRITONSERVER_InferenceRequest* inference_request, const char* id);

/// Set the priority for a request. The default is 0 indicating that
/// the request does not specify a priority and so will use the
/// model's default priority.
///
/// \param inference_request The request object.
/// \param priority The priority level.
/// \return a TRITONSERVER_Error indicating success or failure.
struct TRITONSERVER_Error*
TRITONSERVER_InferenceRequestSetPriorityUInt64(
    struct TRITONSERVER_InferenceRequest* inference_request, uint64_t priority);

/// Get the timeout for a request, in microseconds. The default is 0
/// which indicates that the request has no timeout.
///
/// \param inference_request The request object.
/// \param timeout_us Returns the timeout, in microseconds.
/// \return a TRITONSERVER_Error indicating success or failure.
struct TRITONSERVER_Error*
TRITONSERVER_InferenceRequestTimeoutMicroseconds(
    struct TRITONSERVER_InferenceRequest* inference_request,
    uint64_t* timeout_us);

/// Get model used to produce a response. The caller does not own the
/// returned model name value and must not modify or delete it. The
/// lifetime of all returned values extends until 'inference_response'
/// is deleted.
///
/// \param inference_response The response object.
/// \param model_name Returns the name of the model.
/// \param model_version Returns the version of the model.
/// this response.
/// \return a TRITONSERVER_Error indicating success or failure.
struct TRITONSERVER_Error*
TRITONSERVER_InferenceResponseModel(
    struct TRITONSERVER_InferenceResponse* inference_response,
    const char** model_name, int64_t* model_version);
zhangjun commented 10 months ago

模型管理

zhangjun commented 10 months ago

scheduler