ni / nidaqmx-python

A Python API for interacting with NI-DAQmx
Other
447 stars 161 forks source link

read_many_sample cannot use views #90

Open spalatofhi opened 4 years ago

spalatofhi commented 4 years ago

This is an improvement suggestion.

The read_many_sample methods of stream_reader classes have odd requirements for the data array. The array shape must be (n_channels, n_samples) and must be C_CONTIGUOUS. In the language of nidaq-mx, the data array must be 'non-interleaved'. This causes extra difficulties when using continuous acquisition mode. In this case, the exact number of samples may not be always known. As an example, this prevents the use of array views as an argument to read_many_sample. Let's review the root of this problem, show some example code, and introduce some possible solutions.

Numpy arrays can be either C_CONTIGUOUS (or row-major, C-order) or F_CONTIGUOUS (or column-major). This controls the layout in memory for multidimensional arrays. For 2D arrays, C_CONTIGUOUS arrays store the elements one row after the other. Ie: arr[i,j] is next to arr[i, j+1] in memory. See: https://docs.scipy.org/doc/numpy/glossary.html#term-row-major. This means that changing the number of rows can be done without reorganizing the entire array. Furthermore, a slice containing all rows but not all columns is not C_CONTIGUOUS, whereas a slice containing all columns but some rows is not. Since nidaqmx maps rows onto channels and samples onto columns, a slice onto some samples is not C_CONTIGUOUS, and cannot be used as an argument for stream_reader.AnalogMultiChannelReader.read_many_samples. This also means the transfer from the buffer to the numpy array is not in native order, thus not so efficient.

import numpy as np

n_chan, n_samp = 4, 100
c = np.zeros((n_chan, n_samp), order="C") # default order
assert c.flags.c_contiguous
c_cols = c[:,10:15]
assert not c_cols.flags.c_contiguous

As an example demonstration, let's perform continuous acquisition. This example requires a temporary buffer for each call. This exact example can be achieved using finite acquisition, but the pattern can be useful for other applications:

import numpy as np
import nidaqmx as ni
from nidaqmx.constants import VoltageUnits, AcquisitionType, READ_ALL_AVAILABLE
from nidaqmx.stream_readers import AnalogMultiChannelReader
from time import sleep

##### SETUP
n_tot = 100000
sample_rate = 200000

with ni.Task("signals") as task:
    task.ai_channels.add_ai_voltage_chan(
        "DevT/ai0:1", 
        min_val=-10, max_val=10,
    ) 
    n_channels = task.number_of_channels
    task.timing.cfg_samp_clk_timing(
        rate=sample_rate,
        sample_mode=AcquisitionType.CONTINUOUS,
        samps_per_chan=n_tot,
    )
    reader = AnalogMultiChannelReader(task.in_stream)
    read_buffer = np.ones((n_channels, n_tot))*-1000 # impossible output
    i = 0
    ##### START
    task.start()
    while not task.is_task_done() and i < n_tot:
        sleep(0.01) # pretend to be busy with other tasks
        n = reader._in_stream.avail_samp_per_chan
        if n == 0: continue
        n = min(n, n_tot-i) # prevent reading too many samples
        ##### READ
        tmp = np.ones((n_channels, n)) * -1001
        r = reader.read_many_sample(
            tmp, 
            number_of_samples_per_channel=n
        )
        read_buffer[:,i:i+n] = tmp
        i += r
    ##### STOP AND CHECK RESULTS
    task.stop()
    assert np.all(read_buffer > -1000)

If views could be used the intervening tmp array can be dropped. The code could then be:

# [setup as previously]
        ##### READ
        i += reader.read_many_sample(
            read_buffer[:, i:i+n], # read directly into array using a view
            number_of_samples_per_channel=n
        )
# [stop and check results as previously]

Enabling this compatibility would simplify client code (it would "just work") and should improve interoperability with numpy-based code, such as memory-mapped arrays, hdf5py, etc.

Here are a few possible paths for a solution, all of which have potential drawbacks:

