Open engimatic opened 1 year ago
And I setOnCancelHandler like this, but it didn't take effect.
listener.setOnCancelHandler(() -> {
listener.completed();
log.error("---getStream cancel");
});
@davisusanibar
Hi @engimatic , I would appreciate it if you could clarify which parts are java code on the client side and which parts are part of the java server so that I could reproduce it in the best way possible. Thank you in advance.
Hi @engimatic , I would appreciate it if you could clarify which parts are java code on the client side and which parts are part of the java server so that I could reproduce it in the best way possible. Thank you in advance.
@davisusanibar Server code:
import cn.hutool.core.thread.ThreadFactoryBuilder;
import io.netty.util.internal.PlatformDependent;
import lombok.extern.slf4j.Slf4j;
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.NoOpFlightProducer;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class Server {
/**
* main.
*
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException, IOException {
int corePoolSize = Math.max(Runtime.getRuntime().availableProcessors(), 2) * 2;
int maxPoolSize = corePoolSize * 2;
ExecutorService executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 300,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(100),
ThreadFactoryBuilder.create().setNamePrefix("globalExecutor_pool_").build(),
new ThreadPoolExecutor.CallerRunsPolicy());
long maxSize = PlatformDependent.maxDirectMemory();
long limit = maxSize - 1;
Location location = Location.forGrpcInsecure("0.0.0.0", 8000);
try (BufferAllocator allocator = new RootAllocator(limit)) {
try (FlightServer flightServer = FlightServer.builder(allocator, location, new NoOpFlightProducer() {
@Override
public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
FlightDescriptor flightDescriptor = FlightDescriptor.path(
new String(ticket.getBytes(), StandardCharsets.UTF_8));
String sql = flightDescriptor.getPath().get(0);
Connection connection = null;
PreparedStatement statement = null;
JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator,
JdbcToArrowUtils.getUtcCalendar())
.build();
try {
Class<?> driverClass = Class.forName("com.mysql.jdbc.Driver");
Driver driver = (Driver) driverClass.newInstance();
String jdbcUrl = "jdbc:mysql://{ip}:{port}/test?"
+ "useUnicode=true&socketTimeout=1800000&characterEncoding=UTF-8&autoReconnect=true&"
+ "useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai";
Properties info = new Properties();
info.put("user", user);
info.put("password", pass);
connection = driver.connect(jdbcUrl, info);
statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
statement.setFetchSize(Integer.MIN_VALUE);
} catch (ClassNotFoundException | SQLException | InstantiationException
| IllegalAccessException e) {
log.error(e.getMessage());
}
int index = 0;
VectorSchemaRoot vectorSchemaRoot = null;
try {
try (ResultSet resultSet = statement.executeQuery();
ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator(
resultSet, config)) {
while (iterator.hasNext() && !listener.isCancelled()) {
if (listener.isReady()) {
try (VectorSchemaRoot root = iterator.next()) {
index++;
if (vectorSchemaRoot == null) {
vectorSchemaRoot = root;
listener.start(vectorSchemaRoot);
}
VectorLoader loader = new VectorLoader(vectorSchemaRoot);
VectorUnloader unloader = new VectorUnloader(root);
ArrowRecordBatch arb = unloader.getRecordBatch();
loader.load(arb);
listener.putNext();
arb.close();
}
log.info("currentThreadName: {}, index: {}, "
+ "allocator used {}, max {}, direct buffer userd {}",
Thread.currentThread().getName(), index, allocator.getAllocatedMemory(),
allocator.getLimit(), PlatformDependent.usedDirectMemory());
}
}
}
} catch (SQLException | IOException e) {
log.error(e.getMessage());
} finally {
listener.completed();
if (vectorSchemaRoot != null) {
vectorSchemaRoot.close();
}
}
}
}).executor(executorService).build()) {
flightServer.start();
log.info("ArrowFlightApp: Server (Location): Listening on port {}, max buffer size {}",
flightServer.getPort(), allocator.getLimit());
flightServer.awaitTermination();
}
}
}
}
client code:
import numpy as np
import pyarrow.flight as pf
import pyarrow as pa
import time
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
client = pf.FlightClient("grpc://{ip}:8000")
def query(sql):
ticket = pf.Ticket(str(sql).encode('utf-8'))
start_time = time.time()
reader = client.do_get(ticket)
result = pd.DataFrame()
for chunk in reader:
chunk_df = pd.DataFrame()
for num in range(chunk.data.num_columns):
if type(chunk.data.column(num - 1)) == pa.Decimal128Array or type(
chunk.data.column(num - 1)) == pa.Decimal256Array:
tmp_df = chunk.data.column(num - 1).to_pandas().astype(np.float64).to_frame()
else:
tmp_df = chunk.data.column(num - 1).to_pandas().to_frame()
chunk_df = pd.concat([chunk_df, tmp_df], axis=1)
result = pd.concat([result, chunk_df], ignore_index=True)
print('convert data use time is : {}'.format(time.time() - start_time))
return len(result)
with ThreadPoolExecutor(max_workers=20) as t:
obj_list = []
sql = '''SELECT * FROM test limit 100000'''
for i in range(100):
obj = t.submit(query, sql)
obj_list.append(obj)
j = 0
for future in as_completed(obj_list):
j = j + 1
data = future.result()
print(j, "Got rows total", data)
client.close()
maven :
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-core</artifactId>
<version>10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-jdbc</artifactId>
<version>10.0.0</version>
</dependency>
Hi @engimatic, I can run both your client and server code.
Could you share your steps for testing the endless loop with me? I cannot reproduce the endless loop.
@davisusanibar
When your client run in a little while,forced kill it. Then you will find that the server stuck in an endless loop,at code:
while (iterator.hasNext() && !listener.isCancelled()) {
if (listener.isReady()) {
And the CPU of server is consistently running at high levels,the next request never returns.
@davisusanibar Hello, is there any question about that?Or this is a bug with arrow flight.
Hi @engimatic on my last attempt I caught it.
Flight Server
python clientuser.py
python clientuser.py
pkill -9 -f clientuser.py
Let me try to debug to understand the error with more detail.
In my trace log request, I'm seeing that the server responds with TCP Zero Window Segment, which is equal to Don't send me any more data, as I cannot handle them anyway
. There may be a need to tune the ready/cancel parameter on this type of scenario in order to avoid endless loops. I'll investigate this further.
There is a need to review how Java Flight Server handles window size for:
@davisusanibar Maybe I should wait for the bug to be fixed.Is there any fast solutions?
Describe the usage question you have. Please include as many useful details as possible.
And I handle mysql resultset like this:
But when I force an outage of the client,listener.isCancelled() and listener.isReady() always false,The server is stuck in an endless loop.How to reslove it? Server exception message:
Component(s)
FlightRPC