ayrat555 / fang

Background processing for Rust
https://fang.badykov.com/
MIT License
609 stars 25 forks source link

Sqlite and MySql support for asynk #141

Closed pxp9 closed 5 months ago

pxp9 commented 1 year ago
pxp9 commented 1 year ago

There is one bug in sqlite implementation or test.

For some reason

asynk::async_queue::sqlite::fetch_and_touch_test fails because it does not fetch nothing because SQLite has locked the entry.

when this FOR UPDATE SKIP LOCKED is deleted it from SQLite fetch_task_type query, works fine.

pxp9 commented 1 year ago

This PR will partially close #84

pxp9 commented 1 year ago

could someone help me with mysql queries ?

pxp9 commented 1 year ago

i have an idea, let me work

pxp9 commented 1 year ago

A few explanations before reviewing.

This mysql impl is almost done.

it has one bug remaining.

image

For some reason this test gets stuck.

Why VARCHAR(2048) instead of TEXT in MySQL schema ?

sqlx decodes as BLOB the Mysql TEXT type and that breaks FromRow impl in lib.rs line 183.

pxp9 commented 1 year ago

I have been researching and it gets stuck in this func mysql_impl_insert_task_uniq at let affected_rows = sqlx::query(query), probably it is a future error.

danmx commented 6 months ago

Hi, I see that the PR is a bit stale. Would it be possible to remove MySQL from it and just merge sqlite part?

pxp9 commented 6 months ago

Hi, I see that the PR is a bit stale. Would it be possible to remove MySQL from it and just merge sqlite part?

Dear @danmx , it is stale for 2 reasons.

PR what I am talking about

danmx commented 6 months ago

In that case would it make sense to move MySQL to a separate PR or sqlite depends on that too?

pxp9 commented 6 months ago

In that case would it make sense to move MySQL to a separate PR or sqlite depends on that too?

As you can see in this code, my fork fix is affecting every backend decoding. PostgreSQL, MySQL and Sqlite.

https://github.com/ayrat555/fang/pull/141/files#diff-d6910112c105b30683c633451a1ada3fb1ea2671280e13834396a60d9fb70ffdR195-207

pxp9 commented 6 months ago

In that case would it make sense to move MySQL to a separate PR or sqlite depends on that too?

As you can see in this code, my fork fix is affecting every backend decoding. PostgreSQL, MySQL and Sqlite.

https://github.com/ayrat555/fang/pull/141/files#diff-d6910112c105b30683c633451a1ada3fb1ea2671280e13834396a60d9fb70ffdR195-207

@danmx I have sync my fork and reopened my PR

pxp9 commented 6 months ago

I have been researching mysql issue, i have found that it may be a sqlx issue. this captures are taken with tokio tracing and tokio console. This is generated when the problematic test is executed.

RUSTFLAGS="--cfg tokio_unstable" cargo test asynk::async_queue::mysql::remove_tasks_by_metadata -- --color always --nocapture in one terminal

tokio-console in other terminal

Screenshot from 2024-04-08 21-28-18

Screenshot from 2024-04-08 21-25-44

pxp9 commented 6 months ago

Workflow should test blocking feature and fang_derive_error crate

pxp9 commented 5 months ago

It seems we have this issue with sqlx

pxp9 commented 5 months ago

I have done a downgrade to sqlx 0.6.3 which is latest version that is not broken.

I need help with MySQL workflow , i know it is working because i tested it locally with no problem.

pxp9 commented 5 months ago

LOL. Mysql as always trolling with localhost and 127.0.0.1, for some reason its treatment is different

pxp9 commented 5 months ago

README should be updated.

pxp9 commented 5 months ago

@Dopplerian, @ayrat555 and @0xCAB0

Could you review plz ?

pxp9 commented 5 months ago

we also need to bump the version

pxp9 commented 5 months ago

i think we can make a major release since it is a breaking change

pxp9 commented 5 months ago

Referring this

I need to use the same decoder for the 3 backends.

I am decoding AnyRow to Task struct , the problem is that changing this to u64 will change a lot of things in the API and in the implementation.

