mara / mara-pipelines

A lightweight opinionated ETL framework, halfway between plain scripts and Apache Airflow
MIT License
2.08k stars 102 forks source link

ValueError: cannot find context for 'fork' #35

Closed dyerrington closed 4 years ago

dyerrington commented 4 years ago

This example code on my system, I assume should run without error:

from data_integration.commands.bash import RunBash
from data_integration.pipelines import Pipeline, Task
from data_integration.ui.cli import run_pipeline, run_interactively

pipeline = Pipeline(
    id='demo',
    description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')

pipeline.add(Task(id='ping_localhost', description='Pings localhost',
                  commands=[RunBash('ping -c 3 localhost')]))

sub_pipeline = Pipeline(id='sub_pipeline', description='Pings a number of hosts')

for host in ['google', 'amazon', 'facebook']:
    sub_pipeline.add(Task(id=f'ping_{host}', description=f'Pings {host}',
                          commands=[RunBash(f'ping -c 3 {host}.com')]))

sub_pipeline.add_dependency('ping_amazon', 'ping_facebook')
sub_pipeline.add(Task(id='ping_foo', description='Pings foo',
                      commands=[RunBash('ping foo')]), ['ping_amazon'])

pipeline.add(sub_pipeline, ['ping_localhost'])

pipeline.add(Task(id='sleep', description='Sleeps for 2 seconds',
                  commands=[RunBash('sleep 2')]), ['sub_pipeline'])

run_pipeline(pipeline)

Here's the output of the script:

$ python historical.py
Traceback (most recent call last):
  File "C:\Users\david\Anaconda3\lib\multiprocessing\context.py", line 190, in get_context
    ctx = _concrete_contexts[method]
KeyError: 'fork'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "historical.py", line 28, in <module>
    run_pipeline(pipeline)
  File "C:\Users\david\Anaconda3\lib\site-packages\data_integration\ui\cli.py", line 46, in run_pipeline
    for event in execution.run_pipeline(pipeline, nodes, with_upstreams, interactively_started=interactively_started):
  File "C:\Users\david\Anaconda3\lib\site-packages\data_integration\execution.py", line 48, in run_pipeline
    multiprocessing_context = multiprocessing.get_context('fork')
  File "C:\Users\david\Anaconda3\lib\multiprocessing\context.py", line 238, in get_context
    return super().get_context(method)
  File "C:\Users\david\Anaconda3\lib\multiprocessing\context.py", line 192, in get_context
    raise ValueError('cannot find context for %r' % method)
ValueError: cannot find context for 'fork'

If it matters, I've also provisioned a PostgreSQL instance for mara:

import mara_db.auto_migration
import mara_db.config
import mara_db.dbs

mara_db.config.databases \
    = lambda: {'mara': mara_db.dbs.PostgreSQLDB(host='localhost', user='postgres', password = '', database='etl_mara')}

mara_db.auto_migration.auto_discover_models_and_migrate()
dyerrington commented 4 years ago

Also if it's helpful, this thread might shed some light into supporting multiprocessing on Windows: https://github.com/MVIG-SJTU/AlphaPose/issues/185

martin-loetzsch commented 4 years ago

Hi @dyerrington, forking is a central part of Mara data integration. We chose it over threads because it's more robust and (more importantly) avoids problems of memory leaks and garbage collection. Each task runs in a forked version of the main process, so whenever the task finishes, all allocated resources automatically vanish with the termination of the sub process.

The alternative would be to use a task queue and worker processes (such as in Airflow), but I think that unnecessarily increases the number of moving parts.

Forking unfortunately only works on Posix style operating systems. If you want to run Mara on Windows, please use the Windows Subsystem for Linux (https://en.wikipedia.org/wiki/Windows_Subsystem_for_Linux). I know quite a few people who successfully run Mara in that.

martin-loetzsch commented 4 years ago

I added a comment about this in README.md: https://github.com/mara/data-integration/commit/5ab58be71eebfcce94b41290af928c99bfab1245

dyerrington commented 4 years ago

For the record, I am using Ubuntu /w Linux subsystem on regular Windows 10.

martin-loetzsch commented 4 years ago

Ah ok. Re-opening this then.

What does your python -v say after source .venv/bin/activate?

dyerrington commented 4 years ago

On this particular machine, I am not using an environment (but I could). Here's the entirety of the output:

 dave  ⓔ  base  ~  python -v
import _frozen_importlib # frozen
import _imp # builtin
import '_thread' # <class '_frozen_importlib.BuiltinImporter'>
import '_warnings' # <class '_frozen_importlib.BuiltinImporter'>
import '_weakref' # <class '_frozen_importlib.BuiltinImporter'>
# installing zipimport hook
import 'zipimport' # <class '_frozen_importlib.BuiltinImporter'>
# installed zipimport hook
import '_frozen_importlib_external' # <class '_frozen_importlib.FrozenImporter'>
import '_io' # <class '_frozen_importlib.BuiltinImporter'>
import 'marshal' # <class '_frozen_importlib.BuiltinImporter'>
import 'posix' # <class '_frozen_importlib.BuiltinImporter'>
import _thread # previously loaded ('_thread')
import '_thread' # <class '_frozen_importlib.BuiltinImporter'>
import _weakref # previously loaded ('_weakref')
import '_weakref' # <class '_frozen_importlib.BuiltinImporter'>
# /home/dave/anaconda3/lib/python3.7/encodings/__pycache__/__init__.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/encodings/__init__.py
# code object from '/home/dave/anaconda3/lib/python3.7/encodings/__pycache__/__init__.cpython-37.pyc'
# /home/dave/anaconda3/lib/python3.7/__pycache__/codecs.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/codecs.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/codecs.cpython-37.pyc'
import '_codecs' # <class '_frozen_importlib.BuiltinImporter'>
import 'codecs' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3d23190>
# /home/dave/anaconda3/lib/python3.7/encodings/__pycache__/aliases.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/encodings/aliases.py
# code object from '/home/dave/anaconda3/lib/python3.7/encodings/__pycache__/aliases.cpython-37.pyc'
import 'encodings.aliases' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3d42410>
import 'encodings' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3d9e5d0>
# /home/dave/anaconda3/lib/python3.7/encodings/__pycache__/utf_8.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/encodings/utf_8.py
# code object from '/home/dave/anaconda3/lib/python3.7/encodings/__pycache__/utf_8.cpython-37.pyc'
import 'encodings.utf_8' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3d23150>
import '_signal' # <class '_frozen_importlib.BuiltinImporter'>
# /home/dave/anaconda3/lib/python3.7/encodings/__pycache__/latin_1.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/encodings/latin_1.py
# code object from '/home/dave/anaconda3/lib/python3.7/encodings/__pycache__/latin_1.cpython-37.pyc'
import 'encodings.latin_1' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3d49bd0>
# /home/dave/anaconda3/lib/python3.7/__pycache__/io.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/io.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/io.cpython-37.pyc'
# /home/dave/anaconda3/lib/python3.7/__pycache__/abc.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/abc.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/abc.cpython-37.pyc'
import '_abc' # <class '_frozen_importlib.BuiltinImporter'>
import 'abc' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3d4d610>
import 'io' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3d49c90>
# /home/dave/anaconda3/lib/python3.7/__pycache__/site.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/site.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/site.cpython-37.pyc'
# /home/dave/anaconda3/lib/python3.7/__pycache__/os.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/os.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/os.cpython-37.pyc'
# /home/dave/anaconda3/lib/python3.7/__pycache__/stat.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/stat.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/stat.cpython-37.pyc'
import '_stat' # <class '_frozen_importlib.BuiltinImporter'>
import 'stat' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3ced1d0>
# /home/dave/anaconda3/lib/python3.7/__pycache__/posixpath.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/posixpath.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/posixpath.cpython-37.pyc'
# /home/dave/anaconda3/lib/python3.7/__pycache__/genericpath.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/genericpath.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/genericpath.cpython-37.pyc'
import 'genericpath' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3cf5fd0>
import 'posixpath' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3cedad0>
# /home/dave/anaconda3/lib/python3.7/__pycache__/_collections_abc.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/_collections_abc.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/_collections_abc.cpython-37.pyc'
import '_collections_abc' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3cfa990>
import 'os' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3d5f210>
# /home/dave/anaconda3/lib/python3.7/__pycache__/_sitebuiltins.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/_sitebuiltins.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/_sitebuiltins.cpython-37.pyc'
import '_sitebuiltins' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3d5f750>
# /home/dave/anaconda3/lib/python3.7/__pycache__/_bootlocale.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/_bootlocale.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/_bootlocale.cpython-37.pyc'
import '_locale' # <class '_frozen_importlib.BuiltinImporter'>
import '_bootlocale' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3d5fc50>
# /home/dave/anaconda3/lib/python3.7/__pycache__/types.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/types.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/types.cpython-37.pyc'
import 'types' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3c84d50>
# /home/dave/anaconda3/lib/python3.7/importlib/__pycache__/__init__.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/importlib/__init__.py
# code object from '/home/dave/anaconda3/lib/python3.7/importlib/__pycache__/__init__.cpython-37.pyc'
# /home/dave/anaconda3/lib/python3.7/__pycache__/warnings.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/warnings.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/warnings.cpython-37.pyc'
import 'warnings' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3c8c650>
import 'importlib' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3c825d0>
# /home/dave/anaconda3/lib/python3.7/importlib/__pycache__/util.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/importlib/util.py
# code object from '/home/dave/anaconda3/lib/python3.7/importlib/__pycache__/util.cpython-37.pyc'
# /home/dave/anaconda3/lib/python3.7/importlib/__pycache__/abc.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/importlib/abc.py
# code object from '/home/dave/anaconda3/lib/python3.7/importlib/__pycache__/abc.cpython-37.pyc'
# /home/dave/anaconda3/lib/python3.7/importlib/__pycache__/machinery.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/importlib/machinery.py
# code object from '/home/dave/anaconda3/lib/python3.7/importlib/__pycache__/machinery.cpython-37.pyc'
import 'importlib.machinery' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3c9aa10>
import 'importlib.abc' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3c91c50>
# /home/dave/anaconda3/lib/python3.7/__pycache__/contextlib.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/contextlib.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/contextlib.cpython-37.pyc'
# /home/dave/anaconda3/lib/python3.7/collections/__pycache__/__init__.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/collections/__init__.py
# code object from '/home/dave/anaconda3/lib/python3.7/collections/__pycache__/__init__.cpython-37.pyc'
# /home/dave/anaconda3/lib/python3.7/__pycache__/operator.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/operator.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/operator.cpython-37.pyc'
import '_operator' # <class '_frozen_importlib.BuiltinImporter'>
import 'operator' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3c58690>
# /home/dave/anaconda3/lib/python3.7/__pycache__/keyword.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/keyword.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/keyword.cpython-37.pyc'
import 'keyword' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3c5b3d0>
# /home/dave/anaconda3/lib/python3.7/__pycache__/heapq.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/heapq.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/heapq.cpython-37.pyc'
# extension module '_heapq' loaded from '/home/dave/anaconda3/lib/python3.7/lib-dynload/_heapq.cpython-37m-x86_64-linux-gnu.so'
# extension module '_heapq' executed from '/home/dave/anaconda3/lib/python3.7/lib-dynload/_heapq.cpython-37m-x86_64-linux-gnu.so'
import '_heapq' # <_frozen_importlib_external.ExtensionFileLoader object at 0x7f9ec3c6ac90>
import 'heapq' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3c62dd0>
import 'itertools' # <class '_frozen_importlib.BuiltinImporter'>
# /home/dave/anaconda3/lib/python3.7/__pycache__/reprlib.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/reprlib.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/reprlib.cpython-37.pyc'
import 'reprlib' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3c6afd0>
import '_collections' # <class '_frozen_importlib.BuiltinImporter'>
import 'collections' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3c36190>
# /home/dave/anaconda3/lib/python3.7/__pycache__/functools.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/functools.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/functools.cpython-37.pyc'
import '_functools' # <class '_frozen_importlib.BuiltinImporter'>
import 'functools' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3c36890>
import 'contextlib' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3c9a450>
import 'importlib.util' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3c8f5d0>
# possible namespace for /home/dave/anaconda3/lib/python3.7/site-packages/mpl_toolkits
# destroy sphinxcontrib
# destroy sphinxcontrib
# destroy sphinxcontrib
# destroy sphinxcontrib
# destroy sphinxcontrib
# destroy sphinxcontrib
# possible namespace for /home/dave/anaconda3/lib/python3.7/site-packages/zope
import 'site' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec3d50a90>
Python 3.7.4 (default, Aug 13 2019, 20:35:49)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
# extension module 'readline' loaded from '/home/dave/anaconda3/lib/python3.7/lib-dynload/readline.cpython-37m-x86_64-linux-gnu.so'
# extension module 'readline' executed from '/home/dave/anaconda3/lib/python3.7/lib-dynload/readline.cpython-37m-x86_64-linux-gnu.so'
import 'readline' # <_frozen_importlib_external.ExtensionFileLoader object at 0x7f9ec2cbce90>
import 'atexit' # <class '_frozen_importlib.BuiltinImporter'>
# /home/dave/anaconda3/lib/python3.7/__pycache__/rlcompleter.cpython-37.pyc matches /home/dave/anaconda3/lib/python3.7/rlcompleter.py
# code object from '/home/dave/anaconda3/lib/python3.7/__pycache__/rlcompleter.cpython-37.pyc'
import 'rlcompleter' # <_frozen_importlib_external.SourceFileLoader object at 0x7f9ec2cc53d0>
martin-loetzsch commented 4 years ago

In your original comment, the Python path was C:\Users\david\Anaconda3.

The second example seems to use /home/dave/anaconda3/lib/python3.7/.

Could it be that in the original comment you didn't use the Python that you installed in WSL but instead were in a normal Windows shell?

And if not, can you try the normal python that you get with apt-get install python?