Ошибки пользовательского процессора Nifi с «Обнаружена служба управления, которая была не службой управления WebSocket, а com.sun.proxy.$Proxy75»

#websocket #customization #apache-nifi #processor

Вопрос:

Я пытаюсь разработать приложение Nifi, которое предоставляет интерфейс WebSocket для Kakfa. Я не смог этого сделать, используя стандартные компоненты Nifi, как я пытался сделать ниже (возможно, это не имеет смысла, но интуитивно я хочу этого добиться).:

Использование Стандартных Компонентов

Теперь я создал пользовательский процессор «ReadFromKafka», который я намерен использовать, как показано на рисунке ниже. «ReadFromKafka «будет использовать ту же реализацию, что и стандартный компонент» PutWebSocket», но будет считывать сообщения из темы Кафки и отправлять в качестве ответа клиенту WebSocket.

введите описание изображения здесь

Я предоставил фрагмент кода реализации ниже:

 @SystemResourceConsideration(resource = SystemResource.MEMORY)
public class ReadFromKafka extends AbstractProcessor {


    public static final PropertyDescriptor PROP_WS_SESSION_ID = new PropertyDescriptor.Builder()
            .name("websocket-session-id")
            .displayName("WebSocket Session Id")
            .description("A NiFi Expression to retrieve the session id. If not specified, a message will be "  
                    "sent to all connected WebSocket peers for the WebSocket controller service endpoint.")
            .required(true)
            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
            .defaultValue("${"   ATTR_WS_SESSION_ID   "}")
            .build();

    public static final PropertyDescriptor PROP_WS_CONTROLLER_SERVICE_ID = new PropertyDescriptor.Builder()
            .name("websocket-controller-service-id")
            .displayName("WebSocket ControllerService Id")
            .description("A NiFi Expression to retrieve the id of a WebSocket ControllerService.")
            .required(true)
            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
            .defaultValue("${"   ATTR_WS_CS_ID   "}")
            .build();

    public static final PropertyDescriptor PROP_WS_CONTROLLER_SERVICE_ENDPOINT = new PropertyDescriptor.Builder()
            .name("websocket-endpoint-id")
            .displayName("WebSocket Endpoint Id")
            .description("A NiFi Expression to retrieve the endpoint id of a WebSocket ControllerService.")
            .required(true)
            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
            .defaultValue("${"   ATTR_WS_ENDPOINT_ID   "}")
            .build();

    public static final PropertyDescriptor PROP_WS_MESSAGE_TYPE = new PropertyDescriptor.Builder()
            .name("websocket-message-type")
            .displayName("WebSocket Message Type")
            .description("The type of message content: TEXT or BINARY")
            .required(true)
            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
            .defaultValue(WebSocketMessage.Type.TEXT.toString())
            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
            .build();

    public static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("success")
            .description("FlowFiles that are sent successfully to the destination are transferred to this relationship.")
            .build();
    public static final Relationship REL_FAILURE = new Relationship.Builder()
            .name("failure")
            .description("FlowFiles that failed to send to the destination are transferred to this relationship.")
            .build();

    private static final List<PropertyDescriptor> descriptors;
    private static final Set<Relationship> relationships;

