apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.44k stars 3.52k forks source link

I want to use arrow to recode some projects, but when I use arrow to read csv and compute some indicator, the speed of arrow c++ is even lower than python code, is there something wrong? #38370

Open tianjixuetu opened 1 year ago

tianjixuetu commented 1 year ago

Describe the usage question you have. Please include as many useful details as possible.

I want to use arrow to recode some project, for example, empyrical, pyfolio and backtrader, the first target is reading csv data and compute sharpe ratio, I test three ways, however the arrow c++ is lowest.

  1. arrow c++ component
  2. pure python
  3. pure c++

test speed

I push my code and data to a repo which is learn_arrow

I am doubt that there is something wrong. is there any way to speed the arrow to read data and compute?

the arrow code is below, you can get all code and data from learn_arrow

#include <arrow/api.h>
#include <arrow/io/api.h>
#include "arrow/csv/api.h"
#include <arrow/compute/api.h>
#include <iostream>
#include <chrono>
//#include "../empyrical/empyrical.h"

arrow::Status RunMain(){
    std::shared_ptr<arrow::io::ReadableFile> infile;

    ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open("./fund_nav.csv"));
    // (文档部分:CSV 表格声明)
    std::shared_ptr<arrow::Table> csv_table;
    // CSV 读取器有多个对象,用于不同选项。现在,我们将使用默认值。
    ARROW_ASSIGN_OR_RAISE(
        auto csv_reader,
        arrow::csv::TableReader::Make(
            arrow::io::default_io_context(), infile, arrow::csv::ReadOptions::Defaults(),
            arrow::csv::ParseOptions::Defaults(), arrow::csv::ConvertOptions::Defaults()));
    // 读取表格。
    ARROW_ASSIGN_OR_RAISE(csv_table, csv_reader->Read());

    // 输出显示Table的元数据信息
    // std::cout << "Table Metadata:" << std::endl;
    // std::cout << "Number of columns: " << csv_table->num_columns() << std::endl;
    // std::cout << "Number of rows: " << csv_table->num_rows() << std::endl;
    // std::cout << "Schema: " << csv_table->schema()->ToString() << std::endl;

    // 输出显示Table的数据
    // for (int i = 0; i < csv_table->num_columns(); ++i) {
    //   std::shared_ptr<arrow::Array> column = csv_table->column(i);
    //   std::cout << "Column " << i << ": " << column->ToString() << std::endl;
    // }

    // 1. 显示table信息到std::cout的方法
    // std::shared_ptr<arrow::RecordBatch> record_batch;
    // arrow::Result<std::shared_ptr<arrow::RecordBatch>> result = csv_table->CombineChunksToBatch(); // 执行某个操作,返回Result
    // if (result.ok()) {
    //   record_batch = result.ValueOrDie();
    //   // 在这里使用 record_batch
    // } else {
    //   // 处理错误
    //   std::cerr << "Error: " << result.status().ToString() << std::endl;
    // }
    // //arrow::PrettyPrint(*record_batch, 2, &std::cout);
    // arrow::Status status = arrow::PrettyPrint(*record_batch, 2, &std::cout);
    // if (!status.ok()) {
    //   // 处理错误,例如打印错误信息
    //   std::cerr << "Error: " << status.ToString() << std::endl;
    // }
    // 2. 显示table信息到std::cout的方法
    // std::cout << csv_table->ToString() << std::endl;
    // 3. 显示table信息到std::cout的方法
    // arrow::Status status = arrow::PrettyPrint(*csv_table, 2, &std::cout);
    // if (!status.ok()) {
    //   // 处理错误,例如打印错误信息
    //   std::cerr << "Error: " << status.ToString() << std::endl;
    // }
    // 开始计算夏普率
    // std::cout << "一年的交易日有" << AnnualizationFactors::DAILY << "天" << std::endl;
    // std::cout << DAILY << std::endl;

    // 计算收益率
    auto start_time = std::chrono::high_resolution_clock::now();
    arrow::Datum fund_returns;
    arrow::Datum fund_diff;
    std::shared_ptr<arrow::ChunkedArray> cum_nav = csv_table->GetColumnByName("复权净值");
    std::shared_ptr<arrow::ChunkedArray> now_cum_nav = cum_nav->Slice(1,cum_nav->length()-1);
    std::shared_ptr<arrow::ChunkedArray> pre_cum_nav = cum_nav->Slice(0,cum_nav->length()-1);
    ARROW_ASSIGN_OR_RAISE(fund_diff, arrow::compute::CallFunction(
                                          "subtract", {now_cum_nav,pre_cum_nav}));
    ARROW_ASSIGN_OR_RAISE(fund_returns, arrow::compute::CallFunction(
                                          "divide", {fund_diff,pre_cum_nav}));
    // // 获取结果数组
    // std::cout << "Datum kind: " << fund_returns.ToString()
    //           << " content type: " << fund_returns.type()->ToString() << std::endl;

    // // std::cout << fund_returns.scalar_as<arrow::DoubleScalar>().value << std::endl;
    // std::cout << fund_returns.chunked_array()->ToString() << std::endl;
    // 计算夏普率
    arrow::Datum avg_return;
    arrow::Datum avg_std;
    arrow::Datum daily_sharpe_ratio;
    arrow::Datum sharpe_ratio;
    arrow::Datum sqrt_year;
    // 创建 Arrow Double 标量
    double days_of_year_double = 252.0;
    std::shared_ptr<arrow::Scalar> days_of_year = arrow::MakeScalar(days_of_year_double);
    ARROW_ASSIGN_OR_RAISE(sqrt_year, arrow::compute::CallFunction(
                                          "sqrt", {days_of_year}));
    ARROW_ASSIGN_OR_RAISE(avg_return, arrow::compute::CallFunction(
                                          "mean", {fund_returns}));
    arrow::compute::VarianceOptions variance_options;
    variance_options.ddof = 1;
    ARROW_ASSIGN_OR_RAISE(avg_std, arrow::compute::CallFunction(
                                          "stddev", {fund_returns},&variance_options));
    ARROW_ASSIGN_OR_RAISE(daily_sharpe_ratio, arrow::compute::CallFunction(
                                          "divide", {avg_return,avg_std}));
    ARROW_ASSIGN_OR_RAISE(sharpe_ratio, arrow::compute::CallFunction(
                                          "multiply", {daily_sharpe_ratio,sqrt_year}));

    std::cout << "计算得到的夏普率为 : " << sharpe_ratio.scalar_as<arrow::DoubleScalar>().value << std::endl;

    auto end_time = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);

    std::cout << "c++读取数据,然后计算夏普率一共耗费时间为: " << duration.count()/1000.0 << " ms" << std::endl;

    return arrow::Status::OK();
  }

