oatpp / oatpp-consul

oatpp client for consul
https://oatpp.io/
Apache License 2.0
37 stars 13 forks source link

how to make a requestExecutor? #1

Closed jackieliuqihe closed 5 years ago

jackieliuqihe commented 5 years ago

Helloc ,i am a new guy to learn oatpp-consul,i have search too many about oatpp-consul ,but i can't register a service to consul,i think the problem is how to make a requesexecutor,please help me. thank you.

lganzzzo commented 5 years ago

Hello @jackieliuqihe ,

Thanks for the question.

  // Create connection provider for Consul
  // In case you need secure connection provider so you can connect to Consul via https see oatpp-libressl and tls-libressl example project
  auto connectionProvider = oatpp::network::client::SimpleTCPConnectionProvider::createShared("localhost", 8500 /* port */);

  // Create httpRequestExecutor
  auto requestExecutor = oatpp::web::client::HttpRequestExecutor::createShared(connectionProvider);

  // Create and return consul client
  auto client = oatpp::consul::Client::createShared(requestExecutor);

  // Create AgentServiceRegisterPayload with info of your service
  auto payload = oatpp::consul::rest::AgentServiceRegisterPayload::createShared();
  payload->id = "service_id";
  payload->name = "service_name";
  payload->port = 8000;

  /* make API call */
  auto response = restClient->agentServiceRegister(payload);

This should be it. If you have any additional questions please let me know

Best Regards, lganzzzo

jackieliuqihe commented 5 years ago

Hello @jackieliuqihe ,

Thanks for the question.

  // Create connection provider for Consul
  // In case you need secure connection provider so you can connect to Consul via https see oatpp-libressl and tls-libressl example project
  auto connectionProvider = oatpp::network::client::SimpleTCPConnectionProvider::createShared("localhost", 8500 /* port */);

  // Create httpRequestExecutor
  auto requestExecutor = oatpp::web::client::HttpRequestExecutor::createShared(connectionProvider);

  // Create and return consul client
  auto client = oatpp::consul::Client::createShared(requestExecutor);

  // Create AgentServiceRegisterPayload with info of your service
  auto payload = oatpp::consul::rest::AgentServiceRegisterPayload::createShared();
  payload->id = "service_id";
  payload->name = "service_name";
  payload->port = 8000;

  /* make API call */
  auto response = restClient->agentServiceRegister(payload);

This should be it. If you have any additional questions please let me know

Best Regards, lganzzzo

thank you,i have solve this problem in your way.....

lganzzzo commented 5 years ago

no problem

jackieliuqihe commented 5 years ago

Hello, @lganzzzo i have a new question,now ,i want to make a async request,i have seen the async example,but i can't find the way to return result,for example:

ENDPOINT_ASYNC("GET", "/body/string", EchoStringBody) {

ENDPOINT_ASYNC_INIT(EchoStringBody)

Action act() override {
  /* return Action to start child coroutine to read body */
  return request->readBodyToStringAsync(this, &EchoStringBody::returnResponse);
}

Action returnResponse(const oatpp::String& body){
  /* return Action to return created OutgoingResponse */
  return _return(controller->createResponse(Status::CODE_200, body));
}

};

how can i call returnResponse to return my json result, thanks.

lganzzzo commented 5 years ago

Hello @jackieliuqihe

If you just want to return "static" json:

  ENDPOINT_ASYNC("GET", "/json-async", GetJSONAsync) {

    ENDPOINT_ASYNC_INIT(GetJSONAsync)

    Action act() override {
      auto dto = MessageDto::createShared();
      dto->message = "Hello json";
      return _return(controller->createDtoResponse(Status::CODE_200, dto));
    }

  };

Regards, Leonid

jackieliuqihe commented 5 years ago

hello @lganzzzo In fact, my application scenario is that when a request comes in, my background has a large amount of computation on the request, so I hope the request is not blocked, and when I complete the calculation, I automatically return the result of calculation.

lganzzzo commented 5 years ago

@jackieliuqihe Ok, so if your scenario is as follows:

1) --> Receive request 2) Make calculations 3) <-- Send response with result of calculations

Assuming you are trying to use oatpp Async API. You have to make sure that your "Calculations" are also computed in Asynchronous way. If your calculations are not asynchronous they will block whole async mechanism.

