c++ - Boost Asio pattern with GUI and worker thread -
i implement boost asio pattern using thread gui , worker thread socket io.
the worker thread use boost::asio::io_service
manage socket client. operations on sockets performed worker thread only.
the gui thread needs send , receive messages worker thread.
i can't figure how implement pattern using boost asio.
i've implemented socket communication in standard asio way (i call io_service.run()
worker thread , use async_read_some
/async_send
). don't need strands
because io_service.run()
called worker thread only.
now i'm trying add cross thread message queue. how can implement it?
should run
io_service
gui thread too?
or should use strands
post
post messages gui thread worker thread (without calling io_service.run()
or io_service.poll_one()
gui thread), , use operating system's gui message loop post messages worker thread gui thread?
if need call io_service.run()
or io_service.poll_one()
gui thread too, need use strands
on socket operations, since io_service
shared between 2 threads?
edit: clarify question, whatever can, implement message queue, using boost asio, relying on other libraries if boost asio can't job.
message passing generic. there various ways approach problem, , solution dependent on desired behavioral details. example, blocking or non-blocking, controlling memory allocation, context, etc.
boost.lockfree provides thread-safe lock-free non-blocking queues singe/multi consumer/producers. tends lend nicely event loops, not ideal consumer blocked, waiting producer signal synchronization construct.
boost::lockfree::queue<message_type> worker_message_queue; void send_worker_message(const message_type& message) { // add message worker message queue. worker_message_queue.push(message); // add work worker_io_service process queue. worker_io_service.post(&process_message); } void process_message() { message_type message; // if message not retrieved, return early. if (!worker_message_queue.pop(message)) return; ... }
alternatively, boost.asio's
io_service
can function queue. message needs bound specified handler.void send_worker_message(const message_type& message) { // add work worker_io_service process message. worker_io_service.post(boost::bind(&process_message, message)); } void process_message(message_type& message) { ... }
this comment suggest desire more message passing. sounds though end goal allow 1 thread cause thread invoke arbitrary functions.
if case, consider:
- using boost.signals2 managed signals , slots implementation. allows arbitrary functions register signal.
- using boost.asio's
io_service
setup signal emissions. if gui thread , worker thread each have ownio_service
, worker thread can post handler gui thread'sio_service
emit signal. in gui thread's main loop, pollio_service
, emit signal, , cause slots invoked within gui thread's context.
here complete example 2 threads pass message (as unsigned int
) 1 another, causing arbitrary functions invoked within thread.
#include <iostream> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/signals2.hpp> #include <boost/thread.hpp> /// @brief io_service dedicated gui. boost::asio::io_service gui_service; /// @brief io_service dedicated worker. boost::asio::io_service worker_service; /// @brief work keep gui_service stopping prematurely. boost::optional<boost::asio::io_service::work> gui_work; /// @brief hello slot. void hello(int x) { std::cout << "hello " << x << " thread " << boost::this_thread::get_id() << std::endl; } /// @brief world slot. void world(int x) { std::cout << "world " << x << " thread " << boost::this_thread::get_id() << std::endl; } /// @brief type signals. typedef boost::signals2::signal<void (int)> signal_type; void emit_then_notify_gui(signal_type& signal, unsigned int x); /// @brief emit signals message worker. void emit_then_notify_worker(signal_type& signal, unsigned int x) { // emit signal, causing registered slots run within thread. signal(x); // if x has been exhausted, cause gui service run out of work. if (!x) { gui_work = boost::none; } // otherwise, post work worker service. else { std::cout << "gui thread: " << boost::this_thread::get_id() << " scheduling other thread emit signals" << std::endl; worker_service.post(boost::bind( &emit_then_notify_gui, boost::ref(signal), --x)); } } /// @brief emit signals message worker. void emit_then_notify_gui(signal_type& signal, unsigned int x) { // emit signal, causing registered slots run within thread. signal(x); // if x has been exhausted, cause gui service run out of work. if (!x) { gui_work = boost::none; } // otherwise, post more work gui. else { std::cout << "worker thread: " << boost::this_thread::get_id() << " scheduling other thread emit signals" << std::endl; gui_service.post(boost::bind( &emit_then_notify_worker, boost::ref(signal), --x)); } } void worker_main() { std::cout << "worker thread: " << boost::this_thread::get_id() << std::endl; worker_service.run(); } int main() { signal_type signal; // connect slots signal. signal.connect(&hello); signal.connect(&world); boost::optional<boost::asio::io_service::work> worker_work( boost::ref(worker_service)); gui_work = boost::in_place(boost::ref(gui_service)); std::cout << "gui thread: " << boost::this_thread::get_id() << std::endl; // spawn off worker thread. boost::thread worker_thread(&worker_main); // add work worker. worker_service.post(boost::bind( &emit_then_notify_gui, boost::ref(signal), 3)); // mocked gui main loop. while (!gui_service.stopped()) { // other gui actions. // perform message processing. gui_service.poll_one(); } // cleanup. worker_work = boost::none; worker_thread.join(); }
and output:
gui thread: b7f2f6d0 worker thread: b7f2eb90 hello 3 thread b7f2eb90 world 3 thread b7f2eb90 worker thread: b7f2eb90 scheduling other thread emit signals hello 2 thread b7f2f6d0 world 2 thread b7f2f6d0 gui thread: b7f2f6d0 scheduling other thread emit signals hello 1 thread b7f2eb90 world 1 thread b7f2eb90 worker thread: b7f2eb90 scheduling other thread emit signals hello 0 thread b7f2f6d0 world 0 thread b7f2f6d0
Comments
Post a Comment