Maybe in all databases can store it as a number and retrieve the number and use chrono::DateTime::from_timestamp

What do you think @ayrat555 and @Dopplerian

pxp9 commented 5 months ago

Referring this

I need to use the same decoder for the 3 backends.

I am decoding AnyRow to Task struct , the problem is that changing this to u64 will change a lot of things in the API and in the implementation.

Maybe in all databases can store it as a number and retrieve the number and use chrono::DateTime::from_timestamp

What do you think @ayrat555 and @Dopplerian

Also I am not sure if sqlite has something that can store a u64 , maybe as bytes.

Could someone do some research about it ?

Dopplerian commented 5 months ago

Referring this I need to use the same decoder for the 3 backends. I am decoding AnyRow to Task struct , the problem is that changing this to u64 will change a lot of things in the API and in the implementation. Maybe in all databases can store it as a number and retrieve the number and use chrono::DateTime::from_timestamp What do you think @ayrat555 and @Dopplerian

Also I am not sure if sqlite has something that can store a u64 , maybe as bytes.

Could someone do some research about it ?

Checkout [this Stack Overflow question], although I'm not sure whether these options will work with the 'only using 1 decoder for the 3 backends' limitations.

TL;DR: There are 2 options. The first one is using a blob, and the second one is casting it to a signed 64-bit integer and then storing it as an INTEGER value.

pxp9 commented 5 months ago

Referring this I need to use the same decoder for the 3 backends. I am decoding AnyRow to Task struct , the problem is that changing this to u64 will change a lot of things in the API and in the implementation. Maybe in all databases can store it as a number and retrieve the number and use chrono::DateTime::from_timestamp What do you think @ayrat555 and @Dopplerian

Also I am not sure if sqlite has something that can store a u64 , maybe as bytes.

Could someone do some research about it ?

Checkout [this Stack Overflow question], although I'm not sure whether these options will work with the 'only using 1 decoder for the 3 backends' limitations.

TL;DR: There are 2 options. The first one is using a blob, and the second one is casting it to a signed 64-bit integer and then storing it as an INTEGER value.

I think the best option is to store it as blob. I think casting to i64 may lead to an integer overflow.

But how better is to store it as BLOB instead of store it as Varchar(32) ?

Also we need to check if it we do not have any issue with diesel migrations.

pxp9 commented 5 months ago

Referring this

I need to use the same decoder for the 3 backends.

I am decoding AnyRow to Task struct , the problem is that changing this to u64 will change a lot of things in the API and in the implementation.

Maybe in all databases can store it as a number and retrieve the number and use chrono::DateTime::from_timestamp

What do you think @ayrat555 and @Dopplerian

looking good. a couple of comment

@Dopplerian, @ayrat555 and @0xCAB0

Also plz see this discussion, I think I have tried bytes storage approach before.

0xCAB0 commented 5 months ago

Didn't thought about blob storage, it's probably the best strategy since we avoid integer sizing and at least it's not in string format

0xCAB0 commented 5 months ago

Referring this

I need to use the same decoder for the 3 backends.

I am decoding AnyRow to Task struct , the problem is that changing this to u64 will change a lot of things in the API and in the implementation.

Maybe in all databases can store it as a number and retrieve the number and use chrono::DateTime::from_timestamp

What do you think @ayrat555 and @Dopplerian

looking good. a couple of comment

@Dopplerian, @ayrat555 and @0xCAB0

Also plz see this discussion, I think I have tried bytes storage approach before.

Because Serde is implemented I realized you can just manipulate the type to Unix epoch on serialization.

My point was not about optimizing data size, I wanted to use Unix epoch since it's the standard for SQLite ORMs, if you don't mind to create a migration and @ayrat555 @Dopplerian agree, I would go with Blob

pxp9 commented 5 months ago

Referring this

I need to use the same decoder for the 3 backends.

I am decoding AnyRow to Task struct , the problem is that changing this to u64 will change a lot of things in the API and in the implementation.

Maybe in all databases can store it as a number and retrieve the number and use chrono::DateTime::from_timestamp