So you have to make sure that you are able to program your calculations in asynchronous way. Here is an example of async iterative product calculation:

  class CalculationCoroutine : public oatpp::async::CoroutineWithResult<CalculationCoroutine, v_word32> {
  private:
    v_word32 m_a;
    v_word32 m_b;
    v_word32 m_counter;
    v_word32 m_result;
  public:

    CalculationCoroutine(v_int32 a, v_int32 b)
      : m_a(a)
      , m_b(b)
      , m_counter(0)
      , m_result(0)
    {}

    /**
     * Async representation of "for" loop
     * for(m_counter = 0; m_counter < m_b; m_counter++) {
     *    m_result += m_a;
     * }
     */
    Action act() override {
      if(m_counter < m_b) {
        m_counter ++;
        m_result += m_a;
        return repeat();
      }
      return _return(m_result);
    }

  };

  ENDPOINT_ASYNC("GET", "/multiply/{arg1}/{arg2}", MultiplyAsync) {

    ENDPOINT_ASYNC_INIT(MultiplyAsync)

    Action act() override {
      bool success;
      v_int32 a = oatpp::utils::conversion::strToInt32(request->getPathVariable("arg1"), success);
      OATPP_ASSERT_HTTP(success, Status::CODE_400, "Invalid arg1. Sould be integer value");
      v_int32 b = oatpp::utils::conversion::strToInt32(request->getPathVariable("arg2"), success);
      OATPP_ASSERT_HTTP(success, Status::CODE_400, "Invalid arg2. Sould be integer value");
      return startCoroutineForResult<CalculationCoroutine>(&MultiplyAsync::onResultCalculated, a, b);
    }

    Action onResultCalculated(v_word32 result) {
      auto dto = MultiplicationResultDto::createShared();
      dto->result = result;
      return _return(controller->createDtoResponse(Status::CODE_200, dto));
    }

  };

output:

$ curl http://localhost:8000/multiply/10/150
{"result": 1500}
jackieliuqihe commented 5 years ago

@lganzzzo thank you very much! i have test the code ,it is solved my problem,the oatpp is a very good project!.....

lganzzzo commented 5 years ago

Good to hear that! Let me know if you have more questions.

jackieliuqihe commented 5 years ago

@lganzzzo i'm very sorry,Actually,My problem hasn't been solved yet. Action act() override { if(m_counter < m_b) { m_counter ++; m_result += m_a; sleep(10); return repeat(); } return _return(m_result); }

in the act() code ,i have add sleep(10),and if i set a big value in m_b,then my program can not accept request any more,i don't know how to make the calculations in async , and how to return the result.

lganzzzo commented 5 years ago

Hello @jackieliuqihe

Async methods should not have sleep(..) calls because sleep is actually a blocking call.

Please give me more info - why do you need sleep(10) in the method?

jackieliuqihe commented 5 years ago

Hello @lganzzzo the sleep code is a test code,it‘s just for simulation big calculations, Your meaning is that i shouldn't make big calculations in the act()?......

when a request arrive,i need to keep the quest,when i complete the calculation,then return the result,but in the meantime,i can accept another request ,too.....but if i put the calculations in the act(),the program can't accept another request.

lganzzzo commented 5 years ago

@jackieliuqihe

The point is that you have to rewrite all your calculations in async manner. It should be no blocking calls in the async code.

Example: Blocking loop (the program blocks while the loop iterates)

for(m_counter=0; m_counter < m_b; m_counter ++ ) {
  m_result += m_a;
}

Same loop in Non blocking Async manner

Action act() override {
  if(m_counter < m_b) {
    m_counter ++;
    m_result += m_a;
    return repeat();
  }
  return _return(m_result);
}

This example is trivial and artificial. In most cases you don't need to decompose such loops in Async methods. But the point is, that you have to understand your calculations and decompose heavy blocking parts to make them work in async manner.

If you show me calculations that you are trying to do, I'll try to point out the parts you have to make async

jackieliuqihe commented 5 years ago

Hello @lganzzzo the calculations in my colleague's program,and i must wait the result until it's finished,that's why i use sleep instead for test.

lganzzzo commented 5 years ago

@jackieliuqihe How do you make the call to other program? Can you please show me the code. Most probably it is possible to make it Async.

