Как отправить сообщение из eventhub в другой eventhub?

#c# #azure #azure-eventhub

#c# #azure #azure-eventhub

Вопрос:

Я хотел бы отправить клиенту eventhub, а затем загрузить его для выборки данных, таких как погода, и отправить другой eventhub. Мой код не работает должным образом. Ошибки нет, но данные не отправляются в базу данных.

 public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
                try
            {
                foreach (EventData message in messages)
                {
                        string data = Encoding.UTF8.GetString(message.GetBytes());
                        NewClient Client = JsonConvert.DeserializeObject<NewClient>(data);
                    if (Client.City != null amp;amp; Client.Street != null )
                    {

                        GoogleGeoApi GeoClient = new GoogleGeoApi();
                        GeoClient.SetCoordinates(Client.City, Client.Street);
                        WeatherApi WeatherApiobject = new WeatherApi();
                        WeatherApiobject.GetJson(GeoClient.convertlat, GeoClient.convertlng);
                        string weatherdata = WeatherApiobject.sendEvent;
                        SenderEvent NewSenderEvent = new SenderEvent();
                        NewSenderEvent.DataSender(weatherdata, ConstFile.WeatherEventHubName);
                        //StartH(ConstFile.WeatherEventHubName).Wait();
                    }

                    Interlocked.Increment(ref this.totalMessages);
                    this.LastMessageOffset = message.Offset;
                }

                if (this.IsClosed)
                {
                    this.IsReceivedMessageAfterClose = true;
                }

                if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(1))
                {
                    lock (this)
                    {
                        this.checkpointStopWatch.Reset();
                        return context.CheckpointAsync();
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine("{0} > Event Hub Exception: {1}", DateTime.Now, ex.Message);
            }

            return Task.FromResult<object>(null);
        }
  

Я добавлю, что мой приемник eventhub выглядит следующим образом:
https://code.msdn.microsoft.com/Service-Bus-Event-Hub-45f43fc3/view/SourceCode#content

Комментарии:

1. Что это за класс SenderEvent , отвечает ли этот класс за отправку данных другому EventHub ? Если да, пожалуйста, опубликуйте этот код.

2. Вставляем приведенный ниже код. SenderEvent одинаков для обоих сообщений (client и weatherdata)

3. И достигает ли приложение строки eventhubclient.Send(data1); вставленного кода без исключений? И какой процесс прослушивает второй eventhub?

4. Надеюсь, вы хорошо поняли. У меня нет никаких исключений. В комментарии ниже я добавляю прослушивание кода eventhub.

Ответ №1:

Вы можете использовать Stream Analytics, чтобы сделать это очень легко! Ваш первый концентратор событий — это входные данные в Stream Analytics. Затем вы можете написать запрос к потоку (Выберите * из [Input1] … это даст вам все). Вы можете вывести поток обратно в другой Eventhub.

Комментарии:

1. Это кажется лучшим способом достижения! Очень просто.

Ответ №2:

  public class SenderEvent
    {
        public void DataSender(string data, string eventhubname)
        {
            var eventhubclient = EventHubClient.CreateFromConnectionString(ConstFile.eventHubConnectionString, eventhubname);       
            EventData data1 = new EventData(Encoding.UTF8.GetBytes(data));
            eventhubclient.Send(data1);
        }
  

Ответ №3:

 private static async Task StartHost(string eventHubName)
        {
            string eventProcessorHostName = "1";
            string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", ConstFile.storageAccountName, ConstFile.storageAccountKey);
            host = new EventProcessorHost(
                eventProcessorHostName,
                eventHubName,
                ConstFile.ConsumerGroup,
                ConstFile.eventHubConnectionString,
                storageConnectionString, eventHubName.ToLowerInvariant());

            factory = new DemoEventProcessorFactory(eventProcessorHostName);

            try
            {
                var options = new EventProcessorOptions();
                options.ExceptionReceived  = (sender, e) => { Console.WriteLine(e.Exception); };
                await host.RegisterEventProcessorFactoryAsync(factory);
            }
            catch (Exception exception)
            {
                Console.ForegroundColor = ConsoleColor.Red;
                Console.WriteLine("{0} > Exception: {1}", DateTime.Now.ToString(), exception.Message);
                Console.ResetColor();
            }
        }
    }
}
  

Ответ №4:

Это зависит:

  1. если в вашем центре событий низкая задержка — вы можете использовать триггер Event Hub (функция Azure, которая будет запускаться всякий раз, когда центр событий получает новое событие). В вашей функции Azure вы используете код, как в примере
  2. Если у вас много событий — вы можете попробовать Azure stream Analytics отфильтровать ваши данные о погоде и отправить их в другой eventhub. Вы строите конвейерную линию следующим образом:

    EventHub1 -> AzureStreamAnalytics -> EventHub2