gordonwatts / hl_tables

A high level tables dispatcher for putting together mulitple tables executors
0 stars 0 forks source link

Failed run fails due to co-routine if you try it again #13

Open gordonwatts opened 4 years ago

gordonwatts commented 4 years ago

This is violating the idempotent rule - you should be able to re-run any cell in Jupyter:

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
~/calms/hl_tables/hl_tables/servicex/xaod_runner.py in visit_ast_DataFrame(self, node)
    127                 if do_calc:
--> 128                     r = ast_awkward(await hep_tables.make_local_async(node.dataframe))
    129                     async with self._cached_lock:

/opt/conda/lib/python3.7/site-packages/hep_tables/local.py in make_local_async(df)
     95 
---> 96     return await _make_local_from_expression_async(expression, context, QueryVarTracker())
     97 

/opt/conda/lib/python3.7/site-packages/hep_tables/local.py in _make_local_from_expression_async(expression, context, qvt)
     65     # Run them all at once.
---> 66     results = await asyncio.gather(*results_async)
     67 

/opt/conda/lib/python3.7/site-packages/hep_tables/local.py in _result_from_source_async(s, statements, base_statement)
     42     if isinstance(result, ObjectStream):
---> 43         return (await result.AsAwkwardArray(['col1'])
     44                 .value_async())[default_col_name]

/opt/conda/lib/python3.7/site-packages/func_adl/ObjectStream.py in value_async(self, executor)
    144         # Run it
--> 145         return await exe(self._ast)
    146 

/opt/conda/lib/python3.7/site-packages/func_adl_xAOD/ServiceX.py in execute_result_async(self, a)
     62             q_str = python_ast_to_text_ast(top_level_ast)
---> 63             return await self._ds.get_data_awkward_async(q_str)
     64         else:

/opt/conda/lib/python3.7/site-packages/servicex/servicex_utils.py in cached_version_of_fn(*args, **kwargs)
     35             logger.debug(f"{h} - waiting for processing")
---> 36             await _in_progress_items[h].wait()
     37             logger.debug(f'{h} - done waiting for processing')

/opt/conda/lib/python3.7/asyncio/locks.py in wait(self)
    292         try:
--> 293             await fut
    294             return True

RuntimeError: Task <Task pending coro=<_result_from_source_async() running at /opt/conda/lib/python3.7/site-packages/hep_tables/local.py:43> cb=[gather.<locals>._done_callback() at /opt/conda/lib/python3.7/asyncio/tasks.py:691]> got Future <Future pending> attached to a different loop

The above exception was the direct cause of the following exception:

Exception                                 Traceback (most recent call last)
~/calms/hl_tables/hl_tables/servicex/xaod_runner.py in visit_ast_DataFrame(self, node)
    149 
--> 150                 await asyncio.gather(*results)
    151 

~/calms/hl_tables/hl_tables/ast_utils.py in visit(self, node)
     26         visitor = getattr(self, method, self.generic_visit)
---> 27         return await visitor(node)
     28 

~/calms/hl_tables/hl_tables/ast_utils.py in generic_visit(self, node)
     93         results = [eval_single_field(field, old_value) for field, old_value in iter_fields(node)]
---> 94         await asyncio.gather(*results)
     95 

~/calms/hl_tables/hl_tables/ast_utils.py in eval_single_field(field, old_value)
     86             elif isinstance(old_value, AST):
---> 87                 new_node = await self.visit(old_value)
     88                 if new_node is None:

~/calms/hl_tables/hl_tables/ast_utils.py in visit(self, node)
     26         visitor = getattr(self, method, self.generic_visit)
---> 27         return await visitor(node)
     28 

~/calms/hl_tables/hl_tables/ast_utils.py in generic_visit(self, node)
     93         results = [eval_single_field(field, old_value) for field, old_value in iter_fields(node)]
---> 94         await asyncio.gather(*results)
     95 

~/calms/hl_tables/hl_tables/ast_utils.py in eval_single_field(field, old_value)
     86             elif isinstance(old_value, AST):
---> 87                 new_node = await self.visit(old_value)
     88                 if new_node is None:

~/calms/hl_tables/hl_tables/ast_utils.py in visit(self, node)
     26         visitor = getattr(self, method, self.generic_visit)
---> 27         return await visitor(node)
     28 

~/calms/hl_tables/hl_tables/servicex/xaod_runner.py in visit_ast_DataFrame(self, node)
    155             bad_df = '\n'.join(dumps(node.dataframe))
--> 156             raise Exception(f'Internal Error: Failed to render DataFrame: {bad_df}') from e
    157 

Exception: Internal Error: Failed to render DataFrame: df_1 = DataFrame()
df_2 = df_1.Jets('AntiKt4EMTopoJets')
df_3 = df_2.pt
df_4 = df_3 / 1000.0
df_5 = df_4 > 35.0
df_6 = df_2.eta
df_7 = df_6.abs()
df_8 = df_7 < 2.5
df_9 = df_5 & df_8
df_10 = df_2[df_9]
df_11 = df_10.pt
df_12 = df_11 / 1000.0

