#python #apache-arrow
Вопрос:
Я следовал этому руководству о том, как настроить и использовать полет стрелы Apache.
Из примера, server.py:
import pyarrow as pa
import pyarrow.flight as fl
def create_table_int():
data = [
pa.array([1, 2, 3]),
pa.array([4, 5, 6])
]
return pa.Table.from_arrays(data, names=['column1', 'column2'])
def create_table_dict():
keys = pa.array(["x", "y", "z"], type=pa.utf8())
data = [
pa.chunked_array([
pa.DictionaryArray.from_arrays([0, 1, 2], keys),
pa.DictionaryArray.from_arrays([0, 1, 2], keys)
]),
pa.chunked_array([
pa.DictionaryArray.from_arrays([1, 1, 1], keys),
pa.DictionaryArray.from_arrays([2, 2, 2], keys)
])
]
return pa.Table.from_arrays(data, names=['column1', 'column2'])
class FlightServer(fl.FlightServerBase):
def __init__(self, location="grpc://0.0.0.0:8815", **kwargs):
super(FlightServer, self).__init__(location, **kwargs)
self.tables = {
b'table_int': create_table_int(),
b'table_dict': create_table_dict(),
}
def do_get(self, context, ticket):
table = self.tables[ticket.ticket]
return fl.RecordBatchStream(table)
# return fl.GeneratorStream(table.schema, table.to_batches(max_chunksize=1024))
def main():
FlightServer().serve()
if __name__ == '__main__':
main()
client.py
import argparse
import sys
import pyarrow as pa
import pyarrow.flight as fl
def get_by_ticket(args, client):
ticket_name = args.name
response = client.do_get(fl.Ticket(ticket_name)).read_all()
print_response(response)
def get_by_ticket_pandas(args, client):
ticket_name = args.name
response = client.do_get(fl.Ticket(ticket_name)).read_pandas()
print_response(response)
def print_response(data):
print("=== Response ===")
print(data)
print("================")
def main():
parser = argparse.ArgumentParser()
subcommands = parser.add_subparsers()
cmd_get_by_t = subcommands.add_parser('get_by_ticket')
cmd_get_by_t.set_defaults(action='get_by_ticket')
cmd_get_by_t.add_argument('-n', '--name', type=str, help="Name of the ticket to fetch.")
cmd_get_by_tp = subcommands.add_parser('get_by_ticket_pandas')
cmd_get_by_tp.set_defaults(action='get_by_ticket_pandas')
cmd_get_by_tp.add_argument('-n', '--name', type=str, help="Name of the ticket to fetch.")
args = parser.parse_args()
if not hasattr(args, 'action'):
parser.print_help()
sys.exit(1)
commands = {
'get_by_ticket': get_by_ticket,
'get_by_ticket_pandas': get_by_ticket_pandas,
}
client = fl.connect("grpc://0.0.0.0:8815")
commands[args.action](args, client)
if __name__ == '__main__':
main()
Я запускаю сервер в кластере k8s, доступ к которому осуществляется через службу, а различные другие модули совершают вызовы на сервер. Это работает нормально, ЗА исключением случаев, когда второй вызов выполняется на сервер до того, как вернется первый вызов. В этом случае я не получаю должного ответа от первого звонка, но, похоже, я также не получаю никаких ошибок. Я не уверен, что это правильный термин, но есть ли способ сделать сервер «блокирующим», чтобы он закончил обработку первого вызова до того, как начнет второй, или какой-то другой способ исправить это?
Комментарии:
1. Я также пытаюсь понять, как работает параллелизм для полета стрелы Apache. Вам удалось это выяснить?