Многопоточная обработка событий из epoll_wait с помощью сопрограмм

#c #coroutine #epoll #c -coroutine

#c #сопрограмма #epoll #c -сопрограмма

Вопрос:

У меня есть такой класс контекста ввода-вывода:

     class IOContext
    {
    public:
        explicit IOContext(uint32_t eventPoolCount);
        ~IOContext();

        IOContext(IOContextamp;) = delete;
        IOContextamp; operator=(IOContextamp;) = delete;
        IOContext(IOContextamp;amp;) = delete;
        IOContextamp; operator=(IOContextamp;amp;) = delete;

        void processAwaitingEvents(int timeout);

        void scheduleOperation(std::derived_from<IOOperation> autoamp; operation)
        {
            // ok?
            if(0 == epoll_ctl(this->epollFD, EPOLL_CTL_ADD, operation.fd, amp;operation.settings))
                return;

            if(errno == EEXIST)
                // re-init event
                if(0 == epoll_ctl(this->epollFD, EPOLL_CTL_MOD, operation.fd, amp;operation.settings))
                    return;

            throw std::system_error{errno, std::system_category(), strerror(errno)};
        }

    private:
        tinycoro::Generator<IOOperation::CoroHandle> yieldAwaitingEvents(int timeout);

        std::unique_ptr<epoll_event[]> eventsList;
        const uint32_t eventPoolCount;
        int epollFD = -1;
    };

 
     //.
    //. //constructor, destructor
    //.
    void IOContext::processAwaitingEvents(int timeout)
    {
        for(autoamp; event : this->yieldAwaitingEvents(timeout))
        {
            event.resume();
        }
    }

    tinycoro::Generator<IOOperation::CoroHandle> IOContext::yieldAwaitingEvents(int timeout)
    {
        int eventCount = epoll_wait(this->epollFD, eventsList.get(), this->eventPoolCount, timeout);
        if(eventCount == -1)
        {
            throw std::system_error{errno, std::system_category(), strerror(errno)};
        }

        for(int i = 0; i < eventCount;   i)
        {
            co_yield std::coroutine_handle<IOOperation>::from_address(eventsList[i].data.ptr);
        }
    }
 

и я хочу включить многопоточную обработку для событий ввода-вывода (IOOperation — это базовый класс, который содержит fd), поэтому, когда некоторые события будут готовы, сначала из N поток получит их и обработает.
Итак, я думаю, что я мог бы переместить eventsList в cpp как переменную thread_local, например:

     //.
    //. //constructor, destructor
    //.
    void IOContext::processAwaitingEvents(int timeout)
    {
        for(autoamp; event : this->yieldAwaitingEvents(timeout))
        {
            event.resume();
        }
    }

    tinycoro::Generator<IOOperation::CoroHandle> IOContext::yieldAwaitingEvents(int timeout)
    {
        thread_local std::unique_ptr<epoll_event[]> eventsList = std::make_unique<epoll_event[]>(this->eventPollCount);
        int eventCount = epoll_wait(this->epollFD, eventsList.get(), this->eventPoolCount, timeout);
        if(eventCount == -1)
        {
            throw std::system_error{errno, std::system_category(), strerror(errno)};
        }

        for(int i = 0; i < eventCount;   i)
        {
            co_yield std::coroutine_handle<IOOperation>::from_address(eventsList[i].data.ptr);
        }
    }
 

Но я не уверен, что это хорошая идея, т.Е. Безопасно ли это решение (я провожу несколько тестов, все было в порядке, но, несмотря на это, у меня есть сомнения).
Я мог бы сделать yieldAwaitingEvents общедоступной функцией, но я не думаю, что предоставление необработанного дескриптора сопрограммы — хорошая идея …
Спасибо!

РЕДАКТИРОВАТЬ: я немного изменил код — инициализация переменной thread_local была перенесена в функцию yieldAwaitingEvents

Комментарии:

1. Итак, вы хотите иметь несколько потоков, все вызывающих epoll_wait один и тот же fd, верно? И вы полагаетесь на то, что каждый из них читает не более 12 событий для балансировки нагрузки? Это должно быть хорошо, но события не обязательно будут равномерно распределены по потокам.

2. @Бесполезно Да, точно. Для одного вызова не обязательно должно быть 12 (например, жестко запрограммированных) событий, но да. Конечно, я также принимаю во внимание, что это не обязательно должно быть равномерно. Другая идея состоит в том, чтобы вернуть дескриптор сопрограммы из yieldAwaitingEvents, тогда я мог бы распределять более равномерно — но в этом случае я предоставляю пользователю функцию resume (), которую я не хочу делать.