I/O in concurrent program(并发程序中的 I/O)
问题描述
我正在开发一个并发程序;它有两个线程,一个监听来自服务器的消息,另一个向它发送消息.我需要从用户那里获取命令(使用 cin?)并同时显示来自服务器的消息.
I'm working on a concurrent program; it has two threads, one of which listens messages from a server and the other one sends messages to it. I need to obtain commands from the user (using cin?) and show messages coming from the server both at the same time.
我该如何处理这种情况?问题是,如果我在收到消息时读取用户的命令,则用户的输入会与其他内容混淆.
How can I handle that situation? The problem is that if I'm reading a command from the user when a message comes, the user's input is messed up with other stuff.
提前致谢
推荐答案
我拿了我的旧示例代码并试图把它变成一个 MCVE.(最小"并不一定意味着短",是吗?)
I took my old sample code and tried to turn it into an MCVE. ("Minimal" does not necessarily mean "short", does it?)
这是一个非常简单的shell"概念,它支持一个线程进行输入,而多个线程可以进行输出.
This is a very simple concept of a "shell" which supports one thread for input while multiple threads may do output.
键盘输入无回声.这是不可移植的.因此我提供了两个函数
getChar()
的实现 –一个用于 MS Windows,另一个用于非 MS Windows(实际上只考虑 *ix 操作系统).后者受到 SO:如何在 Linux 中实现 C 的 getch() 函数的强烈启发"?.
The keyboard input is done non-echoing. This is non-portable. Therefore I provide two implementations of function
getChar()
– one for MS Windows and another for non-MS Windows (which considers actually only *ix OSes). The latter is "strongly inspired" by SO: How to implement getch() function of C in Linux?.
输入的字符存储在std::string
中.
输出擦除提示和当前输入文本(重复 " "
的输出),打印输出文本(包括换行符),并打印再次提示和当前输入缓冲区.
The output erases the prompt and the current input text (repeating the output of " "
resp.), prints the output text (incl. newline), and prints the prompt and current input buffer again.
输出受互斥锁保护以授予线程安全性.
The output is mutex guarded to grant thread-safety.
这是示例代码miniShell.cc
:
// system header:
#ifdef _WIN32
#include <conio.h>
#else // (not) _WIN32
#include <termios.h>
#include <unistd.h>
#include <stdio.h>
#endif // _WIN32
/// reads a character from console without echo.
#ifdef _WIN32
inline int getChar() { return _getch(); }
#else // (not) _WIN32
int getChar()
{
struct termios oldattr;
tcgetattr(STDIN_FILENO, &oldattr);
struct termios newattr = oldattr;
newattr.c_lflag &= ~(ICANON | ECHO);
tcsetattr(STDIN_FILENO, TCSANOW, &newattr);
const int ch = getchar();
tcsetattr(STDIN_FILENO, TCSANOW, &oldattr);
return ch;
}
#endif // _WIN32
// standard C/C++ header:
#include <cstring>
#include <mutex>
#include <string>
/* provides a class for a simple thread-safe mini-shell.
*
* It is assumed that one thread may await user input (read()) while
* another thread may (or may not) output text from time to time.
* The mini-shell grants that the input line is always the last line.
*/
class Console {
// variable:
private:
// mutex for console I/O
std::mutex _mtx;
// current input
std::string _input;
// prompt output
std::string _prompt;
// methods:
public:
/// constructor.
Console() { }
// disabled:
Console(const Console&) = delete;
Console& operator = (const Console&) = delete;
// reads a line from console and returns input string
std::string read();
/* writes text to console.
*
* text the text
* size size of text
*/
void write(const char *text, size_t size);
void write(const char *text) { write(text, strlen(text)); }
void write(const std::string &text) { write(text.c_str(), text.size()); }
};
// standard C/C++ header:
#include <atomic>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <thread>
std::string Console::read()
{
{ // activate prompt
std::lock_guard<std::mutex> lock(_mtx);
_prompt = "> "; _input.clear();
std::cout << _prompt << std::flush;
}
#ifdef _WIN32
enum { Enter = '
', BackSpc = '' };
#else // (not) _WIN32
enum { Enter = '
', BackSpc = 127 };
#endif // _WIN32
// input loop
for (;;) {
switch (int c = getChar()) {
case Enter: {
std::lock_guard<std::mutex> lock(_mtx);
std::string input = _input;
_prompt.clear(); _input.clear();
std::cout << std::endl;
return input;
} // unreachable: break;
case BackSpc: {
std::lock_guard<std::mutex> lock(_mtx);
if (_input.empty()) break; // nothing to do
_input.pop_back();
std::cout << " " << std::flush;
} break;
default: {
if (c < ' ' || c >= 'x7f') break;
std::lock_guard<std::mutex> lock(_mtx);
_input += c;
std::cout << (char)c << std::flush;
} break;
}
}
}
void Console::write(const char *text, size_t len)
{
if (!len) return; // nothing to do
bool eol = text[len - 1] == '
';
std::lock_guard<std::mutex> lock(_mtx);
// remove current input echo
if (size_t size = _prompt.size() + _input.size()) {
std::cout
<< std::setfill('') << std::setw(size) << ""
<< std::setfill(' ') << std::setw(size) << ""
<< std::setfill('') << std::setw(size) << "";
}
// print text
std::cout << text;
if (!eol) std::cout << std::endl;
// print current input echo
std::cout << _prompt << _input << std::flush;
}
// a sample application
// shared data for main thread and data processing thread
struct Shared {
// flag: true ... exit communication thread and main loop
std::atomic<bool> exit;
// flag: true ... start data processing
std::atomic<bool> start;
// the mini console
Console console;
// constructor.
Shared(): exit(false), start(true) { }
};
void dataProc(Shared &shared)
{
while (!shared.exit) {
// "busy" wait for start (condition would be more elegant)
while (!shared.start) {
if (shared.exit) return;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// do data processing
shared.console.write("Starting data processing.");
for (int i = 0, n = 20; i < n; ++i) {
// "busy" wait for start (condition would be more elegant)
if (!shared.start) {
shared.console.write("Data processing stopped.");
while (!shared.start) {
if (shared.exit) return;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
shared.console.write("Data processing restarted.");
}
// consume some time (to simulate heavy computation)
std::this_thread::sleep_for(std::chrono::milliseconds(250));
// do some console output about progress
{ std::ostringstream fmt;
fmt << "Step " << i + 1 << '/' << n;
shared.console.write(fmt.str());
}
}
shared.console.write("Data processing done.");
shared.start = false;
}
}
void processInput(const std::string &input, Shared &shared)
{
if (input == "start") shared.start = true;
else if (input == "stop") shared.start = false;
else if (input == "exit") shared.exit = true;
else if (input.size()) shared.console.write("Wrong command!");
}
int main()
{
Shared shared;
// start a thread for some kind of data processing
std::thread threadDataProc(&dataProc, std::ref(shared));
// main loop
while (!shared.exit) {
shared.console.write("Commands: start stop exit");
std::string input = shared.console.read();
processInput(input, shared);
}
// join data processing thread
threadDataProc.join();
// done
return 0;
}
我在 VS2013 和 Windows 10 上的 cygwin 的 bash/Xterm 中编译和测试.(cygwin 是我手头上最接近 Linux 的.)
I compiled and tested in VS2013 vs. bash/Xterm of cygwin on Windows 10. (cygwin was the closest to Linux I have at hand.)
请记住,在我编写这段代码时,简单比完美或舒适更重要.
Please, keep in mind that I wrote this code whereby simplicity was more important than perfection or comfort.
这篇关于并发程序中的 I/O的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:并发程序中的 I/O
- STL 中有 dereference_iterator 吗? 2022-01-01
- 一起使用 MPI 和 OpenCV 时出现分段错误 2022-01-01
- 静态初始化顺序失败 2022-01-01
- Stroustrup 的 Simple_window.h 2022-01-01
- 近似搜索的工作原理 2021-01-01
- C++ 协变模板 2021-01-01
- 使用/clr 时出现 LNK2022 错误 2022-01-01
- 与 int by int 相比,为什么执行 float by float 矩阵乘法更快? 2021-01-01
- 如何对自定义类的向量使用std::find()? 2022-11-07
- 从python回调到c++的选项 2022-11-16