int main() {
  arrow::Status st = RunMain();
  if (!st.ok()) {
    std::cerr << st << std::endl;
    return 1;
  }

  return 0;
}

Component(s)

C++

tianjixuetu commented 12 months ago

why arrow c++ compute so slowly?

mapleFU commented 12 months ago
  1. Are you building using release mode?
  2. Can you provide the correspond Python code?
tianjixuetu commented 12 months ago

yes, all the code is in the repo, arrow c++ , pure python, pure c++ @mapleFU

mapleFU commented 12 months ago

Can you provide the correspond Python code?

And what is a.out here?

Also, pandas is not "pure python", depending on version, it might switch to arrow or other underlying implementions.

tianjixuetu commented 12 months ago

a.out is the compilation result of the pure c++, the pure c++ code is:

#include <iostream>
#include <fstream>
#include <sstream>
#include <vector>
#include <cmath>
#include <chrono>

// 计算均值
double Mean(const std::vector<double>& data) {
    double sum = 0.0;
    for (const double& value : data) {
        sum += value;
    }
    return sum / data.size();
}

// 计算标准差
double StandardDeviation(const std::vector<double>& data) {
    double mean = Mean(data);
    double variance = 0.0;
    for (const double& value : data) {
        variance += std::pow(value - mean, 2);
    }
    return std::sqrt(variance / (data.size()-1));
}

