myriadrf / LimeSuite

Driver and GUI for LMS7002M-based SDR platforms
https://myriadrf.org/projects/lime-suite/
Apache License 2.0
469 stars 186 forks source link

SoapySDR Interface causing segfault for rx stream #261

Closed nunnsy closed 5 years ago

nunnsy commented 5 years ago

I've been trying to develop an application using the Python SoapySDR interface for a LimeSDR. When streaming samples over time, after some time (it varies), the Python interpreter crashes with a segfault.

Below is a snippet of the code running. Then the output in my terminal.

def rx_chain():
    global big_boi
    sdr = limey.get_sdr()
    rxs = limey.get_rx_stream()
    while True:
        print("#0")
        print("SDR: " + str(sdr))
        print("RXS: " + str(rxs))
        print("ea0: " + str(hash(ea0.data.tobytes())))
        print("ea1: " + str(hash(ea1.data.tobytes())))
        sr = sdr.readStream(rxs, [ea0, ea1], len(ea0))
        # power, psd_freq = plt.psd(ea0, NFFT=1024, Fs=limey.get_sample_rate()["rx0"]/1e6, Fc=2.4e9/1e6)
        print("#1")
        print(sr)
        if (sr.ret > 0):
            big_boi = np.concatenate((ea0[0:sr.ret], big_boi[0:-sr.ret]))
            print("#2")
            plt.cla()
            print("#3")
            plt.specgram(big_boi, NFFT=samples, Fs=f_s, Fc=431e6)
            print("#4")
            plt.pause(0.05)
        time.sleep(1)
ret=512, flags=4, timeNs=92993349000
#2
#3
#4
#0
SDR: FX3:LimeSDR-USB
RXS: <Swig Object of type 'SoapySDR::Stream *' at 0x7fc12c268930>
ea0: -8597006528890416220
ea1: -8055776645099117789
#1
ret=512, flags=4, timeNs=94133301000
#2
#3
#4
#0
SDR: FX3:LimeSDR-USB
RXS: <Swig Object of type 'SoapySDR::Stream *' at 0x7fc12c268930>
ea0: 931463062287685360
ea1: 2884037698141062820
Segmentation fault (core dumped)
###@ubuntu:~###/$ 

Are there any further things I can look at within the Python environment to better narrow this down? It appears all objects being used for the .readStream() function seem valid. This error occurs (usually) after 10 loops, but can last for over 60 before an error appears.

I'm assuming the issue will be contained within: https://github.com/myriadrf/LimeSuite/blob/master/SoapyLMS7/Streaming.cpp

Any help would be greatly appreciated!

rjonaitis commented 5 years ago

The best you can do is post the whole python source, or at least minimal amount of code that could be executed to reproduce such crash. Otherwise it's pointless to speculate what could be happening without knowing what your program actually does.

nunnsy commented 5 years ago

Not a problem, @rjonaitis - please excuse the messy code, it's been highly dynamic in trying to get initial tests to work smoothly.

The file which you saw run is:

import sys
sys.path.append("..") # Adds higher directory to python modules path.

import array, os
import numpy as np
from scipy import fftpack
import matplotlib.pyplot as plt
import matplotlib
import math
import time
import threading

matplotlib.use('Qt5Agg')

from lib.pySnowcone.sdr.limesdr import LimeSdr

samples = 512

limey = LimeSdr(431e6, 10e6)

# fa = os.open("./fifoA", os.O_WRONLY|os.O_CREAT)
# fb = os.open("./fifoB", os.O_WRONLY|os.O_CREAT)

ea0 = np.array([1]*samples, np.complex64)
ea1 = np.array([1]*samples, np.complex64)

big_boi = np.array([0]*samples*100, np.complex64)

f_s = limey.get_sample_rate()["rx0"]

def tx_carrier():
    wave_freq = 1e6
    rate = 10000000
    tx_stream = limey.get_tx_stream()
    sdr = limey.get_sdr()
    ampl = 0.8

    phase_acc = 0
    phase_inc = 2*math.pi*wave_freq/rate
    stream_mtu = sdr.getStreamMTU(tx_stream)
    samps_chan = np.array([ampl]*stream_mtu, np.complex64)

    time_last_print = time.time()
    total_samps = 0

    while True:
        phase_acc_next = phase_acc + stream_mtu*phase_inc
        phases = np.linspace(phase_acc, phase_acc_next, stream_mtu)
        samps_chan = ampl*np.exp(1j * phases).astype(np.complex64)
        phase_acc = phase_acc_next
        while phase_acc > math.pi * 2:
            phase_acc -= math.pi * 2

        print("Write 1")
        status = sdr.writeStream(tx_stream, [samps_chan], samps_chan.size, timeoutUs=100000000)
        print("Write 2")
        if status.ret != samps_chan.size:
            print(status.ret)
            # raise Exception("Expected writeStream() to consume all samples! %d" % status.ret)
            print("Expected writeStream() to consume all samples! %d" % status.ret)
        else:
            total_samps += status.ret

        if time.time() > time_last_print + 5.0:
            rate = total_samps / (time.time() - time_last_print) / 1e6
            print("Python siggen rate: %f Msps" % rate)
            total_samps = 0
            time_last_print = time.time()

