Ржавчина: Получение сообщений AMQP из центра событий Azure

#azure #rust #amqp #azure-eventhub

Вопрос:

Я хотел бы использовать концентратор событий с помощью Rust.

Я нашел несколько многообещающих ящиков, но ничего вполне удовлетворительного:

Я не нашел никакого приложения в реальном мире, использующего эти ящики.

Я создал the-hub-namespace , the-hub , the-consumer-group и the-policy с разрешения прослушивания и получил the-key .

До сих пор я добивался наибольшего успеха с ntex-amqp. Мне пришлось исправить разъем rustls, так как он не работал с недопустимым DNS — имя хоста включало номер порта (имя:порт), когда ожидается только имя DNS.

Груз.томл:

 [package]
name = "tmp"
version = "0.1.0"
authors = ["me"]
edition = "2018"

[dependencies]
env_logger = "0.8"
ntex-amqp = { version="0.4", git="https://github.com/BrightOpen/ntex-amqp", branch="master" }
ntex = { version="0.3", features=["rustls"], git="https://github.com/BrightOpen/ntex", branch="master" }
rustls = "0.19"
futures = "0.3"

 

src/main.rs:

 use ntex::connect::rustls::RustlsConnector;
use ntex_amqp::client::{self, SaslAuth};
use ntex_amqp::codec::types::{Descriptor, Symbol, Variant};
use rustls::ClientConfig;
use std::sync::Arc;
use futures::StreamExt;

