baidu / tera

An Internet-Scale Database.
BSD 3-Clause "New" or "Revised" License
1.89k stars 437 forks source link

异步Put操作,Table实例释放问题。 #1324

Closed Lambert-zw closed 6 years ago

Lambert-zw commented 6 years ago

您好: 我看了下teracli_main.cc中关于BatchPutOp使用方式的例子,OpenTable操作用一个智能指针接收,通过IsPutFinished来判断异步的put操作是否执行完, 但是我看 image cur_commit_pending_counter_.Sub(row_mutation->MutationNum()); 这个代码后面还有一些关于Table实例成员变量的操作,假设极限情况 这行语句刚执行,那边的IsPutFinished就出去了,函数走完,Table的智能指针释放了,Table实例就释放掉了。cur_commit_pending_counter_.Sub(row_mutation->MutationNum()); 这段代码后面的所有访问Table实例成员变量的代码都是不安全的。 这种问题会产生崩溃吗 ?

Lambert-zw commented 6 years ago

`int32_t BatchPutOp(Client client, int32_t argc, std::string argv, ErrorCode* err) { if (argc != 4) { LOG(ERROR) << "args number error: " << argc << ", need 4."; PrintCmdHelpInfo(argv[1]); return -1; }

std::string tablename = argv[2];
std::string record_file = argv[3];
TablePtr table(client->OpenTable(tablename, err));
if (table == NULL) {
    LOG(ERROR) << "fail to open table";
    return -1;
}
const int32_t buf_size = 1024 * 1024;
char buf[buf_size];
std::ifstream stream(record_file.c_str());

// input record format: rowkey columnfamily:qualifier value
// or: key:value
std::vector<std::string> input_v;
g_start_time = time(NULL);
while (stream.getline(buf, buf_size)) {
    SplitString(buf, " ", &input_v);
    if (input_v.size() != 3 && input_v.size() != 2) {
        LOG(ERROR) << "input file format error, skip it: " << buf;
        continue;
    }
    std::string rowkey = input_v[0];
    if (FLAGS_readable && !ParseDebugString(input_v[0], &rowkey)) {
        LOG(ERROR) << "input file format error, skip it: " << buf;
        continue;
    }
    std::string family;
    std::string qualifier;
    std::string value = input_v[input_v.size() - 1];
    if (FLAGS_readable && !ParseDebugString(input_v[input_v.size() - 1], &value)) {
        LOG(ERROR) << "input file format error, skip it: " << buf;
        continue;
    }
    RowMutation* mutation = table->NewRowMutation(rowkey);
    if (input_v.size() == 2) {
        // for kv mode
        mutation->Put(value);
    } else {
        // for table mode, put(family, qulifier, value)
        ParseCfQualifier(input_v[1], &family, &qualifier);
        mutation->Put(family, qualifier, value);
    }
    mutation->SetCallBack(BatchPutCallBack);
    table->ApplyMutation(mutation);
}
// 这里如果产生极限情况的话
while (!table->IsPutFinished()) {
    usleep(100000);
}

g_end_time = time(NULL);
g_used_time = g_end_time-g_start_time;
LOG(INFO) << "Write done,write_key_num=" << g_key_num << " used_time=" << g_used_time <<std::endl;
return 0;

} `

datonli commented 6 years ago

@NightStarSoul 感谢你的反馈,这个问题确实存在。在tera内部版本的代码中已经把sdk端OpenTable返回的Table 指针——其实是TableWrapper 指针,所包含的TableImpl* 改为shared_ptr的了,table的生命周期有shared_ptr保证,这个问题会得到解决。

datonli commented 6 years ago

新版本已更新,问题已修复