Maillol / aiohttp-pydantic

Aiohttp View that validates request body and query sting regarding the annotations declared in the View method
MIT License
67 stars 21 forks source link

pydantic type #27

Closed bbartling closed 3 years ago

bbartling commented 3 years ago

I am trying to follow this tutorial for recursive pydantic models.

I am getting tripped up on the class ReadMultModel that calls class ReadSingleModel

Is this legit Pydantic?

class ReadMultModel(BaseModel):
    devices: Dict[str, ReadSingleModel]

This is my models.py below and

from typing import Any, AsyncIterator, Awaitable, Callable, Dict
from pydantic import BaseModel

class ReadSingleModel(BaseModel):
    address: str
    object_type: str
    object_instance: str

class ReadMultModel(BaseModel):
    devices: Dict[str, ReadSingleModel]

In Insomnia I am trying to do this: 192.168.0.105:8080/bacnet/read/multiple


{"devices":{
    "boiler":{
    "address":"12345:2",
    "object_type":"analogInput",
    "object_instance":"2"
    },
    "cooling_plant":{
    "address":"12345:2",
    "object_type":"analogInput",
    "object_instance":"2"
    },
    "air_handler_1":{
    "address":"12345:2",
    "object_type":"analogInput",
    "object_instance":"2"
    },
    "air_handler_2":{
    "address":"12345:2",
    "object_type":"analogInput",
    "object_instance":"2"
    },
    "hot_water_valve_1":{
    "address":"12345:2",
    "object_type":"analogInput",
    "object_instance":"2"
    }
}}

This will error: AttributeError: 'ReadMultModel' object has no attribute 'address'

This is main.py for what its worth:

from aiohttp.web import Application, json_response, middleware
import asyncio
from pathlib import Path
from aiohttp_pydantic import PydanticView
from aiohttp import web
from aiohttp_pydantic import oas
from models import ReadSingleModel,WriteSingleModel,ReleaseSingleModel
from models import ReadMultModel

app = Application()
oas.setup(app, version_spec="1.0.1", title_spec="BACnet Rest API App")

# Create your PydanticView and add annotations.
class ReadSingleView(PydanticView):
    async def get(self, bacnet_req: ReadSingleModel):
        read_result = [
        bacnet_req.address,
        bacnet_req.object_type,
        bacnet_req.object_instance
        ]
        response_obj = {"status":"success", "present_value" : read_result}
        return web.json_response(response_obj)

class ReadMultView(PydanticView):
    async def get(self, bacnet_req: ReadMultModel):
        for device,values in bacnet_req:

            read_result = [
            bacnet_req.address,
            bacnet_req.object_type,
            bacnet_req.object_instance
            ]

            device_mapping[device] = {'pv':read_result_round}

        response_obj = {"status":"success", "data": device_mapping }    
        return web.json_response(response_obj)

app.router.add_view('/bacnet/read/single', ReadSingleView)
app.router.add_view('/bacnet/read/multiple', ReadMultView)
web.run_app(app, host='0.0.0.0', port=8080)
bbartling commented 3 years ago

This was a user operator error! Sorry!