jackieliuqihe commented 5 years ago

Hello @lganzzzo by RabbitMQ,in my grogram ,i create two thread,one for recv ,one for send ,when i receive the request,i will put it into the send queue,and then i wait the result in the ThreadRecvMessage() ,here is the code:

CMQConnect::CMQConnect(void) {

}

CMQConnect::~CMQConnect(void) { if (m_pSend.ullThreadID != 0 && pthread_join(m_pSend.ullThreadID, NULL)) { printf("thread is not exit...\n"); }

if (m_pRecv.ullThreadID != 0 && pthread_join(m_pRecv.ullThreadID, NULL))
{
    printf("thread is not exit...\n");
}

}

bool CMQConnect::InitSendMQ() { //建立mq producer MQConnectNode& sendNode = CMQConnect::GetInstance()->GetSendConnectNode(); int status; amqp_socket_t *socket = NULL;

socket = amqp_tcp_socket_new(sendNode.pConn);
if (!socket) {
    die_on_error(1, "1");
}

status = amqp_socket_open(socket, sendNode.strHostName.c_str(), sendNode.iPort);
if (status) {
    die("opening TCP socket");
}

die_on_amqp_error(amqp_login(sendNode.pConn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
    sendNode.strUserName.c_str(), sendNode.strPassword.c_str()),
    "Logging in");
amqp_channel_open(sendNode.pConn, 1);
die_on_amqp_error(amqp_get_rpc_reply(sendNode.pConn), "Opening channel");

// 初始化完成,建立发送MQ消息线程
int iErr = pthread_create(&sendNode.ullThreadID, NULL, &CMQConnect::ThreadSendMessage, sendNode.pConn);
if (iErr != 0)
{
    printf("%s:%d\n", "threadSendMess", strerror(iErr));
}

return true;

}