    static{
        final List<PropertyDescriptor> innerDescriptorsList = new ArrayList<>();
        innerDescriptorsList.add(PROP_WS_SESSION_ID);
        innerDescriptorsList.add(PROP_WS_CONTROLLER_SERVICE_ID);
        innerDescriptorsList.add(PROP_WS_CONTROLLER_SERVICE_ENDPOINT);
        innerDescriptorsList.add(PROP_WS_MESSAGE_TYPE);
        descriptors = Collections.unmodifiableList(innerDescriptorsList);

        final Set<Relationship> innerRelationshipsSet = new HashSet<>();
        innerRelationshipsSet.add(REL_SUCCESS);
        innerRelationshipsSet.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(innerRelationshipsSet);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession processSession) throws ProcessException {
        final FlowFile flowfile = processSession.get();
        if (flowfile == null) {
            return;
        }

        final String sessionId = context.getProperty(PROP_WS_SESSION_ID)
                .evaluateAttributeExpressions(flowfile).getValue();
        final String webSocketServiceId = context.getProperty(PROP_WS_CONTROLLER_SERVICE_ID)
                .evaluateAttributeExpressions(flowfile).getValue();
        final String webSocketServiceEndpoint = context.getProperty(PROP_WS_CONTROLLER_SERVICE_ENDPOINT)
                .evaluateAttributeExpressions(flowfile).getValue();
        final String messageTypeStr = context.getProperty(PROP_WS_MESSAGE_TYPE)
                .evaluateAttributeExpressions(flowfile).getValue();
        final WebSocketMessage.Type messageType = WebSocketMessage.Type.valueOf(messageTypeStr);

        if (StringUtils.isEmpty(sessionId)) {
            getLogger().debug("Specific SessionID not specified. Message will be broadcast to all connected clients.");
        }

        if (StringUtils.isEmpty(webSocketServiceId)
                || StringUtils.isEmpty(webSocketServiceEndpoint)) {
            transferToFailure(processSession, flowfile, "Required WebSocket attribute was not found.");
            return;
        }

        final ControllerService controllerService = context.getControllerServiceLookup().getControllerService(webSocketServiceId);
        if (controllerService == null) {
            getLogger().debug("ControllerService is NULL");
            transferToFailure(processSession, flowfile, "WebSocket ControllerService was not found.");
            return;
        } else if (!(controllerService instanceof WebSocketService)) {
            getLogger().debug("ControllerService is not instance of WebSocketService");
            transferToFailure(processSession, flowfile, "The ControllerService found was not a WebSocket ControllerService but a "
                      controllerService.getClass().getName());
            return;
        }

        ...

            processSession.getProvenanceReporter().send(updatedFlowFile, transitUri.get(), transmissionMillis);

            processSession.transfer(updatedFlowFile, REL_SUCCESS);
            processSession.commit();

        } catch (WebSocketConfigurationException|IllegalStateException|IOException e) {
            // WebSocketConfigurationException: If the corresponding WebSocketGatewayProcessor has been stopped.
            // IllegalStateException: Session is already closed or not found.
            // IOException: other IO error.
            getLogger().error("Failed to send message via WebSocket due to "   e, e);
            transferToFailure(processSession, flowfile, e.toString());
        }

    }

    private FlowFile transferToFailure(final ProcessSession processSession, FlowFile flowfile, final String value) {
        flowfile = processSession.putAttribute(flowfile, ATTR_WS_FAILURE_DETAIL, value);
        processSession.transfer(flowfile, REL_FAILURE);
        return flowfile;
    }
}
 

Я развернул пользовательский процессор, и когда я подключаюсь к нему с помощью Chrome «Простой клиент веб-сокета», я вижу следующее сообщение в журналах:

Обнаруженный сервис ControllerService был не сервисом управления WebSocket, а com.sun.proxy.$Proxy75

Я использую тот же код, что и в PutWebSocket, и не могу понять, почему он будет вести себя иначе, когда я использую свой пользовательский процессор. Я настроил «JettyWebSocketServer» в качестве службы управления в разделе «ListenWebSocket», как показано на рисунке ниже. введите описание изображения здесь

Дополнительные сведения об исключениях, отображаемые в журнале, приведены ниже:

java.lang.Исключение ClassCastException: класс com.sun.proxy.$Proxy75 не может быть передан классу org.apache.nifi.websocket.Сервис WebSocketService (com.sun.proxy.$Proxy75 находится в безымянном модуле загрузчика org.apache.nifi.nar.InstanceClassLoader @35c646b5; org.apache.nifi.websocket.WebSocketService находится в безымянном модуле загрузчика org.apache.nifi.nar.NarClassLoader @361abd01)

Комментарии:

1. Я пытаюсь подобрать эту резервную копию — основная проблема, похоже, заключается в следующем исключении: java.lang. ClassCastException: класс com.sun.proxy.$Proxy75 не может быть приведен к классу org.apache.nifi.websocket. Сервис WebSocketService (com.sun.proxy.$Proxy75 находится в безымянном модуле загрузчика org.apache.nifi.nar.InstanceClassLoader @35c646b5; org.apache.nifi.websocket. WebSocketService находится в безымянном модуле загрузчика org.apache.nifi.nar.NarClassLoader @361abd01)