Возможно ли использовать вывод процесса во время выполнения процесса?

#c #boost #boost-asio #boost-process

Вопрос:

Boost.process позволяет использовать Boost.asio для выполнения асинхронного чтения.

Насколько я понимаю, это полезно для чтения вывода во время выполнения процесса без необходимости ждать завершения процесса.

Но чтобы получить доступ к этому выводу, нужно ли ждать завершения процесса или можно получить к нему доступ во время выполнения процесса и как?

На самом деле мне нужно получить доступ к началу вывода процесса (чтобы проверить, что он начался так, как ожидалось), сохраняя при этом его выполнение.

Чтобы детализировать контекст, я запускаю процесс, который хочу сохранить до конца выполнения:

 boost::asio::io_service ios;
std::vector<char> buf;

bp::child c("process_that_needs_to_keep_running", args, 
bp::std_out > boost::asio::buffer(buf), ios);

ios.run();
// I DON'T WANT WAIT FOR c TO TERMINATE
// but I want to check that buf contains some text that ensures me it started correctly
// the issue I have here is that I don't know how to read from buf, since its size and content might not be consistent
// is it possible to take a snapshot for instance?
check_started_correctly(buf);
 

Здесь проблема в том, что производитель создает вывод, который я не контролирую, я просто выдаю вывод.

Комментарии:

1. Если вы не смогли прочитать поток вывода процесса до его завершения, вам придется где-то хранить весь вывод. У вас много оперативной памяти?

2. @EOF итак, как я могу получить к нему доступ во время выполнения процесса, например, для получения некоторой информации в строке 5, возможно ли это из boots ::asio::buffer?

3. @FlashMcQueen Предполагая, что вы имеете в виду файловый ввод-вывод в операционной системе * NIX: пока ввод-вывод был сброшен в ОС и не застрял в пользовательском буфере, все в порядке. Но это много «если», поэтому может быть полезно немного больше контекста.

4. Весь смысл каналов unix заключается в том, что они позволяют нескольким программам формировать параллельный конвейер для обработки данных. Первоначально, когда Unix был изобретен на однопроцессорных машинах, это не давало параллелизма , но это означало, что потоковые процессы могли работать над проблемами, которые потребовали бы больше памяти для хранения некоторого промежуточного результата, чем было доступно на любой существующей машине. Производитель может запускаться, заполнять буфер (буфер канала) данными, приостанавливать работу, когда буфер заполнен. Затем потребитель может быть запланирован и считан до тех пор, пока буфер не опустеет. Промойте и повторите.

5. @Frank Я попытался предоставить некоторый код для более подробной информации, надеясь, что это прояснит мой вопрос.

Ответ №1:

Если вы используете bp::std_out > some_kind_of_buffer_or_future , вы обычно получаете результат только при выходе.

Однако вы можете использовать async_pipe :

 bp::async_pipe pipe(io);

bp::child c( //
    "/bin/bash",
    std::vector<std::string>{
        "-c",
        "for a in {1..20}; do sleep 1; echo message $a; done",
    },                    //
    bp::std_out > pipe,   //
    bp::on_exit(on_exit), //
    io);
 

Теперь вам нужно явно выполнить ввод-вывод в этом канале:

 boost::asio::streambuf sb;
async_read_until(                //
    pipe, sb, "message 5n",     //
    [amp;](error_code ec, size_t) { //
        std::cout << "Got message 5 (" << ec.message() << ")" << std::endl;
    });
 

Это работает:

Жить на Coliru

 #include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <boost/asio.hpp>
#include <iostream>

namespace bp = boost::process;
using boost::system::error_code;

namespace /*file-static*/ {
    using namespace std::chrono_literals;
    static auto       now = std::chrono::steady_clock::now;
    static const auto t0  = now();

    static auto timestamp() {
        return std::to_string((now() - t0) / 1.s)   "s ";
    }
} // namespace

int main() {
    boost::asio::io_context io;
    bp::async_pipe pipe(io);

    auto on_exit = [](int code, std::error_code ec) {
        std::cout << timestamp() << "on_exit: " << ec.message() << " code "
                  << code << std::endl;
    };

    bp::child c( //
        "/bin/bash",
        std::vector<std::string>{
            "-c",
            "for a in {1..20}; do sleep 1; echo message $a; done",
        },                    //
        bp::std_out > pipe,   //
        bp::on_exit(on_exit), //
        io);

    boost::asio::streambuf sb;
    async_read_until(                //
        pipe, sb, "message 5n",     //
        [amp;](error_code ec, size_t) { //
            std::cout << timestamp() << "Got message 5 (" << ec.message() << ")"
                      << std::endl;
        });

    io.run();
}
 

С принтами

 5.025400s Got message 5 (Success)
20.100547s on_exit: Success code 0
 

Таким образом, вы можете реагировать на контент, который вы ищете, когда это происходит. Имейте в виду, что ОС и оболочки выполняют буферизацию потоков в каналах, но по умолчанию используется буферизация строк, поэтому вы можете ожидать получения входных данных сразу после печати новой строки.

