Open yorickpeterse opened 4 months ago
An example I've been playing with that lead to this is the following:
client.inko
:
import std.endian.little
import std.net.ip (IpAddress)
import std.net.socket (TcpClient)
import std.stdio (STDOUT)
let GET = 0
let SET = 1
fn set(key: String, value: String) -> ByteArray {
let buf = ByteArray.new
buf.push(SET)
# The key
buf.resize(size: buf.size + 8, value: 0)
little.write_i64(key.size, into: buf, at: 1)
buf.append(key.to_byte_array)
# The value
buf.resize(size: buf.size + 8, value: 0)
little.write_i64(value.size, into: buf, at: buf.size - 8)
buf.append(value.to_byte_array)
buf
}
fn get(key: String) -> ByteArray {
let buf = ByteArray.new
buf.push(GET)
# The key
buf.resize(size: buf.size + 8, value: 0)
little.write_i64(key.size, into: buf, at: 1)
buf.append(key.to_byte_array)
buf
}
class async Main {
fn async main {
let client = TcpClient.new(IpAddress.v4(0, 0, 0, 0), port: 9999).or_panic(
'failed to connect to the server',
)
client.write_bytes(set('name', 'Alice')).or_panic('SET failed')
client.write_bytes(get('name')).or_panic('GET failed')
let resp = ByteArray.new
let size = match client.read(into: resp, size: 8) {
case Ok(8) -> little.read_i64(from: resp, at: 0)
case Ok(_) -> panic('not enough bytes read')
case Error(e) -> panic('failed reading the size: ${e}')
}
resp.clear
match client.read(into: resp, size: size) {
case Ok(n) if n == size -> {}
case Ok(_) -> panic('not enough bytes read')
case Error(e) -> panic('failed reading the bytes: ${e}')
}
let stdout = STDOUT.new
stdout.write_bytes(resp)
stdout.write_string('\n')
}
}
server.inko
:
import std.endian.little
import std.hash.siphash (SipHasher13)
import std.int (MAX)
import std.io (BufferedReader, Error)
import std.net.ip (IpAddress)
import std.net.socket (TcpClient, TcpServer)
import std.stdio (STDIN, STDOUT)
let GET = 0
let SET = 1
fn hash(node: Int, key: ref ByteArray) -> Int {
let hasher = SipHasher13.new(100, 200)
hasher.write(node)
key.iter.each(fn (b) { hasher.write(b) })
hasher.finish
}
fn select_node(nodes: ref Array[Node], key: ref ByteArray) -> uni (Int, Node) {
let mut node = 0
let mut max = 0
let mut idx = 0
let len = nodes.size
while idx < len {
let hash = hash(idx, key)
if hash > max {
node = idx
max = hash
}
idx += 1
}
recover (max, nodes.get(node))
}
class async Node {
# TODO: If we pass the node specific Hash, we can just reuse that.
# TODO: we don't need to maintain the order here
let @data: Map[ByteArray, ByteArray]
fn static new -> Node {
Node(data: recover Map.new)
}
fn async get(key: uni ByteArray, client: Client, connection: uni Connection) {
let key = recover key
let connection = match @data.opt(key) {
case Some(v) -> {
# TODO: until https://github.com/inko-lang/inko/issues/589 is fixed, we
# have to clone() here and do this awful dance to be able to write the
# bytes.
let val = recover v.clone
recover {
let val = recover val
let con = recover connection
con.write_bytes(val)
con
}
}
case _ -> connection
}
client.main(connection)
}
fn async mut set(
key: uni ByteArray,
value: uni ByteArray,
client: Client,
connection: uni Connection,
) {
@data.set(key, value)
client.main(connection)
}
}
class Connection {
let @socket: TcpClient
let @buffer: ByteArray
fn static new(socket: TcpClient) -> Connection {
Connection(socket: socket, buffer: recover ByteArray.new)
}
fn mut read_byte -> Result[Int, Error] {
match @socket.read(into: @buffer, size: 1) {
case Ok(1) -> Result.Ok(@buffer.pop.get)
case Ok(_) -> Result.Error(Error.ConnectionReset)
case Error(e) -> Result.Error(e)
}
}
fn mut read_bytes -> Result[ByteArray, Error] {
let size = try read_int
let buf = ByteArray.new
match @socket.read(into: buf, size: size) {
case Ok(n) if n == size -> Result.Ok(buf)
case Ok(_) -> Result.Error(Error.ConnectionReset)
case Error(e) -> Result.Error(e)
}
}
fn mut write_int(value: Int) {
@buffer.resize(size: 8, value: 0)
little.write_i64(value, into: @buffer, at: 0)
let _ = @socket.write_bytes(@buffer)
@buffer.clear
}
fn mut write_bytes(bytes: ref ByteArray) {
write_int(bytes.size)
let _ = @socket.write_bytes(bytes)
}
fn mut read_int -> Result[Int, Error] {
match @socket.read(into: @buffer, size: 8) {
case Ok(8) -> {}
case Ok(_) -> throw Error.ConnectionReset
case Error(e) -> throw e
}
let size = little.read_i64(@buffer, at: 0)
@buffer.clear
Result.Ok(size)
}
}
class async Client {
let @nodes: Array[Node]
fn static new(nodes: ref Array[Node]) -> Client {
Client(recover nodes.iter.to_array)
}
fn async mut main(connection: uni Connection) {
let stdout = STDOUT.new
stdout.print('Waiting for client command...')
let byte = match connection.read_byte {
case Ok(v) -> v
case Error(e) -> {
stdout.print('failed to read the command type: ${e}')
# TODO: what would we do here?
return
}
}
match byte {
case GET -> get(connection)
case SET -> set(connection)
case _ -> {
stdout.print('command ${byte} is invalid')
# TODO: what would we do here? Communicate the results back?
return
}
}
}
fn mut get(connection: uni Connection) {
let key = recover {
match connection.read_bytes {
case Ok(v) -> v
case Error(e) -> {
STDOUT.new.print('error while reading the key: ${e}')
return main(connection)
}
}
}
STDOUT.new.print('GET ${key}')
match select_node(@nodes, key) {
case (_hash, node) -> node.get(key, self, connection)
}
}
fn mut set(connection: uni Connection) {
let key = recover {
match connection.read_bytes {
case Ok(v) -> v
case Error(e) -> {
STDOUT.new.print('error while reading the key: ${e}')
return main(connection)
}
}
}
let val = recover {
match connection.read_bytes {
case Ok(v) -> v
case Error(e) -> {
STDOUT.new.print('error while reading the value: ${e}')
return main(connection)
}
}
}
STDOUT.new.print('SET ${key} = ${val}')
match select_node(@nodes, key) {
case (_hash, node) -> node.set(key, val, self, connection)
}
}
}
class async Main {
fn async main {
let nodes = [Node.new, Node.new, Node.new]
let server = TcpServer
.new(ip: IpAddress.v4(0, 0, 0, 0), port: 9999)
.or_panic('failed to set up the server')
loop {
let con = recover {
match server.accept {
case Ok(v) -> Connection.new(v)
case _ -> next
}
}
Client.new(nodes).main(con)
}
}
}
In the server code we can see the mentioned "pass data to the nodes" pattern, specifically the Client
and uni Connection
values being passed around.
To add another challenge to consider: given an rc T
, any borrow of a sub value must also be an rc T
, otherwise it's possible for process A to non-atomically increment the reference count while process B atomically increments/decrements it, resulting in an inconsistent state.
Another issue is that if a sub-value borrowed as ref T
is turned into rc T
, we can't tell the difference between a borrowed and owned sub value. This means that when we get rid of the rc T
, we can't know if we're supposed to just decrement it, or if we should also check if the value now needs to be dropped.
Perhaps an alternative is to introduce class rc
or something along those lines, with the restriction that such types can only store other value or class rc
types. This however could be tricky to enforce correctly, and probably be of limited value.
I think that if we introduce atomic reference counting, we need not only rc T
but also rc ref T
. Here rc T
is the atomic version of T
, while rc ref T
is the atomic equivalent of ref T
. A rc ref T
would be produced for any ref T
obtained through an rc T
. The difference between rc T
and rc ref T
is that for rc T
decrements follow a drop check, while for rc ref T
it's just an atomic decrement.
This still leaves the challenge of running destructors, as we'd have to ensure that for an rc T
object graph all reference count checks use atomic operations, as it's possible for a process to retain an atomic reference to a sub value in the graph.
Description
Inko's shared-nothing approach generally works well enough, but there are cases where one really wants to use some form of (immutable) shared data. An example of this is something like memcached: you have a process accepting connections, spawning a new process to handle each connection. Data in turn is stored in one or more nodes, which are also processes. Client processes then read data from the socket and determine what node process to get their data from:
Without the ability to share data, client processes must not only send e.g. the key they're interested in to a node, but also a reference to themselves and the socket to send the data back to. The node process then gets the data, writes it to the socket, and sends a message back to the client process containing the socket, such that the client process can resume its work.
In other words, instead of nodes sharing their data with clients, clients tell nodes where to write the data to, and nodes then wake up clients again, passing the necessary data back to it.
While this model works, it means nodes now need to handle writing to sockets, error handling, etc; instead of just being dumb data storage processes. It also leads to a somewhat clunky API where data used by clients is passed around as arguments (such that we can pass it between clients and nodes), instead of this data being stored in a client process field.
In contrast, if we introduce immutable, atomically reference counted types then this problem goes away: instead of nodes writing to sockets, they simply send an immutable value back to the client. The client then writes this value to the socket and discards it.
The foundations for this already are in place: internally we already support atomic reference counting for types such as
String
andChannel
, so we could introduce the typerc T
for arbitrary types. Such types would be created usingrecover
just as how one createsuni T
values. This should be safe because:recover
guarantees no outside borrows to the value existrc T
types would only allow the use offn
methods and notfn mut
methodsmut T
references stored to therc T
graph in the graph itself would be turned intoref T
references upon retrieval, due to the previous fact/limitation, i.e.Array.get
returnsref T
even if the value is stored asmut T
andArray.get_mut
wouldn't be availablerecover
itself likely needs no changing, and instead we can implement this by allowing the casting ofuni T
torc T
values, such that you can simply write this:Or this:
The downside of this approach is having to use explicit type signatures, which could get verbose for complex generic types. An alternative is to introduce syntax like this:
Downsides
Using an approach like this we're essentially "cheating" the shared-nothing idea, because we are very much sharing data. On the other hand, this isn't that different from e.g. Erlang's ETS feature.
Another downside is a more complicated type system, and in particular it feels a bit like a slippery slope towards a complicated capabilities system as found in Pony, something I'd like to avoid.
To put it differently, I prefer fewer features capable of doing more things rather than introduce many specific-purpose features.
Related work
No response