int main() {

    auto start_time = std::chrono::high_resolution_clock::now();
    std::ifstream file("./fund_nav.csv");
    if (!file.is_open()) {
        std::cerr << "Failed to open the CSV file." << std::endl;
        return 1;
    }

    std::vector<double> returns;
    double previous_nav = 0.0;
    std::string line;
    getline(file, line); // Skip the header line

    while (getline(file, line)) {
        std::istringstream ss(line);
        std::string date, nav_str,cum_nav_str;

        getline(ss, date, ',');
        getline(ss, nav_str, ',');
        getline(ss, cum_nav_str, ',');

        //std::cout << line << std::endl;

        try {
            double nav = std::stod(cum_nav_str);
            if (previous_nav > 0.0) {
                double daily_return = (nav - previous_nav) / previous_nav;
                returns.push_back(daily_return);
            }
            previous_nav = nav;
        } catch (const std::invalid_argument& e) {
            std::cerr << "Invalid data found: " << nav_str << std::endl;
        }
    }

    file.close();

    if (returns.empty()) {
        std::cerr << "No returns data found." << std::endl;
        return 1;
    }

    // 计算均值和标准差
    double mean_return = Mean(returns);
    double std_deviation = StandardDeviation(returns);

    // 计算夏普比率
    double trading_days_per_year = 252.0;
    double sqrt_trading_days_per_year = std::sqrt(trading_days_per_year);
    double sharpe_ratio = (mean_return * sqrt_trading_days_per_year) / std_deviation;
    // std::cout << "mean_return " << mean_return << std::endl;
    // std::cout << "std_deviation " << std_deviation << std::endl;
    // std::cout << "sqrt_trading_days_per_year " << sqrt_trading_days_per_year << std::endl;

    std::cout << "the result of sharpe ratio : " << sharpe_ratio << std::endl;
    auto end_time = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);

    std::cout << "the consume time of pure c++ read data and caculate the sharpe_ratio: " << duration.count()/1000.0 << " ms" << std::endl;

    return 0;
}

the python code is :

import pandas as pd 
import empyrical as ep
import time 
a = time.perf_counter()
data = pd.read_csv("./fund_nav.csv")
returns = data['复权净值'].pct_change().dropna()
sharpe_ratio = ep.sharpe_ratio(returns)
# print("mean_return ",returns.mean())
# print("std_deviation ",returns.std())
# print("sqrt_trading_days_per_year ",252**0.5)
print("the result of sharpe ratio : ", sharpe_ratio)
b = time.perf_counter()
print(f"the consume time of python read data and caculate the sharpe_ratio:  {(b-a)*1000.0} ms")

Also, pandas is not "pure python", depending on version, it might switch to arrow or other underlying implementions.

you are right , the python code is not pure python code, it uses pandas and numpy.

mapleFU commented 12 months ago

Would code like below helps?

  arrow::Datum avg_return;
  arrow::Datum avg_std;
  double daily_sharpe_ratio;
  // 创建 Arrow Double 标量
  double days_of_year_double = 252.0;
  double sqrt_year = std::sqrt(days_of_year_double);
  ARROW_ASSIGN_OR_RAISE(avg_return, arrow::compute::CallFunction(
                                        "mean", {fund_returns}));
  arrow::compute::VarianceOptions variance_options;
  variance_options.ddof = 1;
  ARROW_ASSIGN_OR_RAISE(avg_std, arrow::compute::CallFunction(
                                     "stddev", {fund_returns},&variance_options));
  daily_sharpe_ratio = avg_return.scalar_as<::arrow::DoubleScalar>().value / avg_std.scalar_as<::arrow::DoubleScalar>().value;

  std::cout << "计算得到的夏普率为 : " << daily_sharpe_ratio / sqrt_year << std::endl;

  auto end_time = std::chrono::high_resolution_clock::now();
  auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);

  std::cout << "c++读取数据,然后计算夏普率一共耗费时间为: " << duration.count()/1000.0 << " ms" << std::endl;

  return arrow::Status::OK();
tianjixuetu commented 12 months ago

thank you very much. but the fund_returns don't calculate from the fund_nav value

mapleFU commented 12 months ago

I didn't change the code in reading-from-csv part(though it also can be optimized, currently more than expected column has been read)

tianjixuetu commented 12 months ago

I didn't change the code in reading-from-csv part(though it also can be optimized, currently more than expected column has been read)

ok, thank you, however I think it is not a valid way.

mapleFU commented 12 months ago

Can you explain why this is not a valid way?

And what about the performance now?

tianjixuetu commented 12 months ago

Can you explain why this is not a valid way?