What do you think @ayrat555 and @Dopplerian

looking good. a couple of comment

@Dopplerian, @ayrat555 and @0xCAB0

Also plz see this discussion, I think I have tried bytes storage approach before.

Because Serde is implemented I realized you can just manipulate the type to Unix epoch on serialization.

My point was not about optimizing data size, I wanted to use Unix epoch since it's the standard for SQLite ORMs, if you don't mind to create a migration and @ayrat555 @Dopplerian agree, I would go with Blob

The main problem is PostgreSQL migration schema must change in order to store it as a BLOB.

pxp9 commented 5 months ago

https://github.com/ayrat555/fang/pull/141#discussion_r1564555315

This is not possible because in order to have agnosticism in queries and make something like this work

    let task: Task = sqlx::query_as(query)
        .bind(uuid_as_str)
        .bind(metadata_str)
        .bind(task_type)
        .bind(scheduled_at_str)
        .fetch_one(pool)
        .await?;

    Ok(task)
}

you need to implement impl<'a> FromRow<'a, AnyRow> for Task

the alternative is to use

impl From<PgRow> for AnyRow

impl From<MySqlRow> for AnyRow

and

impl From<SqliteRow> for AnyRow

and implement

impl<'a> FromRow<'a, PgRow> for Task

impl<'a> FromRow<'a, MySqlRow> for Task

impl<'a> FromRow<'a, SqliteRow> for Task

pxp9 commented 5 months ago

Look at this trait bounds

pub fn query_as<'q, DB, O>(sql: &'q str) -> QueryAs<'q, DB, O, <DB as HasArguments<'q>>::Arguments>
where
    DB: Database,
    O: for<'r> FromRow<'r, DB::Row>,

The type O requires FromRow<'r, AnyRow>

Because the sqlx database that we are using is Any as we are using Pool<Any>

pxp9 commented 5 months ago

#141 (comment)

This is not possible because in order to have agnosticism in queries and make something like this work

    let task: Task = sqlx::query_as(query)
        .bind(uuid_as_str)
        .bind(metadata_str)
        .bind(task_type)
        .bind(scheduled_at_str)
        .fetch_one(pool)
        .await?;

    Ok(task)
}

you need to implement impl<'a> FromRow<'a, AnyRow> for Task

the alternative is to use

impl From<PgRow> for AnyRow

impl From<MySqlRow> for AnyRow

and

impl From<SqliteRow> for AnyRow

and implement

impl<'a> FromRow<'a, PgRow> for Task

impl<'a> FromRow<'a, MySqlRow> for Task

impl<'a> FromRow<'a, SqliteRow> for Task

This approach is not possible since to use

impl From<MySqlRow> for AnyRow for example you need to consume AnyRow and QueryAs trait from sqlx just provides an inmutable reference to AnyRow.

pxp9 commented 5 months ago

The main issue we have is that we cannot have different decodings for each database.

This do not allow us to change a database schema without changing the others.

Ayrat and I think this is too complicated to implement in this PR because it will require to change a lot of things.

We are going to leave the schemas as we have some freedom to change it, we will change it.

I think we have to this work in a different PR that will explore this point.

pxp9 commented 5 months ago

Can i get approved ? @ayrat555 , @0xCAB0 and @Dopplerian

decoding issue and migrations issue will be addressed in #148 and other PR and this will be released as release candidate.

pxp9 commented 5 months ago

dont merge , let me cook , i think i can solve it this afternoon

pxp9 commented 5 months ago

i am cooking in the branch fix/independent_decoding

pxp9 commented 5 months ago

image

It is possible to implement it as generics just need a bunch trait bounds xd

pxp9 commented 5 months ago

my approach will be doing a trait to require this trait bounds

pxp9 commented 5 months ago

Now we should be able to modify with freedom different schemas without affecting each other.

pxp9 commented 5 months ago

Sometimes fetch and touch in mysql fetches the task that is created after that one that should fetch

pxp9 commented 5 months ago

Sometimes fetch and touch in mysql fetches the task that is created after that one that should fetch

this test has 3 different situations: