quantile-development / dagster-ext

10 stars 5 forks source link

Error: Could not process json - on `meltano invoke dagster:start` #2

Open seanglynn-thrive opened 1 year ago

seanglynn-thrive commented 1 year ago

I am getting a JSON parser error when I run the meltano invoke dagster:start from my meltano project directory Steps I have followed:

meltano add utility dagster-ext

meltano install

meltano invoke dagster:initialize

# Fails here:
meltano invoke dagster:start

meltano.yml utilities:

...
  utilities:
  - name: dagster
    variant: quantile-development
    pip_url: dagster-ext
    config:
      dagster_home: $MELTANO_PROJECT_ROOT/.meltano/dagster
      repository_dir: $MELTANO_PROJECT_ROOT/orchestrator/dagster
...

Full error trace

image
Error loading repository.py. Try reloading the repository location after resolving the issue.
ValueError: Could not process json:
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster/_grpc/server.py", line 241, in __init__
    self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster/_grpc/server.py", line 104, in __init__
    loadable_targets = get_loadable_targets(
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster/_grpc/utils.py", line 33, in get_loadable_targets
    else loadable_targets_from_python_file(python_file, working_directory)
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster/_core/workspace/autodiscovery.py", line 27, in loadable_targets_from_python_file
    loaded_module = load_python_file(python_file, working_directory)
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster/_core/code_pointer.py", line 86, in load_python_file
    return import_module_from_path(module_name, python_file)
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster/_seven/__init__.py", line 51, in import_module_from_path
    spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 850, in exec_module
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/Users/user/DEV/meltano-pipelines/orchestrator/dagster/repository.py", line 16, in <module>
    meltano_jobs = load_jobs_from_meltano_project(MELTANO_PROJECT_DIR)
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster_meltano/generation.py", line 32, in load_jobs_from_meltano_project
    return list(meltano_jobs)
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster_meltano/meltano_resource.py", line 76, in jobs
    for meltano_job in self.meltano_jobs:
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster_meltano/meltano_resource.py", line 54, in meltano_jobs
    meltano_job_list = self.meltano_yaml["jobs"]
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster_meltano/meltano_resource.py", line 48, in meltano_yaml
    jobs, schedules = asyncio.run(self.gather_meltano_yaml_information())
  File "/Users/user/.pyenv/versions/3.9.13/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/user/.pyenv/versions/3.9.13/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster_meltano/meltano_resource.py", line 38, in gather_meltano_yaml_information
    jobs, schedules = await asyncio.gather(
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster_meltano/meltano_resource.py", line 30, in load_json_from_cli
    _, log_results = await self.meltano_invoker.exec(
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster_meltano/meltano_invoker.py", line 120, in exec
    raise log_result
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster_meltano/log_processing/__init__.py", line 33, in process_logs
    return self.results
  File "/Users/user/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster_meltano/log_processing/json_processor.py", line 22, in results
    raise ValueError(f"Could not process json: {self.log_lines}")
# Meltano vars:
MELTANO_PROJECT_DIR: /Users/xxx/DEV/meltano-pipelines
MELTANO_BIN: meltano
JulesHuisman commented 1 year ago

With the following meltano.yml it doesn't throw any errors for me:

version: 1
default_environment: dev
project_id: 3275c848-a861-49c5-a55a-f2c70e2b40a3
environments:
- name: dev
- name: staging
- name: prod
plugins:
  utilities:
  - name: dagster
    variant: quantile-development
    pip_url: dagster-ext
    config:
      repository_dir: $MELTANO_PROJECT_ROOT/orchestrate/dagster

What do the commands:

meltano job list --format=json

and

meltano schedule list --format=json

return for you?

seanglynn-thrive commented 1 year ago

Hi @JulesHuisman, Thanks for getting back so soon.

The output of the two commands:


╭─user@local ~/DEV/meltano-pipelines ‹dagster-orchestrator-spike●› 
╰─$ meltano job list --format=json                                                                                                                                                                        130 ↵

{
  "jobs": [
    {
      "job_name": "tap-okta-target-bigquery",
      "tasks": [
        "tap-okta target-bigquery"
      ]
    }
  ]
}
╭─user@local ~/DEV/meltano-pipelines ‹dagster-orchestrator-spike●› 
╰─$ meltano schedule list --format=json

{
  "schedules": {
    "job": [],
    "elt": [
      {
        "name": "postgres-to-jsonl",
        "extractor": "tap-postgres",
        "loader": "target-jsonl",
        "transform": "skip",
        "interval": "@once",
        "start_date": "2010-01-01",
        "env": {},
        "cron_interval": null,
        "last_successful_run_ended_at": null,
        "elt_args": [
          "tap-postgres",
          "target-jsonl",
          "--transform=skip",
          "--state-id=postgres-to-jsonl"
        ]
      },
      {
        "name": "postgres-to-bigquery",
        "extractor": "tap-postgres",
        "loader": "target-bigquery",
        "transform": "skip",
        "interval": "@once",
        "start_date": "2010-01-01",
        "env": {},
        "cron_interval": null,
        "last_successful_run_ended_at": null,
        "elt_args": [
          "tap-postgres",
          "target-bigquery",
          "--transform=skip",
          "--state-id=postgres-to-bigquery"
        ]
      },
      {
        "name": "tap-cloudflare-graphql-to-bigquery",
        "extractor": "tap_cloudflare_graphql",
        "loader": "target-bigquery",
        "transform": "skip",
        "interval": "@once",
        "start_date": "2010-01-01",
        "env": {},
        "cron_interval": null,
        "last_successful_run_ended_at": null,
        "elt_args": [
          "tap_cloudflare_graphql",
          "target-bigquery",
          "--transform=skip",
          "--state-id=tap-cloudflare-graphql-to-bigquery"
        ]
      },
      {
        "name": "cloudflare-graphql-to-csv",
        "extractor": "tap-cloudflare-graphql",
        "loader": "target-csv",
        "transform": "skip",
        "interval": "@once",
        "start_date": "2010-01-01",
        "env": {},
        "cron_interval": null,
        "last_successful_run_ended_at": null,
        "elt_args": [
          "tap-cloudflare-graphql",
          "target-csv",
          "--transform=skip",
          "--state-id=cloudflare-graphql-to-csv"
        ]
      },
      {
        "name": "csv-to-bigquery",
        "extractor": "tap-csv",
        "loader": "target-bigquery",
        "transform": "skip",
        "interval": "@once",
        "start_date": "2010-01-01",
        "env": {},
        "cron_interval": null,
        "last_successful_run_ended_at": null,
        "elt_args": [
          "tap-csv",
          "target-bigquery",
          "--transform=skip",
          "--state-id=csv-to-bigquery"
        ]
      }
    ]
  }
}
JulesHuisman commented 1 year ago

Mmm, that is strange. The dagster-ext runs these commands behind the scenes to get the jobs and schedules. It then parses these json responses.

The way the exception is constructed is that it shows the string it wants to parse (The part after Could not process json:). Which in this case is empty, which means the meltano job list --format=json returns an empty string.

And does everything work if you create an empty project with just the dagster extension?

JulesHuisman commented 1 year ago

@seanglynn-thrive Did you eventually figure out what the issue was?

seanglynn-thrive commented 1 year ago

@seanglynn-thrive Did you eventually figure out what the issue was?

@JulesHuisman my apologies for the late response. We could not get this to work even with the above meltano.yml We now get a different exception on the meltano invoke dagster:initialize command:

╰─$ poetry run meltano invoke dagster:initialize

2022-12-12T09:36:14.018975Z [info     ] Environment 'dev' is active
Where do you want to install the Dagster project? ($MELTANO_PROJECT_ROOT/orchestrate/dagster): 
Traceback (most recent call last):
  File "/Users/sean.glynn/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster_ext/main.py", line 34, in initialize
    ext.initialize(force)
  File "/Users/sean.glynn/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster_ext/extension.py", line 79, in initialize
    self.set_meltano_config(
  File "/Users/sean.glynn/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/dagster_ext/extension.py", line 64, in set_meltano_config
    self.get_invoker_by_name("meltano").run(
  File "/Users/sean.glynn/DEV/meltano-pipelines/.meltano/utilities/dagster/venv/lib/python3.9/site-packages/meltano/edk/process.py", line 88, in run
    return subprocess.run(
  File "/Users/sean.glynn/.pyenv/versions/3.9.13/lib/python3.9/subprocess.py", line 528, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['meltano', 'config', 'dagster', 'set', 'repository_dir', '$MELTANO_PROJECT_ROOT/orchestrate/dagster']' returned non-zero exit status 1.
initialize failed with uncaught exception, please report to maintainer

Environment info:


╰─$ poetry run python --version                                          
Python 3.9.13

╰─$ poetry show         
aiodocker              0.21.0      Docker API client for asyncio
aiohttp                3.8.3       Async http client/server framework (asyncio)
aiosignal              1.3.1       aiosignal: a list of registered asynchronous callbacks
alembic                1.8.1       A database migration tool for SQLAlchemy.
aniso8601              9.0.1       A library for parsing ISO 8601 strings.
anyio                  3.6.2       High level compatibility layer for multiple asynchronous event loop implementations
asgiref                3.5.2       ASGI specs, helper code, and adapters
async-timeout          4.0.2       Timeout context manager for asyncio programs
atomicwrites           1.4.1       Atomic file writes.
attrs                  22.1.0      Classes Without Boilerplate
authlib                1.2.0       The ultimate Python library in building OAuth and OpenID Connect servers and clients.
babel                  2.11.0      Internationalization utilities
backoff                2.2.1       Function decoration for backoff and retry
bcrypt                 3.2.2       Modern password hashing for your software and your servers
blinker                1.5         Fast, simple object-to-object and broadcast signaling
cached-property        1.5.2       A decorator for caching properties in classes.
certifi                2022.12.7   Python package for providing Mozilla's CA Bundle.
cffi                   1.15.1      Foreign Function Interface for Python calling C code.
charset-normalizer     2.1.1       The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet.
click                  8.1.3       Composable command line interface toolkit
click-default-group    1.2.2       Extends click.Group to invoke a command without explicit subcommand name
commonmark             0.9.1       Python parser for the CommonMark Markdown spec
croniter               1.3.8       croniter provides iteration for datetime object with cron like format
cryptography           38.0.4      cryptography is a package which provides cryptographic recipes and primitives to Python developers.
dnspython              2.2.1       DNS toolkit
email-validator        1.3.0       A robust email address syntax and deliverability validation library.
fasteners              0.17.3      A python package that provides useful locks
flask                  2.1.3       A simple framework for building complex web applications.
flask-babelex          0.9.4       Adds i18n/l10n support to Flask applications
flask-cors             3.0.10      A Flask extension adding a decorator for CORS support
flask-executor         0.10.0      An easy to use Flask wrapper for concurrent.futures
flask-login            0.6.1       User authentication and session management for Flask.
flask-mail             0.9.1       Flask extension for sending email
flask-principal        0.4.0       Identity management for flask
flask-restful          0.3.9       Simple framework for creating REST APIs
flask-sqlalchemy       2.5.1       Adds SQLAlchemy support to your Flask application.
flask-wtf              1.0.1       Form rendering, validation, and CSRF protection for Flask with WTForms.
flatten-dict           0.4.2       A flexible utility for flattening and unflattening dict-like objects in Python.
frozenlist             1.3.3       A list-like structure which implements collections.abc.MutableSequence
gunicorn               20.1.0      WSGI HTTP Server for UNIX
h11                    0.14.0      A pure-Python, bring-your-own-I/O implementation of HTTP/1.1
httptools              0.5.0       A collection of framework independent HTTP protocol utils.
idna                   3.4         Internationalized Domain Names in Applications (IDNA)
importlib-metadata     5.1.0       Read metadata from Python packages
importlib-resources    5.10.1      Read resources from Python packages
itsdangerous           2.1.2       Safely pass data to untrusted environments and back.
jinja2                 3.1.2       A very fast and expressive template engine.
jsonschema             4.17.3      An implementation of JSON Schema validation for Python
mako                   1.2.4       A super-fast templating language that borrows the best ideas from the existing templating languages.
markdown               3.4.1       Python implementation of Markdown.
markupsafe             2.1.1       Safely add untrusted strings to HTML/XML markup.
meltano                2.11.1      Meltano: Your DataOps Platform Infrastructure
meltano-flask-security 0.1.0       Simple security for Flask apps.
multidict              6.0.3       multidict implementation
packaging              21.3        Core utilities for Python packages
passlib                1.7.4       comprehensive password hashing framework supporting over 30 schemes
psutil                 5.9.4       Cross-platform lib for process and system monitoring in Python.
psycopg2-binary        2.9.5       psycopg2 - Python-PostgreSQL Database Adapter
pycparser              2.21        C parser in Python
pygments               2.13.0      Pygments is a syntax highlighting package written in Python.
pyhumps                3.8.0       🐫  Convert strings (and dictionary keys) between snake case, camel case and pascal case in Python. Inspired by Humps for Node
pyparsing              3.0.9       pyparsing module - Classes and methods to define and execute parsing grammars
pyrsistent             0.19.2      Persistent/Functional/Immutable data structures
python-dateutil        2.8.2       Extensions to the standard Python datetime module
python-dotenv          0.20.0      Read key-value pairs from a .env file and set them as environment variables
python-gitlab          3.12.0      Interact with GitLab API
pytz                   2022.6      World timezone definitions, modern and historical
pytz-deprecation-shim  0.1.0.post0 Shims to make deprecation of pytz easier
pyyaml                 6.0         YAML parser and emitter for Python
requests               2.28.1      Python HTTP for Humans.
requests-toolbelt      0.10.1      A utility belt for advanced users of python-requests
rich                   12.6.0      Render rich text, tables, progress bars, syntax highlighting, markdown and more to the terminal
ruamel-yaml            0.17.21     ruamel.yaml is a YAML parser/emitter that supports roundtrip preservation of comments, seq/map flow style, and map key order
ruamel-yaml-clib       0.2.7       C version of reader, parser and emitter for ruamel.yaml derived from libyaml
setuptools             65.6.3      Easily download, build, install, upgrade, and uninstall Python packages
six                    1.16.0      Python 2 and 3 compatibility utilities
smart-open             6.3.0       Utils for streaming large files (S3, HDFS, GCS, Azure Blob Storage, gzip, bz2...)
smtpapi                0.4.12      Simple wrapper to use SendGrid SMTP API
sniffio                1.3.0       Sniff out which async library your code is running under
snowplow-tracker       0.10.0      Snowplow event tracker for Python. Add analytics to your Python and Django apps, webapps and games
speaklater             1.3         implements a lazy string for python useful for use with gettext
sqlalchemy             1.4.45      Database Abstraction Library
structlog              21.5.0      Structured Logging for Python
typing-extensions      4.4.0       Backported and Experimental Type Hints for Python 3.7+
tzdata                 2022.7      Provider of IANA time zone data
tzlocal                4.2         tzinfo object for the local timezone
urllib3                1.26.13     HTTP library with thread-safe connection pooling, file post, and more.
uvicorn                0.17.6      The lightning-fast ASGI server.
uvloop                 0.17.0      Fast implementation of asyncio event loop on top of libuv
watchgod               0.8.2       Simple, modern file watching and code reload in python.
websockets             10.4        An implementation of the WebSocket Protocol (RFC 6455 & 7692)
werkzeug               2.1.2       The comprehensive WSGI web application library.
wtforms                3.0.1       Form validation and rendering for Python web development.
yarl                   1.8.2       Yet another URL library
zipp                   3.11.0      Backport of pathlib-compatible object wrapper for zip files