mobiusklein / ms_deisotope

A library for deisotoping and charge state deconvolution of complex mass spectra
https://mobiusklein.github.io/ms_deisotope
Apache License 2.0
33 stars 14 forks source link

Unable to run on Apache Spark #18

Closed im281 closed 3 years ago

im281 commented 4 years ago

I get this error when trying to us the ms_deisotope package in the spark context

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

` def deconv_spark(peak_list): import ms_deisotope from lib.componentdetection.pdata import Feature feature_list = []

To return a new list, use the sorted() built-in function...

sorted_peak_list = sorted(peak_list[1], key=lambda p: p.MZ, reverse=False)
group = []
for peak in sorted_peak_list:
    group.append((peak.MZ,peak.Intensity,peak.RT))
deconvoluted_peaks, targeted = ms_deisotope.deconvolute_peaks(group, averagine=ms_deisotope.peptide,
                                                              scorer=ms_deisotope.MSDeconVFitter(10.))
if deconvoluted_peaks.peaks > 0:
    for peak in deconvoluted_peaks.peaks:
        f = Feature()
        f.RT = np.median([x[2] for x in group])
        f.DeconvolutedMass = peak.neutral_mass
        feature_list.append(f)
    return feature_list
else:
    return []

`

mobiusklein commented 4 years ago

I'm not familiar with Apache Spark, so it's not immediately clear to me where the error message is. This looks like several separate expressions on the same line. Could you show me the formatted code you're trying to run please? Does Spark expect an actual Python list or will it accept any iterable/sequence?

mobiusklein commented 4 years ago

I was able to install Spark and PySpark on a system and run a simple test:

Python 2.7.16 (default, Mar  8 2019, 17:43:33)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
2020-07-16 20:40:17 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/

Using Python version 2.7.16 (default, Mar  8 2019 17:43:33)
SparkSession available as 'spark'.
>>> import ms_deisotope
>>> peaks = [(1204.078121, 312.8150641383043), (1204.5797680470216, 328.3714066200455),
(1205.0811790341174, 213.741136441843), (1205.5825517643248, 103.75167278645647),
(1206.0838776722621, 41.32072001335061), (607.123, 1777.99696234838),
(607.4572859498758, 1755.5958572945165), (607.7912718269597, 1025.2348062956905),
(608.1251757149944, 441.1723740614121)]
>>> sc.parallelize([peaks, peaks], 2).map(ms_deisotope.deconvolute_peaks).collect()
[DeconvolutionProcessResult(<DeconvolutedPeakSet 2 Peaks>, []),
 DeconvolutionProcessResult(<DeconvolutedPeakSet 2 Peaks>, [])]

So in theory, it should work on Spark.

im281 commented 4 years ago

Hi Joshua, Spark accepts a list of arbitrary objects. Consider the following example:

for peak_list in peaks: deconv_spark(peak_list)

In spark it is simply a 'one-liner' to parallelize an arbitrary collection (In our case we want to deconvolute a list of arbitrary peaks which are grouped by retention time and sorted by m/z) spark.rdd.map(lambda peak_list: detect.deconv_spark(peak_list)).collect()

The function is a 'delegate function' and it doesn't really matter but here it is:

In the 'Spark context' each separate peak list that is grouped by retention time is passed to a 'mapping function'. This handles all the data distribution across nodes and it is abstracted away (thank goodness) So effectively, the same function operates on a list but across a distributed cluster of nodes. In my function below, I group the detected peaks by retention time windows and assign a 'window string' as it's key and the list of retention time-grouped peaks as the value. I will publish the code but I need to get some approvals first which is why I am 'dragging my feet' to get you what you need. If this works we could probably publish a paper together...

def deconv_spark(peak_list): import ms_deisotope from lib.componentdetection.pdata import Feature feature_list = []

To return a new list, use the sorted() built-in function...

sorted_peak_list = sorted(peak_list[1], key=lambda p: p.MZ, reverse=False)
group = []
for peak in sorted_peak_list:
    group.append((peak.MZ,peak.Intensity,peak.RT))
deconvoluted_peaks, targeted =

ms_deisotope.deconvolute_peaks(group, averagine=ms_deisotope.peptide,

scorer=ms_deisotope.MSDeconVFitter(10.)) if deconvoluted_peaks.peaks > 0: for peak in deconvoluted_peaks.peaks: f = Feature() f.RT = group[0][2] # key f.DeconvolutedMass = peak.neutral_mass feature_list.append(f) return feature_list else: return []

