#python-3.x #amazon-web-services #websocket #mqtt #aws-iot
Вопрос:
Я следую api aws для подключения к mqtt через веб-сайты. Ниже приведен мой код:
credentials_provider = AwsCredentialsProvider.new_static(
access_key_id = auth_response_dictionary['user']['accessKeyId'],
secret_access_key = auth_response_dictionary['user']['secretKey'],
session_token = auth_response_dictionary['user']['sessionToken']
)
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.websockets_with_default_aws_signing(
endpoint=auth_response_dictionary['user']['iotEndpoint'],
region=auth_response_dictionary['user']['region'],
credentials_provider=credentials_provider,
client_bootstrap=client_bootstrap,
client_id=clientId
)
print("Connecting to aws")
# Make the connect() call
connect_future = mqtt_connection.connect()
# Future.result() waits until a result is available
print('connect_future ' str(connect_future))
x= connect_future.result()
print('connect_future ' str(x))
print("Connected!")
future, packet_id = mqtt_connection.publish(topic=TOPIC, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)
future, packet_id = mqtt_connection.publish(topic='test/po', payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)
print('future ' str(future))
print('future ' str(packet_id))
print('Publish End')
Я не получаю никаких ошибок при подключении и публикации, но я не получаю никаких сообщений msg от своего брокера aws mqtt, когда я подписываюсь на эту тему в разделе «Тестирование».
Я думаю, что я настроил что-то не так в одном credentials_provider
client_bootstrap
или обоих, но не знаю, в чем.
Вот распечатанные журналы
Connecting to aws
connect_future<Future at 0x7f605f942af0 state=pending>
connect_future{'session_present': False}
Connected!
future <Future at 0x7f605f8e54f0 state=pending>
future 3
Publish End
Может кто-нибудь, пожалуйста, помочь?
Ответ №1:
mqtt_connection.subscribe(...)
используется для подписки на тему MQTT для сообщений AWS IoT, которую я нигде не вижу в вашем коде.
mqtt_connection.subscribe
вызывается, как показано ниже, с указанием названия темы, уровня качества обслуживания и обратного вызова.
received_count = 0
received_all_event = threading.Event()
...
topic='test/po'
print("Subscribing to topic '{}'...".format(topic))
subscribe_future, packet_id = mqtt_connection.subscribe(
topic=topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message_received)
subscribe_result = subscribe_future.result()
print("Subscribed with {}".format(str(subscribe_result['qos'])))
on_message_received
может выглядеть так:
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
print("Received message from topic '{}': {}".format(topic, payload))
global received_count
received_count = 1
# Number of messages to wait for
if received_count = 10:
received_all_event.set()
Затем в вашем основном методе вы можете подождать, пока не получите 10 сообщений:
# Wait for all messages to be received.
# This waits forever if count was set to 0.
if not received_all_event.is_set():
print("Waiting for all messages to be received...")
received_all_event.wait()
print("{} message(s) received.".format(received_count))
Есть действительно хороший пример кода, предоставленный AWS, который я бы рекомендовал вам проверить.
Комментарии:
1. Я подписался на свою опубликованную тему в консоли aws iot в разделе «тестирование». проблема в том, что я там ничего не получаю
2. Куда? Вам нужно подписаться на тему — где
test
бы она ни была, ее нет в вашем вопросе