def rx_chain():
    global big_boi
    sdr = limey.get_sdr()
    rxs = limey.get_rx_stream()
    while True:
        # print("#0")
        # print("SDR: " + str(sdr))
        # print("RXS: " + str(rxs))
        # print("ea0: " + str(hash(ea0.data.tobytes())))
        # print("ea1: " + str(hash(ea1.data.tobytes())))
        sr = sdr.readStream(rxs, [ea0, ea1], len(ea0))
        # power, psd_freq = plt.psd(ea0, NFFT=1024, Fs=limey.get_sample_rate()["rx0"]/1e6, Fc=2.4e9/1e6)
        # print("#1")
        print(sr)
        if (sr.ret > 0):
            big_boi = np.concatenate((ea0[0:sr.ret], big_boi[0:-sr.ret]))
            # print("#2")
            plt.cla()
            # print("#3")
            plt.specgram(big_boi, NFFT=samples, Fs=f_s, Fc=431e6)
            # print("#4")
            plt.pause(0.05)
        time.sleep(1)

threading.Thread(target=tx_carrier).start()
threading.Thread(target=rx_chain).start()

plt.show()

And the LimeSdr file contains:

import numpy as np
import os
import time

import SoapySDR
from SoapySDR import *

def generate_cf32_pulse(num_samps, width=5, scale_factor=0.3):
    """Create a sinc pulse."""
    rel_time = np.linspace(-width, width, num_samps)
    pulse = np.sinc(rel_time).astype(np.complex64)
    return pulse * scale_factor

# This is for the big LimeSDR

MAXIMUM_SAMPLE_RATE = 61400000              # 61.4 MHz
# Share the maximum sample rate between the three channels
DEFAULT_CHANNEL_SAMPLE_RATE = 10000000      # 10.0 MHz

DEFAULT_RX_GAIN = 30.00
DEFAULT_TX_GAIN = 40.00

TX_CHANNEL = 0

