Необходимо использовать из темы Kakfa и динамически предоставлять выражение cron (полученное из темы kafka) в quartz2

#dynamic #apache-kafka #cron #apache-camel #quartz-scheduler

#динамический #apache-kafka #cron #apache-camel #quartz-планировщик

Вопрос:

Постановка проблемы — мне нужно использовать из темы kafka, где я получаю выражение cron в качестве заголовка, и мне нужно запустить QUARTZ и динамически предоставлять выражение cron.

Вот мой фрагмент кода —

 from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}......")
    .routeId(routeId)
    //accessing all the header value and then forming a valid cron expression
    .process(new Processor() {
        @Override 
        public void process(Exchange exchange) throws Exception {
            String sec=exchange.getIn().getHeader("second",String.class);
            String min=exchange.getIn().getHeader("minute",String.class);
            String hours=exchange.getIn().getHeader("hours",String.class);       
            body=exchange.getIn().getBody();
            log.info("The original Body : {}",body);
            // Method call to prepare the cron expression based on the headers value
            cronExp=cb.getCronExperssion(sec,min,hours,"?","*","*","*");
            //Setting up the final cron expression to the header    
            exchange.getIn().setHeader("timerObject", timerObj); 
            log.info("The timer object values are: {}", timerObj);
            log.info("The original scheduled time is : {}",startPolicy.getRouteStartTime());
        }
    })
    // because the quartz2 route can’t be at the to side, it should always be a from() that's why i am using the mock:result here, Any better way ???
    .to("mock:result");

// Here I need to give the cron expression which is being stored in the headers.
from("quartz2://group1/trigger1?cron=0/20 * * ? * * *amp;stateful=true")
    .routeId(“Scheduler Route”)
    .log(“Scheduler routed started …”)
    .to(“direct:httpRoute”);
 

Я пробовал несколько вещей, таких как enrich (), pollEnrich, но не смог применить выражение cron из значения заголовков. Пожалуйста, помогите….

Спасибо Дипак

Комментарии:

1. Что вы устанавливаете cronExp в заголовках? exchange.getIn().setHeader("cronExp", cronExp);

2. Это внутри метода .process(), вот код … Поскольку оно находится в заголовке, я не могу получить к нему доступ при формировании раздела from(«quartz2: // group1 / trigger1?cron =» timerInfo «amp; stateful = true»), поскольку мне нужно указать там CronExpression .

3. Вот код, в котором я настраиваю заголовки …..process(new Processor() { @Override public void process(Exchange exchange) выдает исключение { // Получение строки заголовков exchange sec=exchange.getIn().getHeader(«второй»,String.class ); Строка min=exchange.getIn().getHeader(«минута»,String.class ); cronExp=cb.getcronexpersion(сек, мин, часы,»?»,» «,» «,»*»);

4. Я предполагаю, что проблема здесь заключается в обновлении динамического значения cron во внешней переменной класса, которое также из анонимного внутреннего класса, т.е. .process(new processor() { ……