blackbeam / mysql_async

Asyncronous Rust Mysql driver based on Tokio.
Apache License 2.0
372 stars 113 forks source link

When use the query_stream method, the error "bytes remaining on stream" occurs. #265

Open artorias1024 opened 8 months ago

artorias1024 commented 8 months ago
image

This is the logic of my code.

    async fn create_data_events(&self, captured_tables: HashSet<TableId>) -> core::result::Result<(), MySqlConnectorError> {
        let pool = &self.context.pool;
        let sender = &self.sender;

        let max_concurrent_tasks = 2;
        let semaphore = Arc::new(Semaphore::new(max_concurrent_tasks));

        let mut handlers = Vec::new();
        for table_id in captured_tables {
            let table = self.context.schema.table_for(&table_id).unwrap().clone();
            let connection = pool.get_conn().await?;
            let sender_clone = sender.clone();
            // Self::export_data(connection, table_id, table, sender.clone()).await?;
            let semaphore_clone = semaphore.clone();
            let join_handler = tokio::spawn(async move {
                let _permit = semaphore_clone.acquire().await.unwrap();
                Self::export_data(connection, table_id, table, sender_clone).await
            });
            handlers.push(join_handler);
        }

        for handle in handlers {
            if let Ok(snapshot_result) = handle.await {}
        }

        Ok(())
    }
async fn export_data(
        mut connection: Conn,
        table_id: TableId,
        table: Arc<Table>,
        sender: Sender<FlatMessage>,
    ) -> core::result::Result<(), MySqlConnectorError> {
        {
            info!("Exporting data from table '{}'", table_id);
            let export_start_time = Instant::now();
            let mut stream: ResultSetStream<Row, TextProtocol> = connection.query_stream(format!("select * from {}", table_id)).await?;
            let mut rows_count = 0;
            while let Some(next) = stream.next().await {
                match next {
                    Ok(row) => {
                        let message = table.generate_snapshot_record(row);
                        match sender.send(message).await {
                            Ok(_) => {}
                            Err(err) => error!("{}", err),
                        }
                    }
                    Err(err) => {
                        error!("Snapshotting of table {} failed : {:#?}", table_id, err);
                    }
                }

                rows_count += 1;
            }
            drop(stream);
            info!(
                "Finished exporting {} records for table'{}';total duration {:#?}",
                rows_count,
                table_id,
                export_start_time.elapsed()
            );
        }

        let _ = connection.disconnect().await?;
        Ok(())
    }
artorias1024 commented 8 months ago

It's very strange that when I set the Semaphore to 1, which means that the connection is not used concurrently, no error occurs.