#java #akka-stream
#java #akka-поток
Вопрос:
Я пытаюсь использовать Akka Stream для обработки большого файла (источник — это файл, приемник — простой println
приемник), и вот мой код:
final Path file = Paths.get("f:/someFile");
// sink, do something interesting later
Sink<ByteString, CompletionStage<Done>> printlnSink = Sink.<ByteString>foreach(chunk -> {
String line = chunk.utf8String(); // turn each chunk to a String
System.out.println(line);
});
// flow, which has a chunk size
ByteString separator1 = ByteString.fromString("."); // this will work
ByteString separator2 = ByteString.fromString(".n"); // this will NOT work
final Flow<ByteString, ByteString, NotUsed> flow = Framing.delimiter(separator1, 1000, FramingTruncation.ALLOW);
// put them together and let it run
CompletionStage<IOResult> ioResult = FileIO
.fromPath(file)
.via(flow)
.to(printlnSink)
.run(system);
поэтому, если я использую separator1
to Framing.delimiter(...)
, он будет работать нормально, но если я использую separator2
to Framing.delimiter(...)
, он вообще не будет работать.
Вот часть файла, который я хочу обработать:
@prefix ns1: <http://www.w3.org/2004/02/skos/core#> .
@prefix ns2: <http://hadatac.org/ont/hasco/> .
@prefix ns3: <http://purl.org/dc/terms/> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix sio: <http://semanticscience.org/resource/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
ns2:Cohort a owl:Class ;
rdfs:subClassOf <http://purl.org/twc/HHEAR_00050> .
ns2:wasApprovedBy a owl:AnnotationProperty .
Как вы можете видеть, я не могу использовать single .
в качестве разделителя, потому что это происходит в середине предложения во многих местах. Вот почему мне нужно использовать .n
в качестве разделителя, который не работает.
Может кто-нибудь помочь мне проверить, что я сделал не так?
Обновить:
вот мой импорт:
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.*;
import akka.stream.javadsl.*;
import akka.util.ByteString;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
Я использую Java 8, и вот мой файл сборки:
scalaVersion := "2.13.3"
val AkkaVersion = "2.6.10"
libraryDependencies = "com.typesafe.akka" %% "akka-stream" % AkkaVersion
Комментарии:
1. это очень странно. Я скопировал ваш код и использовал оба разделителя. Оба работают. Теперь я не уверен, вставляю ли я свой код здесь или прошу вас добавить свой импорт и версию akka.
2. @Felipe, большое вам спасибо за то, что изучили это. Пожалуйста, посмотрите Мое обновление в исходном вопросе — я вставил свой импорт и версию akka.
3. @Felipe, я заставил это работать, единственное, что я изменил, было здесь,
Framing.delimiter(separator2, 1000, FramingTruncation.ALLOW)
, изменение с1000
на10000
заставило его работать просто отлично. Еще раз спасибо, что изучили это.4. с другой стороны, трудно поверить, что даже исходных 1000 недостаточно… первая строка в файле не имеет длины 1000…. Кроме того, для усечения установлено значение
ALLOW
, не уверен, как Akka может просто остановить обработку. Я думаю, нужно прочитать его исходный код, чтобы понять это.