The above exception was the direct cause of the following exception:

Exception                                 Traceback (most recent call last)
<ipython-input-10-a2d4c9ef1ab4> in <module>
      1 for s in all_samples:
----> 2     plot_pt(s)

<ipython-input-6-fbf830b6b8e9> in plot_pt(ds, func)
      1 def plot_pt(ds, func=lambda d: d.loose_jets.ptgev):
----> 2     histogram(func(ds['data']), bins=200, range=(0,800))
      3     _ = plt.ylabel('Count')
      4     _ = plt.title(ds['name'])
      5     _ = plt.xlabel('$p_T$ [GeV]')

/opt/conda/lib/python3.7/site-packages/make_it_sync/func_wrapper.py in wrapped_call(*args, **kwargs)
     61         @wraps(fn)
     62         def wrapped_call(*args, **kwargs):
---> 63             return _sync_version_of_function(fn, *args, **kwargs)
     64 
     65         return wrapped_call

/opt/conda/lib/python3.7/site-packages/make_it_sync/func_wrapper.py in _sync_version_of_function(fn, *args, **kwargs)
     24         future = exector.submit(get_data_wrapper, *args, **kwargs)
     25 
---> 26         return future.result()
     27 
     28 

/opt/conda/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    433                 raise CancelledError()
    434             elif self._state == FINISHED:
--> 435                 return self.__get_result()
    436             else:
    437                 raise TimeoutError()

/opt/conda/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

/opt/conda/lib/python3.7/concurrent/futures/thread.py in run(self)
     55 
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

/opt/conda/lib/python3.7/site-packages/make_it_sync/func_wrapper.py in get_data_wrapper(*args, **kwargs)
     19             asyncio.set_event_loop(loop)
     20             assert not loop.is_running()
---> 21             return loop.run_until_complete(fn(*args, **kwargs))
     22 
     23         exector = ThreadPoolExecutor(max_workers=1)

/opt/conda/lib/python3.7/asyncio/base_events.py in run_until_complete(self, future)
    577             raise RuntimeError('Event loop stopped before Future completed.')
    578 
--> 579         return future.result()
    580 
    581     def stop(self):

~/calms/hl_tables/hl_tables/plot.py in histogram_async(df, bins, range, density)
     51 
     52     # Now render locally so we can plot it.
---> 53     h, bins = await local.make_local_async(hist_data)
     54     f, ax = plt.subplots()
     55     ax.fill_between(bins, np.r_[h, h[-1]], step='post')

~/calms/hl_tables/hl_tables/local.py in make_local_async(df)
     18     modified_df = df
     19     for r in runners:
---> 20         modified_df = await r.process(modified_df)
     21         if isinstance(modified_df, result):
     22             break

~/calms/hl_tables/hl_tables/servicex/xaod_runner.py in process(self, df)
    205     async def process(self, df: DataFrame) -> Union[DataFrame, Column, result]:
    206         'Process as much of the tree as we can process'
--> 207         return await _process(df)

~/calms/hl_tables/hl_tables/servicex/xaod_runner.py in _process(df)
    187     # Run the transformation to see what we can actually convert.
    188     t = _transform(marker)
--> 189     r = await t.visit(top_level_ast)
    190 
    191     if isinstance(r, ast_Column):

~/calms/hl_tables/hl_tables/ast_utils.py in visit(self, node)
     25         method = 'visit_' + node.__class__.__name__
     26         visitor = getattr(self, method, self.generic_visit)
---> 27         return await visitor(node)
     28 
     29     async def generic_visit(self, node: AST, context: Optional[Any] = None):

~/calms/hl_tables/hl_tables/servicex/xaod_runner.py in visit_ast_DataFrame(self, node)
    154             from dataframe_expressions import dumps
    155             bad_df = '\n'.join(dumps(node.dataframe))
--> 156             raise Exception(f'Internal Error: Failed to render DataFrame: {bad_df}') from e
    157 
    158     async def visit_ast_Column(self, node: ast_Column) -> ast.AST:

Exception: Internal Error: Failed to render DataFrame: df_1 = DataFrame()
df_2 = df_1.Jets('AntiKt4EMTopoJets')
df_3 = df_2.pt
df_4 = df_3 / 1000.0
df_5 = df_4 > 35.0
df_6 = df_2.eta
df_7 = df_6.abs()
df_8 = df_7 < 2.5
df_9 = df_5 & df_8
df_10 = df_2[df_9]
df_11 = df_10.pt
df_12 = df_11 / 1000.0
df_13 = df_12.histogram()

This happens only the second (and subsequent) times you run. And it happens when there is a real crash the first time (like Service responds with a failed transform request).