#[ntex::main]
async fn main() -> std::io::Result<()> {
    std::env::set_var("RUST_LOG", "ntex=trace,ntex_amqp=trace,basic=trace");
    env_logger::init();

    let mut tlsconfig = ClientConfig::new();
    let certs = tlsconfig
        .root_store
        .add_pem_file(
            // This is the Azure CA cert
            amp;mut "-----BEGIN CERTIFICATE-----
MIIF8zCCBNugAwIBAgIQCq mxcpjxFFB6jvh98dTFzANBgkqhkiG9w0BAQwFADBh
MQswCQYDVQQGEwJVUzEVMBMGA1UEChMMRGlnaUNlcnQgSW5jMRkwFwYDVQQLExB3
d3cuZGlnaWNlcnQuY29tMSAwHgYDVQQDExdEaWdpQ2VydCBHbG9iYWwgUm9vdCBH
MjAeFw0yMDA3MjkxMjMwMDBaFw0yNDA2MjcyMzU5NTlaMFkxCzAJBgNVBAYTAlVT
MR4wHAYDVQQKExVNaWNyb3NvZnQgQ29ycG9yYXRpb24xKjAoBgNVBAMTIU1pY3Jv
c29mdCBBenVyZSBUTFMgSXNzdWluZyBDQSAwMTCCAiIwDQYJKoZIhvcNAQEBBQAD
ggIPADCCAgoCggIBAMedcDrkXufP7pxVm1FHLDNA9IjwHaMoaY8arqqZ4Gff4xyr
RygnavXL7g12MPAx8Q6Dd9hfBzrfWxkF0Br2wIvlvkzW01naNVSkHp OS3hL3W6n
l/jYvZnVeJXjtsKYcXIf/6WtspcF5awlQ9LZJcjwaH7KoZuK THpXCMtzD8XNVdm
GW/JI0C/7U/E7evXn9XDio8SYkGSM63aLO5BtLCv092 1d4GGBSQYolRq 7Pd1kR
EkWBPm0ywZ2Vb8GIS5DLrjelEkBnKCyy3B0yQud9dpVsiUeE7F5sY8Me96WVxQcb
OyYdEY/j/9UpDlOG vA YgOvBhkKEjiqygVpP8EZoMMijephzg43b5Qi9r5UrvYo
o19oR/8pf4HJNDPF0/FJwFVMW8PmCBLGstin3NE1 NeWTkGt0TzpHjgKyfaDP2tO
4bCk1G7pP2kDFT7SYfc8xbgCkFQ2UCEXsaH/f5YmpLn4YPiNFCeeIida7xnfTvc4
7IxyVccHHq1FzGygOqemrxEETKh8hvDR6eBdrBwmCHVgZrnAqnn93JtGyPLi6 cj
WGVGtMZHwzVvX1HvSFG771sskcEjJxiQNQDQRWHEh3NxvNb7kFlAXnVdRkkvhjpR
GchFhTAzqmwltdWhWDEyCMKC2x/mSZvZtlZGY g37Y72qHzidwtyW7rBetZJAgMB
AAGjggGtMIIBqTAdBgNVHQ4EFgQUDyBd16FXlduSzyvQx8J3BM5ygHYwHwYDVR0j
BBgwFoAUTiJUIBiV5uNu5g/6 rkS7QYXjzkwDgYDVR0PAQH/BAQDAgGGMB0GA1Ud
JQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjASBgNVHRMBAf8ECDAGAQH/AgEAMHYG
CCsGAQUFBwEBBGowaDAkBggrBgEFBQcwAYYYaHR0cDovL29jc3AuZGlnaWNlcnQu
Y29tMEAGCCsGAQUFBzAChjRodHRwOi8vY2FjZXJ0cy5kaWdpY2VydC5jb20vRGln
aUNlcnRHbG9iYWxSb290RzIuY3J0MHsGA1UdHwR0MHIwN6A1oDOGMWh0dHA6Ly9j
cmwzLmRpZ2ljZXJ0LmNvbS9EaWdpQ2VydEdsb2JhbFJvb3RHMi5jcmwwN6A1oDOG
MWh0dHA6Ly9jcmw0LmRpZ2ljZXJ0LmNvbS9EaWdpQ2VydEdsb2JhbFJvb3RHMi5j
cmwwHQYDVR0gBBYwFDAIBgZngQwBAgEwCAYGZ4EMAQICMBAGCSsGAQQBgjcVAQQD
AgEAMA0GCSqGSIb3DQEBDAUAA4IBAQAlFvNh7QgXVLAZSsNR2XRmIn9iS8OHFCBA
WxKJoi8YYQafpMTkMqeuzoL3HWb1pYEipsDkhiMnrpfeYZEA7Lz7yqEEtfgHcEBs
K9KcStQGGZRfmWU07hPXHnFz 5gTXqzCE2PBMlRgVUYJiA25mJPXfB00gDvGhtYa
 mENwM9Bq1B9YYLyLjRtUz8cyGsdyTIG/bBM/Q9jcV8JGqMU/UjAdh1pFyTnnHEl
Y59Npi7F87ZqYYJEHJM2LGD le8VsHjgeWX2CJQko7klXvcizuZvUEDTjHaQcs2J
 kPgfyMIOY1DMJ21NxOJ2xPRC/wAh/hzSBRVtoAnyuxtkZ4VjIOh
-----END CERTIFICATE-----"
                .as_bytes(),
        )
        .unwrap();
    assert_eq!(certs, (1, 0));

    let driver = client::Connector::new()
        .connector(RustlsConnector::new(Arc::new(tlsconfig)))
        .hostname("...the-hub-namespace...servicebus.windows.net")
        .connect_sasl(
            "...the-hub-namespace....servicebus.windows.net:5671",
            SaslAuth {
                authz_id: "".into(),
                authn_id: "...the-policy...".into(),
                password: "...the-key...".into(),
            },
        )
        .await
        .unwrap();
    let sink = driver.sink();

    ntex::rt::spawn(driver.start_default());

    let mut session = sink.open_session().await.unwrap();

    let mut links = vec![];

    // I have 32 partitions
    for i in 0..32u8 {
        let link = session
            .build_receiver_link(
                format!("mylink{}", i),
                format!("...the-hub.../ConsumerGroups/...the-consumer-group.../Partitions/{}", i),
            )
            .max_message_size(65535)
.property(
                Symbol::from_slice("com.microsoft:entity-type"),
                Some("8".into()),
            )
            .property(
                Symbol::from_slice("apache.org:selector-filter:string"),
                Some(Variant::Described((
                    Descriptor::Symbol(Symbol::from_slice("apache.org:selector-filter:string")),
                    Box::new(Variant::from("amqp.annotation.x-opt-offset > '@latest'")),
                ))),
            )
            .open()
            .await
            .unwrap();
        link.set_link_credit(20);
        links.push(link);
    }

    let mut links = futures::stream::select_all(links);

    use futures::StreamExt;
    while let Some(msg) = links.next().await {
        eprintln!("Message: {:#?}", msg);
    }

    Ok(())
}
 

