pathwaycom / pathway

Python ETL framework for stream processing, real-time analytics, LLM pipelines, and RAG.
https://pathway.com
Other
2.84k stars 98 forks source link

[Bug]: Tutorial on Defining behaviors for streaming windows #8

Open nlpneurobot opened 3 months ago

nlpneurobot commented 3 months ago

Steps to reproduce

I try to reproduce your tutorial : https://pathway.com/developers/user-guide/exploring-pathway/from-jupyter-to-deploy/#part-2-from-static-data-exploration-to-interactive-dashboard-prototyping

I use only copy/paste your code and I obtain an error of input type when I execute the cell bellow "Please add the behavior argument to window definition as in the code snippet below."

python version : 3.11.8 pathway version : 0.8.2

Relevant log output

~~~python
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[23], line 3
      1 minute_20_stats = (
      2     data
----> 3     .windowby(
      4         pw.this.t, 
      5         window=pw.temporal.sliding(
      6             hop=datetime.timedelta(minutes=1),
      7             duration=datetime.timedelta(minutes=20)
      8         ),
      9         # Wait until the window collected all data before producing a result
     10         behavior=pw.temporal.exactly_once_behavior(),
     11         instance=pw.this.ticker
     12     )
     13     .reduce(
     14         ticker=pw.this._pw_instance,
     15         t=pw.this._pw_window_end,
     16         volume=pw.reducers.sum(pw.this.volume),
     17         transact_total=pw.reducers.sum(pw.this.volume * pw.this.vwap),
     18         transact_total2=pw.reducers.sum(pw.this.volume * pw.this.vwap**2)
     19     )
     20     .with_columns(
     21         vwap=pw.this.transact_total [/](http://localhost:8888/) pw.this.volume
     22     )
     23     .with_columns(
     24         vwstd=(pw.this.transact_total2 [/](http://localhost:8888/) pw.this.volume - pw.this.vwap**2)**0.5
     25     ).with_columns(
     26         bollinger_upper=pw.this.vwap + 2 * pw.this.vwstd,
     27         bollinger_lower=pw.this.vwap - 2 * pw.this.vwstd
     28     )
     29 )
     31 minute_1_stats = (
     32     data.windowby(
     33         pw.this.t,
   (...)
     44     .with_columns(vwap=pw.this.transact_total [/](http://localhost:8888/) pw.this.volume)
     45 )

File [~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/internals/trace.py:129](http://localhost:8888/lab/workspaces/~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/internals/trace.py#line=128), in trace_user_frame.<locals>._pathway_trace_marker(*args, **kwargs)
    127     return func(*args, **kwargs)
    128 except Exception as e:
--> 129     _reraise_with_user_frame(e)

File [~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/internals/trace.py:109](http://localhost:8888/lab/workspaces/~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/internals/trace.py#line=108), in _reraise_with_user_frame(e, trace)
    106 if user_frame is not None:
    107     add_pathway_trace_note(e, user_frame)
--> 109 raise e

File [~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/internals/desugaring.py:341](http://localhost:8888/lab/workspaces/~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/internals/desugaring.py#line=340), in desugar.<locals>.wrapper(*args, **kwargs)
    334     args = tuple(
    335         desugaring_context._desugaring.eval_expression(arg) for arg in args
    336     )
    337     kwargs = {
    338         key: desugaring_context._desugaring.eval_expression(value)
    339         for key, value in kwargs.items()
    340     }
--> 341 return func(*args, **kwargs)

File [~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/internals/arg_handlers.py:20](http://localhost:8888/lab/workspaces/~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/internals/arg_handlers.py#line=19), in arg_handler.<locals>.wrapper.<locals>.inner(*args, **kwargs)
     17 @wraps(func)
     18 def inner(*args, **kwargs):
     19     args, kwargs = handler(*args, **kwargs)
---> 20     return func(*args, **kwargs)

File [~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/internals/arg_handlers.py:20](http://localhost:8888/lab/workspaces/~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/internals/arg_handlers.py#line=19), in arg_handler.<locals>.wrapper.<locals>.inner(*args, **kwargs)
     17 @wraps(func)
     18 def inner(*args, **kwargs):
     19     args, kwargs = handler(*args, **kwargs)
---> 20     return func(*args, **kwargs)

File [~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/internals/runtime_type_check.py:19](http://localhost:8888/lab/workspaces/~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/internals/runtime_type_check.py#line=18), in check_arg_types.<locals>.with_type_validation(*args, **kwargs)
     14 """Hides beartype dependency by reraising beartype exception as TypeError.
     15 
     16 Should not be needed after resolving https://github.com/beartype/beartype/issues/234
     17 """
     18 try:
---> 19     return beartype.beartype(f)(*args, **kwargs)
     20 except beartype.roar.BeartypeCallHintParamViolation as e:
     21     raise TypeError(e) from None

File <@beartype(pathway.stdlib.temporal._window.windowby) at 0x14cdad6c0>:108, in windowby(__beartype_func, __beartype_conf, __beartype_get_violation, __beartype_object_140551755614240, __beartype_object_140551766098816, __beartype_object_140551790108176, __beartype_object_5625875136, __beartype_object_5638154688, __beartype_object_140551766513840, *args, **kwargs)

File ~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/stdlib/temporal/_window.py:910, in windowby(self, time_expr, window, behavior, instance)
    858 @trace_user_frame
    859 @desugar
    860 @arg_handler(handler=shard_deprecation)
   (...)
    869     instance: pw.ColumnExpression | None = None,
    870 ) -> pw.GroupedTable:
    871     """
    872     Create a GroupedTable by windowing the table (based on `expr` and `window`),
    873     optionally with `instance` argument.
   (...)
    908     1        | 1     | 16    | 2
    909     """
--> 910     return window._apply(self, time_expr, behavior, instance)

File [~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/internals/runtime_type_check.py:19](http://localhost:8888/lab/workspaces/~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/internals/runtime_type_check.py#line=18), in check_arg_types.<locals>.with_type_validation(*args, **kwargs)
     14 """Hides beartype dependency by reraising beartype exception as TypeError.
     15 
     16 Should not be needed after resolving https://github.com/beartype/beartype/issues/234
     17 """
     18 try:
---> 19     return beartype.beartype(f)(*args, **kwargs)
     20 except beartype.roar.BeartypeCallHintParamViolation as e:
     21     raise TypeError(e) from None

File <@beartype(pathway.stdlib.temporal._window._SlidingWindow._apply) at 0x14cdac180>:98, in _apply(__beartype_func, __beartype_conf, __beartype_get_violation, __beartype_object_140551755614240, __beartype_object_140551766098816, __beartype_object_5625875136, __beartype_object_5638154688, __beartype_object_140551766513840, *args, **kwargs)

File ~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/stdlib/temporal/_window.py:340, in _SlidingWindow._apply(self, table, key, behavior, instance)
    332 @check_arg_types
    333 def _apply(
    334     self,
   (...)
    338     instance: pw.ColumnExpression | None,
    339 ) -> pw.GroupedTable:
--> 340     check_joint_types(
    341         {
    342             "time_expr": (key, TimeEventType),
    343             "window.hop": (self.hop, IntervalType),
    344             "window.duration": (self.duration, IntervalType),
    345             "window.origin": (self.origin, TimeEventType),
    346         }
    347     )
    349     key_dtype = eval_type(key)
    350     assign_windows = self._window_assignment_function(key_dtype)

File [~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/stdlib/temporal/utils.py:79](http://localhost:8888/lab/workspaces/~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/stdlib/temporal/utils.py#line=78), in check_joint_types(parameters)
     75 else:
     76     expected_types_string = " or ".join(
     77         repr(tuple(ex_types.values())) for ex_types in expected_types
     78     )
---> 79     raise TypeError(
     80         f"Arguments ({', '.join(parameters.keys())}) have to be of types "
     81         + f"{expected_types_string} but are of types {tuple(types.values())}."
     82     )

TypeError: Arguments (time_expr, window.hop, window.duration) have to be of types (INT, INT, INT) or (FLOAT, FLOAT, FLOAT) or (DATE_TIME_NAIVE, DURATION, DURATION) or (DATE_TIME_UTC, DURATION, DURATION) but are of types (INT, DURATION, DURATION).
Occurred here:
    Line: .windowby(
    File: [/var/folders/2l/p4vvj_3j3fq2h1l38s2dgtq00000gn/T/ipykernel_9280/4248455004.py:3](http://localhost:8888/var/folders/2l/p4vvj_3j3fq2h1l38s2dgtq00000gn/T/ipykernel_9280/4248455004.py#line=2)
~~~

What did you expect to happen?

A tutorial up to date ;)

Version

0.8.2

Docker Versions (if used)

No response

OS

MacOS

On which CPU architecture did you run Pathway?

x86-64

janchorowski commented 3 months ago

Thanks for the bug report - we'll get back to you shortly with a resolution.

nlpneurobot commented 3 months ago

I think I found this problem.

In part 1, the data are loaded and the t column is converted to date time format :

data = data.with_columns(t=data.t.dt.utc_from_timestamp(unit="ms"))

It's not present in part 2 after loading shown in "Switching to streaming data" section.

With conversion in datetime format, minute_20_stats and minute_1_stats appear to be correctly calculated.

nlpneurobot commented 3 months ago

Looking at your charts, the t column is indeed INT on your side. Unfortunately it doesn't work on my side. Applying conversion to datetime format, it's good for me.

I would like to share my experience on this part 2. You should send a message to the installation page of the panel module because it requires the pyviz_comms module ( https://panel.holoviz.org/getting_started/installation.html )

After the stupid and nasty execution of the cells that you propose, the output of pn.Row(...) leaves the static mode. I first need to recalculate minute_20_stats and minute_1_stats then re-execute the cell which calculates joint_stats (present in part 1 and not mentioned in part 2) to obtain streaming mode.

Then I can execute the pn.Row(...) command that you suggest for a nice display ready for streaming and finally the pw.run() command to start streaming.

When streaming, I have this warning which appears but which you must surely already be working on or have even corrected:

~/miniconda3/envs/mixtral_ollama/lib/python3.11/site-packages/pathway/stdlib/viz/table_viz.py:140: 
FutureWarning: DataFrame.applymap has been deprecated. Use DataFrame.map instead.
  df = df.applymap(_format_types)  # type: ignore