#avro #spring-cloud-stream #spring-cloud-stream-binder-kafka
#avro #spring-cloud-stream #spring-cloud-stream-binder-kafka
Вопрос:
Я пытаюсь использовать преобразователь в потоке spring cloud для использования json и создания записи схемы avro. Можете ли вы дать мне представление о том, как правильно это исправить? После выполнения всей документации я написал следующий трансформатор
@Bean
public Function<String, GenericRecord> transformMaestroDirectSearchRequest() {
return value -> {
try {
GenericRecord genericRecord = TransformJsonToAvro.convertJsonToAvro(
value, transformJsonToAvro.getMaestroDirectSearchSchema());
return genericRecord;
} catch (Exception e){
e.printStackTrace();
return null;
}
};
}
Но я продолжаю получать эту ошибку при запуске интеграционного теста
nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not an enum:
@Test
public void maestroSearchRequestTransformTest() throws IOException, URISyntaxException {
GenericMessage<byte[]> inputMessage =
new GenericMessage<>(Utils.getJsonFile("Maestro_direct_Req.json").getBytes());
this.input.send(inputMessage);
Message<byte[]> receive = this.output.receive();
System.out.println(receive.getPayload());
}
Есть идеи о том, как я могу это решить?
Я уже проверил, что логика преобразования для создания GenericRecord работает хорошо, и в этом нет ничего плохого (т. Е. Создается действительный объект GenericRecord)
Вот полная трассировка стека
2020-11-18 13:36:31.112 ERROR 89354 --- [ main] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@16fc5622]; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not an enum: {"type":"record","default":"null"}]}],"default":"null"}]} (through reference chain: org.apache.avro.generic.GenericData$Record["schema"]->org.apache.avro.Schema$RecordSchema["enumSymbols"])
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:289)
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertToInternal(ApplicationJsonMessageMarshallingConverter.java:69)
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:198)
at org.springframework.cloud.function.context.config.NegotiatingMessageConverterWrapper.toMessage(NegotiatingMessageConverterWrapper.java:125)
at org.springframework.cloud.function.context.config.NegotiatingMessageConverterWrapper.toMessage(NegotiatingMessageConverterWrapper.java:139)
at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertValueToMessage(SimpleFunctionRegistry.java:705)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputValueIfNecessary(SimpleFunctionRegistry.java:667)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:600)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:443)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:431)
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:71)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:609)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:129)
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:112)
at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:55)
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:387)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:92)
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:375)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:171)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:637)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:630)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:614)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:585)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:477)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:355)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:108)
... 93 more
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not an enum: {"type":"record",..... removed real json ,"default":"null"}]}],"default":"null"}]} (through reference chain: org.apache.avro.generic.GenericData$Record["schema"]->org.apache.avro.Schema$RecordSchema["enumSymbols"])
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:397)
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:356)
at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:316)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:763)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:728)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3058)
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:271)
... 125 more
Caused by: org.apache.avro.AvroRuntimeException: Not an enum: {"type":"record",... removed real json...."default":"null"}]}],"default":"null"}]}
at org.apache.avro.Schema.getEnumSymbols(Schema.java:206)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:689)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:755)
... 133 more
Комментарии:
1. Показать полную трассировку стека; усечение, вызванное трассировкой, делает его бесполезным.
2. @GaryRussell — добавлена полная трассировка стека. Я удалил реальные значения json, так как это сделало бы это слишком длинным.