Как отключить блокировку сигнала сервера pyarrow flight rpc

#python #signals #pyarrow #apache-arrow

#python #сигналы #pyarrow #apache-стрелка

Вопрос:

Привязка к python сервера Apache Arrow Flight RPC блокирует SIGTERM и SIGINT подает сигналы во время длительных операций. И я не смог разблокировать эти сигналы.

Похоже enable_signal_handlers() , что метод from pyarrow.lib предназначен для точной цели включения / выключения блокировки сигнала, но он не работает, по крайней мере, для следующего примера кода,

  • server.py
 import signal
import threading
import time
from datetime import datetime

import pyarrow as pa
import pyarrow.flight as pf


class TickServer(pf.FlightServerBase):

    SCHEMA = pa.schema([
        ('time', pa.timestamp("us")),
    ])

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.stopped = threading.Event()

    def _generate_ticks(self, context: pf.ServerCallContext):
        while not self.stopped.is_set() and not context.is_cancelled():
            yield pa.record_batch([
                pa.array([datetime.now(), ]),
            ], schema=TickServer.SCHEMA)
            time.sleep(1)

    def do_get(self, context: pf.ServerCallContext, ticket: pf.Ticket):
        key = ticket.ticket.decode('utf-8')
        if key == "tick":
            return pf.GeneratorStream(TickServer.SCHEMA, self._generate_ticks(context))
        raise pa.ArrowKeyError("Unknown ticket: {}".format(ticket))

    def shutdown(self):
        self.stopped.set()
        return super().shutdown()


if __name__ == "__main__":
    # NOTE: this doesn't work
    pa.enable_signal_handlers(False)

    with TickServer(location="grpc://127.0.0.1:8000") as server:

        def signal_handler(signum, frame):
            server.shutdown()
            print("interrupted")

        signal.signal(signal.SIGINT, signal_handler)
        server.serve()

 
  • client.py
 import pyarrow as pa
import pyarrow.flight as pf

if __name__ == "__main__":
    try:
        client: pf.FlightClient = pf.connect(location="grpc://127.0.0.1:8000")
        client.wait_for_available()
        for batch in client.do_get(pf.Ticket("tick")):
            table = pa.Table.from_batches([batch.data])
            for record in table:
                print(record[0])
    except KeyboardInterrupt:
        print("interrupted")
 

Текущее поведение server.py заключается в том, что оно может быть прервано только SIGINT (например, Ctrl C), когда ни один клиент не получает тиков. Я хотел бы иметь возможность прерывать его независимо от того, есть ли активные клиенты, а не нет.

Моя среда — Ubuntu 20.04 python 3.8 pyarrow 6.0.1.

Комментарии:

1. Для будущих читателей ознакомьтесь с этой проблемой GitHub: github.com/apache/arrow/issues/11932