#rust #rabbitmq
Вопрос:
Я ищу пример кода, иллюстрирующий потребление сообщений от RabbitMQ с помощью lapin, включая доступ к полезной нагрузке сообщений AMQP.
Я пытаюсь запустить «consumer.rs» пример из «https://raw.githubusercontent.com/CleverCloud/lapin/master/examples/consumer.rs» но я получаю нижеследующее
.ack(BasicAckOptions::default())
^^^ method not found in `(lapin::Channel, Delivery)
Ниже приведен код, полученный из «https://github.com/CleverCloud/lapin/blob/master/examples/consumer.rs»
use futures_lite::StreamExt;
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
use tracing::info;
fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
tracing_subscriber::fmt::init();
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672//".into());
async_global_executor::block_on(async {
let conn = Connection::connect(amp;addr, ConnectionProperties::default())
.await
.expect("connection error");
info!("CONNECTED");
//receive channel
let channel = conn.create_channel().await.expect("create_channel");
info!(state=?conn.status().state());
let queue = channel
.queue_declare(
"hello",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.expect("queue_declare");
info!(state=?conn.status().state());
info!(?queue, "Declared queue");
info!("will consume");
let mut consumer = channel
.basic_consume(
"hello",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("basic_consume");
info!(state=?conn.status().state());
while let Some(delivery) = consumer.next().await {
info!(message=?delivery, "received message");
if let Ok(delivery) = delivery {
delivery
.ack(BasicAckOptions::default())
.await
.expect("basic_ack");
}
}
})
}
Комментарии:
1. Пожалуйста, включите весь минимальный код, особенно если вы изменили пример.
2. Хотя голос был не от меня, я должен извиниться за свое предположение, что вы изменили пример и нарушили его. Тем не менее, по-прежнему полезно публиковать код в вопросе, так как гиперссылки могут исчезнуть, особенно если вы ссылаетесь на
master
движущуюся цель , а не на конкретную фиксацию Git.
Ответ №1:
Ваша проблема в том, что вы, скорее всего, зависите от последней стабильной версии lapin
in crates.io, который на данный момент равен 1.8. Однако пример, который вы используете, взят из master
ветви. Похоже, разработчик готовится к новой основной версии 2.0, и поэтому API библиотеки отличается master
.
Если вы используете тот же файл примера из ветви 1.8, код компилируется.
Альтернативно: используйте пример из master
и ящик из master
, а не последнюю стабильную версию.
Ответ №2:
Ниже приведен мой тестовый код, который исключает функцию main (). Существует значительное количество дополнительного кода, включая конфигурацию ведения журнала и так далее. Я очистил этот код в надежде, что он может быть полезен кому-то еще.
Все еще существует проблема, все сообщения в очереди, по-видимому, назначаются только одному потребителю, и все эти сообщения перемещаются из состояния «Готово» в состояние «Распаковано» почти мгновенно, даже если только одно сообщение обрабатывается за 10 секунд (из-за преднамеренного включения режима ожидания 10 секунд).
Я опубликую новый вопрос по этому вопросу.
Проблема была решена путем правильного просмотра Qos, я отредактировал эту функцию, чтобы включить вызов функции basic_qos.
use lapin::{options::*, Connection, ConnectionProperties, Result};
use futures_util::stream::StreamExt;
//use std::future::Future;
use tracing::info;
use slog::Drain;
pub fn lapin_test_consumer()->std::result::Result<i64, Box<std::io::Error>> {
//env_logger::init();
let log_file_name:amp;str="/tmp/lapin_test_consumer.log";
let log_file_path=std::path::Path::new(amp;log_file_name);
let dir_file_path=log_file_path.parent().unwrap();
std::fs::create_dir_all(dir_file_path).unwrap();
let log_file_handler_option = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(log_file_name)
//.unwrap()
;
let log_file_handler=match log_file_handler_option
{
Ok(f)=>f
,Err(err)=>{
println!("{:?}", err);
panic!("Unable to open the log file '{}', '{:?}'",log_file_name,err);
}
};
let my_log_drain = slog_async::Async::new(
slog::Duplicate::new(
slog::Filter::new(
slog_term::FullFormat::new(
slog_term::PlainSyncDecorator::new(log_file_handler,)
)
.use_file_location()
.build()
,
|record: amp;slog::Record|
{
record.level().is_at_least(slog::Level::Debug)
}
)
//,slog_term::FullFormat::new(slog_term::PlainSyncDecorator::new(std::io::stdout())).build()
,slog::Duplicate::new(
slog::Filter::new(
slog_term::FullFormat::new(
slog_term::PlainSyncDecorator::new(std::io::stderr(),)
)
.use_file_location()
.build()
,
//|record: amp;slog::Record| record.level().is_at_least(slog::Level::Warning)
|record: amp;slog::Record|
{
record.level().is_at_least(slog::Level::Debug)
}
)
//,slog_term::FullFormat::new(slog_term::PlainSyncDecorator::new(std::io::stdout())).build()
,slog_term::FullFormat::new(slog_term::TermDecorator::new().build()).use_file_location().build()
)
).fuse()
)
.build()
.fuse()
;
let my_slog_logger=slog::Logger::root(my_log_drain, slog::o!("n" => env!("CARGO_PKG_NAME"),"v" => env!("CARGO_PKG_VERSION")));
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
let addr:String = std::env::var("AMQP_ADDR").unwrap_or_else(
|_|{
format!("amqp://{}:{}@{}:{}/{}?heartbeat=0"
,"abcd"//aMQPJobUser
,"abcd"//aMQPJobPasswd
,"somewhere.com"//aMQPJobHost
,5672//aMQPJobPort
,"lapin_test.test"//aMQPJobVirtualHost
).into()
}
);
let amqp_conn_url:amp;str=amp;addr.as_str();
//see "https://docs.rs/lapin/1.8.0/lapin/struct.Consumer.html"
let res: std::result::Result<i64, Box<std::io::Error>> = async_global_executor::block_on(async {
let sleep_duration_ms:u64=10000u64;
let conn_result:std::result::Result<lapin::Connection, lapin::Error> = Connection::connect(
amp;amqp_conn_url,
ConnectionProperties::default().with_default_executor(2),//set the number of threads
//ConnectionProperties::default().with_default_executor(8),
)
.await;
let conn:lapin::Connection=match conn_result{
Err(err)=>{
let bt=backtrace::Backtrace::new();
let log_message=format!(">>>>>At lapin_test_publisher(), pos 1b, some error has been encountered while trying to establish AMQP connection '{:?}', error is:'{:?}', backtrace is '{:?}'",amp;amqp_conn_url,amp;err,amp;bt);
slog::error!(my_slog_logger,"{}",log_message);
let custom_error=std::io::Error::new(std::io::ErrorKind::Other, amp;log_message.to_string()[..]);
return std::result::Result::Err(Box::new(custom_error));
}
Ok(conn2)=>{info!("CONNECTED");conn2}
};
//set basic_qos so each consumer may only have at most one message at at time
channel_a.basic_qos(
1
,BasicQosOptions{global:true}
);
let mut message_cnt:i64=0i64;let _some_i64:i64=message_cnt;
let channel_a_result:Result<lapin::Channel>=conn.create_channel().await;
let channel_a:lapin::Channel=match channel_a_result{
Err(err)=>{
let bt=backtrace::Backtrace::new();
let log_message=format!(">>>>>At lapin_test_consumer(), pos 1b, some error has been encountered while trying to obtain a channel from AMQP connection '{:?}', error is:'{:?}', backtrace is '{:?}'",amp;amqp_conn_url,amp;err,amp;bt);
slog::error!(my_slog_logger,"{}",log_message);
let custom_error=std::io::Error::new(std::io::ErrorKind::Other, amp;log_message.to_string()[..]);
return std::result::Result::Err(Box::new(custom_error));
//return Err(err);
}
Ok(channel)=>{channel}
};
channel_a
.exchange_declare(
"my_direct_exchange"
,lapin::ExchangeKind::Direct
,lapin::options::ExchangeDeclareOptions{
passive:false
,durable:true
,auto_delete:false
,internal:false
,nowait:false
}
,lapin::types::FieldTable::default()//see "https://docs.rs/amq-protocol-types/6.1.0/amq_protocol_types/struct.FieldTable.html"
)
;
let queue = channel_a
.queue_declare(
"hello.persistent"//:amp;str queue name
,lapin::options::QueueDeclareOptions{
passive:false,
durable:true,
exclusive:false,
auto_delete:false,
nowait:false,
}
,lapin::types::FieldTable::default()//see "https://docs.rs/amq-protocol-types/6.1.0/amq_protocol_types/struct.FieldTable.html"
)
.await
.expect("queue_declare")
;
channel_a
.queue_bind(
"hello.persistent"
,"my_direct_exchange"
,"hello.persistent"
, lapin::options::QueueBindOptions{
nowait:false
}
,lapin::types::FieldTable::default()//see "https://docs.rs/amq-protocol-types/6.1.0/amq_protocol_types/struct.FieldTable.html"
)
;
let consumer_a_result:Result<lapin::Consumer>=channel_a
.basic_consume(
"hello.persistent",
"my_consumer",
lapin::options::BasicConsumeOptions{
no_local: true,//see "https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-local"
no_ack: false,//see "https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-ack" "If this field is set the server does not expect acknowledgements for messages. That is, when a message is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. This functionality may increase performance but at the cost of reliability. Messages can get lost if a client dies before they are delivered to the application."
exclusive: false,
nowait: false,//see "https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-wait" "If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception."
},
lapin::types::FieldTable::default(),
)
.await;
let mut consumer_a:lapin::Consumer=match consumer_a_result{
Err(err)=>{
let bt=backtrace::Backtrace::new();
let log_message=format!(">>>>>At lapin_test_consumer(), pos 1b, some error has been encountered while trying to obtain a consumer from AMQP connection '{:?}', error is:'{:?}', backtrace is '{:?}'",amp;amqp_conn_url,amp;err,amp;bt);
slog::error!(my_slog_logger,"{}",log_message);
let custom_error=std::io::Error::new(std::io::ErrorKind::Other, amp;log_message.to_string()[..]);
return std::result::Result::Err(Box::new(custom_error));
//return Err(err);
}
Ok(consumer)=>{consumer}
};
while let Some(delivery) = consumer_a.next().await {
let (channel2, delivery2) = delivery.expect("error in consumer");
message_cnt =1;
slog::info!(my_slog_logger,"------------------------------------------------------------------, message_cnt is:{}",amp;message_cnt);
let s:String = match String::from_utf8(delivery2.data.to_owned()) {//delivery.data is of type Vec<u8>
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};
let log_message:String=format!("message_cnt is:{}, delivery_tag is:{}, exchange is:{}, routing_key is:{}, redelivered is:{}, properties is:'{:?}', received data is:'{:?}'"
,amp;message_cnt
,amp;delivery2.delivery_tag
,amp;delivery2.exchange
,amp;delivery2.routing_key
,amp;delivery2.redelivered
,amp;delivery2.properties
,amp;s
);
slog::info!(my_slog_logger,"{}",log_message);
std::thread::sleep(std::time::Duration::from_millis(sleep_duration_ms));
slog::info!(my_slog_logger,"After {}ms sleep.",sleep_duration_ms);
channel2
.basic_ack(delivery2.delivery_tag, BasicAckOptions::default())
.await
.expect("ack")
;
}
Ok(message_cnt)
}
);
res
}