class LimeSdr:
    def __init__(self, frequency, bandwidth, \
        sample_rate=DEFAULT_CHANNEL_SAMPLE_RATE, rx_gain=DEFAULT_RX_GAIN, \
            tx_gain=DEFAULT_TX_GAIN):

        args = dict(driver="lime")
        self._sdr = SoapySDR.Device(args)

        if not self._sdr.hasHardwareTime():
            # Can't syncronise receive channels
            raise Exception('this device does not support timed streaming')

        # Configure port usage
        self._sdr.setAntenna(SOAPY_SDR_RX, 0, "LNAW")
        self._sdr.setAntenna(SOAPY_SDR_RX, 1, "LNAW")
        # TODO May need to change this antenna
        self._sdr.setAntenna(SOAPY_SDR_TX, TX_CHANNEL, "BAND1")

        # Delay 1st channel by this much to align data.
        # If negative, 2nd channel must be delayed.
        self._sample_offset = 0

        self.set_sample_rate(sample_rate)
        self.set_frequency(frequency)
        self.set_bandwidth(bandwidth)

        self._sdr.setGain(SOAPY_SDR_RX, 0, rx_gain)
        self._sdr.setGain(SOAPY_SDR_RX, 1, rx_gain)
        self._sdr.setGain(SOAPY_SDR_TX, TX_CHANNEL, tx_gain)

        self.setup_rx_stream()
        self.setup_tx_stream()

        # self.start_rx_stream()
        # self.start_tx_stream()

        time.sleep(10)
        self.find_sync()
        time.sleep(10)

    def set_sample_rate(self, sample_rate):
        self._sdr.setSampleRate(SOAPY_SDR_RX, 0, sample_rate)
        self._sdr.setSampleRate(SOAPY_SDR_RX, 1, sample_rate)

        self._sdr.setSampleRate(SOAPY_SDR_TX, TX_CHANNEL, sample_rate)

    def get_sample_rate(self):
        return {
            "rx0" : self._sdr.getSampleRate(SOAPY_SDR_RX, 0),
            "rx1" : self._sdr.getSampleRate(SOAPY_SDR_RX, 1),
            "tx0" : self._sdr.getSampleRate(SOAPY_SDR_TX, 0)
        }

    def set_bandwidth(self, bandwidth):
        self._sdr.setBandwidth(SOAPY_SDR_RX, 0, bandwidth)
        self._sdr.setBandwidth(SOAPY_SDR_RX, 1, bandwidth)

        self._sdr.setBandwidth(SOAPY_SDR_TX, TX_CHANNEL, bandwidth)

    def set_frequency(self, frequency):
        self._sdr.setFrequency(SOAPY_SDR_RX, 0, frequency)
        self._sdr.setFrequency(SOAPY_SDR_RX, 1, frequency)

        self._sdr.setFrequency(SOAPY_SDR_TX, TX_CHANNEL, frequency)

    def setup_rx_stream(self):
        self._rx_stream = self._sdr.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32, \
            [0,1])
        time.sleep(1)

    def start_rx_stream(self):
        self._sdr.activateStream(self._rx_stream, SOAPY_SDR_HAS_TIME)

    def setup_tx_stream(self):
        self._tx_stream = self._sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32, \
            [TX_CHANNEL])
        time.sleep(1)

    def start_tx_stream(self):
        self._sdr.activateStream(self._tx_stream, SOAPY_SDR_HAS_TIME)

    def get_sdr(self):
        return self._sdr

    def get_rx_stream(self):
        return self._rx_stream

    def get_tx_stream(self):
        return self._tx_stream

    def find_sync(self, num_tx_samps=2000, num_rx_samps=10000):
        tx_pulse = generate_cf32_pulse(num_tx_samps)
        tx_time_0 = int(self._sdr.getHardwareTime() + 0.5e9) #500ms
        # tx_flags = SOAPY_SDR_HAS_TIME | SOAPY_SDR_END_BURST
        tx_flags = SOAPY_SDR_HAS_TIME
        status = self._sdr.writeStream(self._tx_stream, [tx_pulse], len(tx_pulse), tx_flags, tx_time_0)
        if status.ret != len(tx_pulse):
            raise Exception('transmit failed %s'%str(status))

        #receive slightly before transmit time
        rx0_buffs = np.array([], np.complex64)
        rx1_buffs = np.array([], np.complex64)
        # rx_flags = SOAPY_SDR_HAS_TIME | SOAPY_SDR_END_BURST
        rx_flags = SOAPY_SDR_HAS_TIME
        #half of the samples come before the transmit time
        rate = self.get_sample_rate()["rx0"]
        receive_time = int(tx_time_0 - (num_rx_samps/rate) * 1e9 / 2)
        # self._sdr.activateStream(self._rx_stream, rx_flags, receive_time, num_rx_samps)
        self._sdr.activateStream(self._rx_stream, rx_flags, receive_time)
        rx_time_0 = None

        total_samples = 0

        #accumulate receive buffer into large contiguous buffer
        while True:
            rx0_buff = np.array([0]*1024, np.complex64)
            rx1_buff = np.array([0]*1024, np.complex64)
            timeout_us = int(5e5) #500 ms >> stream time
            status = self._sdr.readStream(self._rx_stream, [rx0_buff, rx1_buff], len(rx0_buff), timeoutUs=timeout_us)

            #stash time on first buffer
            if status.ret > 0 and rx0_buffs.size:
                rx_time_0 = status.timeNs
                if (status.flags & SOAPY_SDR_HAS_TIME) == 0:
                    raise Exception('receive fail - no timestamp on first readStream %s'%(str(status)))

            #accumulate buffer or exit loop
            if total_samples <= num_rx_samps:
                total_samples += status.ret
                rx0_buffs = np.concatenate((rx0_buffs, rx0_buff[:status.ret]))
                rx1_buffs = np.concatenate((rx1_buffs, rx1_buff[:status.ret]))
            else:
                break

        # #cleanup streams
        # print("Cleanup streams")
        # sdr.deactivateStream(tx_stream)
        # sdr.closeStream(rx_stream)
        # sdr.closeStream(tx_stream)

        #check resulting buffer
        if len(rx0_buffs) < num_rx_samps:
            raise Exception(
                'receive fail 0 - captured samples %d out of %d'%(len(rx0_buffs), num_rx_samps))
        if len(rx1_buffs) < num_rx_samps:
            raise Exception(
                'receive fail 1 - captured samples %d out of %d'%(len(rx1_buffs), num_rx_samps))
        if rx_time_0 is None:
            raise Exception('receive fail - no valid timestamp')

        #clear initial samples because transients
        rx0_mean = np.mean(rx0_buffs)
        for i in range(len(rx0_buffs) // 100):
            rx0_buffs[i] = rx0_mean
        rx1_mean = np.mean(rx1_buffs)
        for i in range(len(rx1_buffs) // 100):
            rx1_buffs[i] = rx1_mean

        #normalize the samples
        def normalize(samps):
            samps = samps - np.mean(samps) #remove dc
            samps = np.absolute(samps) #magnitude
            samps = samps / max(samps) #norm ampl to peak
            # print (samps)
            return samps

        tx_pulse_norm = normalize(tx_pulse)
        rx0_buffs_norm = normalize(rx0_buffs)
        rx1_buffs_norm = normalize(rx1_buffs)

        # #dump debug samples
        # if dump_dir is not None:
        #     np.save(os.path.join(dump_dir, 'txNorm.npy'), tx_pulse_norm)
        #     np.save(os.path.join(dump_dir, 'rxNorm.npy'), rx_buffs_norm)
        #     np.save(os.path.join(dump_dir, 'rxRawI.npy'), np.real(rx_buffs))
        #     np.save(os.path.join(dump_dir, 'rxRawQ.npy'), np.imag(rx_buffs))

        #look for the for peak index for time offsets
        rx0_argmax_index = np.argmax(rx0_buffs_norm)
        rx1_argmax_index = np.argmax(rx1_buffs_norm)
        tx_argmax_index = np.argmax(tx_pulse_norm)

        #check goodness of peak by comparing argmax and correlation
        rx0_coor_index = np.argmax(np.correlate(rx0_buffs_norm, tx_pulse_norm)) + len(tx_pulse_norm) // 2
        rx1_coor_index = np.argmax(np.correlate(rx1_buffs_norm, tx_pulse_norm)) + len(tx_pulse_norm) // 2

        # if abs(rx0_coor_index-rx0_argmax_index) > len(tx_pulse_norm)/4:
        #     raise Exception(
        #         'correlation(%d) 0 does not match argmax(%d), probably bad data' %
        #         (rx0_coor_index, rx0_argmax_index))

        # if abs(rx1_coor_index-rx1_argmax_index) > len(tx_pulse_norm)/4:
        #     raise Exception(
        #         'correlation(%d) 1 does not match argmax(%d), probably bad data' %
        #         (rx1_coor_index, rx1_argmax_index))

        #calculate time offset
        tx_peak_time = int(tx_time_0 + (tx_argmax_index / rate) * 1e9)
        rx0_peak_time = int(rx_time_0 + (rx0_argmax_index / rate) * 1e9)
        rx1_peak_time = int(rx_time_0 + (rx1_argmax_index / rate) * 1e9)
        time_delta_0 = rx0_peak_time - tx_peak_time
        time_delta_1 = rx1_peak_time - tx_peak_time
        print('>>> Time delta %f us'%((time_delta_1 - time_delta_0) / 1e3))
        self._sample_offset = rx1_argmax_index - rx0_argmax_index
        print('>>> Sample Delta %d'%(self._sample_offset))
        print("Done!")

With my so far limited experience with SoapySDR, I figured the best way to synchronise two channels was following examples by @guruofquality : https://github.com/pothosware/SoapySDR/tree/master/python/apps

I figured, once a delay is measured, those streams should remain open. I understand that the buffer will fill in the mean time, and I'll need to empty it out again before the synchronised streaming occurs, but using the same streams should, after the buffer empty, have roughly the same sample delay?

If I am going about this completely wrong, please let me know and I'll pursue another avenue. I am tempted to move over to C/C++ due to the large amounts of streaming data I want to work with in the future. If this helps sort out streaming problems now, that would be even more reason to do so.

IgnasJarusevicius commented 5 years ago

Hi, I ran your code and found one place where SoapyLMS7 segfaults in Rx. For me, it now longer crashes after 8ee7053c1043262dbc78730a53525398e50c2355. Could you confirm that it fixes your problem?

nunnsy commented 5 years ago

Hi @IgnasJarusevicius,

I will test it as soon as I have the opportunity (next few days). Thank you for alerting me to this update!

nunnsy commented 5 years ago

Seems to have fixed the stability! Awesome, thank you for the assistance. 😄