Скопированный сокет не обрабатывается

#python #python-3.x #rust #multiprocessing #pyo3

Вопрос:

Я пытаюсь скопировать сокет и отправить его в другой процесс на Python.

Сокет создается в rust и используется как объект Python через PyO3.

Вот общий код сокета

 
use pyo3::prelude::*;

use socket2::{Domain, Protocol, Socket, Type};
use std::net::SocketAddr;

#[pyclass]
#[derive(Debug)]
pub struct SocketHeld {
    pub socket: Socket,
}

#[pymethods]
impl SocketHeld {
    #[new]
    pub fn new(address: String, port: i32) -> PyResult<SocketHeld> {
        let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?;
        println!("{}", address);
        let address: SocketAddr = address.parse()?;
        socket.set_reuse_address(true)?;
        //socket.set_reuse_port(true)?;
        socket.bind(amp;address.into())?;
        socket.listen(1024)?;

        Ok(SocketHeld { socket })
    }

    pub fn try_clone(amp;self) -> PyResult<SocketHeld> {
        let copied = self.socket.try_clone()?;
        Ok(SocketHeld { socket: copied })
    }
}

impl SocketHeld {
    pub fn get_socket(amp;self) -> Socket {
        self.socket.try_clone().unwrap()
    }
}


 

Ниже приведен код python, в котором я пытаюсь запустить два разных процесса. Я пытался использовать собственную многопроцессорную библиотеку, библиотеку fork of multiprocess и даже библиотеку pathos.

 

    def start(self, url="127.0.0.1", port=5000):
        """
        [Starts the server]

        :param port [int]: [reperesents the port number at which the server is listening]
        """
        socket = SocketHeld(f"0.0.0.0:{port}", port)
        if not self.dev:
            from pathos.pools import ProcessPool
            pool = ProcessPool(nodes=2)
            # spawned_process(url, port, self.routes, socket.try_clone(), f"Process {1}")
            pool.map(spawned_process, [(url, port, self.routes, socket.try_clone(), f"Process {1}"), (url, port, self.routes, socket.try_clone(), f"Process {2}")])
            # for i in range(2):
            #     copied = socket.try_clone()
            #     p = Pool().map(
            #         spawned_process,
            #         args=(self.routes, copied, f"Process {i}"),
            #     )
            #     p.start()

            # input("Press Cntrl   C to stop n")
            # self.server.start(url, port)
        else:
            ...

 

Тем не менее, я все еще получаю сообщение об ошибке, что объект не может быть сериализован.

I get the following error:

 
Traceback (most recent call last):
  File "integration_tests/base_routes.py", line 75, in <module>
    app.start(port=5000, url='0.0.0.0')
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/robyn/__init__.py", line 95, in start
    pool.map(spawned_process, [(url, port, self.routes, socket.try_clone(), f"Process {1}"), (url, port, self.routes, socket.try_clone(), f"Process {2}")])
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/pathos/multiprocessing.py", line 139, in map
    return _pool.map(star(f), zip(*args)) # chunksize
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/pool.py", line 771, in get
    raise self._value
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/pool.py", line 537, in _handle_tasks
    put(task)
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/connection.py", line 209, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/reduction.py", line 54, in dumps
    cls(buf, protocol, *args, **kwds).dump(obj)
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/dill/_dill.py", line 498, in dump
    StockPickler.dump(self, obj)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 485, in dump
    self.save(obj)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 899, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 884, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 884, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 884, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 884, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 899, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 576, in save
    rv = reduce(self.proto)
TypeError: cannot pickle 'builtins.SocketHeld' object


 

Я иду не так концептуально где-то здесь? Каково решение для этого?

PS:

Я пытаюсь запустить серверную среду выполнения в процессе.

 
def spawned_process(url, port, handlers, socket, name):
    import asyncio
    import uvloop

    uvloop.install()
    loop = uvloop.new_event_loop()
    asyncio.set_event_loop(loop)

    print(handlers)
    server = Server()


    for i in handlers:
        route_type, endpoint, handler, is_async, number_of_params = i
        print(i)
        server.add_route(route_type, endpoint, handler, is_async, number_of_params)

    print(socket, name)
    server.start(url, port, socket, name)
    asyncio.get_event_loop().run_forever()

 

Комментарии:

1. Я не думаю, что вы можете отправить сокет другому процессу (может быть, в другой поток). Вы действительно пытались сделать то же самое в python с самой библиотекой сокетов?

2. @Netwave , я еще этого не сделал. Поскольку большая часть кодовой базы находится только в rust. Я пытался придерживаться этого. Кроме того, я не понимаю, как это могло бы быть иначе концептуально. С тех пор сокет клонируется нормально. Я думал, что это должно работать нормально.

Ответ №1:

Сокеты — это, по сути, просто относящаяся к процессу ссылка на некоторую структуру ядра ОС. Поскольку травление включает в себя только часть пользовательского пространства этой ссылки, а не структуру ядра, сокеты не могут быть просто маринованы и восстановлены в каком-то другом процессе, на другой машине и т.д.

В системах UNIX файловые дескрипторы могут передаваться между процессами через доменные сокеты UNIX, что создаст другую ссылку на ту же структуру ядра в другом процессе. Однако это не будет работать с сокетами SSL, поскольку существует некоторое состояние пользовательского пространства, связанное с этим сокетом, которое не является ни частью процесса дескриптора файла, ни специфичной для Python обработки.

Комментарии:

1. какой подход я должен использовать вместо этого?

2. @SanskarJethi, возможно, подойдет какой-нибудь шаблон производителя / потребителя. Но нам не хватает контекста того, что вы пытаетесь сделать.

3. @SanskarJethi: Вы спрашиваете только о проблеме Y проблемы XY . Эта проблема Y не может быть решена общим способом. Поэтому вам может потребоваться пересмотреть свой подход к решению проблемы X. Мы не можем здесь помочь, поскольку не знаем X.

4. Извините за ограниченную информацию. Я пытаюсь запустить среду выполнения в каждом процессе. По сути, я пытаюсь прочитать TCP-сокет из нескольких процессов. Я обновил фрагмент кода в описании, возможно, это будет более полезно.

5. Подумав об этом еще раз, возможно, попытка создания очередей сработает. Поскольку я все равно копирую сокеты в основном процессе, и мне не нужно совместно использовать один сокет для всех процессов.