The tests were performed using the following configuration: python=3.8.2 numpy=1.18.1 nidaqmx-python=0.5.7 NI-DAQmx=19.6 on windows 10. A simulated device (USB-6356) was created in NI-MAX, with name DevT.

spalatofhi commented 4 years ago

Currently, the first call to the low-level functions such as _read_analog_f_64 set the expected datatype. As such, any change to the datatype would be global. Allowing both C and F orders thus seems dangerous.

I opted to tranpose the data array, to use always use shape (n_samples, n_channels). Here is a working example that uses a mmap array. Note this example does not handle resizing of the array for further reading, or truncating when aborting read. The results are identical to the previous examples, and can be checked by visual inspection.

import numpy as np
import nidaqmx as ni
from nidaqmx import DaqError
from nidaqmx.error_codes import DAQmxErrors
from nidaqmx.stream_readers import AnalogMultiChannelReader
from nidaqmx._task_modules.read_functions import _read_analog_f_64
from nidaqmx.constants import READ_ALL_AVAILABLE, FillMode, AcquisitionType
from time import sleep

class TAMR(AnalogMultiChannelReader): # TAMR is subclass
    """
    Transposed Analog Multichannel Reader.
    """
    # essentially a copy of the parent function, with an inverted `array_shape`
    def _verify_array(self, data, number_of_samples_per_channel,
                      is_many_chan, is_many_samp):
        if not self._verify_array_shape:
            return
        channels_to_read = self._in_stream.channels_to_read
        number_of_channels = len(channels_to_read.channel_names)
        array_shape = (number_of_samples_per_channel, number_of_channels)
        if array_shape is not None and data.shape != array_shape:
            raise DaqError(
                'Read cannot be performed because the NumPy array passed into '
                'this function is not shaped correctly. You must pass in a '
                'NumPy array of the correct shape based on the number of '
                'channels in task and the number of samples per channel '
                'requested.\n\n'
                'Shape of NumPy Array provided: {0}\n'
                'Shape of NumPy Array required: {1}'
                .format(data.shape, array_shape),
                DAQmxErrors.UNKNOWN.value, task_name=self._task.name)

    # copy of parent method, simply using a different fill_mode argument
    def read_many_sample(self, data, 
            number_of_samples_per_channel=READ_ALL_AVAILABLE, timeout=10.0):
        number_of_samples_per_channel = (
            self._task._calculate_num_samps_per_chan(
                number_of_samples_per_channel))

        self._verify_array(data, number_of_samples_per_channel, True, True)

        return _read_analog_f_64(self._handle, data,
            number_of_samples_per_channel, timeout,
            fill_mode=FillMode.GROUP_BY_SCAN_NUMBER)

##### SETUP
n_tot = 100000
sample_rate = 200000

with ni.Task("signals") as task:
    task.ai_channels.add_ai_voltage_chan(
        "DevT/ai0:1", 
        min_val=-10, max_val=10,
    ) 
    n_channels = task.number_of_channels
    task.timing.cfg_samp_clk_timing(
        rate=sample_rate,
        sample_mode=AcquisitionType.CONTINUOUS,
        samps_per_chan=n_tot,
    )
    reader = TAMR(task.in_stream)
    read_buffer = np.memmap(
        "test.tmp",
        dtype=np.float,
        mode="w+",
        shape=(n_tot, n_channels))
    read_buffer[:] = -1000 # impossible output
    i = 0
    ##### START
    task.start()
    while not task.is_task_done() and i < n_tot:
        sleep(0.01) # pretend to be busy with other tasks
        n = reader._in_stream.avail_samp_per_chan
        if n == 0: continue
        n = min(n, n_tot-i) # prevent reading too many samples
        ##### READ
        i += reader.read_many_sample(
            read_buffer[i:i+n, :], # read directly into array using a view
            number_of_samples_per_channel=n
        )
    ##### STOP AND CHECK RESULTS
    task.stop()
    read_buffer.flush()
    assert np.all(read_buffer > -1000)
print("Complete")
slartz42 commented 2 months ago

Any chances we can get this feature implemented? This would make my program much more efficient, right now I have to pre-allocate a new numpy array every loop rather than once before the data acquisition starts.