A major benefit of actors is that they are very easy to test. No complex mocking, no simulation, simply sending and receiving messages. Unfortunately this does not apply to brokers, a special type of actors performing I/O. Sending messages still works, of course, but testing brokers involves the I/O part as opposed to sending messages. Does your actor parse its input correctly? Does it close connections appropriately? This blog post describes how CAF supports unit testing of the I/O aspect outside the message passing realm.
A key concern of unit testing is reproducibility. When writing a test, you want to make sure to be in charge of each step individually. This is generally not the case when performing I/O operations, since this involves components of the operating system, e.g., sockets and the TCP stack. Fortunately, brokers in CAF never talk to the OS directly, which allows us to trick them into a fake environment.
Before we talk about how to take control of I/O, we first need to understand how the machinery behind brokers works.
The UML diagram above shows the relations a brokers has with I/O-related classes
in CAF. The middleman
is a singleton in CAF that provides access to various
I/O-related functionality. The three classes that are responsible for doing I/O
are multiplexer
, scribe
, and doorman
.
A scribe decouples a broker from the underlying networking APIs. To the broker,
it provides access to an output buffer and the scribe will create
new_data_msg
messages for the broker whenever the scribe has received enough
data according to the read policy set by the broker (via configure_read
).
Note that the user-facing API of the broker does not expose the scribe to the
programmer. All the broker needs to worry about is a connection_handle
. This
handle simply identifies the corresponding scribe and forwards the operation.
For example, if you call self->configure_read(hdl,
receive_policy::at_most(1024));
on a broker, it looks which scribe manages
hdl
and forwards the second argument to this scribe.
It is also worth mentioning that the new_data_msg
received by the broker will
always contain the same buffer. The scribe merely re-writes its content over
and over again.
A doorman, just like a scribe, decouples the broker from underlying networking
APIs. Whenever a new network connection has been established, the doorman
generates a new_connection_msg
for the broker.
The multiplexer is an I/O loop and a factory for scribes and doormen. If you want to change which networking API CAF is using, this is the (abstract) class you need to implement. It has no member functions in the UML diagram for brevity, but here are the important ones we need to know:
class multiplexer {
public:
/// Assigns an unbound scribe identified by `hdl` to `ptr`.
/// @warning Do not call from outside the multiplexer's event loop.
virtual void assign_tcp_scribe(abstract_broker* ptr,
connection_handle hdl) = 0;
/// Assigns an unbound doorman identified by `hdl` to `ptr`.
/// @warning Do not call from outside the multiplexer's event loop.
virtual void assign_tcp_doorman(abstract_broker* ptr, accept_handle hdl) = 0;
/// Simple wrapper for runnables
struct runnable;
using runnable_ptr = intrusive_ptr<runnable>;
/// Runs the multiplexers event loop.
virtual void run() = 0;
/// Invokes @p fun in the multiplexer's event loop, calling `fun()`
/// immediately when called from inside the event loop.
/// @threadsafe
template <class F>
void dispatch(F fun);
/// Invokes @p fun in the multiplexer's event loop, forcing
/// execution to be delayed when called from inside the event loop.
/// @threadsafe
template <class F>
void post(F fun);
// ...
};
Calling multiplexer::assign_tcp_scribe
will create a new scribe (for TCP-like
communication) and assign this scribe to the given broker. The function
assign_tcp_doorman
does the same thing for doormen. If you have ever used
ASIO, run
, dispatch
, and post
will remind you of ASIO’s io_service
. And
you are right. In fact, CAF’s asio_multiplexer
is simply implemented using an
io_service
. The function run
is called in a thread crated by the middleman
on startup. Whenever a broker receives a message from other actors, it calls
post
to schedule handling the message for later by creating a runnable
for
this task.
For testing brokers without actually doing I/O, CAF has a multiplexer
implementation that is solely meant for testing (since
0.14.1):
class test_multiplexer : public multiplexer {
public:
/// A buffer storing bytes.
using buffer_type = std::vector<char>;
/// Models pending data on the network, i.e., the network
/// input buffer usually managed by the operating system.
buffer_type& virtual_network_buffer(connection_handle hdl);
/// Returns the output buffer of the scribe identified by `hdl`.
buffer_type& output_buffer(connection_handle hdl);
/// Returns the input buffer of the scribe identified by `hdl`.
buffer_type& input_buffer(connection_handle hdl);
/// Returns `true` if this handle has been closed
/// for reading, `false` otherwise.
bool& stopped_reading(connection_handle hdl);
/// Returns `true` if this handle has been closed
/// for reading, `false` otherwise.
bool& stopped_reading(accept_handle hdl);
/// Stores `hdl` as a pending connection for `src`.
void add_pending_connect(accept_handle src, connection_handle hdl);
/// Accepts a pending connect on `hdl`.
void accept_connection(accept_handle hdl);
/// Reads data from the external input buffer until
/// the configured read policy no longer allows receiving.
void read_data(connection_handle hdl);
/// Appends `buf` to the virtual network buffer of `hdl`
/// and calls `read_data(hdl)` afterwards.
void virtual_send(connection_handle hdl, const buffer_type& buf);
/// Waits until a `runnable` is available and executes it.
void exec_runnable();
/// Returns `true` if a `runnable` was available, `false` otherwise.
bool try_exec_runnable();
/// Executes all pending `runnable` objects.
void flush_runnables();
// ...
};
The test_multiplexer
has several member function that “fake” network events
and allow you to manipulate the buffers of scribes directly. In particular,
virtual_send
allows you to fake incoming data on a connection_handle
. This
will cause the scribe to generate one or more new_data_msg
messages
(depending on the configured receive policy). Those messages are handled by the
broker immediately.
To simulate a remote connection, one needs to create a pending connection using
add_pending_connect
and cause the corresponding doorman to accept it via
accept_connection
.
Using the test multiplexer requires calling set_middleman(new
network::test_multiplexer)
in main
, before calling any I/O-related function
in CAF. Before showing the test multiplexer in action, we first implement
implement a broker we want to test.
The application we are going to test is a simplistic HTTP server. We do not
want to bother with actually parsing HTTP, so we always only consider the first
line of a HTTP header and check if it is equal to "GET / HTTP/1.1"
. If we
receive anything else, we will send a 404 as response.
However, we are going to deal with chunked input and we (pedantically) require
"\n\r"
for line breaks. We are not going to generate HTTP dynamically.
Instead, we use the following constants:
constexpr char http_valid_get[] = "GET / HTTP/1.1";
constexpr char http_get[] = "GET / HTTP/1.1\r\n"
"Host: localhost\r\n"
"Connection: close\r\n"
"Accept: text/plain\r\n"
"User-Agent: CAF/0.14\r\n"
"Accept-Language: en-US\r\n"
"\r\n";
constexpr char http_ok[] = "HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n"
"Connection: close\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"d\r\n"
"Hi there! :)\r\n"
"\r\n"
"0\r\n"
"\r\n"
"\r\n";
constexpr char http_error[] = "HTTP/1.1 404 Not Found\r\n"
"Connection: close\r\n"
"\r\n";
constexpr char newline[2] = {'\r', '\n'};
When receiving input chunks, we need to keep track of the state of our parser. We will also store each received line in a vector of strings and make use of CAF’s stateful actors, as shown below.
enum parser_state {
receive_new_line,
receive_continued_line,
receive_second_newline_half
};
struct http_state {
http_state(abstract_broker* self) : self_(self) {
// nop
}
~http_state() {
aout(self_) << "http worker finished with exit reason: "
<< self_->planned_exit_reason()
<< endl;
}
std::vector<std::string> lines;
parser_state ps = receive_new_line;
abstract_broker* self_;
};
using http_broker = caf::experimental::stateful_actor<http_state, broker>;
To make bookkeeping easier, we use one broker (our server) that accepts incoming connections and spawns a new HTTP worker per connection. The server does not need any additional state and is quite simple.
behavior server(broker* self) {
aout(self) << "server up and running" << endl;
return {
[=](const new_connection_msg& ncm) {
aout(self) << "fork on new connection" << endl;
auto worker = self->fork(http_worker, ncm.handle);
},
others >> [=] {
aout(self) << "unexpected: "
<< to_string(self->current_message()) << endl;
}
};
}
And here is the implementation of our http_worker
using the state from above.
behavior http_worker(http_broker* self, connection_handle hdl) {
// tell network backend to receive any number of bytes between 1 and 128
self->configure_read(hdl, receive_policy::at_most(128));
return {
[=](const new_data_msg& msg) {
assert(! msg.buf.empty());
assert(msg.handle == hdl);
// extract lines from received buffer
auto& lines = self->state.lines;
auto i = msg.buf.begin();
auto e = msg.buf.end();
// search position of first newline in data chunk
auto nl = std::search(i, e, std::begin(newline), std::end(newline));
// store whether we are continuing a previously started line
auto append_to_last_line = self->state.ps == receive_continued_line;
// check whether our last chunk ended between \r and \n
if (self->state.ps == receive_second_newline_half) {
if (msg.buf.front() == '\n') {
// simply skip this character
++i;
}
}
// read line by line from our data chunk
do {
if (append_to_last_line) {
append_to_last_line = false;
auto& back = lines.back();
back.insert(back.end(), i, nl);
} else {
lines.emplace_back(i, nl);
}
// if our last search didn't found a newline, we're done
if (nl != e) {
// skip newline and seek the next one
i = nl + sizeof(newline);
nl = std::search(i, e, std::begin(newline), std::end(newline));
}
} while (nl != e);
// store current state of our parser
if (msg.buf.back() == '\r') {
self->state.ps = receive_second_newline_half;
self->state.lines.pop_back(); // drop '\r' from our last read line
} else if (msg.buf.back() == '\n') {
self->state.ps = receive_new_line; // we've got a clean cut
} else {
self->state.ps = receive_continued_line; // interrupted in the middle
}
// we don't need to check for completion in any intermediate state
if (self->state.ps != receive_new_line)
return;
// we have received the HTTP header if we have an empty line at the end
if (lines.size() > 1 && lines.back().empty()) {
auto& out = self->wr_buf(hdl);
// we only look at the first line in our example and reply with our
// OK message if we receive exactly "GET / HTTP/1.1", otherwise
// we send a 404 HTTP response
if (lines.front() == http_valid_get)
out.insert(out.end(), std::begin(http_ok), std::end(http_ok));
else
out.insert(out.end(), std::begin(http_error), std::end(http_error));
// write data and close connection
self->flush(hdl);
self->quit();
}
},
[=](const connection_closed_msg&) {
self->quit();
},
others >> [=] {
aout(self) << "unexpected: "
<< to_string(self->current_message()) << endl;
}
};
}
Our HTTP worker receives chunks of 128 bytes. Once it detected the end of the
HTTP header (a blank line), it looks at the first line to see if it is equal to
http_valid_get
. If so, it sends the OK message, otherwise it sends the 404.
A minimal application using our brokers that always tries to open port 8080 is a three-liner:
int main() {
spawn_io_server(server, 8080);
await_all_actors_done();
shutdown();
}
Without further ado, here is our complete unit test for the example application.
namespace {
class fixture {
public:
fixture() {
// note: the middleman will take ownership of mpx_, but using
// this pointer is safe at any point before calling `shutdown`
mpx_ = new network::test_multiplexer;
set_middleman(mpx_);
// spawn the actor-under-test
aut_ = spawn_io(server);
// assign the acceptor handle to the AUT
aut_ptr_ = static_cast<abstract_broker*>(actor_cast<abstract_actor*>(aut_));
mpx_->assign_tcp_doorman(aut_ptr_, acceptor_);
// "open" a new connection to our server
mpx_->add_pending_connect(acceptor_, connection_);
mpx_->assign_tcp_scribe(aut_ptr_, connection_);
mpx_->accept_connection(acceptor_);
}
~fixture() {
anon_send_exit(aut_, exit_reason::kill);
// run the exit message and other pending messages explicitly,
// since we do not invoke any "I/O" from this point on that would
// trigger those messages implicitly
mpx_->flush_runnables();
await_all_actors_done();
shutdown();
}
// helper class for a nice-and-easy "mock(...).expect(...)" syntax
class mock_t {
public:
mock_t(fixture* thisptr) : this_(thisptr) {
// nop
}
mock_t(const mock_t&) = default;
mock_t& expect(const std::string& what) {
auto& buf = this_->mpx_->output_buffer(this_->connection_);
CAF_REQUIRE((buf.size() >= what.size()));
CAF_REQUIRE((std::equal(buf.begin(), buf.begin() + what.size(),
what.begin()));
buf.erase(buf.begin(), buf.begin() + what.size()));
return *this;
}
fixture* this_;
};
// mocks some input for our AUT and allows to
// check the output for this operation
mock_t mock(const char* what) {
std::vector<char> buf;
for (char c = *what++; c != '\0'; c = *what++)
buf.push_back(c);
mpx_->virtual_send(connection_, std::move(buf));
return {this};
}
actor aut_;
abstract_broker* aut_ptr_;
network::test_multiplexer* mpx_;
accept_handle acceptor_ = accept_handle::from_int(1);
connection_handle connection_ = connection_handle::from_int(1);
};
} // namespace <anonymous>
CAF_TEST_FIXTURE_SCOPE(http_tests, fixture)
CAF_TEST(valid_response) {
// write a GET message and expect an OK message as result
mock(http_get).expect(http_ok);
}
CAF_TEST(invalid_response) {
// write a GET with invalid path and expect a 404 message as result
mock("GET /kitten.gif HTTP/1.1\r\n\r\n").expect(http_error);
}
CAF_TEST_FIXTURE_SCOPE_END()
The constructor of our fixture spawns the server that we are going to test.
Since we need an actual pointer to the broker, we need to use actor_cast
to
get a pointer and downcast it afterwards. Using the pointer, we can setup a
connection using the test multiplexer. These steps will create a
new_connection_msg
for the server that spawns a http_worker
in response.
With the mock
member function, we create a simple API that allows us to
correlate inputs and outputs. The mock
function is a simple wrapper around
virtual_send
and expect
compares what output a broker has written in its
output_buffer
with the output we expect.
The test uses the unit test framework from CAF. Using a different test framework, e.g., Boost.Test, is straightforward. The complete source code can be found at GitHub.