Open ccrvlh opened 2 years ago
I take my hat off to you, @lowercase00
That is excellent writing worth being shared in a separate post.
I would love to link it in the main branch and I want to re-read it later with a fresh mind.
Thank you.
Local Session Management - It's not wrong to create multiple sessions per request when it makes sense - think graphql resolvers or something similar 🤔
@zhanymkanov glad it was worth the read! of course, feel free to use this in anyway you think it's worth.
@ThirVondukr I don't really think the way we managed sessions makes much sense to be honest. probably there are situations where this is reasonable, but I think our code is just bad in that aspect, really...
In most REST endpoints it's enough to use one session per endpoint since it's a single operation that either completes or not.
I am truly grateful for this sharing of information !
I would like to know a few more things, how do you handle pagination in your project (limit/offset)?
That depends on the use case, some things need page based pagination, some need cursor pagination
Okay, you've made your own pagination system, you didn't use an external package ? (as fastapi-pagination for example)
Yes, because creating page based pagination isn't that hard compared to keyset/cursor pagination.
I have been looking for something like this for a while. It seems impossible to find good examples and discussion about structuring and the architecture actual large applications.
I am encountering the session management problem too. If I have service layers that consume eachother, with the same session lifecycles, then the consumer service can be problematic for the nested service. I.e if the nested service performs an external change, but the consumer service has made the call within a nested transaction that it rolls back, the nested service now has lost sync with the action it performed, and its own state. That among other issues with session transaction/commit ownership.
I am wondering how you are handling business logic validation of payloads.
I currently have a large app that I am migrating to FastAPI (for the dependency injection to makes tests cleaner, and how clean openapi setup is).
My application uses Flask and Marshmallow, with some mixins for class based views that make the marshmallow schemas work pretty seamlessly.
In the application, the views use marshmallow schemas that both validate the payload content with basic type and format checks, but also apply business logic validations. So if the payload contains ids, these ids fields will have checks applied to them to confirm they are usable by the authenticated user.
Some will argue that this is a mixing of layers, but in my case it has been handled in a way that works well (in my opinion). The marshmallow schemas are given the user as a context, and then when validating, each field, the validator will then call a service layer interface.
My reasoning for this is:
In FastAPI there doesn't seem to be any clear way to pass a context to a pydantic schema. Reading the issues on pydantics github, there was reference to using ContextVars. So if I could override how the payload gets implicitly loaded, I could wrap it in a context manager that sets the user on a ContextVar, and then use that within the pydantic schema, but I don't see any way to handle this (and I wouldn't want to have to every controller retrieve the raw body and manually load it). I could add a middleware that takes the user and adds it to a context, but then we lose the benefit of the def controller(user: User = Depends(get_user))
(unless the get_user accesses the request state to get the user)
So what other approaches are there?
def get_valid_item(item_id: int = Body(..., embed=False), user: User = Depends(get_user)):
item = service.get_item(item_id, user)
if not item:
raise Something
return item
def my_controller(item: Item = Depends(get_valid_item)):
...
but in that case, it seems it will get messy if you have payloads with numerous ids (and I am not sure whether it will return all ID errors, or just the first dependency that fails.
So the summarise, do you have a best practice performing business logic validations [that require a context] to fields in a way that returns as many errors as possible in a single request, that maps correctly to the api interface which doesnt necessarily map to the service interface?
A big disclaimer: I’m not really sure I can talk about the so called “best practices”, but I can share my experience, it might help somehow.
By your text I understand that your validation is a bit more business-oriented rather than just types/fields. The way we do it is: the route/controller is responsible for the interaction with the client, so if a string parameter is wrong for example, we perform the validation within the route (this happens when we accept a comma separated list of strings for example, that is not natively supported by FastAPI as a type per se). Otherwise most of our validation logic sits within the service layer.
Pydantic is great for the types/fields validation, but it also extremely flexible, so you could have a field that’s hidden for the API and use this field internally for example, or even allow Extras
. With context within the schema you can now leverage all of Pydantic's strengths (like the validator
decorators @root_validator
and etc) You can always add methods to Pydantic as well, I guess is a matter of personal preference.
I’m personally not a big fan of too much business logic within Pydantic models. Simple stuff (like making sure two passwords are the same for example) are mostly fine, but I tend to prefer to have the schemas.py
(where we keep all of Pydantic schemas) mostly just data containers parsing and validating data input (and not business logic).
The way I personally like to think about this is: imagine that the API is not the only entrypoint to the application, but rather just an implementation (for example having both a CLI and an API as entry points). If the validation would be needed both for the CLI and the API then most likely this is a business rule that should live in the service layer. If this is something API specific that hasn’t been solved by Pydantic already (type/format validation), the I might as well handle it in the router/controller (or maybe injecting a dependency). At the end of the day, we mostly have logic in the services, and the routes are usually catching exceptions from services and translating them to the proper HTTP Exceptions, sometimes making small validations with the params (and responding helpful error messages for the client). I honestly think that these are all just fine, and mostly personal preference (considering the trade-offs on testing capabilities, separation of concerns, readability, etc, etc).
At the beginning we were handling sessions within methods, which is less than ideal for a lot of reasons (unit of work, rollback, extra verbosity and context managers all around, etc, etc). We recently changed to a dependency injection approach, that goes like:
# .../module/depends.py
async def repository_injector() -> AsyncGenerator[MyRepository, None]:
try:
session = async_session()
service = MyRepository(session=session)
yield service
finally:
await session.close()
# .../module/repository.py
class MyRepository(...):
def __init__(self, db: AsyncSession, ...):
self._db = db
super().__init__(db, ...)
@property
def db(self) -> AsyncSession:
if not self._db:
raise Exception("Database session is not initialized.")
if not isinstance(self._db, AsyncSession):
raise Exception(
"Database session invalid. Expected AsyncSession, got {}".format(type(self._db))
)
return self._db
@db.setter
def db(self, db: AsyncSession) -> AsyncSession:
if not isinstance(db, AsyncSession):
raise Exception("Session is not valid.")
self._db = db
return self._db
def my_method(self, ...):
self.db.execute(...)
# .../module/services.py
class MyService(...)
def __init__(self, db: AsyncSession, repo: MyRepository):
...
Now the repository, service (or whatever business layer uses the session), already has the session injected and you gain control over the transaction (you can commit or rollback a whole block, instead of individual pieces). One way to do that is by using session.flush()
that sends the statement to the Database (generating automatic primary keys for example), but without committing, so you still have the rollback()
at hand if anything goes wrong. For nested services you already have the session at hand, so you can just pass it along, no issues there.
What we also found in our case, is that when we started having too much nesting, there was something wrong with our design, actually that’s the reason why we started using the repository pattern more often (although with a single implementation) and having it injected to a service for example. We found that things were a lot cleaner (and easier) when doing that.
Now, the issue starts with concurrency (asyncio.gather(…)
) because a single session can’t be used at the same time. I’m still trying to figure out how to deal with it, have a few ideas in mind, and just opened a discussion in the SQLALchemy Repo to better understand the issue.
@lowercase00 Databases don't support concurrent operations on same connection, that's not really a sqlalchemy issue, you can spawn more sessions if you need to. Session dependency could be created like this, no need for try/expect:
async def get_db() -> AsyncIterable[AsyncSession]:
async with async_session() as session:
yield session
In my opinion pydantic should only perform simple validation that doesn't have anything to do with request state or database
If you need to "automatically" raise an error maybe create a method for that, e.g. get_or_404
to retrieve your models or raise 404 error, you can also make it a simple function that would throw an exception if it receives None 🤔
@ThirVondukr Sure, I don't think it is a SQLAlchemy issue, but rather a pattern to solve somehow (I haven't solved it). If asyncio's concurrency shines with I/O, there should be away to enjoy these benefits: using different connections perhaps?
From what I understand no (practical) difference in the try/except and the contextmanager? Both will enforce closing the session afterwards given the close()
in the finally
block. I tend to prefer calling the .close()
explicitly since you only do it in a single place.
@ThirVondukr btw, if you would like to contribute, or are interested in the discussion about SQLAlchemy patterns to use with asyncio concurrency there's a discussion here.
Using a context manager is just more clean/pythonic way since it would not be possible to say forget to call .close()
in that case
I appreciate your reply @lowercase00, I will follow-up a bit because you appear very knowledgeable on the subject. Regarding the validation I will give an example.
Considering a payload like:
class Package(BaseModel):
item_id: int
tag_id: int
# Maybe a bunch more ids
...
So from the controller perspective, we could just use this schema and validate that the ids are integers. From a business logic perspective, we need to apply access control logic to these ids. So while the format/type may be correct, the value should be considered as invalid because it doesn't exist for the authenticated user.
In my service layer I can do (I am neglecting some deps and sessions and whatnot in the snippets):
class PackageService:
def __init__(user: User, ...):
self.user = user
def create_package(package: PackageSchema):
item = ItemRepo.get(package.item_id, user_id=self.user.id)
tag = TagRepo.get(package.tag_id, user_id=self.user.id)
... # Repeat for all ids
package = PackageRepo.create(...)
The repos can raise whatever, which then gets mapped to some HTTP_40X error. But: 1) if there are a lot of ids, this procedural approach to check each id seems unorganised, and as-is, if there are 10 invalid ids, you would need 11 requests to eventually fix them all and get your 201 (fix one, send the request, get another error for the next id, repeat) 2) The raised exception loses granularity for the api consumer.
The service layer can't raise anything based on the api interface, so whatever it raises either needed to be remapped in the controller, or is generic and independent of the interface, such as raising something that returns {"message": "Item not found"}
. The service can't meaningfully raise {"message": ..., "details": [{"validation error with loc"}]
because the input to the service doesn't necessarily map directly to the interface defined on the api controller (in this example it does, but in practice its not always the case).
So as a consumer of the API, for better error handling I would like the error to be:
{
"detail":[
{
"loc":[
"body",
"tag_id"
],
"msg":"invalid value",
"type":"value_error.invalid"
},
... # Repeat for the other ids
]
}
If you have frontend ui with a form, you can then map an error to the specific input element (Ignoring that the frontend shouldn't be showing ids that they can't use, because this general business logic validation applies to other things, more than just access control).
For this sort of granularity, we need the this validation to be performed in controller. Now we don't want business logic to be performed in the controller as such, but we can have services expose methods that allow the controller to enrich their validations to return more useful errors.
Like:
class PackageSchema(BaseModel):
item_id: int
tag_id: int
# Maybe a bunch more ids
...
@validator("item_id")
def validate_item_id(cls, value):
user = ... # User from somewhere, contextvars maybe
if not ItemService(user=user).get_item(value):
raise ValueError("Invalid item id")
But here the schema has no dependency injection and no clear way to handle how we pass the user context to the validators.
So I am wondering how you are approaching thing.
Are you performing business logic validations within the service layer, in a way where the returned error is not mapped to the specific field of the input?
Are you distinguishing between field level validations only handling basic validations of type/format/required of the value, with business logic validations of fields being less linked to the given field?
@reritom I'd personally recommend to not perform any database actions in your schema, better move them into your service layer 🤔
@ThirVondukr So would you have service layers raise Exceptions raise SomeException("Item not found")
and dismiss the more granular error handling on the api, or would you do somehow raise specific errors from the service layer that the controller then maps to the correct api schema field:
class Service:
...
def create_package(self, package: PackageSchema):
try:
TagRepo.get_tag(package.tag_id, user_id=user_id)
except NotFound:
raise ExceptionWithSomeData("Tag not found", field="tag_id")
...
@app.get(...)
def controller(package: PackageSchema, user: User = Depends(get_user)):
try:
PackageService(...).create_package(...)
except ExceptionWithSomeData as e:
raise SomeValidationError(field=e.field) # Here the mapping is one to one, but in practice it might not be
@reritom I think there are a few different things here. I’m assuming that when you say “the ids” you are talking about all field_id
fields in the Package
Schema, right? So to be able to perform some random action, you have to check whether all of your fields are valid (business wise). While just looking at this specific problem, you could iterate through the schema structure, get the field names, and have a strategy (map) { field_a: validator_a }
to run all validations. But… it honestly is looking a bit strange, there might be better ways to achieve what you want.
First, the frontend should only be loading ids
that the user is allowed to see, so when performing the GET /items
and GET /tags
that is needed to load items on a select component for example, this should already be filtered by what the user can see, so this is the first part of the authorization. When you do that, the frontend only has valid IDs as potential values and hence a first layer of validation for the ids
.
About mapping potential issues to specific fields, like: tag_id is invalid
. We don’t usually do that automatically, I normally send specific responses using the DefaultResponse
(described in the first post) and them pass details
(such as these) to describe the specific issue. And then the frontend should be responsible to deal with it.
If you want to still validate the content from a business perspective, I would probably implement validate
methods (in the service) that you call before even performing the action itself:
is_tag_valid = await service.validate_tag(...)
if not is_tag_valid:
raise TagNotFoundException("Tag ID was not found")
And then catching the exception to translate it to a 404 for example. You could obviously do that sequentially or in bulk (performing all validations and then returning a specific message), returning maybe an array of objects each with known fields, maybe something like:
class ValidationDetailSchema(BaseModel):
field_name: str
error_code: int
detail: str
I really like known objects with predictable keys, tends to be more friendly for parsing.
There’s a trade-off here: you can either call a ton of validate
methods before going to create_package
, but you would be going to the database quite a lot of times (one for each validation, one for the creation). I can’t say whether this is worth it, but it’s just something to keep in mind.
I’m not a big fan of having business logic in the @validator
but it is possible, sure. I would still prefer to have a more complex service that handles it all.
That all being said… it does looks like this situation should probably not even be possible if your access control is working from the start, and your business layer is "complete" and properly separated from the Repository for example. From your text I got the impression that the API and the frontend are tightly coupled (you mention controlling form messages for example), this is not good/bad, but it does makes for specific issues to appear where more “generic” APIs wouldn’t worry too much about it. It does seem to be worth to revisit the architecture to see if there’s something to be done diferently.
In one project where our business logic was fairly complex, what we’ve done is we had a class XYZHandler
(somewhat in the lines of what DDD calls UseCase
) and the Handler
(which is a callable class - with the __call__
method implemented) calls the services. This has been very useful in situations where there were a whole lot of steps to perform for a single action (like SendPackage
in your case, but overkill if it’s a simple CRUD. In this case we had a HandlerInterface
and several handlers, basically on for each UseCase
, but that’s a totally different paradigm than the controller > service
structure. In this specific project, it really helped me to think of the UseCase
, the business process itself, instead of the methods that I would call (I think it’s pretty common to start at the code, and then try to solve the business issue, but the other way round tends to be more effective). And then the classes would change completely, like no more XYZService
but rather PackageDispatcher
(that would probably use a few services - acting more like utilities methods - and repositories).
Well, I really don’t know what would be the best way, again, just sharing experiences that might be relevant for your use case, hope it helps somehow!
First, the frontend should only be loading ids that the user is allowed to see, so when performing the GET /items and GET /tags that is needed to load items on a select component for example, this should already be filtered by what the user can see, so this is the first part of the authorization. When you do that, the frontend only has valid IDs as potential values and hence a first layer of validation for the ids.
This is correct. When the frontend populates the input, it is using ids that are relevant to the given user and context. However, it is possible for the ids that are being listed to become outdated.
If the ids (well, their labels linked to an id) shown on the frontend view contains an id that another user for the same tenant has deleted (after this user loaded the view presenting the ids), then we have the case where you can post an invalid id. Ideally when the other user deletes an entity, it will trigger an event which will be sent the the current user using Pusher or some form of websockets, which can then invalidate the cache (or manually refresh) the ids presented to the user as options. But there is always the chance that this event will be lost/late or just not having that solution implemented.
In this case when we POST the payload, we can have different types of field specific errors that the frontend can use to perform different actions.
If the payload were {"tag_id": 1, "name": "somethingUnique"}
we have the cases:
{"error": {"details": [{"field": "name", "message": "taken"]}}
(Note: the message could be constants, enums, or error codes). In this case, the frontend can choose to put a red outline around the input (I am using the frontend as an example, but in practice its for giving more context to any api consumer).{"field": "tag_id", "message": "invalid"}
. In this case, the frontend can be clever enough to know that the tag_id
or any <entity>_id
with invalid
(or whatever enum or status code) should trigger a cache invalidation or re-retrieval of the entities related to that id.So if I have multiple ids in my payload and one fails, it will use this information to just refresh the ids presented to the user (with some warning/error message), while an error for non-id field might just be used to show a red outline.
Again though, this sort of enriched validation needs to have at the controller level to combining various service interfaces/apis, because if these validations exist purely within the service layer, there will be overhead in the controller layer in mapping them to match the API definition.
Using your example here with the tag validation:
is_tag_valid = await service.validate_tag(...)
if not is_tag_valid:
raise TagNotFoundException("Tag ID was not found")
We are able to achieve a similar goal of giving the API consumer enough information to know that their ids cache needs invalidating. Changing it to NotFoundException("...", entity="tag")
(where entity
maybes maps to the resource name in the openapi spec could add some flexibility. Again limited to returning one error at a time, while the field-level errors can trigger multiple consumer side invalidations from a single error response.
I’m not a big fan of having business logic in the @validator but it is possible, sure. I would still prefer to have a more complex service that handles it all.
I agree with this in principle, but also want to provide the API consumer with the tools to be as efficient as possible at correcting for bad requests.
From your text I got the impression that the API and the frontend are tightly coupled
The extent to which my frontend and backend are coupled is purely by the API contract. My existing error messages are marshmallow based so resemble this:
{
"type": "ValidationError",
"message": "...",
"details": {
"item_id": ["invalid"],
"other_id": ["invalid"],
"other_field": ["taken",] # Or other strings that indicate the length restrictions, etc,
}
}
And with this sort of error my frontend is able to know that the ids for those two entities need refreshing, while the other_field only needs to display some visual indication that the input isn't valid.
I prefer the FastAPI approach of error formats with the list of objects that represent the locator, message, etc. As you mentioned, objects with predictable keys are generally easier to work with.
Anyway thanks for your input, I found it interesting and enjoyed reading all of your replies in this thread.
Well, to be honest it seems like you kinda have it figured it out... and mostly of matter of putting the pieces together in a way that it makes sense to you and your team (for readability, maintainability and etc).
Perhaps you can have a ValidationService
. In my experience the most important thing in this scenario is to have a clear contract of the return, I would totally give up on trying to mix this with the Pydantic validation and just have a custom validation Schema exactly like the one you already have, you would enforce a contract for the data validation that would make it easy for your client to parse it. My only suggestion here is to invert the logic, and to have predictable keys and custom values: so instead of having item_id: ['invalid']
. something like field: 'item_id', status: True
. A schema could be something like:
class FieldValidationResponse:
field_name: str
status: bool
details: dict[str, Any]
class ValidationResponse:
status: bool
msg: str
field_status: list[FieldValidationResponse]
This makes it for a much easier parsing experience for the client, and easier for you to build the object you are returning. With this schema you could then have a Union[ValidationSchema | PackageResposne]
on your response model, or maybe even a separate route for validation if you think it's worth the extra request (would help to keep the routes more consistent... Then Pydantic would validate the data types, you would have a business validation layer, and then the service, which seems totally reasonable.
But again, total personal preference, I'm sure there are a whole lot of ways to deal with this...
PS: One thing that also comes to mind in this scenario is the Envelope pattern which is pretty nice, since you can encapsulate different types of messages while maintaining a predictable contract. This is a good article about it, I've used in another project and I quite enjoyed it, maybe it's worth to take a look.
Amazing! that is what I want. Thank you @zhanymkanov, @lowercase00.
Hi @lowercase00
Great post. Had a look at the project structure and was piqued by your extension package in regards to the correlation id middleware.
I had used ContextVars to manage the correlation id in conjunction with a well known key in the request header which if not present, a random uuid will be generated. If outbound requests are made, the correlation id is then propagated forward by adding it to the outbound headers payload.
It feels a bit clunky, how did you go about managing correlation ids?
Hi @lowercase00 Thanks a lot for this post, Can you explain more on the role of the Message Hub, if possible describe a usecase.
@terrysclaw I just use the package asgi_correlation_id
, the idea has always been to track inbound requests though, so not sure this is the same use case. We just wanted a way for the client to tell us the correlation ID so we could track actions on our side.
@Uncensored-Developer yeah, sure.
So the MessageHub
is basically just a way to decouple things. Think of it as an internal task queue or pubsub system, but with some benefits and without overhead.
Say you have ServiceA
on modulea
and in a complex workflows, after it finishes a certain task, it triggers ServiceB
on moduleb
. You don't want ServiceA
calling ServiceB
directly for a whole lot of reasons: circular dependency, business logic coupling, potentially blocking operations, error in ServiceB
could affect ServiceA
response etc. So its just as if you sent the message to a task queue or to a pubsub (kafka/redis-stream) system. fire and foreget. ServiceA
has done it's job, and can tell everyone interested that it has finished. Now ServiceB
knows that ServiceA
finished and can run whatever.
It's almost as if ServiceA
and ServiceB
were microservices, but they live in the same codebase, and you won't have all the challenges related to microservices (database, gateway, authentication, etc), while also giving all the benefits of a monolith.
Now that you have a hub
on your application, you can build things on top of it: you can record every message that's going through the hub, you can have multiple handlers for a certain event (now when ServiceA
finishes, run ServiceB
and ServiceC
in parallel), you can make be synchronous or asynchronous dynamically depending on the event body (sending to Celery/RQ), there just a lot of benefits (in my opinion) without the very heavy lifting of a external pubsub system.
@ccrvlh
Please could you explain the role of the modules
sub-folders and the core
folders because I noticed they have similar contents with other folders like auth, users, tenants. etc
Hey @Uncensored-Developer sorry the delay.
So when you use the "modular" approach, you'll find that it's fairly easy to end up with a mess of circular dependencies (one module depending on the other). But you'll also find that those circular dependencies are usually all the same (or very similar), so what we've learned was that it made sense to have a sort of "global" set of packages that most of the modules use - the easiest way to think about this is: those are the modules that the application actually need to make sense (eg users
, or auth
, or tenants
or some random set of functions that everybody needs eg core
functions).
So what we did was: we promoted the ones that were inherently necessary for the application to function to the root of the package (app/....
), and had everything that was just a bunch of functionalities that you could turn on/off within modules
. That way your application is actually modular: you can add or remove any set of modules
you want, and you won't break things - all you have to do most of the times is to register the route (in your global routes), and the models (in your db), and you're ready to go with a new module (new set of features).
If you ever find yourself having circular dependencies, probably there are better ways to organize your modules - normally you'll add another module (module A
) that imports from two other modules (modules B
and C
), and you'll break the circular dependency between module B and C. In our experience this almost always will make 100% sense from a business perspective, and the app becomes obvious for anyone that looks at it - it's almost a perfect reflection of the underlying business mechanics.
That's why I added a core
(some set of functions/classes that almost every module use, and your app just can't live without), and promoted some modules (users
, tenants
), that are also 100% linked to your app - your app just doesn't make sense without that base.
Hopefully this makes sense.
@ccrvlh thanks for sharing your insights and experience with these type of projects, they are much appreciated 🙏
Noticed you switched from Celery to RQ: we've recently switched from RQ to Arq it's async counterpart and deeply inspired from RQ.
@ccrvlh Thanks for the great write-ups. How are the modular models being picked up by Alembic for migration autogeneration?
Hey @bitstein i'd have a models.py
file importing the models from every module, and then import the models file on alembic's env.py
.
I did change this approach on my last projects though, and now I have all tables into a single db/models.py
file. It's a rather long file, which is the downside, but I think it makes it easier for (1) allowing for more flexibility in terms of which module can use a model (2) detaching the persistence layer from the business logic layer (3) easier to reason about the database schema and relationships. Also, I like to fold things, and use comments to separate groups of tables in a somewhat logical manner, so I feel it makes a lot of sense and easier to work with.
Hey @bitstein i'd have a
models.py
file importing the models from every module, and then import the models file on alembic'senv.py
.I did change this approach on my last projects though, and now I have all tables into a single
db/models.py
file. It's a rather long file, which is the downside, but I think it makes it easier for (1) allowing for more flexibility in terms of which module can use a model (2) detaching the persistence layer from the business logic layer (3) easier to reason about the database schema and relationships. Also, I like to fold things, and use comments to separate groups of tables in a somewhat logical manner, so I feel it makes a lot of sense and easier to work with.
Thank you!
@ccrvlh
Many thanks for sharing.
I'm wondering:
@copdips
You mean model
or module
? If module_a
needs to communicate with module_b
there a few options: they are both part of the same "workflow" in which case it might make sense to create module_c
that depends on a
and b
. It might be that they are not part of the same workflow, and this could be done through the message hub / external worker (eg. updating a records triggers some other action in another module, but those are independent). Where, it kind of depends - I have some things being tracked on the route layer, and other on the service layer - as I'm mostly doing APIs the route ends up being the most relevant external interface (CLIs for example are mostly for internal usage, so no need to track those, and SDKs are based on the API, so the codebase itself is not meant to be a library - in that case tracking stuff on the service layer is less relevant).
As for the DB, in my latest projects I have moved the (db) models out of the modules
to a dedicated db folder, this has given me a lot less headaches when designing the database, and lot more freedom for modules to use the persistence layer as they see fit - sometime you may want more flexibility when querying a table that is theoretically "owned" my another model, for example. I do try to restrict data manipulation to the module that owns the set of tables i'm working on, but this is just a very general guideline rather than a hard rule.
@ccrvlh ,
I apologize for the misunderstanding earlier. But you correctly interpreted my intent :)
My initial questions were about:
This morning, I searched on google, and found a doc, while designed for Django https://phalt.github.io/django-api-domains/examples/, has a very similar structure to what's recommended here.
The author introduces two new layers:
interface
: Contains all outgoing methods.apis
: Contains all public incoming methods (not the API endpoints themselves). There's a separate file (urls.py
) defines routes that consume the apis
layer and then interact with backend database models through the services layer.If I understand correctly, when domain A needs to communicate with domain B, it would call domain_b/apis.py
from domain_a/interfaces.py
. Behind the scenes, domain_b/apis.py
could call domain_b/services.py
.
So a services.py
could have two consumers, one is the interfaces.py
from another domain, and another is the urls.py
in the same domain for routes.
The author does not explicitly discuss many-to-many relationships, only mentioning that each domain possesses its own database model file. Regarding your solution to centralize the database models from the domain layer into a single folder, the question remains: how are association tables handled? Is there a single file for all tables, or does each domain maintain its own database model file in this centralized folder ? If it's the latter, where are the association tables placed?
@ccrvlh Thanks for your sharing,
Here is how handle the request-coupled session management. Set ContextVar for every request, so session can be bind to single request context. LINK. Pros: do not need explicitly initiating session. Cons: middleware call finally after all BackgroundTask(bgt) done, so session pool could be drain when there is time consuming bgt. If then, move this code to router layer with decorator or context manager or remove bgt.
I had to consider single transaction on multiple service, so I use uow pattern, (I am using PDI, global session is injected directly to repository)
This is transactional context manager,
import contextlib
import typing
from functools import wraps
from traceback import print_exc
from dependency_injector.wiring import Provide, Provider
from sqlalchemy.ext.asyncio import async_scoped_session
# global session is injected by PDI
session_provider: Provider[async_scoped_session] = Provide["session.provider"]
@contextlib.asynccontextmanager
async def transactional(
can_commit: bool = True,
can_flush: bool = True,
) -> typing.AsyncContextManager[async_scoped_session]:
"""
Transactional context manager for database operations.
Usage:
async with transactional() as session:
await session.execute(...)
if can_commit is False, only flush will be executed.
if can_flush is False, neither commit nor flush will be executed.
This is for when you want to manually control the commit and flush and prevent list item error with sqlalchemy instance.
"""
session = session_provider()
try:
yield session
except Exception as e:
await session.rollback()
print_exc()
raise e
finally:
if can_commit:
await session.commit()
else:
# sometimes, I need to disable autoflush
if can_flush:
try:
await session.flush()
except Exception as e:
await session.rollback()
print_exc()
raise e
class ServiceA: _repo: RepositoryA
def do_a(..., commit: bool = True) ...
async with transactional(can_commit: commit):
self._repo.do_a(...)
- transaction on multi service
```python
class MultiService:
service_a: ServiceA
service_b: ServiceB
def combine_a_and_b(...)
...
# commit() is called when each service session execution is done
async with transactional(can_commit: True):
self.service_a.do_a(..., commit=false)
self.service_b.do_b(..., commit=false)
@rumbarum Wouldn't it be better to just use a begin
context manager once in a dependency or a middleware:
async def get_session() -> AsyncIterator[AsyncSession]:
async with async_session_factory.begin() as session:
yield session
And if you need to manage transactions inside of your endpoint (e.g. creating nested transactions and rolling them back) you can create a constrained interface/protocol for sqlalchemy session, so other methods aren't used in inappropriate places?
Hi, @ThirVondukr
Session initiated already on middleware.
transactional
is used to control commit scope and autoflush, I had a trouble on autoflush
async with transactional() as session:
with session.no_autoflush:
...
I could converting to decorator but this is much simpler and explicitly how commit handeled.
I don't understand your saying
create a constrained interface/protocol for sqlalchemy session, so other methods aren't used in inappropriate places?
Would you say again more explainable please?
Generally you wouldn't want to have more than one (normal, not nested/savepoint) transaction per request, having your commit called in a fastapi dependency makes a lot of sense since they're terminated just before your response is sent (that wasn't the case some time ago)
If some high level parts of your application want to control transactions it doesn't make much sense to expose whole AsyncSession
object/interface to them, so you can create a wrapper or just a protocol for your AsyncSession
that would only have necessary methods to control transactions.
@ThirVondukr Thanks for your comment.
"they're terminated just before your response is sent" is it true? on my version they finished after response.
@ThirVondukr Oh, I found change on 0.106.
@zhanymkanov thanks for the write up. It’s great to have some benchmarks on professional implementations, this is awesome and one of the most valuables repositories, just a lot of production-ready and architecture tips, great stuff, thanks a lot for sharing this!
To our (very positive) surprise, this is very similar to what we are doing in our side. I though it was worth sharing our experiences and the choices we've made along the way good and bad.
Project Structure
This is very similar to what we are doing. The functional way of splitting things doesn’t really work except for really small projects, so we also have a “module” based approach. Our application looks something like:
A few comments:
modules
module. We have been using this structure for the past couple of years and have been pretty happy with the separation of concerns it brings. We even reuse the same blueprint for different projects, we mostly just change themodules
which is great.db
module on the top level has helped a lot giving us flexibility to have more robustMixin
classes, better engine configuration and some other goodies.core
module on the top level. This gives us flexibility to do things like a specific mock service, a taskStatus route or more generic resources.Permissions & Auth
Although the “recommended” way of doing authentication in FastAPI would be the dependency injection, we have chosen to use a class-based decorator to control access on the route level. So our routes look something like:
And our
access_control
class looks like:A few benefits we encountered, and few drawbacks:
module
oraction
orsuperuser=True
and things like that.access_control
class itself) is fairly easy to work on, being very powerful at the same time, since it has the*args
and**kwargs
from the request, and the full context (current user, path, tenant, etc), so all sort of checks can be used. As we increase the granularity over access control we have been considering implementing apermissions
decorator for each module, so we can have more specific control over a given resource. But WIP still.Class-based Services
Our service module
service.py
started to get big and a mess of functions, so we started having a few class based services, which have been working very well. Something likeTenantService
,UserService
. This almost looks like arepository
for simple modules (in some cases we even spiltd the service intoservice
andrepository
(for more complex business logic). Now each service module has anything from 1 to 10 service classes, this greatly improved our organization and readability.Class-based views
Earlier this year we refactor all of our routes to use a
class based view
that is included in thefastapi-utils
package and this is made our code a lot cleaner. The main benefit for us, is that the basic authentication process (reading thetoken
and theX-Tenant-ID
for theheader
) is done in one place only, se we don’t have to repeat the dependencies. What we’ve done is, we have a customcommons_deps
function, and at the beginning of each route class we do something like:We have been experimenting with something slightly different nowadays, which is having the
service
being instantiated with thetenant_id
andcurrent_user
in a dependency injection, so that our service starts up a bit more complete.Task Queues
We are long time Celery users, but celery is overwhelming and fairly difficult to reason about when you get to the internals and specifics. We just switched to RQ and couldn’t be happier with a few caveats. The logic is amazing (the
Queue
,Job
objets are really intuitive and easy to work with, as aredependency
chains withdepends_on
. The thing is that there’s an issue withasync
functions. They work if you use the worker, but won’t work if you run in the same process, which is kind of a pain when debugging. We haven’t experimented with starlette’s. Background jobs as we always valued having a centralized dashboard for tasks and an easy way to get a task status for example. As we deploy most of our applications in Kubernetes, being able to scale the workers easily and indefinitely is awesome and we are really glad with it. I have been experimenting with a few different snippets to try to open a PR and make RQ compatible in every scenario.The fancy architecture
In same cases (actually projects) we slightly changed our module architecture to account for a proper business oriented
Model
object.For fancier implementations this worked very well, although is a lot more complex to start with. This gives us a proper
EntityModel
and great separation of concerns, but it gets a lot more verbose really quick, so we found it was only worth it for very complex projects, but it’s also a possibility.Custom Response Serializers & BaseSchema
We found that the
response_class
in FastAPI also serializes the data in Pydantic, so it’s not purely for documentation. You can, however, overwrite the default response behavior by making a custom response class, which we did going a bit of performance (anywhere from 50-100ms) and flexibility. So we have something like:And then we use the response directly like:
This gave us a cleaner
router
since we can use the status code on the response itself, which was more intuitive for use, gained a bit of performance with theorjson
encoder and we just like it better. The (big) downside is that we face the risk of having documentation/API inconsistencies, in our case it happened once or twice, but we think it’s still worth it.Just as you guys we also have a
BaseSchema
base for all Pydantic schemas we use that have a couple of configurations likeorm_mode
enum
etc.Using a
DefaultResponse
classIn several occasions the response is kind of generic, so we use a lot of a schema called
DefaultResponse
:This is a kind of standardized way of communicating with our client (we have a React frontend) so the front devs always know what to look for when getting a
DefaultResponse
.Configuration
Although Pydantic is nice for configuration as well, we couldn’t be happier using the amazing @Dynaconf lib, developed and maintained by @BrunoRocha. This was a game changer in our settings management.
All of our settings/secrets went to
.toml
files and a few things happened:toml
headerssettings.py
file has ~10 lines:And now everywhere we can just
And don’t need to change anything when deploying to K8S since we already inject everything with env vars (config). Can’t recommend it more. We still have to experiment with the
Vault
integration, which is the next step.The Message Hub
This helped a lot while we were trying to further decouple our services. The hub is a centralized
hub
to share message between modules, something like:And in most modules we have
handlers.py
module that will have a few functions that handle events. Theservices
themselves usually dispatch events, likehub.MessageHub.handle(event_created_by_the_service)
, and we also use it to track application activity, normally called by the routehub.MessageHub.track(application_activity_schema)
Types & Docs
100% of arguments are typed and 100% of methods / functions have docstrings. I honestly can't live without anymore. Now wondering if could just compile the whole code to C and make it fly? Nuitka, MyPyC maybe? TBC...
Now the bad part, and our (really) bad practices
Local Session Management
For a couple of reasons we didn’t implement the request-coupled session management (inject the session through FastAPI’s dependency injection system) and we ended up having a lot of services that handle the session locally, which is not cool and not recommended by SQLAlchemy. Think of:
Managing the session lifecycle itself is fairly ok and it works for really simple services, but what we found is that for more complex services methods that call on another you end up nesting sessions which is terrible. Imagine calling
other_method
frommodule_method
that also has the same session lifecycle management, now you just opened a session within another session. Jus terrible. We are gradually moving to better session management, but we are still trying to find better ways of handling it.Little use of the Dependency Injection
In your write up a lot of great example of how to properly use and leverage the power of dependency injection, we don’t use much of those, and we definitely should.
Lack of Context in Services
Sometimes we found ourselves having a Service class that didn’t even have a initializer and was purely for organization, this is fine, but we are missing a lot of benefits of having some context in the service (example:
tenant_id
andsession
) which would save was from having thetenant_id
being passed to every single method in a service class. So there’s definitely a lot to improve here.There's obviously a lot to improve and a whole lot more of bad things that I probably forgot to mention, but again, I though it was worth sharing the experience. And to finish our Dockerfile, which is also pretty simple (using
poetry
and leveraging it'sdev-dependencies
logic something that was mentioned here as well https://github.com/zhanymkanov/fastapi-best-practices/issues/1 :