#websocket #customization #apache-nifi #processor
Вопрос:
Я пытаюсь разработать приложение Nifi, которое предоставляет интерфейс WebSocket для Kakfa. Я не смог этого сделать, используя стандартные компоненты Nifi, как я пытался сделать ниже (возможно, это не имеет смысла, но интуитивно я хочу этого добиться).:
Теперь я создал пользовательский процессор «ReadFromKafka», который я намерен использовать, как показано на рисунке ниже. «ReadFromKafka «будет использовать ту же реализацию, что и стандартный компонент» PutWebSocket», но будет считывать сообщения из темы Кафки и отправлять в качестве ответа клиенту WebSocket.
Я предоставил фрагмент кода реализации ниже:
@SystemResourceConsideration(resource = SystemResource.MEMORY)
public class ReadFromKafka extends AbstractProcessor {
public static final PropertyDescriptor PROP_WS_SESSION_ID = new PropertyDescriptor.Builder()
.name("websocket-session-id")
.displayName("WebSocket Session Id")
.description("A NiFi Expression to retrieve the session id. If not specified, a message will be "
"sent to all connected WebSocket peers for the WebSocket controller service endpoint.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("${" ATTR_WS_SESSION_ID "}")
.build();
public static final PropertyDescriptor PROP_WS_CONTROLLER_SERVICE_ID = new PropertyDescriptor.Builder()
.name("websocket-controller-service-id")
.displayName("WebSocket ControllerService Id")
.description("A NiFi Expression to retrieve the id of a WebSocket ControllerService.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("${" ATTR_WS_CS_ID "}")
.build();
public static final PropertyDescriptor PROP_WS_CONTROLLER_SERVICE_ENDPOINT = new PropertyDescriptor.Builder()
.name("websocket-endpoint-id")
.displayName("WebSocket Endpoint Id")
.description("A NiFi Expression to retrieve the endpoint id of a WebSocket ControllerService.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("${" ATTR_WS_ENDPOINT_ID "}")
.build();
public static final PropertyDescriptor PROP_WS_MESSAGE_TYPE = new PropertyDescriptor.Builder()
.name("websocket-message-type")
.displayName("WebSocket Message Type")
.description("The type of message content: TEXT or BINARY")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.defaultValue(WebSocketMessage.Type.TEXT.toString())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are sent successfully to the destination are transferred to this relationship.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed to send to the destination are transferred to this relationship.")
.build();
private static final List<PropertyDescriptor> descriptors;
private static final Set<Relationship> relationships;
static{
final List<PropertyDescriptor> innerDescriptorsList = new ArrayList<>();
innerDescriptorsList.add(PROP_WS_SESSION_ID);
innerDescriptorsList.add(PROP_WS_CONTROLLER_SERVICE_ID);
innerDescriptorsList.add(PROP_WS_CONTROLLER_SERVICE_ENDPOINT);
innerDescriptorsList.add(PROP_WS_MESSAGE_TYPE);
descriptors = Collections.unmodifiableList(innerDescriptorsList);
final Set<Relationship> innerRelationshipsSet = new HashSet<>();
innerRelationshipsSet.add(REL_SUCCESS);
innerRelationshipsSet.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(innerRelationshipsSet);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession processSession) throws ProcessException {
final FlowFile flowfile = processSession.get();
if (flowfile == null) {
return;
}
final String sessionId = context.getProperty(PROP_WS_SESSION_ID)
.evaluateAttributeExpressions(flowfile).getValue();
final String webSocketServiceId = context.getProperty(PROP_WS_CONTROLLER_SERVICE_ID)
.evaluateAttributeExpressions(flowfile).getValue();
final String webSocketServiceEndpoint = context.getProperty(PROP_WS_CONTROLLER_SERVICE_ENDPOINT)
.evaluateAttributeExpressions(flowfile).getValue();
final String messageTypeStr = context.getProperty(PROP_WS_MESSAGE_TYPE)
.evaluateAttributeExpressions(flowfile).getValue();
final WebSocketMessage.Type messageType = WebSocketMessage.Type.valueOf(messageTypeStr);
if (StringUtils.isEmpty(sessionId)) {
getLogger().debug("Specific SessionID not specified. Message will be broadcast to all connected clients.");
}
if (StringUtils.isEmpty(webSocketServiceId)
|| StringUtils.isEmpty(webSocketServiceEndpoint)) {
transferToFailure(processSession, flowfile, "Required WebSocket attribute was not found.");
return;
}
final ControllerService controllerService = context.getControllerServiceLookup().getControllerService(webSocketServiceId);
if (controllerService == null) {
getLogger().debug("ControllerService is NULL");
transferToFailure(processSession, flowfile, "WebSocket ControllerService was not found.");
return;
} else if (!(controllerService instanceof WebSocketService)) {
getLogger().debug("ControllerService is not instance of WebSocketService");
transferToFailure(processSession, flowfile, "The ControllerService found was not a WebSocket ControllerService but a "
controllerService.getClass().getName());
return;
}
...
processSession.getProvenanceReporter().send(updatedFlowFile, transitUri.get(), transmissionMillis);
processSession.transfer(updatedFlowFile, REL_SUCCESS);
processSession.commit();
} catch (WebSocketConfigurationException|IllegalStateException|IOException e) {
// WebSocketConfigurationException: If the corresponding WebSocketGatewayProcessor has been stopped.
// IllegalStateException: Session is already closed or not found.
// IOException: other IO error.
getLogger().error("Failed to send message via WebSocket due to " e, e);
transferToFailure(processSession, flowfile, e.toString());
}
}
private FlowFile transferToFailure(final ProcessSession processSession, FlowFile flowfile, final String value) {
flowfile = processSession.putAttribute(flowfile, ATTR_WS_FAILURE_DETAIL, value);
processSession.transfer(flowfile, REL_FAILURE);
return flowfile;
}
}
Я развернул пользовательский процессор, и когда я подключаюсь к нему с помощью Chrome «Простой клиент веб-сокета», я вижу следующее сообщение в журналах:
Обнаруженный сервис ControllerService был не сервисом управления WebSocket, а com.sun.proxy.$Proxy75
Я использую тот же код, что и в PutWebSocket, и не могу понять, почему он будет вести себя иначе, когда я использую свой пользовательский процессор. Я настроил «JettyWebSocketServer» в качестве службы управления в разделе «ListenWebSocket», как показано на рисунке ниже.
Дополнительные сведения об исключениях, отображаемые в журнале, приведены ниже:
java.lang.Исключение ClassCastException: класс com.sun.proxy.$Proxy75 не может быть передан классу org.apache.nifi.websocket.Сервис WebSocketService (com.sun.proxy.$Proxy75 находится в безымянном модуле загрузчика org.apache.nifi.nar.InstanceClassLoader @35c646b5; org.apache.nifi.websocket.WebSocketService находится в безымянном модуле загрузчика org.apache.nifi.nar.NarClassLoader @361abd01)
Комментарии:
1. Я пытаюсь подобрать эту резервную копию — основная проблема, похоже, заключается в следующем исключении: java.lang. ClassCastException: класс com.sun.proxy.$Proxy75 не может быть приведен к классу org.apache.nifi.websocket. Сервис WebSocketService (com.sun.proxy.$Proxy75 находится в безымянном модуле загрузчика org.apache.nifi.nar.InstanceClassLoader @35c646b5; org.apache.nifi.websocket. WebSocketService находится в безымянном модуле загрузчика org.apache.nifi.nar.NarClassLoader @361abd01)