How to avoid data race with `asio::ip::tcp::iostream`?(如何使用`asio::ip::tcp::iostream`避免数据竞争?)
How do I avoid a data race when using two threads to send and receive over an asio::ip::tcp::iostream
I am writing a program that uses an asio::ip::tcp::iostream
for input and output. The program accepts commands from the (remote) user over port 5555 and sends messages over that same TCP connection to the user. Because these events (commands received from the user or messages sent to the user) occur asynchronously, I have separate transmit and receive threads.
In this toy version, the commands are "one", "two" and "quit". Of course "quit" quits the program. The other commands do nothing, and any unrecognized command causes the server to close the TCP connection.
The transmitted messages are simple serial-numbered messages that are are sent once per second.
In both this toy version and the real code I'm trying to write, the transmit and receive processes are both using blocking IO, so there doesn't appear to be a good way to use a std::mutex
or other synchronization mechanism. (In my attempts, one process would grab the mutex and then block, which isn't going to work for this.)
g++ -DASIO_STANDALONE -Wall -Wextra -pedantic -std=c++14 concurrent.cpp -o concurrent -lpthread
To test, I run the server with this command:
valgrind --tool=helgrind --log-file=helgrind.txt ./concurrent
Then I use telnet 5555
in another window to create a connection to the server. What helgrind
correctly points out is that there is a data race because both runTx
and runRx
are trying to access the same stream asynchronously:
==16188== Possible data race during read of size 1 at 0x1FFEFFF1CC by thread #1
#include <asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>
class Console {
Console() :
bool getQuitValue() const { return want_quit; }
int run(std::istream *in, std::ostream *out);
bool wantReset() const { return want_reset; }
int runTx(std::istream *in);
int runRx(std::ostream *out);
bool want_quit;
bool want_reset;
int Console::runTx(std::istream *in) {
static const std::array<std::string, 3> cmds{
"quit", "one", "two",
std::string command;
while (!want_quit && !want_reset && *in >> command) {
if (command == cmds.front()) {
want_quit = true;
if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
want_reset = true;
std::cout << "unknown command [" << command << "]
} else {
std::cout << command << '
return 0;
int Console::runRx(std::ostream *out) {
for (int i=0; !(want_reset || want_quit); ++i) {
(*out) << "This is message number " << i << '
return 0;
int Console::run(std::istream *in, std::ostream *out) {
want_reset = false;
std::thread t1{&Console::runRx, this, out};
int status = runTx(in);
return status;
int main()
Console con;
asio::io_service ios;
// IPv4 address, port 5555
asio::ip::tcp::acceptor acceptor(ios,
asio::ip::tcp::endpoint{asio::ip::tcp::v4(), 5555});
while (!con.getQuitValue()) {
asio::ip::tcp::iostream stream;
acceptor.accept(*stream.rdbuf());, &stream);
if (con.wantReset()) {
std::cout << "resetting
Yeah, you're sharing the socket that underlies the stream, without synchronization
Sidenote, same with the boolean flags, which can easily be "fixed" by changing:
std::atomic_bool want_quit;
std::atomic_bool want_reset;
How To Solve
To be honest, I don't think there is a good solution. You said it yourself: the operations are asynchronous, so you'll be in trouble if you try to do them synchronously.
但我们可以使用 Boost Iostreams 来破解:
But we could hack one up using Boost Iostreams:
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>
// .... later:
// HACK: procure a _separate `ostream` to prevent the race, using the same fd
namespace bio = boost::iostreams;
bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
bio::stream<bio::file_descriptor_sink> hack_ostream(fds);, hack_ostream);
Don't do that. It's a kludge. You're complicating things, apparently in an attempt to avoid using asynchronous code. I'd bite the bullet.
If you would like to learn about some middle ground, look at stackful coroutines (
Note I refactored to remove the need for pointers. You're not transferring ownership, so a reference will do. In case you didn't know how to pass the reference to a
constructor, the trick is in thestd::ref
you'll see.
[For stress testing I have greatly reduced the delays.]
#include <boost/asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>
class Console {
Console() :
bool getQuitValue() const { return want_quit; }
int run(std::istream &in, std::ostream &out);
bool wantReset() const { return want_reset; }
int runTx(std::istream &in);
int runRx(std::ostream &out);
std::atomic_bool want_quit;
std::atomic_bool want_reset;
int Console::runTx(std::istream &in) {
static const std::array<std::string, 3> cmds{
{"quit", "one", "two"},
std::string command;
while (!want_quit && !want_reset && in >> command) {
if (command == cmds.front()) {
want_quit = true;
if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
want_reset = true;
std::cout << "unknown command [" << command << "]
} else {
std::cout << command << '
return 0;
int Console::runRx(std::ostream &out) {
for (int i=0; !(want_reset || want_quit); ++i) {
out << "This is message number " << i << '
return 0;
int Console::run(std::istream &in, std::ostream &out) {
want_reset = false;
std::thread t1{&Console::runRx, this, std::ref(out)};
int status = runTx(in);
return status;
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream.hpp>
int main()
Console con;
boost::asio::io_service ios;
// IPv4 address, port 5555
boost::asio::ip::tcp::acceptor acceptor(ios, boost::asio::ip::tcp::endpoint{boost::asio::ip::tcp::v4(), 5555});
while (!con.getQuitValue()) {
boost::asio::ip::tcp::iostream stream;
// HACK: procure a _separate `ostream` to prevent the race, using the same fd
namespace bio = boost::iostreams;
bio::file_descriptor_sink fds(stream.rdbuf()->native_handle(), false); // close_on_exit flag is deprecated
bio::stream<bio::file_descriptor_sink> hack_ostream(fds);, hack_ostream);
if (con.wantReset()) {
std::cout << "resetting
netcat localhost 5555 <<<quit
This is message number 0
This is message number 1
This is message number 2
commands=( one two one two one two one two one two one two one two three )
while sleep 0.1; do echo ${commands[$(($RANDOM%${#commands}))]}; done | (while netcat localhost 5555; do sleep 1; done)
runs indefinitely, occasionally resetting the connection (when command "three" has been sent).

