Unified Information Access Blog

Welcome to Attivio's Unified Information Access Blog. Join us for discussions on topics ranging from enterprise search solutions, information access insights, Agile software development methodology to programming with Java. We hope you'll find the articles informative and participate in the discussions by leaving a comment.

Share


The Problem

If you've been a Java programmer for long enough you've probably come across the situation where you have one API that can write to an OutputStream and you need to stitch it to another API which wants to read from an InputStream. No problem right? The Java API is large and there must be a class implements InputStream that can be constructed with an OutputStream. What? No? Well, then a quick internet search should reveal a utility class that will do the trick.

If you do a google search for the terms OutputStream InputStream you will quickly find Stephen Ostermiller's excellent article that outlines three approaches to solving the problem. The PipedInputStream and PipedOutputStream approach is one I've used in the past, but the solution falls short in the area of exception handling. What happens if while writing the output an exception occurs? This exception will be uncaught in the new thread. The thread reading the InputStream will never see the cause of the exception. With this in mind and with the desire to make it easy to reuse the piped solution we created the a static method and a supporting class and interface shown below.

Using the new method requires only a few lines. Any IOException thrown by class2.write will get re-thrown by the class1.read method. If class2.write throws other types of exceptions these can be caught and wrapped with an IOException.

class1.read(IOUtils.toInputStream(new OutputStreamConsumer() {
public void write(OutputStream os) throws IOException {
class2.write(os);
}
}));

The Code


/**
* Creates an InputStream which reads data written by the {@link OutputStreamConsumer}.
* An OutputStreamConsumer is an interface which provides a single write method that takes
* an OutputStream.
*
* Sample use
*
* {@literal
* class1.read(IOUtils.toInputStream(new OutputStreamConsumer() {
* @Override
* public void write(OutputStream os) throws IOException {
* class2.write(os);
* }
* }));
*
*
* @param ss
* @return an InputStream which reads data written by the OutputStreamConsumer.
*/
public static InputStream toInputStream(final OutputStreamConsumer ss) throws IOException {
final ExceptionPassingPipedInputStream in = new ExceptionPassingPipedInputStream();
final PipedOutputStream out = new PipedOutputStream(in);
new Thread(
new Runnable(){
public void run(){
try {
ss.write(out);
} catch (IOException e) {
in.ex = e;
}
}
}
).start();
return in;
}

/**
* A class used to pass IOExceptions generated while writing to an associated output stream
* to the reader.
*/
private static class ExceptionPassingPipedInputStream extends PipedInputStream {
private IOException ex = null;

@Override
public synchronized int available() throws IOException {
if (ex!=null) throwException();
return super.available();
}
@Override
public synchronized int read() throws IOException {
if (ex!=null) throwException();
return super.read();
}
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
if (ex!=null) throwException();
return super.read(b, off, len);
}
@Override
public void close() throws IOException {
super.close();
if (ex!=null) throwException();
}

public synchronized void throwException() throws IOException {
if (ex == null) return;
try { throw new IOException(ex); }
finally { ex = null; }
}

}

/** Used for copying an OutputStream into an InputStream.
* @see IOUtils#toInputStream(OutputStreamConsumer)
* */
public static interface OutputStreamConsumer {
/** Write contents to OutputStream. */
void write(OutputStream s) throws IOException;
}


Author Bio

Since graduating from MIT with a computer science degree, Martin Serrano has spent most his career developing high-performance distributed systems with a concentration in analytic algorithms. In 1993 he was one of the first engineers to develop commercial data processing software for a parallel computer (the CM5 from Thinking Machines), including efficient algorithms for multiplying large floating-point vectors and porting the popular gzip compression program. As the first engineer hired by high-performance computing systems integrator, Tessera Enterprise Systems (later acquired by iXL), Martin was responsible for developing data visualization software and led the team (in 1996) that designed AT&T's next generation business intelligence data warehouse which held 17TB of data. Martin then joined Ab Initio Software, the defacto leader in high-performance parallel business data processing. As an early employee at Ab Initio, Martin was responsible for the design and development of the SAS AnalyzerTM , a system that automatically analyzes programs from the SAS Institute's analytic software suite and schedules them for parallel execution. Turning his attention to bioinformatics, Martin became a Group Leader at the Broad Institute, responsible for leading the team which develops one of the largest chemical compound and biological experiment results search websites available. Martin is currently one of the Chief Architects at Attivio.

Trackback(0)
Comments (2)add comment

Davide Simonetti said:

...
Hi, i came across yor article and i was really surprised because i had the same problem and the first code i wrote was pretty similar to yours. After i used for two years in a production environment i had the chance to improve it. (in your code it is eventually possible a lock if the internal thread throws an exception after the reading thread checked for ex!=null. I'll tell you more detaily if you need. I think a finally block with a out.close() inside the Thread should solve the problem).

I made an open source library with my code (and other utilities for streams). I'm sure you'll find it interesting. Have a look at the io-tools project at googlecode. Some of the features are:

A bit simpler usage (no need of the OutputConsumer implementation, just subclass the InputStreamFromOutputStream).
It allows to return a result of the elaboration, not only an exception.
The eventual exception of the internal thread is returned upon the invocation of getResult()
It uses the jvm1.5 ExecutorService to instantiate threads (for better performances).
It allows you to set the pipe circular buffer size (the 1k default is doesn't perform well for large streams).

Here is how you can use it:


final InputStreamFromOutputStream isos = new InputStreamFromOutputStream() {
@Override
public String produce(final OutputStream dataSink) throws Exception {
/*
* call your application function who produces the data here
* WARNING: we're in another thread here, so this method shouldn't
* write any class field or make assumptions on the state of the class.
*/
return produceMydata(dataId,dataSink)
}
};
try {
//now you can read from the InputStream the data that was written to the
//dataSink OutputStream
byte[] readed=IOUtils.toByteArray(isos);
//Use data here
} catch (final IOException e) {
//Handle exception here
} finally {
isos.close();
}
//You can get the result of produceMyData after the stream has been closed.
String resultOfProduction = isos.getResult();
}

I also made a class that works the opposite way: it is an OutputStream and let you read the data from an InputStream (OutputStreamToInputStream).

My code is distributed under the BSD software license, but if you need other licensing please contact me.

Best regards and congratulations for the article!
Davide.
March 28, 2009 | url

Davide Simonetti said:

...
oops... i forgot the link: http://code.google.com/p/io-to...EasyStream
Davide.
March 28, 2009 | url

Write comment
smaller | bigger

security image
Write the displayed characters


busy

SUBSCRIBE

blue-rss-icon.png

Enter your email address:

 

Connect with Attivio

ico_attivio_spiral.jpg facebook-Icon.png twitter-icon.png

Contact Us:
+1.857.226.5040

Recent Posts

UIA: Boosting Customer Loyalty (Or How NOT to Annoy Your Customers)

I am in Tel Aviv enjoying a beautiful summer afternoon with a new customer when my mood is altered by a credit card snafu. Back in the states, a wary...
Read More...

Clash of the Titans: Price and Performance for Data Warehousing and Analytics

Why are all the mega technology vendors suddenly so focused on next generation data warehousing and analytics? Primarily, it's because Oracle wants to...
Read More...

Enriching Unified Information Stores with Text Analytics

Too often getting access to all the relevant business information we need has forced us to undertake a journey across multiple sources, using different...
Read More...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8