[x] I have included information about relevant versions
[x] I have verified that the issue persists when using the master branch of Faust.
Steps to reproduce
start a single Faust worker (python -m opm worker -l info)
wait for worker to report: [^Worker]: Ready
verify that the web-view exposing tables (e.g. http://localhost:6066/object/1480) returns JSON
stop the worker
clear the application state using kafka-streams-application-reset Kafka tool: : kafka-streams-application-reset --application-id opm-v71 --bootstrap-servers 127.0.0.1:9092 --input-topics opm.pmdb.PMObjects,opm.pmdb.PMObjectProperties,opm.pmdb.PMObjectPropertyParams,opm.pmdb.PMPropertyTypes,opm.pmdb.PMPlants
restart the worker
request the same web-view again
Faust worker then raises exception
App code:
import faust
import os
kafka_host = 'kafka://localhost'
if 'KAFKA_HOST' in os.environ:
kafka_host = os.environ['KAFKA_HOST']
app = faust.App(
'opm',
autodiscover=True,
broker=kafka_host,
origin='opm',
store='rocksdb://',
topic_partitions=4,
version=71
)
def main() -> None:
app.main()
Agent code:
import faust
from opm.app import app
from opm.pmdb_service.models import PMObjects, PMObjectsRecord, \
PMObjectProperties, PMObjectPropertiesRecord, \
PMObjectPropertyParams, PMObjectPropertyParamsRecord, \
PMPlants, PMPlantsRecord, \
PMPropertyTypes, PMPropertyTypesRecord
# Kafka PMDB source topics
plants_topic = app.topic('opm.pmdb.PMPlants', value_type=PMPlantsRecord)
objects_topic = app.topic('opm.pmdb.PMObjects', value_type=PMObjectsRecord)
params_topic = app.topic('opm.pmdb.PMObjectPropertyParams', value_type=PMObjectPropertyParamsRecord)
properties_topic = app.topic('opm.pmdb.PMObjectProperties', value_type=PMObjectPropertiesRecord)
property_types_topic = app.topic('opm.pmdb.PMPropertyTypes', value_type=PMPropertyTypesRecord)
plants = app.Table('plants', default=PMPlants)
# a table of object properties, keyed by object ID
objects = app.Table('objects', default=PMObjects)
# a table of object properties (sensors), keyed by object property (sensor) ID
properties = app.Table('properties', default=PMObjectProperties)
# a table of property types, mostly useful for looking up icons
property_types = app.Table('propertytypes', default=PMPropertyTypes)
# a table of sensor parameters, keyed by parameter ID
parameters = app.Table('params', default=PMObjectPropertyParams)
# a list of directly related child IDs of an object, keyed by object ID
object_children = app.Table('children', default=list)
# a list of sensor IDs, keyed by object ID
sensors = app.Table('sensors', default=list)
# a table of sensor parameter IDs, keyed by sensor ID
sensor_params = app.Table('sensorparams', default=list)
@app.agent(objects_topic)
async def process_objects(stream):
obj: PMObjectsRecord
async for obj in stream:
oid = obj.payload.ID # PMObjects.ID
objects[oid] = obj.payload
# make a list of child objects (NB: not sensors) associated with this object
pid = obj.payload.ParentObject
if pid:
# get a reference to the current child ID list
# creates a new one if it does not yet exist, due to default=list table argument above
siblings = object_children[pid]
siblings.append(oid)
# important ! write it back
object_children[pid] = siblings
@app.agent(plants_topic)
async def process_plants(stream):
plant: PMPlantsRecord
async for plant in stream:
plants[plant.payload.ID] = plant.payload
@app.agent(property_types_topic)
async def process_property_types(stream):
prop_types: PMPropertyTypesRecord
async for prop_types in stream:
property_types[prop_types.payload.ID] = prop_types.payload
@app.agent(properties_topic)
async def process_properties(stream):
props: PMObjectPropertiesRecord
async for props in stream:
properties[props.payload.ID] = props.payload
# make a list of sensors belonging to this object
oid = props.payload.Object
if oid:
siblings = sensors[oid]
siblings.append(props.payload.ID)
sensors[oid] = siblings
@app.agent(params_topic)
async def process_params(stream):
param: PMObjectPropertyParamsRecord
async for param in stream:
parameters[param.payload.ID] = param.payload
# make a list of parameters associated with this sensor
sensor_id = param.payload.Property
if sensor_id:
param_id_list = sensor_params[sensor_id]
param_id_list.append(param.payload.ID)
sensor_params[sensor_id] = param_id_list
webview code:
from anytree import AnyNode
from anytree.exporter import DictExporter
from opm.app import app
from opm.pmdb_service.agents import object_children, objects, parameters, plants, \
properties, property_types, sensors, sensor_params
def add_property_types(prop_type: int, kwargs):
kwargs['icon'] = property_types[prop_type].ICON
kwargs['type'] = property_types[prop_type].Description
def get_sensor_parameters(sensor_id: int):
kwargs = {}
for prop_id in set(sensor_params[sensor_id]):
kwargs[parameters[prop_id].Name] = parameters[prop_id].Value
return kwargs
def get_object_properties(object_id: int):
obj = objects[object_id]
kwargs = {'object_id': object_id,
'label': obj.Description,
'name': obj.Name}
add_property_types(obj.PropertyType, kwargs)
return kwargs
def add_sensors_maybe(node: AnyNode, oid: int):
for sid in set(sensors[oid]):
kwargs = {'sensor_id': sid,
'label': properties[sid].Description,
'name': properties[sid].Name,
'units': properties[sid].EU,
'source': properties[sid].SourceName,
'priority': properties[sid].Priority}
sensor_kwargs = get_sensor_parameters(sid)
add_property_types(properties[sid].PropertyType, sensor_kwargs)
sensor_node = AnyNode(parent=node, **kwargs, **sensor_kwargs)
def build_object_tree(node: AnyNode, oid: int):
for child in set(object_children[oid]):
kwargs = get_object_properties(child)
child_node = AnyNode(parent=node, **kwargs)
add_sensors_maybe(child_node, child)
# recurse to add the children of the current child
build_object_tree(child_node, child)
@app.page('/object/{web_id}')
@app.table_route(table=objects, match_info='web_id')
async def get_object(web, request, web_id):
oid = int(web_id) # PMObjects.ID
if oid in objects:
kwargs = get_object_properties(oid)
root = AnyNode(**kwargs)
if oid in object_children:
build_object_tree(root, oid)
exporter = DictExporter()
return web.json({
"children": exporter.export(root)
})
else:
return web.json({
oid: "No data for the object was found."
})
Expected behavior
Faust worker webview returns JSON
Actual behavior
Web browser (Firefox) reports 500 Internal Server Error
Faust worker webview raises exception (see traceback below)
Increasing the version in app.py to e.g. 72 resolves the problem - webview returns valid JSON again.
Checklist
master
branch of Faust.Steps to reproduce
python -m opm worker -l info
)[^Worker]: Ready
http://localhost:6066/object/1480
) returns JSONkafka-streams-application-reset
Kafka tool: :kafka-streams-application-reset --application-id opm-v71 --bootstrap-servers 127.0.0.1:9092 --input-topics opm.pmdb.PMObjects,opm.pmdb.PMObjectProperties,opm.pmdb.PMObjectPropertyParams,opm.pmdb.PMPropertyTypes,opm.pmdb.PMPlants
App code:
Agent code:
webview code:
Expected behavior
Actual behavior
500 Internal Server Error
app.py
to e.g.72
resolves the problem - webview returns valid JSON again.Full traceback
The
TypeError
on the last line referers to the type ofproperty_types
table (referenced inviews.py
) and is show below:Versions
3.6.6
1.2.1
Ubuntu 18.04.1 LTS
2.0.0-cp1
5.8