Реестр Actix SyncArbiter

#rust #rust-actix #actix-web

#Ржавчина #rust-actix #actix-web

Вопрос:

Я пытаюсь реализовать пул из 10 повторных подключений, используя SyncArbiter для использования разными участниками. Допустим, у нас есть актер по имени Боб, который должен использовать актера Redis для выполнения своей задачи.

Хотя это достижимо следующим образом:

 // crate, use and mod statements have been omitted to lessen clutter

/// FILE main.rs
pub struct AppState {
    pub redis: Addr<Redis>,
    pub bob: Addr<Bob>
}

fn main() {
    let system = actix::System::new("theatre");

    server::new(move || {
        let redis_addr = SyncArbiter::start(10, || Redis::new("redis://127.0.0.1").unwrap());
        let bob_addr = SyncArbiter::start(10, || Bob::new());

        let state = AppState {
            redis: redis_addr,
            bob: bob_addr
        };

        App::with_state(state).resource("/bob/eat", |r| {
            r.method(http::Method::POST)
                .with_async(controllers::bob::eat)
        })
    })
    .bind("0.0.0.0:8080")
    .unwrap()
    .start();

    println!("Server started.");

    system.run();
}

/// FILE controllers/bob.rs
pub struct Food {
  name: String,
  kcal: u64
}

pub fn eat(
    (req, state): (Json<Food>, State<AppState>),
) -> impl Future<Item = HttpResponse, Error = Error> {
    state
        .bob
        .send(Eat::new(req.into_inner()))
        .from_err()
        .and_then(|res| match res {
            Ok(val) => {
                println!("==== BODY ==== {:?}", val);
                Ok(HttpResponse::Ok().into())
            }
            Err(_) => Ok(HttpResponse::InternalServerError().into()),
        })
}

/// FILE actors/redis.rs
#[derive(Debug)]
pub struct Redis {
    pub client: Client
}

pub struct RunCommand(Cmd);

impl RunCommand {
    pub fn new(cmd: Cmd) -> Self {
        RunCommand(cmd)
    }
}

impl Message for RunCommand {
    type Result = Result<RedisResult<String>, ()>;
}

impl Actor for Redis {
    type Context = SyncContext<Self>;
}

impl Handler<RunCommand> for Redis {
    type Result = Result<RedisResult<String>, ()>;

    fn handle(amp;mut self, msg: RunCommand, _context: amp;mut Self::Context) -> Self::Result {
        println!("Redis received command!");
        Ok(Ok("OK".to_string()))
    }
}

impl Redis {
    pub fn new(url: amp;str) -> Result<Self, RedisError> {
        let client = match Client::open(url) {
            Ok(client) => client,
            Err(error) => return Err(error)
        };

        let redis = Redis {
            client: client,
        };

        Ok(redis)
    }
}

/// FILE actors/bob.rs
pub struct Bob;

pub struct Eat(Food);

impl Message for Eat {
    type Result = Result<Bob, ()>;
}

impl Actor for Eat {
    type Context = SyncContext<Self>;
}

impl Handler<Eat> for Bob {
    type Result = Result<(), ()>;

    fn handle(amp;mut self, msg: Eat, _context: amp;mut Self::Context) -> Self::Result {
        println!("Bob received {:?}", amp;msg);

        // How to get a Redis actor and pass data to it here?

        Ok(msg.datapoint)
    }
}

impl Bob {
    pub fn new() -> () {
        Bob {}
    }
}
  

Из приведенной выше реализации дескриптора в Bob неясно, как Bob мог получить адрес участника Redis. Или отправьте любое сообщение любому, Actor запущенному в SyncArbiter .

То же самое можно было бы достичь, используя обычные Arbiter и Registry , но, насколько мне известно, Actix не допускает нескольких одинаковых участников (например, мы не можем запустить 10 участников Redis, используя обычный Arbiter ).

Чтобы формализовать мои вопросы:

  • Есть ли Registry для SyncArbiter участников
  • Могу ли я запустить несколько однотипных участников в обычном Arbiter
  • Есть ли лучший / более канонический способ реализации пула соединений

Редактировать

Версии:

  • actix 0.7.9
  • actix_web 0.7.19
  • фьючерсы = «0.1.26»
  • rust 1.33.0

Ответ №1:

Я сам нашел ответ.

Из коробки невозможно получить Actor с SyncContext из реестра.

Учитывая мой приведенный выше пример. Для того, чтобы субъект Bob мог отправить какое-либо сообщение Redis субъекту, ему необходимо знать адрес Redis субъекта. Bob может получить адрес Redis , явно содержащийся в отправленном ему сообщении или считанный из какого-либо общего состояния.

Я хотел систему, похожую на Erlang, поэтому я решил не передавать адреса участников через сообщения, поскольку это казалось слишком трудоемким, подверженным ошибкам, и, на мой взгляд, это противоречит цели создания модели параллелизма на основе актера (поскольку ни один актер не может отправлять сообщения любому другому актеру).).

Поэтому я исследовал идею общего состояния и решил реализовать свое собственное, SyncRegistry которое было бы аналогом стандарта Actix Registry , который делает именно то, что я хочу, но не для участников с SyncContext .

Вот наивное решение, которое я придумал:https://gist.github.com/monorkin/c463f34764ab23af2fd0fb0c19716177

Со следующей настройкой:

 fn main() {
    let system = actix::System::new("theatre");

    let addr = SyncArbiter::start(10, || Redis::new("redis://redis").unwrap());
    SyncRegistry::set(addr);
    let addr = SyncArbiter::start(10, || Bob::new());
    SyncRegistry::set(addr);


    server::new(move || {
        let state = AppState {};

        App::with_state(state).resource("/foo", |r| {
            r.method(http::Method::POST)
                .with_async(controllers::foo::create)
        })
    })
    .bind("0.0.0.0:8080")
    .unwrap()
    .start();

    println!("Server started.");

    system.run();
}
  

Исполнитель Bob может получить адрес Redis следующим образом из любой точки программы:

 impl Handler<Eat> for Bob {
    type Result = Result<(), ()>;

    fn handle(amp;mut self, msg: Eat, _context: amp;mut Self::Context) -> Self::Result {
        let redis = match SyncRegistry::<Redis>::get() {
            Some(redis) => redis,
            _ => return Err(())
        };

        let cmd = redis::cmd("XADD")
            .arg("things_to_eat")
            .arg("*")
            .arg("data")
            .arg(amp;msg.0)
            .to_owned();

        redis.clone().lock().unwrap().send(RunCommand::new(cmd)).wait().unwrap();
    }
}