On Thu, Jul 16, 2020 at 3:52 PM Joshua Klein notifications@github.com wrote:

I'm not familiar with Apache Spark, so it's not immediately clear to me where the error message is. This looks like several separate expressions on the same line. Could you show me the formatted code you're trying to run please? Does Spark expect an actual Python list or will it accept any iterable/sequence?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/mobiusklein/ms_deisotope/issues/18#issuecomment-659718713, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACM5I4D4CV3LOUXA6Q62TJTR36AB5ANCNFSM4O47W75A .

im281 commented 4 years ago

Thanks. I see you are using Python 2.7. I wonder if that could be the issue? Best regards, Iman

On Thu, Jul 16, 2020 at 5:44 PM Joshua Klein notifications@github.com wrote:

I was able to install Spark and PySpark on a system and run a simple test:

Python 2.7.16 (default, Mar 8 2019, 17:43:33) [GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2 Type "help", "copyright", "credits" or "license" for more information. 2020-07-16 20:40:17 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to


 / __/__  ___ _____/ /__
_\ \/ _ \/ _ `/ __/  '_/

/ / ./_,// //_\ version 2.3.0 //

Using Python version 2.7.16 (default, Mar 8 2019 17:43:33) SparkSession available as 'spark'.

import ms_deisotope peaks = [(1204.078121, 312.8150641383043), (1204.5797680470216, 328.3714066200455), (1205.0811790341174, 213.741136441843), (1205.5825517643248, 103.75167278645647), (1206.0838776722621, 41.32072001335061), (607.123, 1777.99696234838), (607.4572859498758, 1755.5958572945165), (607.7912718269597, 1025.2348062956905), (608.1251757149944, 441.1723740614121)] sc.parallelize([peaks, peaks], 2).map(ms_deisotope.deconvolute_peaks).collect() [DeconvolutionProcessResult(<DeconvolutedPeakSet 2 Peaks>, []), DeconvolutionProcessResult(<DeconvolutedPeakSet 2 Peaks>, [])]

So in theory, it should work on Spark.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/mobiusklein/ms_deisotope/issues/18#issuecomment-659758905, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACM5I4DCPPOKBLQ5IIRT2C3R36NG3ANCNFSM4O47W75A .

mobiusklein commented 4 years ago

The only line of that that doesn't look okay is if deconvoluted_peaks.peaks > 0:. I suspect the intent is to say "if the number of peaks in the deconvoluted peak set is greater than zero", but that attribute is a tuple, it's the underlying collection that the DeconvolutedPeakSet object holds its contents in. Using > on it with an integer will result in a nonsense comparison in Py2, and an outright TypeError on Py3. If you're on Py3, and you're not getting an actual traceback, this would be my first check.

Could you not just use if len(deconvoluted_peaks) > 0: or even just if deconvoluted_peaks:? The latter is True only if there are peaks in the result.

im281 commented 4 years ago

that is an error that I fixed (in different ways) before but to no avail

if len(deconvoluted_peaks.peaks) > 0:

if len(deconvoluted_peaks) > 0:

On Thu, Jul 16, 2020 at 5:57 PM Joshua Klein notifications@github.com wrote:

The only line of that that doesn't look okay is if deconvoluted_peaks.peaks > 0:. I suspect the intent is to say "if the number of peaks in the deconvoluted peak set is greater than zero", but that attribute is a tuple, it's the underlying collection that the DeconvolutedPeakSet object holds its contents in. Using > on it with an integer will result in a nonsense comparison in Py2, and an outright TypeError on Py3. If you're on Py3, and you're not getting an actual traceback, this would be my first check.

Could you not just use if len(deconvoluted_peaks) > 0: or even just if deconvoluted_peaks:? The latter is True only if there are peaks in the result.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/mobiusklein/ms_deisotope/issues/18#issuecomment-659763115, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACM5I4C635GDRFFQKWHYYLDR36OWJANCNFSM4O47W75A .

im281 commented 4 years ago

Let me check in the latest into : https://github.com/thermofisherlsms/parsec

although you can run the current implementation with the peak_detect class. Just clone the latest and try to run the following in 'parsec_runner.py'

peaks = spark.read.parquet(i) \
    .rdd.map(lambda scan: pc.kv_scan(scan)) \
    .filter(lambda scan: scan[1].MsOrder == 1).groupByKey() \
    .flatMap(lambda file: pc.bin_masses(file)) \
    .flatMap(lambda fileMassBin: pc.create_mass_traces(fileMassBin)) \
    .filter(lambda trace: len(trace[1].Trace) > 10) \
    .flatMap(lambda trace: detect.get_peak_list_v3(trace[1], 10)) \
    .map(lambda p: pc.map_peak(p, 0.1)) \
    .reduceByKey(lambda p1, p2: p1 + p2) \
    .flatMap(lambda peak_list: detect.deconv_spark(peak_list)).collect()

On Fri, Jul 17, 2020 at 7:50 AM Iman Mohtashemi iman.mohtashemi@gmail.com wrote:

that is an error that I fixed (in different ways) before but to no avail

if len(deconvoluted_peaks.peaks) > 0:

if len(deconvoluted_peaks) > 0:

On Thu, Jul 16, 2020 at 5:57 PM Joshua Klein notifications@github.com wrote:

The only line of that that doesn't look okay is if deconvoluted_peaks.peaks > 0:. I suspect the intent is to say "if the number of peaks in the deconvoluted peak set is greater than zero", but that attribute is a tuple, it's the underlying collection that the DeconvolutedPeakSet object holds its contents in. Using > on it with an integer will result in a nonsense comparison in Py2, and an outright TypeError on Py3. If you're on Py3, and you're not getting an actual traceback, this would be my first check.

Could you not just use if len(deconvoluted_peaks) > 0: or even just if deconvoluted_peaks:? The latter is True only if there are peaks in the result.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/mobiusklein/ms_deisotope/issues/18#issuecomment-659763115, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACM5I4C635GDRFFQKWHYYLDR36OWJANCNFSM4O47W75A .

from scipy import ndimage from scipy.signal import savgol_filter from scipy.ndimage import gaussian_filter1d from scipy.signal import find_peaks from peak import Peak import numpy as np

def get_peak_list_v3(trace, sn): peaks = [] stn = max([i.Intensity for i in trace.Trace]) / min([i.Intensity for i in trace.Trace]) if stn > sn: peak_loc = peak_detect(trace.Trace) if peak_loc is not None:

print(len(peak_loc))

        for index in peak_loc:
            p = Peak()
            p.MZ = trace.Trace[index].Mass
            p.Intensity = trace.Trace[index].Intensity
            p.RT = trace.Trace[index].RT
            peaks.append(p)
    if peaks is not None:
        return peaks
    else:
        return []
else:
    return []

def peak_detect(trace): peaks_positions = [] peaks_positions_traces = [] x = [] y = [] for i in range(len(trace)): x.append(trace[i].RT) y.append(trace[i].Intensity) retention_time = np.asarray(x) intensity = np.asarray(y) y = savgol_filter(intensity, 9, 2) base = tophat(y, 0.1) gauss = gaussianfilter1d(base, sigma=np.std(base)) peaks, = find_peaks(gauss, distance=10)

if len(peaks.tolist()) > 0:

return peaks

def tophat(data, factor): ''' data -- numpy array Factor determines how finely the filter is applied to data. A point factor of 0.01 is appropriate for the tophat filter. A smaller number is faster but a trade-off is imposed '''

data = np.array(data, dtype=np.float32)
struct_pts = int(round(len(data) * factor))
str_el = np.repeat([1], struct_pts)
t_fil = ndimage.white_tophat(data, None, str_el)
return t_fil

def sorted_group_peak_list(grouped_list): import pandas as pd sorted_group_list = [] for group in grouped_list[1]: df = pd.DataFrame(group, columns=['MZ', 'Intensity', 'RT']) df_sorted = df.sort_values(by=['MZ']) tuples = [tuple(x) for x in df_sorted.to_numpy()] sorted_group_list.append(tuples) return sorted_group_list

def deconv(grouped_list): import ms_deisotope deconv_peak_sets = [] rt_of_deconv_peak = [] for group in grouped_list: deconvoluted_peaks, targeted = ms_deisotope.deconvolute_peaks(group, averagine=ms_deisotope.peptide, scorer=ms_deisotope.MSDeconVFitter(10.)) if deconvoluted_peaks.peaks.len() > 0: deconv_peak_sets.append(deconvoluted_peaks) rt = np.median([x[2] for x in group]) rt_of_deconv_peak.append(rt) return deconv_peak_sets, rt_of_deconv_peak

def deconv_spark(peak_list): import ms_deisotope from lib.componentdetection.pdata import Feature feature_list = []

To return a new list, use the sorted() built-in function...

sorted_peak_list = sorted(peak_list[1], key=lambda p: p.MZ, reverse=False)
group = []
for peak in sorted_peak_list:
    group.append((peak.MZ,peak.Intensity,peak.RT))
deconvoluted_peaks, targeted = ms_deisotope.deconvolute_peaks(group, averagine=ms_deisotope.peptide,
                                                              scorer=ms_deisotope.MSDeconVFitter(10.))
if len(deconvoluted_peaks) > 0:
    for peak in deconvoluted_peaks.peaks:
        f = Feature()
        f.RT = np.median([x[2] for x in group])
        f.DeconvolutedMass = peak.neutral_mass
        feature_list.append(f)
    return feature_list
else:
    return []
im281 commented 4 years ago

Here is a small test file

On Fri, Jul 17, 2020 at 8:00 AM Iman Mohtashemi iman.mohtashemi@gmail.com wrote:

Let me check in the latest into : https://github.com/thermofisherlsms/parsec

although you can run the current implementation with the peak_detect class. Just clone the latest and try to run the following in 'parsec_runner.py'

peaks = spark.read.parquet(i) \
    .rdd.map(lambda scan: pc.kv_scan(scan)) \
    .filter(lambda scan: scan[1].MsOrder == 1).groupByKey() \
    .flatMap(lambda file: pc.bin_masses(file)) \
    .flatMap(lambda fileMassBin: pc.create_mass_traces(fileMassBin)) \
    .filter(lambda trace: len(trace[1].Trace) > 10) \
    .flatMap(lambda trace: detect.get_peak_list_v3(trace[1], 10)) \
    .map(lambda p: pc.map_peak(p, 0.1)) \
    .reduceByKey(lambda p1, p2: p1 + p2) \
    .flatMap(lambda peak_list:

detect.deconv_spark(peak_list)).collect()

On Fri, Jul 17, 2020 at 7:50 AM Iman Mohtashemi iman.mohtashemi@gmail.com wrote:

that is an error that I fixed (in different ways) before but to no avail

if len(deconvoluted_peaks.peaks) > 0:

if len(deconvoluted_peaks) > 0:

On Thu, Jul 16, 2020 at 5:57 PM Joshua Klein notifications@github.com wrote:

The only line of that that doesn't look okay is if deconvoluted_peaks.peaks > 0:. I suspect the intent is to say "if the number of peaks in the deconvoluted peak set is greater than zero", but that attribute is a tuple, it's the underlying collection that the DeconvolutedPeakSet object holds its contents in. Using > on it with an integer will result in a nonsense comparison in Py2, and an outright TypeError on Py3. If you're on Py3, and you're not getting an actual traceback, this would be my first check.

Could you not just use if len(deconvoluted_peaks) > 0: or even just if deconvoluted_peaks:? The latter is True only if there are peaks in the result.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/mobiusklein/ms_deisotope/issues/18#issuecomment-659763115, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACM5I4C635GDRFFQKWHYYLDR36OWJANCNFSM4O47W75A .

mobiusklein commented 4 years ago

The attachment you sent was stripped by Github.

I cloned parsec and added a file at lib/componentdetection/detect.py with the contents you provided, although I replaced the from peak import Peak with from .pcore import Peak.

I bootstrapped a new Py3.7 environment with PySpark, and ran the following code:

$ pyspark
Python 3.7.7 (default, May 21 2020, 14:57:43)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.16.1 -- An enhanced Interactive Python. Type '?' for help.
2020-07-20 07:32:11 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0
      /_/

Using Python version 3.7.7 (default, May 21 2020 14:57:43)
SparkSession available as 'spark'.

In [1]: from lib.componentdetection import detect

In [2]: i = "../data/testfiles/blacktea_1.parquet"

In [3]: from parsec_runner import *

In [4]: peaks = spark.read.parquet(i) \
   ...:         .rdd.map(lambda scan: pc.kv_scan(scan)) \
   ...:         .filter(lambda scan: scan[1].MsOrder == 1).groupByKey() \
   ...:         .flatMap(lambda file: pc.bin_masses(file)) \
   ...:         .flatMap(lambda fileMassBin: pc.create_mass_traces(fileMassBin)) \
   ...:         .filter(lambda trace: len(trace[1].Trace) > 10) \
   ...:         .flatMap(lambda trace: detect.get_peak_list_v3(trace[1], 10)) \
   ...:         .map(lambda p: pc.map_peak(p, 0.1)) \
   ...:         .reduceByKey(lambda p1, p2: p1 + p2) \
   ...:         .flatMap(lambda peak_list: detect.deconv_spark(peak_list)).collect()
2020-07-20 07:32:41 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
[Stage 2:>                                                          (0 + 1) / 1]/usr2/postdoc/jaklein/current_virtualenvs/py37/lib/python3.7/site-packages/scipy/ndimage/filters.py:145: RuntimeWarning: divide by zero encountered in true_divide
  phi_x = numpy.exp(-0.5 / sigma2 * x ** 2)
/usr2/postdoc/jaklein/current_virtualenvs/py37/lib/python3.7/site-packages/scipy/ndimage/filters.py:145: RuntimeWarning: invalid value encountered in multiply
  phi_x = numpy.exp(-0.5 / sigma2 * x ** 2)

In [5]: peaks
Out[5]:
[<lib.componentdetection.pdata.Feature at 0x7ff2e63dbf50>,
 <lib.componentdetection.pdata.Feature at 0x7ff2e63dbf10>,
 <lib.componentdetection.pdata.Feature at 0x7ff2e63e7350>,
 <lib.componentdetection.pdata.Feature at 0x7ff2e63dbf90>,
 ...

I didn't modify anything else. While debugging where the missing import came from, I saw that readable Python tracebacks were produced. Is there any chance you could reproduce the actual Python traceback you received when you tried to do this?

im281 commented 4 years ago

Wow! This is amazing! Let me check

On Mon, Jul 20, 2020 at 4:49 AM Joshua Klein notifications@github.com wrote:

The attachment you sent was stripped by Github.

I cloned parsec and added a file at lib/componentdetection/detect.py with the contents you provided, although I replaced the from peak import Peak with from .pcore import Peak.

I bootstrapped a new Py3.7 environment with PySpark, and ran the following code:

$ pysparkPython 3.7.7 (default, May 21 2020, 14:57:43)Type 'copyright', 'credits' or 'license' for more informationIPython 7.16.1 -- An enhanced Interactive Python. Type '?' for help.2020-07-20 07:32:11 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicableSetting default log level to "WARN".To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).Welcome to


 / __/__  ___ _____/ /__
_\ \/ _ \/ _ `/ __/  '_/

/ / ./_,// //_\ version 2.3.0 // Using Python version 3.7.7 (default, May 21 2020 14:57:43)SparkSession available as 'spark'. In [1]: from lib.componentdetection import detect In [2]: i = "../data/testfiles/blacktea_1.parquet" In [3]: from parsec_runner import In [4]: peaks = spark.read.parquet(i) \ ...: .rdd.map(lambda scan: pc.kv_scan(scan)) \ ...: .filter(lambda scan: scan[1].MsOrder == 1).groupByKey() \ ...: .flatMap(lambda file: pc.bin_masses(file)) \ ...: .flatMap(lambda fileMassBin: pc.create_mass_traces(fileMassBin)) \ ...: .filter(lambda trace: len(trace[1].Trace) > 10) \ ...: .flatMap(lambda trace: detect.get_peak_list_v3(trace[1], 10)) \ ...: .map(lambda p: pc.map_peak(p, 0.1)) \ ...: .reduceByKey(lambda p1, p2: p1 + p2) \ ...: .flatMap(lambda peak_list: detect.deconv_spark(peak_list)).collect()2020-07-20 07:32:41 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException [Stage 2:> (0 + 1) / 1]/usr2/postdoc/jaklein/current_virtualenvs/py37/lib/python3.7/site-packages/scipy/ndimage/filters.py:145: RuntimeWarning: divide by zero encountered in true_divide phi_x = numpy.exp(-0.5 / sigma2 x * 2)/usr2/postdoc/jaklein/current_virtualenvs/py37/lib/python3.7/site-packages/scipy/ndimage/filters.py:145: RuntimeWarning: invalid value encountered in multiply phi_x = numpy.exp(-0.5 / sigma2 x ** 2) In [5]: peaksOut[5]: [<lib.componentdetection.pdata.Feature at 0x7ff2e63dbf50>, <lib.componentdetection.pdata.Feature at 0x7ff2e63dbf10>, <lib.componentdetection.pdata.Feature at 0x7ff2e63e7350>, <lib.componentdetection.pdata.Feature at 0x7ff2e63dbf90>, ...

I didn't modify anything else. While debugging where the missing import came from, I saw that readable Python tracebacks were produced. Is there any chance you could reproduce the actual Python traceback you received when you tried to do this?

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/mobiusklein/ms_deisotope/issues/18#issuecomment-660979495, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACM5I4FRZ4ZFBTAYRUOH7BDR4QVMNANCNFSM4O47W75A .

mobiusklein commented 4 years ago

Were you able to replicate the result, or are you still having an issue?