dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.14k stars 1.4k forks source link

Schedule dont work on production dagster.check.CheckError but task manualy run ok #4261

Open galloramiro opened 3 years ago

galloramiro commented 3 years ago

Summary

I have a dagster service in a docker compose that use a postgres sql, a dagit, and the dagster daemon with the deffault DagsterDaemonScheduler. When I run this localy works fine, the schedules run on time with out error. But when I put it on production the schedules dont run and give me the following error:

dagster.check.CheckError: Invariant failed. Description: Attempted to deserialize class "ExternalScheduleExecutionData" which is not in the whitelist.
  File "/usr/local/lib/python3.8/site-packages/dagster/scheduler/scheduler.py", line 204, in launch_scheduled_runs_for_schedule
    yield from _schedule_runs_at_time(
  File "/usr/local/lib/python3.8/site-packages/dagster/scheduler/scheduler.py", line 254, in _schedule_runs_at_time
    schedule_execution_data = repo_location.get_external_schedule_execution_data(
  File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/repository_location.py", line 682, in get_external_schedule_execution_data
    return sync_get_external_schedule_execution_data_grpc(
  File "/usr/local/lib/python3.8/site-packages/dagster/api/snapshot_schedule.py", line 45, in sync_get_external_schedule_execution_data_grpc
    api_client.external_schedule_execution(
  File "/usr/local/lib/python3.8/site-packages/dagster/grpc/client.py", line 277, in external_schedule_execution
    return deserialize_json_to_dagster_namedtuple(
  File "/usr/local/lib/python3.8/site-packages/dagster/serdes/serdes.py", line 241, in deserialize_json_to_dagster_namedtuple
    dagster_namedtuple = _deserialize_json_to_dagster_namedtuple(
  File "/usr/local/lib/python3.8/site-packages/dagster/serdes/serdes.py", line 252, in _deserialize_json_to_dagster_namedtuple
    return _unpack_value(seven.json.loads(json_str), whitelist_map=whitelist_map)
  File "/usr/local/lib/python3.8/site-packages/dagster/serdes/serdes.py", line 276, in _unpack_value
    check.invariant(
  File "/usr/local/lib/python3.8/site-packages/dagster/check/__init__.py", line 167, in invariant
    raise CheckError(f"Invariant failed. Description: {desc}")

But, if I run the task manually on production they run ok. I see the documentation and seams to a type check error, and I remove all the typing on the tasks, but nothing happend.

Reproduction

I dont know how to reproduce this. Because in local envirorment works fine, the task run manualy on productions, and I cant find the error.

Dagit UI/UX Issue Screenshots

**Error** ![Screenshot from 2021-06-08 10-23-58](https://user-images.githubusercontent.com/25160187/121192848-b5990300-c843-11eb-9753-580d541fc0d8.png) **Runs on production** ![Screenshot from 2021-06-08 10-25-21](https://user-images.githubusercontent.com/25160187/121193473-42dc5780-c844-11eb-8868-f06a0ffc56e7.png) ## Additional Info about Your Environment

Example of repositories

repository.py

@repository
def cement_scraper_repository():
    return [cement_scrapper_pipeline, daily_cement_scrapper_schedule]

@repository
def mysql_intercement_bi_repository():
    return [
        real_calls_pipeline, daily_real_calls_schedule,
        # forecast_pipeline, daily_forecast_schedule  # We dont use forecast at the moment
    ]

@repository
def call_center_repository():
    return [call_center_pipeline, daily_call_center_schedule]

@repository
def br_concrete_repository():
    return [br_concrete_pipeline, br_concrete_schedule]

Example of schedules

call_center.py

from datetime import time, datetime

from dagster import daily_schedule

@daily_schedule(
    pipeline_name="call_center_pipeline", 
    start_date=datetime(2021, 1, 1), 
    execution_time=time(hour=5, minute=15),
    execution_timezone='America/Argentina/Buenos_Aires',
)
def daily_call_center_schedule(date):
    return dict(solids=dict(
        get_service=dict(config=dict(date=dict(value=date.strftime("%Y-%m-%d")))),
        save_yesterday_general_values=dict(config=dict(date=dict(value=date.strftime("%Y-%m-%d")))),
        save_calls_for_options=dict(config=dict(date=dict(value=date.strftime("%Y-%m-%d")))),
        save_calls_status_for_month_by_year=dict(config=dict(date=dict(value=date.strftime("%Y-%m-%d")))),
        save_abandon_calls_count_by_day_and_queue=dict(config=dict(date=dict(value=date.strftime("%Y-%m-%d")))),
        save_top_15_client_calls_by_sales_representatives=dict(config=dict(date=dict(value=date.strftime("%Y-%m-%d")))),
        save_top_15_customers=dict(config=dict(date=dict(value=date.strftime("%Y-%m-%d")))),
        save_top_15_sales_representatives_calls_to_the_call_center=dict(config=dict(date=dict(value=date.strftime("%Y-%m-%d")))),
        save_top_no_customers=dict(config=dict(date=dict(value=date.strftime("%Y-%m-%d")))),
        save_customers_vs_no_customers=dict(config=dict(date=dict(value=date.strftime("%Y-%m-%d")))),
        ),
    )

Example of tasks

from datetime import date, datetime
from typing import Tuple
from dateutil.relativedelta import relativedelta

from dagster import solid, Failure, pipeline
from scrapper.call_center.factory import build_general_call_center_service
from scrapper.services.gitlab_logger import GitLabLogger
from scrapper.services.utils import get_yesterday_and_first_day_of_that_month

logger = GitLabLogger()

@solid
def get_service(_):
    return build_general_call_center_service()

@solid
def save_yesterday_general_values(context, service):
    yesterday = (date.today() - relativedelta(days=1)).strftime('%Y-%m-%d')
    was_saved, was_saved_str = service.save_general_values_on_db_by_date(initial_date=yesterday, end_date=yesterday)

    if not was_saved:
        context.log.info(was_saved_str)
        logger.send(
            title='call_center', 
            description=f'Could not save_yesterday_general_values:\n{was_saved_str}',
            start_time=datetime.now(),
            severity=1,
        )
        raise Failure(was_saved_str)

    context.log.info(was_saved_str)
    return was_saved

@solid
def save_calls_for_options(context, service, general_values_saved):
    yesterday_year = (date.today() - relativedelta(days=1)).strftime('%Y')
    was_saved, was_saved_str = service.save_calls_for_options_by_year(year=yesterday_year)

    if not was_saved:
        context.log.info(was_saved_str)
        logger.send(
            title='call_center', 
            description=f'Could not save_calls_for_options:\n{was_saved_str}',
            start_time=datetime.now(),
            severity=1,
        )
        raise Failure(was_saved_str)

    context.log.info(was_saved_str)

    return was_saved

@solid
def save_calls_status_for_month_by_year(context, service, general_values_saved):
    yesterday_year = (date.today() - relativedelta(days=1)).strftime('%Y')
    was_saved, was_saved_str = service.save_calls_status_for_month_by_year(year=yesterday_year)

    if not was_saved:
        context.log.info(was_saved_str)
        logger.send(
            title='call_center', 
            description=f'Could not save_calls_status_for_month_by_year:\n{was_saved_str}',
            start_time=datetime.now(),
            severity=1,
        )
        raise Failure(was_saved_str)

    context.log.info(was_saved_str)

    return was_saved

@solid
def save_abandon_calls_count_by_day_and_queue(context, service, general_values_saved):
    yesterday = (date.today() - relativedelta(days=1)).strftime('%Y-%m-%d')
    was_saved, was_saved_str = service.save_abandon_calls_count_by_day_and_queue(initial_date=yesterday, end_date=yesterday)

    if not was_saved:
        context.log.info(was_saved_str)
        logger.send(
            title='call_center', 
            description=f'Could not save_abandon_calls_count_by_day_and_queue:\n{was_saved_str}',
            start_time=datetime.now(),
            severity=1,
        )
        raise Failure(was_saved_str)

    context.log.info(was_saved_str)

    return was_saved

@solid
def save_top_15_client_calls_by_sales_representatives(context, service, general_values_saved):
    yesterday, first_day_of_the_month = get_yesterday_and_first_day_of_that_month()
    was_saved, was_saved_str = service.save_top_15_client_calls_by_sales_representatives(
        initial_date=first_day_of_the_month, end_date=yesterday
        )

    if not was_saved:
        context.log.info(was_saved_str)
        logger.send(
            title='call_center', 
            description=f'Could not save_top_15_client_calls_by_sales_representatives:\n{was_saved_str}',
            start_time=datetime.now(),
            severity=1,
        )
        raise Failure(was_saved_str)

    context.log.info(was_saved_str)

    return was_saved

@solid
def save_top_15_customers(context, service, general_values_saved):
    yesterday, first_day_of_the_month = get_yesterday_and_first_day_of_that_month()
    was_saved, was_saved_str = service.save_top_15_customers(initial_date=first_day_of_the_month, end_date=yesterday)

    if not was_saved:
        context.log.info(was_saved_str)
        logger.send(
            title='call_center', 
            description=f'Could not save_top_15_customers:\n{was_saved_str}',
            start_time=datetime.now(),
            severity=1,
        )
        raise Failure(was_saved_str)

    context.log.info(was_saved_str)

    return was_saved

@solid
def save_top_15_sales_representatives_calls_to_the_call_center(context, service, general_values_saved):
    yesterday, first_day_of_the_month = get_yesterday_and_first_day_of_that_month()
    was_saved, was_saved_str = service.save_top_15_sales_representatives_calls_to_the_call_center(
        initial_date=first_day_of_the_month, end_date=yesterday
    )

    if not was_saved:
        context.log.info(was_saved_str)
        logger.send(
            title='call_center', 
            description=f'Could not save_top_15_sales_representatives_calls_to_the_call_center:\n{was_saved_str}',
            start_time=datetime.now(),
            severity=1,
        )
        raise Failure(was_saved_str)

    context.log.info(was_saved_str)

    return was_saved

@solid
def save_top_no_customers(context, service, general_values_saved):
    yesterday, first_day_of_the_month = get_yesterday_and_first_day_of_that_month()
    was_saved, was_saved_str = service.save_top_no_customers(initial_date=first_day_of_the_month, end_date=yesterday)

    if not was_saved:
        context.log.info(was_saved_str)
        logger.send(
            title='call_center', 
            description=f'Could not save_top_no_customers:\n{was_saved_str}',
            start_time=datetime.now(),
            severity=1,
        )
        raise Failure(was_saved_str)

    context.log.info(was_saved_str)

    return was_saved

@solid
def save_customers_vs_no_customers(context, service, general_values_saved):
    yesterday, first_day_of_the_month = get_yesterday_and_first_day_of_that_month()
    was_saved, was_saved_str = service.save_customers_vs_no_customers(initial_date=first_day_of_the_month, end_date=yesterday)

    if not was_saved:
        context.log.info(was_saved_str)
        logger.send(
            title='call_center', 
            description=f'Could not save_customers_vs_no_customers:\n{was_saved_str}',
            start_time=datetime.now(),
            severity=1,
        )
        raise Failure(was_saved_str)

    context.log.info(was_saved_str)

    return was_saved

@pipeline
def call_center_pipeline():
    service = get_service()
    general_values_saved = save_yesterday_general_values(service=service)

    if not general_values_saved:
        raise Failure('No general values saved, we cant proceed')

    save_calls_for_options(service=service, general_values_saved=general_values_saved)
    save_calls_status_for_month_by_year(service=service, general_values_saved=general_values_saved)
    save_abandon_calls_count_by_day_and_queue(service=service, general_values_saved=general_values_saved)
    save_top_15_client_calls_by_sales_representatives(service=service, general_values_saved=general_values_saved)
    save_top_15_customers(service=service, general_values_saved=general_values_saved)
    save_top_15_sales_representatives_calls_to_the_call_center(service=service, general_values_saved=general_values_saved)
    save_top_no_customers(service=service, general_values_saved=general_values_saved)
    save_customers_vs_no_customers(service=service, general_values_saved=general_values_saved)

Message from the maintainers:

Impacted by this bug? Give it a 👍. We factor engagement into prioritization.

gibsondan commented 3 years ago

Hi @galloramiro - is it possible to share your workspace.yaml file / say more about what the difference is between your production setup and your local setup? This error looks like your pipeline code may be running with an older version of dagster than your daemon. If you're running your own gRPC server, I would make sure that you built it using the same version of dagster that your daemon and dagit are using (you could verify this by re-building or re-installing it from scratch).

galloramiro commented 3 years ago

Hi @gibsondan thanks for the answer. Here is my workspace.yaml

load_from:
  - grpc_server:
      host: scrappers_web_tasks
      port: 4000
      location_name: "scrappers_web_tasks"

And here my dagster.yaml

scheduler:
  module: dagster.core.scheduler
  class: DagsterDaemonScheduler

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator

run_storage:
  module: dagster_postgres.run_storage
  class: PostgresRunStorage
  config:
    postgres_db:
      hostname: postgresql_db
      username:
        env: DAGSTER_POSTGRES_USER
      password:
        env: DAGSTER_POSTGRES_PASSWORD
      db_name:
        env: DAGSTER_POSTGRES_DB
      port: 5432

schedule_storage:
  module: dagster_postgres.schedule_storage
  class: PostgresScheduleStorage
  config:
    postgres_db:
      hostname: postgresql_db
      username:
        env: DAGSTER_POSTGRES_USER
      password:
        env: DAGSTER_POSTGRES_PASSWORD
      db_name:
        env: DAGSTER_POSTGRES_DB
      port: 5432

event_log_storage:
  module: dagster_postgres.event_log
  class: PostgresEventLogStorage
  config:
    postgres_db:
      hostname: postgresql_db
      username:
        env: DAGSTER_POSTGRES_USER
      password:
        env: DAGSTER_POSTGRES_PASSWORD
      db_name:
        env: DAGSTER_POSTGRES_DB
      port: 5432

telemetry:
      enabled: false

I think there is no difference between local an prod, but they are runing on pods, this culd be a difference? I m not very familiarized with the devops world hahah

gibsondan commented 3 years ago

Got it - can you make sure that the version of dagster that your gRPC server is using is fully up to date? The "ExternalScheduleExecutionData" object mentioned in the error no longer exists in the codebase and is returned from your gRPC server, which implies pretty strongly to me that it's using an older version of dagster.

galloramiro commented 3 years ago

Srry if its a newby question but how can I make shuer of that?

here are my requierements for dagster:

dagster==0.11.10
dagit==0.11.10
dagster-cron==0.11.10
dagster-pandas==0.11.10
dagster-docker==0.11.10
dagster-postgres==0.11.10

Because from what i understunt the gRPC is comming from the scrappers_web_tasks service from docker, that is true?

gibsondan commented 3 years ago

You probably need to rebuild the scrappers_web_tasks service (and make sure that its Dockerfile is using the same requirements that you just listed there). I'd need to see your exact Dockerfile to give specific advice on how to do that, but the ultimate goal is to make sure that it's installing dagster 0.11.10 and not some older version.

galloramiro commented 3 years ago

Srry for the delay, but tring to get with the people that deploy this. We rebuild all the things and still giving the same error.

This are the libraries installed in each container/pod are:

scrappe_web_tasks

dagit                        0.11.10
dagster                      0.11.10
dagster-cron                 0.11.10
dagster-docker               0.11.10
dagster-graphql              0.11.10
dagster-pandas               0.11.10
dagster-postgres             0.11.10

dagit

dagit                  0.11.12
dagster                0.11.12
dagster-docker         0.11.12
dagster-graphql        0.11.12
dagster-postgres       0.11.12

dagster-daemon

dagit                  0.11.12
dagster                0.11.12
dagster-docker         0.11.12
dagster-graphql        0.11.12
dagster-postgres       0.11.12

And here is the DockerFile for scrappe_web_tasks

FROM python:3.8-slim-buster
ENV PYTHONUNBUFFERED=1

RUN apt-get update

# Set general dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    curl \
    libgflags-dev \
    libsnappy-dev \
    zlib1g-dev \
    libbz2-dev \
    liblz4-dev \
    librocksdb-dev \
    libzstd-dev \
    cron \
    wget \
    xvfb \
    unzip \
    unixodbc \
    gcc \
    g++ \
    gnupg \
    gnupg1 \
    gnupg2 \
    unixodbc-dev \
    freetds-dev \
    freetds-bin \
    tdsodbc \
    tzdata \
    python3-dev

# Set up chrome dependencies
RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add -
RUN echo "deb http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list
RUN apt-get update -y
RUN apt-get install -y google-chrome-stable

# Add SQL Server ODBC Driver 17
RUN apt-get install curl \
    && curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - \
    && curl https://packages.microsoft.com/config/debian/10/prod.list > /etc/apt/sources.list.d/mssql-release.list \
    && apt-get update
RUN ACCEPT_EULA=Y apt-get install -y --allow-unauthenticated msodbcsql17 \
    mssql-tools \
    libgssapi-krb5-2

RUN echo "[FreeTDS]\n\
Description = FreeTDS Driver\n\
Driver = /usr/lib/x86_64-linux-gnu/odbc/libtdsodbc.so\n\
Setup = /usr/lib/x86_64-linux-gnu/odbc/libtdsS.so" >> /etc/odbcinst.ini

RUN chmod +rwx /etc/ssl/openssl.cnf
RUN sed -i 's/TLSv1.2/TLSv1.0/g' /etc/ssl/openssl.cnf
RUN sed -i 's/SECLEVEL=2/SECLEVEL=1/g' /etc/ssl/openssl.cnf

# Install fbprophet
RUN pip install --upgrade setuptools
RUN pip install lxml
RUN pip install Cython==0.29.15
RUN pip install numpy==1.18.1
RUN pip install pystan==2.19.1.1
RUN pip install fbprophet==0.7.1
RUN pip install zeep==4.0.0

# SAP by Pyrfc
# configure SAP NW RFC SDK files path
ENV SAPNWRFC_HOME=/usr/local/sap/nwrfcsdk
COPY nwrfc_sdk/nwrfcsdk.conf /etc/ld.so.conf.d/nwrfcsdk.conf
# copy SAP NW RFC SDK files
COPY nwrfc_sdk/nwrfcsdk $SAPNWRFC_HOME
# Install Pyrfc
RUN pip --no-cache-dir install https://github.com/SAP/PyRFC/releases/download/v2.4.1/pyrfc-2.4.1-cp38-cp38-linux_x86_64.whl

# Create the workdir
WORKDIR scrape_web_tasks

CMD mkdir scrape_web_tasks/src
COPY requirements.txt src/
RUN pip install -r src/requirements.txt

# Clear dependencies
RUN apt-get remove -y --purge \
    gcc \
    g++ \
# clear apt-get cache & remove unnecessary files
    && apt-get -y autoremove \
    && apt-get clean && rm -rf /var/lib/apt/lists/*

ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV SRCDIR "/scrape_web_tasks/src/"
ENV DAGSTER_HOME "${SRCDIR}"
ENV PYTHONPATH "${SRCDIR}"
ENV LC_ALL=es_AR.utf8
ENV LANG=es_AR.utf8
ENV LANGUAGE=es_AR.utf8
ENV TZ=America/Argentina/Buenos_Aires
gibsondan commented 3 years ago

Could you update the versions of the libraries on scrappe_web_tasks to 0.11.12 like the rest of them to see if that fixes the problem?

galloramiro commented 3 years ago

Ok, Im on it. Somethin interesting was when I run this localy runs ok. But I see that are difference in the version between local an production, so i fix the dagster docker versions to 0.11.12 and update from my scrape_web_tasks. So I tell you tomorrow how it goes.

galloramiro commented 3 years ago

That was the problem, fixed on all the dockers the same dagster version and all works fine! Thanx @gibsondan for all your help!!