StreamConsumer.java
/*
* Licensed to The Apereo Foundation under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
*
* The Apereo Foundation licenses this file to you under the Educational
* Community License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License
* at:
*
* http://opensource.org/licenses/ecl2.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/
package org.opencastproject.util;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
/**
* A StreamConsumer helps to asynchronously consume a text input stream line by line.
* The consumer guarantees the closing of the stream.
*/
public class StreamConsumer implements Runnable {
private final CountDownLatch running = new CountDownLatch(1);
private final CountDownLatch ready = new CountDownLatch(1);
private final CountDownLatch finished = new CountDownLatch(1);
private final Function<String, Boolean> consumer;
private boolean stopped = false;
private InputStream stream;
private BufferedReader reader;
/**
* Create a new stream consumer.
*
* @param consumer
* a predicate function that may stop reading further lines by returning <code>false</code>
*/
public StreamConsumer(Function<String, Boolean> consumer) {
this.consumer = consumer;
}
@Override public void run() {
try {
running.countDown();
ready.await();
// also save a reference to the reader to able to close it in stopReading
// otherwise the read loop may continue reading from the buffer
reader = new BufferedReader(new InputStreamReader(stream));
IoSupport.<Void, BufferedReader>withResource(reader, consumeBufferedFunction);
finished.countDown();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/** Wait for the executing thread to run. */
public void waitUntilRunning() {
try {
running.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/** Wait until the stream has been fully consumed. */
public void waitUntilFinished() {
try {
finished.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/** Forcibly stop consuming the stream. */
public void stopConsuming() {
if (stream != null) {
stopped = true;
IoSupport.closeQuietly(stream);
IoSupport.closeQuietly(reader);
}
}
/** Start consuming <code>stream</code>. It is guaranteed that the stream gets closed. */
public void consume(InputStream stream) {
waitUntilRunning();
this.stream = stream;
ready.countDown();
}
private final Function<BufferedReader, Void> consumeBufferedFunction = new Function<BufferedReader, Void>() {
@Override
public Void apply(BufferedReader bufferedReader) {
consumeBuffered(bufferedReader);
return null; // Void return
}
};
private void consumeBuffered(BufferedReader reader) {
try {
String line;
while ((line = reader.readLine()) != null) {
if (!consumer.apply(line)) {
stopConsuming();
break;
}
}
} catch (IOException e) {
if (!stopped) {
throw new RuntimeException(e);
}
}
}
}