странное поведение фрейминга.разделителя

#java #akka-stream

#java #akka-поток

Вопрос:

Я пытаюсь использовать Akka Stream для обработки большого файла (источник — это файл, приемник — простой println приемник), и вот мой код:

 final Path file = Paths.get("f:/someFile");

// sink, do something interesting later
Sink<ByteString, CompletionStage<Done>> printlnSink = Sink.<ByteString>foreach(chunk -> {
    String line = chunk.utf8String();  // turn each chunk to a String
    System.out.println(line);
});

// flow, which has a chunk size
ByteString separator1 = ByteString.fromString(".");    // this will work
ByteString separator2 = ByteString.fromString(".n");  // this will NOT work
final Flow<ByteString, ByteString, NotUsed> flow = Framing.delimiter(separator1, 1000, FramingTruncation.ALLOW);

// put them together and let it run
CompletionStage<IOResult> ioResult = FileIO
        .fromPath(file)
        .via(flow)
        .to(printlnSink)
        .run(system);
 

поэтому, если я использую separator1 to Framing.delimiter(...) , он будет работать нормально, но если я использую separator2 to Framing.delimiter(...) , он вообще не будет работать.

Вот часть файла, который я хочу обработать:

 @prefix ns1: <http://www.w3.org/2004/02/skos/core#> .
@prefix ns2: <http://hadatac.org/ont/hasco/> .
@prefix ns3: <http://purl.org/dc/terms/> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix sio: <http://semanticscience.org/resource/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .

ns2:Cohort a owl:Class ;
    rdfs:subClassOf <http://purl.org/twc/HHEAR_00050> .

ns2:wasApprovedBy a owl:AnnotationProperty .
 

Как вы можете видеть, я не могу использовать single . в качестве разделителя, потому что это происходит в середине предложения во многих местах. Вот почему мне нужно использовать .n в качестве разделителя, который не работает.

Может кто-нибудь помочь мне проверить, что я сделал не так?

Обновить:

вот мой импорт:

 import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.*;
import akka.stream.javadsl.*;
import akka.util.ByteString;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
 

Я использую Java 8, и вот мой файл сборки:

 scalaVersion := "2.13.3"
val AkkaVersion = "2.6.10"
libraryDependencies  = "com.typesafe.akka" %% "akka-stream" % AkkaVersion
 

Комментарии:

1. это очень странно. Я скопировал ваш код и использовал оба разделителя. Оба работают. Теперь я не уверен, вставляю ли я свой код здесь или прошу вас добавить свой импорт и версию akka.

2. @Felipe, большое вам спасибо за то, что изучили это. Пожалуйста, посмотрите Мое обновление в исходном вопросе — я вставил свой импорт и версию akka.

3. @Felipe, я заставил это работать, единственное, что я изменил, было здесь, Framing.delimiter(separator2, 1000, FramingTruncation.ALLOW) , изменение с 1000 на 10000 заставило его работать просто отлично. Еще раз спасибо, что изучили это.

4. с другой стороны, трудно поверить, что даже исходных 1000 недостаточно… первая строка в файле не имеет длины 1000…. Кроме того, для усечения установлено значение ALLOW , не уверен, как Akka может просто остановить обработку. Я думаю, нужно прочитать его исходный код, чтобы понять это.