tarantool / tarantool-python

Python client library for Tarantool
https://www.tarantool.io
BSD 2-Clause "Simplified" License
100 stars 48 forks source link

iproto: support feature push #247

Closed GRISHNOV closed 1 year ago

GRISHNOV commented 1 year ago

Adds support for receiving out-of-band messages from a server that uses box.session.push call.

Implemented for methods: call, eval, select, insert, replace, update, upsert, delete. To work with out-of-band messages, 2 new optional arguments are used:

Below is an example of the proposed API with method call and insert. In the described example, before the end of the call and insert, out-of-band messages are processed via specified callback.

For comparison: implementation in the lua version.

Client Server
fiber = require('fiber')
box.cfg({listen = 3301})
box.schema.user.grant(
    'guest',
    'read,write,execute',
    'universe'
)
function server_function()
    x = {0,0}
    while x[1] < 3 do
        x[1] = x[1] + 1
        fiber.sleep(1)
        box.session.push(x)
    end
    fiber.sleep(1)
    return x
end
import tarantool

def callback(data, on_push_ctx=[]): print('run callback with data: ', data) data[0][1] = data[0][1] + 1 on_push_ctx.append(data)
callback_res = []
conn = tarantool.connect(port=3301) res = conn.call( 'server_function', on_push=callback, on_push_ctx=callback_res ) # receiving out-of-band messages, # the conn.call is not finished yet. >>> run callback with data: [[1, 0]] >>> run callback with data: [[2, 0]] >>> run callback with data: [[3, 0]] # the conn.call is finished now.
print(res) >>> [3, 0]
print(callback_res) >>>[[[1, 1]], [[2, 1]], [[3, 1]]]
box.schema.create_space(
 'tester', {
  format = {
    {name = 'id', type = 'unsigned'},
    {name = 'name', type = 'string'},
  }
})
box.space.tester:create_index(
 'primary_index', {
  parts = {
    {field = 1, type = 'unsigned'},
  }
})
function on_replace_callback()
    x = {0,0}
    while x[1] < 300 do
        x[1] = x[1] + 100
        box.session.push(x)
    end
    return x
end
box.space.tester:on_replace(
    on_replace_callback
)
callback_res = []

conn_pool = tarantool.ConnectionPool( [{'host':'localhost', 'port':3301}], user='guest')
res = conn_pool.insert( 'tester', (1, 'Mike'), mode=tarantool.Mode.PREFER_RO, on_push=callback, on_push_ctx=callback_res, ) # receiving out-of-band messages, # the conn_pool.insert is not finished yet. >>> run callback with data: [[100, 0]] >>> run callback with data: [[200, 0]] >>> run callback with data: [[300, 0]] # the conn_pool.insert is finished now.
print(res) >>> [1, 'Mike']
print(callback_res) >>>[[[100, 1]], [[200, 1]], [[300, 1]]]

Closes #201

Totktonada commented 1 year ago

In the Lua API on_push is a callback (called at receiving of a pushed value) and on_push_ctx is a value to pass to the callback (context). As I see from the PR description, here the API is different: pushed values are just stored in a provided collection. One can't instantly react on a push using a code during a long request. Is it intentional?

GRISHNOV commented 1 year ago

In the Lua API on_push is a callback (called at receiving of a pushed value) and on_push_ctx is a value to pass to the callback (context). As I see from the PR description, here the API is different: pushed values are just stored in a provided collection. One can't instantly react on a push using a code during a long request. Is it intentional?

I have updated the solution and description in PR. Now it is possible to use callback, which will be called streaming when out-of-band messages are received before the end of the main call (for example, the end of call or eval).

GRISHNOV commented 1 year ago

Thank you for your feedback! If the current version of the draft is suitable, I will start writing tests and documentation

DifferentialOrange commented 1 year ago

If the current version of the draft is suitable,

I think it is. To fix docs build, I recommend you to rebase on master branch. Moreover, I think it's better to rebase on yet unmerged https://github.com/tarantool/tarantool-python/pull/251 since it contains fixes for potential test fails.

DifferentialOrange commented 1 year ago

Sorry, it seems that you'll need to rebase one more time. I think it's would be the last time and your PR would be merged next.

Master changes that are relevant to you: now ConnectionPool is supported on Python 3.6 and you don't need to skip tests anymore.

GRISHNOV commented 1 year ago

Thank you for your feedback! I tried to answer all the comments on the code review