And what about the performance now?

the performance is just a little better than the precious, is also worse than python code. you just calculate the sharpe ratio use pure c++ ,else is same , so it is not a valid way. by the way, is this code generated by chatgpt?

mapleFU commented 12 months ago

Call Compute function using scalar is like volcano model in data base, it has the cost of:

  1. Find the function ( in dispatch )
  2. Detect the input type
  3. Compute -> this is the only logic we actually need
  4. Wrap the output function

The pure C++ code is a bit like the codegen in system. You already know the type(though reading from file might suffer from non-optimal performance). So computing using raw-C++ with self defined type would be faster. You can achive some similar performance using some template to compute the logic directly.

So I don't think it's a good way if you can ensure the function call and know the input / output type. Also when I run benchmark localy, the performance mainly slower when:

  1. Setup the framework.
  2. Dispatch function

So you may need to just benchmark the "compute time", rather than this. The initialize of arrow::compute might take some time.

Specificlly, you can:

  auto registry = ::arrow::compute::GetFunctionRegistry();
  // 计算收益率
  auto start_time = std::chrono::high_resolution_clock::now();

by the way, is this code generated by chatgpt?

No.

tianjixuetu commented 12 months ago

Call Compute function using scalar is like volcano model in data base, it has the cost of:

  1. Find the function ( in dispatch )
  2. Detect the input type
  3. Compute -> this is the only logic we actually need
  4. Wrap the output function

The pure C++ code is a bit like the codegen in system. You already know the type(though reading from file might suffer from non-optimal performance). So computing using raw-C++ with self defined type would be faster. You can achive some similar performance using some template to compute the logic directly.

So I don't think it's a good way if you can ensure the function call and know the input / output type. Also when I run benchmark localy, the performance mainly slower when:

  1. Setup the framework.
  2. Dispatch function

So you may need to just benchmark the "compute time", rather than this. The initialize of arrow::compute might take some time.

Specificlly, you can:

  auto registry = ::arrow::compute::GetFunctionRegistry();
  // 计算收益率
  auto start_time = std::chrono::high_resolution_clock::now();

by the way, is this code generated by chatgpt?

No.

Your points make a lot of sense, but when it comes to Arrow as a standalone module providing computation capabilities, especially in C++, the performance is unexpectedly slower than Python code. This is somewhat unacceptable, and there is a significant need for improvement. Are you familiar with Arrow? Do you have specific methods to implement data reading and computation to achieve speeds close to C++?

mapleFU commented 12 months ago

IMO I don't think comparing like this would be fair. Since when initialize, arrow::compute will register lots of functions. It will execute only once when problem is execution.

The py code might do initialize before calling the real compution. Have you tried:

  auto registry = ::arrow::compute::GetFunctionRegistry();
  auto start_time = std::chrono::high_resolution_clock::now();

Also, using compute with Scalar is ok but it's not adviced since you know the type yourself.

tianjixuetu commented 12 months ago
  auto registry = ::arrow::compute::GetFunctionRegistry();
  auto start_time = std::chrono::high_resolution_clock::now();

I just try it, but when I use this code, the speed fluctuates significantly, and there's no clear improvement @mapleFU

mapleFU commented 12 months ago

You can try to find how time spend in the remaining time. After pre-initialized these data, remaining time is decided by reading-csv, processing and some memcpy.

Since the workload is a memory bound and might introduce some threading. Unstable time may come from threading or initialization.

Also, I don't think runing a problem once is suitable to benchmark, maybe you can run multiple times or introduce google benchmark. In my machine, the main-time is from loading the Registry, and reading small csv file. And it takes about 0.08ms. So I don't know what occupies your execution time

tianjixuetu commented 12 months ago

well, i will give up arrow c++ now, using pure c++ to recode the empyrical. looking forwarding the arrow more effiently

You can try to find how time spend in the remaining time. After pre-initialized these data, remaining time is decided by reading-csv, processing and some memcpy.

Since the workload is a memory bound and might introduce some threading. Unstable time may come from threading or initialization.

Also, I don't think runing a problem once is suitable to benchmark, maybe you can run multiple times or introduce google benchmark. In my machine, the main-time is from loading the Registry, and reading small csv file. And it takes about 0.08ms. So I don't know what occupies your execution time