Native Erlang client for CQL3 over Cassandra's latest binary protocol v4.
Usage · Connecting · Clusters · Performing queries · Query options · Batched queries · Reusable queries · Data types · Token aware policy
Installation · Compatibility · Tests · License
CQErl offers a simple Erlang interface to Cassandra using the latest CQL version. The main features include:
CQErl was designed to be as simple as possible on your side. You just provide the configuration you want as environment variables. Under the hood, CQErl maintains a pool of persistent connections with Cassandra and select the client to use based on a hash of the requsting PID (and, optionally, the connected node if Token Aware Policy is enabled - see below). This ensures relatively even distribution of work amongst clients when there is a lot of concurrent activity (where distribution is most important).
If you installed Cassandra and didn't change any configuration related to authentication or SSL, you should be able to connect like this
cqerl:add_group(["localhost"], [], 1).
The first argument to cqerl:add_group/3
is the set of nodes to which you wish to connect in the form {IP, Port}
, IP
, or Hostname
. IP
may be a string, binary or tuple as described in the inet
manual.
The second argument is a proplist of options. Valid options are:
keyspace
which determines in which keyspace all subsequent requests operate, on that connection.auth
(described below)protocol_version
to connect to older Cassandra instances.ssl
if present, is either true
or a list of options to be passed to ssl:connect/4
. If absent or false
, SSL will not be used.tcp_opts
allows you to specify extra options to be passed to gen_tcp:connect/4
when the client connects. If SSL is enabled, this will be ignored.If you've set simple username/password authentication scheme on Cassandra, you can provide those to CQErl
{ok, Client} = cqerl:add_group(["localhost"], [{auth, {cqerl_auth_plain_handler, [{"test", "aaa"}]}}]).
Since Cassandra implements pluggable authentication mechanisms, CQErl also allows you to provide custom authentication modules (here cqerl_auth_plain_handler
). The options you pass along with it are given to the module's auth_init/3
as its first argument.
The third argument is the number of clients to start per node.
The cqerl:add_group/[3,4]
call blocks until at least one client from the group is connected and available. Once the call returns, queries should be able to be run using clients from it.
Groups may be named, using cqerl:add_group/4
and supplying a name (any term) as the first parameter. See Named Groups below.
Groups of clients may be configured though the application config, for example:
{cqerl, [
{client_groups, [
{client_group, [
{name, cluster1},
{hosts, ["10.1.1.107", "10.1.1.108", {"10.1.1.109", 6666}]},
{opts, [{keyspace, user_db}, {ssl, true}]},
{clients_per_server, 10}
]},
{client_group, [
{name, config_cluster},
{hosts, ["10.1.1.107", "10.1.1.108", {"10.1.1.109", 6666}]},
{opts, [{keyspace, config_db}]},
{clients_per_server, 2}
]}
]}
]}
Doing so will fire up connection pools as soon as the CQErl application is started. Once cqerl has started, clients should be available immediately.
The name
and opts
fields are optional; hosts
and clients_per_server
must be included.
There are several application environment variables that may be set to change query behaviour:
{text_uuids, true}
will cause timeuuid
and uuid
fields to be returned as binary strings in canonical form (eg <<"5620c844-e98d-11e5-b97b-08002719e96e">>
) rather than pure binary.{strategy, Strategy}
determines how nodes are chosen for queries. The default value of token_aware
attempts to use Token Aware Policy. Setting this to simple
will disable TAP.{query_timeout, T}
provides a timeout in milliseconds after which a run_query
call will timeout. This may be infinity
.Performing a query can be as simple as this:
{ok, Result} = cqerl:run_query(my_keyspace, "SELECT * FROM users;").
% Equivalent to
{ok, Result} = cqerl:run_query(my_keyspace, <<"SELECT * FROM users;">>).
% Also equivalent to
{ok, Result} = cqerl:run_query(#cql_query{statement = <<"SELECT * FROM users;">>, keyspace = my_keyspace}).
It can also be performed asynchronously using
Tag = cqerl:send_query(my_keyspace, "SELECT * FROM users;"),
receive
{cqerl_result, Tag, Result} ->
ok
end.
Here's a rundown of the possible return values:
SELECT
queries will yield result of type #cql_result{}
(more details below).#cql_schema_changed{type, keyspace, table}
void
if everything worked correctly.{error, Reason}
in the synchronous case, and {error, Tag, Reason}
in the asynchronous case).#cql_result{}
The return value of SELECT
queries will be a #cql_result{}
record, which can be used to obtain rows as proplists and fetch more result if available
{ok, _SchemaChange} = cqerl:run_query(test_keyspace, "CREATE TABLE users(id uuid, name varchar, password varchar)"),
{ok, void} = cqerl:run_query(Client, #cql_query{
statement = "INSERT INTO users(id, name, password) VALUES(?, ?, ?);",
values = #{
id => new,
name => "matt",
password => "qwerty"
},
keyspace = test_keyspace
}),
{ok, Result} = cqerl:run_query(test_keyspace, "SELECT * FROM users;").
Row = cqerl:head(Result),
Tail = cqerl:tail(Result),
{Row, Tail} = cqerl:next(Result),
1 = cqerl:size(Result),
0 = cqerl:size(Tail),
empty_dataset = cqerl:next(Tail),
[Row] = cqerl:all_rows(Result),
#{name := <<"matt">>, password := <<"qwerty">>} = Row.
#cql_result{}
can also be used to fetch more results, synchronously or asynchronously:
case cqerl:has_more_pages(Result) of
true -> {ok, Result2} = cqerl:fetch_more(Result);
false -> ok
end,
Tag2 = cqerl:fetch_more_async(Result),
receive
{result, Tag2, Result2} -> ok
end.
#cql_schema_changed{}
#cql_schema_changed{}
is returned from queries that change the database schema somehow (e.g. ALTER
, DROP
, CREATE
, and so on). It includes:
type
of change, either created
, updated
or dropped
keyspace
where the change happened, as a binarytable
on which the change was applied, as a binaryWhen performing queries, you can provide more information than just the query statement using the #cql_query{}
record, which includes the following fields:
The query statement
, as a string or binary
values
for binding variables from the query statement (see next section).
You can tell CQErl to consider a query reusable
or not (see below for what that means). By default, it will detect binding variables and consider it reusable if it contains (named or not) any. Queries containing named binding variables will be considered reusable no matter what you set reusable
to. If you explicitely set reusable
to false
on a query having positional variable bindings (?
), you would provide values with in {Type, Value}
pairs instead of {Key, Value}
.
You can specify how many rows you want in every result page using the page_size
(integer) field. The devs at Cassandra recommend a value of 100 (which is the default).
You can also specify what consistency
you want the query to be executed under. Possible values include:
any
one
two
three
quorum
all
local_quorum
each_quorum
local_one
In case you want to perform a lightweight transaction using INSERT
or UPDATE
, you can also specify the serial_consistency
that will be use when performing it. Possible values are:
serial
local_serial
In the #cql_query{}
record, you can provide values
as a map, where the keys match the column names or binding variable names in the statement, in lowercase.
Example:
% Deriving the value key from the column name
#cql_query{statement = "SELECT * FROM table1 WHERE id = ?", values = #{id => SomeId}},
% Explicitly providing a binding variable name
#cql_query{statement = "SELECT * FROM table1 WHERE id = :id_value", values = #{id_value => SomeId}},
Special cases include:
TTL
and TIMESTAMP
option in statements, in which case the proplist key would be [ttl]
and [timestamp]
respectively. Note that, while values for a column of type timestamp
are provided in milliseconds, a value for the TIMESTAMP
option is expected in microseconds.UPDATE keyspace SET set = set + ? WHERE id = 1;
. The name for this variable binding is set
, the name of the column, and it's expected to be an erlang list of values.UPDATE keyspace SET list = list + ? WHERE id = 1;
. The name for this variable binding is list
, the name of the column, and it's expected to be an erlang list of values.UPDATE keyspace SET map[?] = 1 WHERE id = 1;
. The name for this variable binding is key(map)
, where map
is the name of the column.UPDATE keyspace SET map['key'] = ? WHERE id = 1;
. The name for this variable binding is value(map)
, where map
is the name of the column.UPDATE keyspace SET list[?] = 1 WHERE id = 1;
. The name for this variable binding is idx(list)
, where list
is the name of the column.SELECT * FROM keyspace LIMIT ?
. The name for the LIMIT
variable is [limit]
.Also, when providing the value for a uuid
-type column, you can give the value new
, strong
or weak
, in which case CQErl will generate a random UUID (v4), with either a strong or weak number random generator.
Finally, when providing the value for a timeuuid
or timestamp
column, you can give the value now
, in which case CQErl will generate a normal timestamp, or a UUID (v1) matching the current date and time.
To perform batched queries (which can include any non-SELECT
DML statements), simply put one or more #cql_query{}
records in a #cql_query_batch{}
record, and run it in place of a normal #cql_query{}
. #cql_query_batch{}
include the following fields:
consistency
level to apply when executing the batch of queries.mode
of the batch, which can be logged
, unlogged
or counter
. Running a batch in unlogged mode removes the performance penalty of enforcing atomicity. The counter mode should be used to perform batched mutation of counter values.queries
.InsertQ = #cql_query{statement = "INSERT INTO users(id, name, password) VALUES(?, ?, ?);"},
{ok, void} = cqerl:run_query(Client, #cql_query_batch{
mode=unlogged,
queries=[
InsertQ#cql_query{values = #{id => new, name => "sean", password => "12312"}},
InsertQ#cql_query{values = #{id => new, name => "jenna", password => "11111"}},
InsertQ#cql_query{values = #{id => new, name => "kate", password => "foobar"}}
]
}).
If any of the following is true:
#cql_query{}
's reusable
field to true
?
) and you did not explicitely reusable
to false:name
) (ignores the value of reusable
)the query is considered reusable. This means that the first time this query will be performed, CQErl will ask the connected Cassandra node to prepare the query, after which, internally, a query ID will be used instead of the query statement when executing it. That particular cassandra node will hold on to the prepared query on its side and subsequent queries that use exactly the same statement will be performed faster and with less network traffic.
CQErl can tell which query has been previously prepared on which node by keeping a local cache, so this happens transparently. Note that prepared queries are stored per-connection, so each client process will need to prepare its own copy of any given query (though this also occurs transparently).
Here is a correspondance of cassandra column types with their equivalent Erlang types (bold denotes what will used in result sets, the rest is what is accepted).
Cassandra Column Type | Erlang types |
---|---|
ascii | binary, string (only US-ASCII) |
bigint | integer (signed 64-bit) |
blob | binary |
boolean | true , false |
counter | integer (signed 64-bit) |
decimal | {Unscaled :: integer(), Scale :: integer()} |
double | float (signed 64-bit) |
float | float (signed 32-bit) |
int | integer (signed 32-bit) |
timestamp | integer (milliseconds, signed 64-bit), now , binary or string |
uuid | binary, new |
varchar | binary, string |
varint | integer (arbitrary precision) |
timeuuid | binary, now |
inet | {X1, X2, X3, X4} (IPv4), {Y1, Y2, Y3, Y4, Y5, Y6, Y7, Y8} (IPv6), string or binary |
By default, this client library assumes we're talking to a 2.2+ or 3+ instance of Cassandra. 2.1.x the latest native protocol (v4) which is required to use some of the newest datatypes and optimizations. To tell CQErl to use the older protocol version (v3), which is required to connect to a 2.1.x instance of Cassandra, you can set the protocol_version
option to the integer 3
, in your configuration file, i.e.
[
{cqerl, [
{cassandra_nodes, [ { "127.0.0.1", 9042 } ]},
{protocol_version, 3}
]},
]
or in a cqerl:add_group/3
call:
G = cqerl:add_group(["127.0.0.1:9042"], [{protocol_version, 3}, {keyspace, oper}], 1).
Just include this repository in your project's rebar.config
file and run ./rebar get-deps
. See rebar for more details on how to use rebar for Erlang project management.
As noted earlier, this library uses Cassandra's newest native protocol versions (v4, or v3 optionally), which is said to perform better than the older Thrift-based interface. It also speaks CQL version 3, and uses new features available in Cassandra 2.X, such as paging, parametrization, query preparation and so on.
All this means is that this library works with Cassandra 2.1.x (2.2+ or 3+ recommended), configured to enable the native protocol. This documentation page gives details about the how to configure this protocol. In the cassandra.yaml
configuration file of your Cassandra installation, the start_native_transport
need to be set to true and you need to take note of the value for native_transport_port
, which is the port used by this library.
This library implementes Token Aware Policy (TAP) for improved query efficincy. TAP allows the client to calculate which node(s) hold the data being read/written and use a direct connection to that node when available. By contrast, without TAP a node will be randomly chosen from all available and, if that node does not contain the data, it will forward on the query to the appropriate node, increasing load, network traffic and latency. If, for any reason, the TAP algorithm cannot determine or access the appropriate node, it will automatically fall back to non-TAP operation for that query.
The current TAP implementation has the following limitations:
cqerl:run_query(#cqerl_query{statement = "SELECT * FROM user WHERE id = ?",
values = #{id => 'user1'},
keyspace = user_db})
However this will not:
cqerl:run_query(user_db, "SELECT * FROM user WHERE id = 'user1')
cqerl:run_query(#cqerl_query{statement = "SELECT * FROM user_db.user WHERE id = ?",
values = #{id => 'user1'})
As an alternative to client groups that are selected based on keyspace, groups may also be assigned a name. This is done either by using cqerl:add_group/4
or by including the name
attribute in the client_group
configuration item.
Named groups are never automatically used by - instead
This rework contains a number of compatability-breaking changes from the original cqerl
. Specifically:
get_client
and close_client
calls no longer exist nor are needed. Instead, see add_group
.CQErl includes a test suite that you can run yourself, especially if you plan to contribute to this project.
test/test.config
and put your own cassandra's configurationsmake test
The MIT License (MIT)
Copyright (c) 2013 Mathieu D'Amours
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.