bool CMQConnect::InitRecvMQ() { //建立mq consumer MQConnectNode& recvNode = CMQConnect::GetInstance()->GetRecvConnectNode(); int status; amqp_socket_t *socket = NULL;

amqp_bytes_t queuename;
/*分配并初始化一个新的amqp_connection_state_t对象,用该函数创建的
amqp_connection_state_t对象需要用amqp_destroy_connection()函数来释放*/
//conn = amqp_new_connection();

/*创建一个新的TCP socket
需调用amqp_connection_close()释放socket资源*/
socket = amqp_tcp_socket_new(recvNode.pConn);
if (!socket) {
    die("creating TCP socket");
}

/**
* 打开socket连接
*
* 该函数打开从amqp_tcp_socket_new()或amqp_ssl_socket_new()返回的socket连接。
* 该函数应当在设置socket选择之后并在使用amqp_set_socket()分配socket到AMQP连接
* 之前调用。
*
* 成功返回AMQP_STATUS_OK,失败返回一个amqp_status_enum
*/
status = amqp_socket_open(socket, recvNode.strHostName.c_str(), recvNode.iPort);
if (status) {
    die("opening TCP socket");
}
/**
*login到broker
*
* 使用amqp_open_socket和amqp_set_sockfd后,调用amqp_login完成到broker的连接
*
* \param [in] state 连接对象
* \param [in] vhost 虚拟主机连接到broker,大多数broker默认为“/”
* \param [in] channel_max 连接通道数量的限制,0代表无限制,较好的默认值是AMQP_DEFAULT_MAX_CHANNELS
* \param [in] frame_max 线路上的AMQP帧的最大大小以请求代理进行此连接,最小4096,
* 最大2^31-1,较好的默认值是131072 (128KB)或者AMQP_DEFAULT_FRAME_SIZE
* \param [in] heartbeat 心跳帧到broker的请求之间的秒数。设0表示禁用心跳
* \param [in] sasl_method SASL method用来验证broker,以下是SASL methods的实现:
*             -AMQP_SASL_METHOD_PLAIN:该方法需要按如下顺序跟两个参数:
*               const char* username, and const char* password.
*             -AMQP_SASL_METHOD_EXTERNAL:该方法需要跟参数:
*               const char* identity.
*
* 返回值:amqp_rpc_reply_t 标明成功或失败
*/
die_on_amqp_error(amqp_login(recvNode.pConn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
    recvNode.strUserName.c_str(), recvNode.strPassword.c_str()),
    "Logging in");

amqp_channel_open(recvNode.pConn, 1);
/**
* 获取最后一个全局amqp_rpc_reply
*
* 此API方法对应于大多数同步的AMQP方法返回一个指向解码方法结果的指针。
*
* \param [in] state 连接对象
* \return 最新的amqp_rpc_reply_t
* - r.reply_type == AMQP_RESPONSE_NORMAL. RPC已成功完成
* - r.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION. broker返回异常
* - r.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION. 库内发生异常
*/
die_on_amqp_error(amqp_get_rpc_reply(recvNode.pConn), "Opening channel");

{
    amqp_queue_declare_ok_t *r = amqp_queue_declare(
        recvNode.pConn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
    die_on_amqp_error(amqp_get_rpc_reply(recvNode.pConn), "Declaring queue");
    /**
    * 复制一个amqp_bytes_t缓冲区。
    *
    * clone缓冲区并复制内容
    *
    * 与输出相关的内存分配给amqp_bytes_malloc(),且应该amqp_bytes_free()释放
    */
    queuename = amqp_bytes_malloc_dup(r->queue);
    if (queuename.bytes == NULL) {
        fprintf(stderr, "Out of memory while copying queue name");
        return false;
    }
}
amqp_queue_bind(recvNode.pConn, 1, queuename, amqp_cstring_bytes(MQ_EXCHANGE_DIRECT),
    amqp_cstring_bytes(MQ_BINDINGKEY_DEVICE_MODULE), amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(recvNode.pConn), "Binding queue");

amqp_basic_consume(recvNode.pConn, 1, queuename, amqp_empty_bytes, 0, 1, 0,
    amqp_empty_table);
die_on_amqp_error(amqp_get_rpc_reply(recvNode.pConn), "Consuming");

// 初始化完成,建立接收MQ消息线程
int iErr = pthread_create(&recvNode.ullThreadID, NULL, &CMQConnect::ThreadRecvMessage, recvNode.pConn);
if (iErr != 0)
{
    printf("%s:%d\n", "threadRecvMess", strerror(iErr));
}

return true;

}

void CMQConnect::ThreadSendMessage(void arg) { printf("threadSendMess"); MQConnectNode& sendNode = CMQConnect::GetInstance()->GetSendConnectNode(); CMutexDeque& deqTask = CMQConnect::GetInstance()->GetTaskDeq(); Json::Value value; /set properties/ amqp_basic_properties_t props; props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_REPLY_TO_FLAG | AMQP_BASIC_CORRELATION_ID_FLAG; props.content_type = amqp_cstring_bytes("text/plain"); props.delivery_mode = 2; / persistent delivery mode / props.reply_to = amqp_cstring_bytes(MQ_BINDINGKEY_DEVICE_MODULE); if (props.reply_to.bytes == NULL) { fprintf(stderr, "Out of memory while copying queue name"); return NULL; } props.correlation_id = amqp_cstring_bytes("1");//暂时没用

while (true)
{
    if (!deqTask.empty())
    {
        string strTask = deqTask.front();
        deqTask.pop_front();
        try
        {
            //解析成功
            if (g_JsonReader->parse(strTask, value))
            {
                string strDest = value["Dest"].asString();
                string strContent = value["Content"].toStyledString();

                cout <<"strContent="<< strContent << endl;
                cout <<"strDest=" << strDest << endl;

                // do send message
                die_on_error(amqp_basic_publish(sendNode.pConn, 1, amqp_cstring_bytes(MQ_EXCHANGE_DIRECT),
                    amqp_cstring_bytes(strDest.c_str()), 0, 0,
                    &props, amqp_cstring_bytes(strContent.c_str())),
                    "Publishing");
            }
        }
        catch (...)
        {
            cout << "parse jason error:" << strTask << endl;
        }
    }

}
pthread_exit(0);

}

void CMQConnect::ThreadRecvMessage(void arg) { MQConnectNode& recvNode = CMQConnect::GetInstance()->GetRecvConnectNode(); printf("threadRecvMess"); while (true) { amqp_rpc_reply_t res; amqp_envelope_t envelope;

    /**
    * 释放amqp_connection_state_t占用的内存
    *
    * 释放与任何通道相关的amqp_connection_state_t对象拥有的内存,允许库重用。
    * 在调用该函数之前使用库返回的任何内存,会导致未定义的行为。
    */
    amqp_maybe_release_buffers(recvNode.pConn);

    /**
    * 等待并消费一条消息
    *
    * 在任何频道上等待basic.deliver方法,一旦收到basic.deliver它读取该消息,并返回。
    * 如果在basic.deliver之前接收到任何其他方法,则此函数将返回一个包含
    * ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION和
    * ret.library_error == AMQP_STATUS_UNEXPECTED_STATE的amqp_rpc_reply_t。
    * 然后调用者应该调用amqp_simple_wait_frame()来读取这个帧并采取适当的行动。
    *
    * 在使用amqp_basic_consume()函数启动消费者之后,应该使用此函数
    *
    *  \returns 一个amqp_rpc_reply_t对象,成功时,ret.reply_type == AMQP_RESPONSE_NORMAL
    *  如果ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION,并且
    *  ret.library_error == AMQP_STATUS_UNEXPECTED_STATE
    *  如果收到AMQP_BASIC_DELIVER_METHOD以外的帧,则调用者应调用amqp_simple_wait_frame()
    *  来读取此帧并采取适当的操作。
    */
    res = amqp_consume_message(recvNode.pConn, &envelope, NULL, 0);

    if (AMQP_RESPONSE_NORMAL != res.reply_type) {
        break;
    }

    // received the message ,and deal it

    printf("Delivery %u, exchange %.*s routingkey %.*s\n",
        (unsigned)envelope.delivery_tag, (int)envelope.exchange.len,
        (char *)envelope.exchange.bytes, (int)envelope.routing_key.len,
        (char *)envelope.routing_key.bytes);

    if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
        printf("Content-type: %.*s\n",
            (int)envelope.message.properties.content_type.len,
            (char *)envelope.message.properties.content_type.bytes);
    }
    printf("----\n");

    amqp_dump(envelope.message.body.bytes, envelope.message.body.len);

    /**
    * 释放在amqp_consume_message()中分配的与amqp_envelope_t相关联的内存
    */
    amqp_destroy_envelope(&envelope);
}

}