Большие буферы?

В приведенном выше виде предполагается, что вы можете буферизировать весь вывод до интересного сообщения. Что, если это гигабайты? Пока ваш шаблон не имеет гигабайт, вы можете продолжать чтение до тех пор, пока не будут выполнены критерии.

Давайте преобразуем наш пример в асинхронный grep, который ищет регулярное classs*w _heap выражение во всех заголовках boost. Конечно, это много мегабайт данных, но мы используем только буфер размером 10 КБ:

 std::string text;
auto buf = boost::asio::dynamic_buffer(text, 10 * 1024); // max 10 kilobyte

size_t total_received =0;
boost::regex const re(R"(classs*w _heap)");
 

Теперь мы создаем цикл чтения, который считывает до совпадения или когда буфер заполнен:

 std::function<void()> wait_for_message;
wait_for_message = [amp;] {
    async_read_until(                         //
        pipe, buf, re,                        //
        [amp;](error_code ec, size_t received) { //
            std::cerr << 'x0d' << timestamp() << "Checking for message ("
                      << ec.message() << ", total " << total_received
                      << ")                ";

            if (received || ec != boost::asio::error::not_found) {
                total_received  = received;
                buf.consume(received);

                boost::smatch m;
                if (regex_search(text, m, re)) {
                    std::cout << "n" << timestamp()
                              << "Found: " << std::quoted(m.str()) << " at "
                              << (total_received - m.size()) << " bytes"
                              << std::endl;
                }
            } else {
                // discard 90% of buffer capacity
                auto discard =
                    std::min(buf.max_size() / 10 * 9, buf.size());
                total_received  = discard;
                buf.consume(discard);
            }

            if (!ec | (ec == boost::asio::error::not_found))
                wait_for_message();
            else
                std::cout << "n" << timestamp() << ec.message() << std::endl;
        });
};
 

Конечно, эта система может пропустить совпадения, если совпадение превышает 10% от размера буфера (потому что мы сохраняем только 10% предыдущего содержимого буфера, чтобы совпадения перекрывали границы чтения).

Опять же, посмотрите это в прямом эфире на Coliru

 #include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <boost/asio.hpp>
#include <boost/regex.hpp>
#include <iostream>
#include <iomanip>

namespace bp = boost::process;
using boost::system::error_code;

namespace /*file-static*/ {
    using namespace std::chrono_literals;
    static auto       now = std::chrono::steady_clock::now;
    static const auto t0  = now();

    static auto timestamp() {
        return std::to_string((now() - t0) / 1.s)   "s ";
    }
} // namespace

int main() {
    boost::asio::io_context io;
    bp::async_pipe pipe(io);

    auto on_exit = [](int code, std::error_code ec) {
        std::cout << timestamp() << "on_exit: " << ec.message() << " code "
                  << code << std::endl;
    };

    bp::child c( //
        "/usr/bin/find",
        std::vector<std::string>{"/usr/local/include/boost", "-name",
                                 "*.hpp", "-exec", "cat", "{}", " "},
        bp::std_out > pipe,   //
        bp::on_exit(on_exit), //
        io);

    std::string text;
    auto buf = boost::asio::dynamic_buffer(text, 10 * 1024); // max 10 kilobyte

    size_t total_received =0;
    boost::regex const re(R"(classs*w _heap)");

    std::function<void()> wait_for_message;
    wait_for_message = [amp;] {
        async_read_until(                         //
            pipe, buf, re,                        //
            [amp;](error_code ec, size_t received) { //
                std::cerr << 'x0d' << timestamp() << "Checking for message ("
                          << ec.message() << ", total " << total_received
                          << ")                ";

                if (received || ec != boost::asio::error::not_found) {
                    total_received  = received;
                    buf.consume(received);

                    boost::smatch m;
                    if (regex_search(text, m, re)) {
                        std::cout << "n" << timestamp()
                                  << "Found: " << std::quoted(m.str()) << " at "
                                  << (total_received - m.size()) << " bytes"
                                  << std::endl;
                    }
                } else {
                    // discard 90% of buffer capacity
                    auto discard =
                        std::min(buf.max_size() / 10 * 9, buf.size());
                    total_received  = discard;
                    buf.consume(discard);
                }

                if (!ec | (ec == boost::asio::error::not_found))
                    wait_for_message();
                else
                    std::cout << "n" << timestamp() << ec.message() << std::endl;
            });
    };

    wait_for_message();
    io.run();

    std::cout << timestamp() << " - Done, total_received: " << total_received << "n";
}
 

Который печатает

 2.033324s Found: "class d_ary_heap" at 6747512 bytes
2.065290s Found: "class pairing_heap" at 6831390 bytes
2.071888s Found: "class binomial_heap" at 6860833 bytes
2.072715s Found: "class skew_heap" at 6895677 bytes
2.073348s Found: "class fibonacci_heap" at 6921559 bytes
34.729355s End of file
34.730515s on_exit: Success code 0
34.730593s  - Done, total_received: 154746011
 

Или в прямом эфире с моей машины:

введите описание изображения здесь