I have no clue where to set the-hub and the-consumer-group, let alone the partition number. And how do I actually receive, or react to messages? How do I handle my offset?

Update! @JesseSquire helped figure out the appropriate resource path. A little more research finally revealed the Stream implementation on ReceiverLink — where was I looking? So now I actually get some messages. Some of the above questions out of the way.

When I run this code, I get the output:

 [2021-06-22T18:21:49Z TRACE ntex::connect::resolve] DNS resolver: resolving host "...the-hub-namespace....servicebus.windows.net:5671"
[2021-06-22T18:21:49Z TRACE ntex::connect::resolve] DNS resolver: host "...the-hub-namespace....servicebus.windows.net:5671" resolved to [13.69.64.2:5671]
[2021-06-22T18:21:49Z TRACE ntex::connect::service] TCP connector - connecting to "...the-hub-namespace....servicebus.windows.net:5671" port:5671
[2021-06-22T18:21:49Z TRACE ntex::connect::service] TCP connector - successfully connected to connecting to "...the-hub-namespace....servicebus.windows.net:5671" - Ok(13.69.64.2:5671)
[2021-06-22T18:21:49Z TRACE ntex::connect::rustls] SSL Handshake start for: "...the-hub-namespace....servicebus.windows.net"
[2021-06-22T18:21:49Z TRACE ntex::connect::rustls] SSL Handshake success: DNSNameRef("...the-hub-namespace....servicebus.windows.net")
[2021-06-22T18:21:49Z TRACE ntex_amqp::client::connector] Negotiation client protocol id: AmqpSasl
[2021-06-22T18:21:49Z TRACE ntex_amqp::client::connector] Negotiation client protocol id: Amqp
[2021-06-22T18:21:50Z TRACE ntex_amqp::client::connector] Open client amqp connection: Open { container_id: "42b4f39ce51f46bf85cf8631f64d8cca", hostname: Some("...the-hub-namespace....servicebus.windows.net"), max_frame_size: 65535, channel_max: 1024, idle_time_out: Some(120000), outgoing_locales: None, incoming_locales: None, offered_capabilities: None, desired_capabilities: None, properties: None }
[2021-06-22T18:21:50Z TRACE ntex_amqp::client::connector] Open confirmed: Open { container_id: "f11d7639f6454b819bbf7b1e1150c101_G6", hostname: None, max_frame_size: 65535, channel_max: 1024, idle_time_out: Some(120000), outgoing_locales: None, incoming_locales: None, offered_capabilities: None, desired_capabilities: None, properties: None }
[2021-06-22T18:21:50Z TRACE ntex_amqp::connection] Session opened: local 0 remote 0
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink0" 0 -> 0
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink1" 1 -> 1
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink2" 2 -> 2
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink3" 3 -> 3
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink4" 4 -> 4
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink5" 5 -> 5
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink6" 6 -> 6
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink7" 7 -> 7
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink8" 8 -> 8
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink9" 9 -> 9
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:50Z TRACE ntex_amqp::session] Receiver link opened: "mylink10" 10 -> 10
[2021-06-22T18:21:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink11" 11 -> 11
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink12" 12 -> 12
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink13" 13 -> 13
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink14" 14 -> 14
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink15" 15 -> 15
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink16" 16 -> 16
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink17" 17 -> 17
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink18" 18 -> 18
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink19" 19 -> 19
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink20" 20 -> 20
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink21" 21 -> 21
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink22" 22 -> 22
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink23" 23 -> 23
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:51Z TRACE ntex_amqp::session] Receiver link opened: "mylink24" 24 -> 24
[2021-06-22T18:21:51Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink25" 25 -> 25
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink26" 26 -> 26
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink27" 27 -> 27
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink28" 28 -> 28
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink29" 29 -> 29
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink30" 30 -> 30
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:21:52Z TRACE ntex_amqp::session] Receiver link opened: "mylink31" 31 -> 31
[2021-06-22T18:21:52Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
Message: Ok(
    Transfer {
        handle: 0,
        delivery_id: Some(
            0,
        ),
        delivery_tag: Some(
            b"",
        ),
        message_format: Some(
            0,
        ),
        settled: Some(
            true,
        ),
        more: false,
        rcv_settle_mode: None,
        state: None,
        resume: false,
        aborted: false,
        batchable: true,
        body: Some(
            Data(
                b"Srxc1Ix06xa3x15x-opt-sequence-numberUxa3x0cx-opt-offsetxa1x010xa3x13x-opt-enqueued-timex83x01z4xcdxfc Suxa0x14s6df54s65df4s6d5f4sf",
            ),
        ),
    },
)
...
[2021-06-22T18:23:37Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:24:09Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
Message: Ok(
    Transfer {
        handle: 13,
        delivery_id: Some(
            27,
        ),
        delivery_tag: Some(
            b"",
        ),
        message_format: Some(
            0,
        ),
        settled: Some(
            true,
        ),
        more: false,
        rcv_settle_mode: None,
        state: None,
        resume: false,
        aborted: false,
        batchable: true,
        body: Some(
            Data(
                b"Srxc1Jx06xa3x15x-opt-sequence-numberUx01xa3x0cx-opt-offsetxa1x0256xa3x13x-opt-enqueued-timex83x01z4xf7"xfbSuxa0x15999999999999999999999",
            ),
        ),
    },
)
[2021-06-22T18:24:50Z TRACE ntex_amqp::session] Session received credit None. window: 4294967295, pending: 0
[2021-06-22T18:24:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:26:35Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task
[2021-06-22T18:27:50Z TRACE ntex_amqp::session] Session received credit None. window: 4294967295, pending: 0
[2021-06-22T18:27:50Z TRACE ntex::framed::dispatcher] not enough data to decode next frame, register dispatch task


 

Хорошо, основная цель выполнена: я могу получать сообщения концентратора событий в Rust с помощью AMQP. Хотя это оставляет желать многого:

  1. Фильтрация, похоже, не оказывает никакого эффекта, как мне управлять смещением?
  2. Более надежный, ухоженный, испытанный в боях ящик?
  3. Документы, примеры?
  4. По крайней мере, один раз доставка (в настоящее время выглядит как не более одного раза)

Комментарии:

1.Я не могу помочь с частями ржавчины, но путь ссылки, который вам понадобится для чтения сообщений, представлен в форме [ namespace endpoint ][ event hub ]ConsumerGroups[ consumer group ]Partitions[ partition ] . Вам также потребуется добавить свойство связи com.microsoft:entity-type со значением 8 (группа потребителей). Начальную позицию необходимо установить в качестве фильтра apache.org:selector-filter:string со значением в форме amqp.annotation.x-opt-offset >= [ offset value ] (вы можете удалить значения, если хотите, чтобы фильтр не включал смещение).

2. Если это поможет, источник C# из официального пакета Azure SDK для создания ссылки на потребителя можно найти здесь , хотя для исходных значений, по которым вам нужно будет перемещаться, существует несколько уровней абстракции. Большинство из них можно найти в папке amqp .

3. @JesseSquire, я добавил свойства (не уверен, что это работает) и исправил путь к ссылке, как было предложено. Я не вижу линии, по которой связь с приемником была удаленно закрыта, так что, возможно, мы на что-то наткнулись! Нужно будет протестировать некоторые сообщения. До сих пор не ясно, как их на самом деле получить 🙂 но большое спасибо!

4. @JesseSquire, сейчас я получаю сообщения. Однако фильтрация не имеет никакого эффекта. Я отправляю это через: ` Символ( B:»apache.org:селектор-фильтр:строка», ): Описано( ( Ulong( 83483426826, ), Строка( B:»amqp.аннотация.x-opt-смещение > «@последняя'», ), ), ),`

5. Рад слышать, что вы, по крайней мере, можете получать сообщения. Символ ключа фильтра кажется мне правильным. Значение фильтра выглядит так, как будто оно может быть отключено, хотя это может быть просто языковая деталь. Described Тип, похоже, не ссылается на символ, который он описывает, только на код типа. Я также вижу одиночные кавычки, вокруг @latest которых, по моему мнению, не нужны. В C# это значение фильтра будет выглядеть так: new AmqpDescribed("apache.org:selector-filter:string", (ulong)0x00000137000000A) { Value = "amqp.annotation.x-opt-offset > @latest" };