MQConnectNode& CMQConnect::GetSendConnectNode() { return m_pSend; }

MQConnectNode& CMQConnect::GetRecvConnectNode() { return m_pRecv; }

void CMQConnect::StartSendMQ(CScString strHostName, UINT iPort, CScString strUserName, CScString strPassword) { m_pSend.pConn = amqp_new_connection(); m_pSend.strHostName = strHostName; m_pSend.iPort = iPort; m_pSend.strUserName = strUserName; m_pSend.strPassword = strPassword;

InitSendMQ();

}

void CMQConnect::StartRecvMQ(CScString strHostName, UINT iPort, CScString strUserName, CScString strPassword) { m_pRecv.pConn = amqp_new_connection(); m_pRecv.strHostName = strHostName; m_pRecv.iPort = iPort; m_pRecv.strUserName = strUserName; m_pRecv.strPassword = strPassword;

InitRecvMQ();

}

void CMQConnect::SendMQInfo(std::string strIdentify, std::string strContent) { Json::Value jsTask; jsTask["Dest"] = strIdentify;

Json::Value jsAddDevice;
jsAddDevice[JSON_KEY_JSONRPC] = JSON_VALUE_JSONRPC_JSONRPC20;
jsAddDevice[JSON_KEY_METHOD] = JSON_VALUE_METHOD_ADD_DEVICE;
jsAddDevice[JSON_KEY_PARAMS] = strContent;

jsTask["Content"] = jsAddDevice;

fprintf(stderr, jsTask.toStyledString().c_str());

m_DeqTask.push_back(jsTask.toStyledString());

}

CMutexDeque& CMQConnect::GetTaskDeq() { return m_DeqTask; }

lganzzzo commented 5 years ago

@jackieliuqihe

I looked through rabbitmq-c library and it seems like it has no suitable non-blocking interface for coroutines.

So at this point I'll recommend to go with oatpp simple API (not Async).

Also I will investigate other amqp libs and create async example some time later.

Regards, Leonid

jackieliuqihe commented 5 years ago

Hello @lganzzzo really thanks for your help,i will wait,too。

lganzzzo commented 5 years ago

No problem