package jperipheral; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; import java.nio.charset.CoderResult; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.regex.Matcher; import java.util.regex.Pattern; import jperipheral.nio.channels.AsynchronousByteChannel; import jperipheral.nio.channels.CompletionHandler; import jperipheral.nio.channels.ReadPendingException; import jperipheral.nio.channels.ShutdownChannelGroupException; import jperipheral.nio.channels.WritePendingException; /** * An adapter that maps an AsynchronousCharChannel on top of an existing AsynchronousByteChannel. * * @author Gili Tzabari */ public class AsynchronousByteCharChannel implements AsynchronousCharChannel { /** * The underlying AsynchronousByteChannel. */ private final AsynchronousByteChannel channel; /** * Used by write() to encode bytes. */ private final CharsetEncoder encoder; /** * Used by read() and readLine() to decodes characters. */ private final CharsetDecoder decoder; /** * Searches for line delimiters. */ private final Pattern delimiters = Pattern.compile("\\r|\\n"); /** * Indicates if there is an ongoing read operation. */ private boolean readPending; /** * Indicates if there is an ongoing write operation. */ private boolean writePending; /** * Indicates if the subsequent newline character should be disregarded by readLine(). */ private boolean consumeNewline; /** * The buffer containing characters decoded from readBytes. */ private StringBuilder readString = new StringBuilder(); /** * The buffer that the underlying AsynchronousByteChannel writes into. */ private ByteBuffer readBytes = ByteBuffer.allocate(1024); /** * Creates a new AsynchronousCharChannel. * * @param channel * The underlying AsynchronousByteChannel * @param charset * The character set */ private AsynchronousByteCharChannel(AsynchronousByteChannel channel, Charset charset) { this.channel = channel; this.encoder = charset.newEncoder(); this.decoder = charset.newDecoder(); } /** * Opens an asynchronous character channel. * * @param channel * The underlying AsynchronousByteChannel * @param charset * The character set * @return A new asynchronous socket channel * @throws IOException * If an I/O error occurs */ public static AsynchronousByteCharChannel open(AsynchronousByteChannel channel, Charset charset) throws IOException { return new AsynchronousByteCharChannel(channel, charset); } public void read(CharBuffer target, A attachment, CompletionHandler handler) throws IllegalArgumentException, ReadPendingException, ShutdownChannelGroupException { if (target.isReadOnly()) throw new IllegalArgumentException("target may not be read-only"); if (target.remaining() <= 0) { handler.completed(0, attachment); return; } read(attachment, handler, new CharBufferWriter(target)); } /** * @see #read(CharBuffer,Object,CompletionHandler) */ private synchronized void read(A attachment, CompletionHandler handler, CharSequenceWriter target) throws IllegalArgumentException, ReadPendingException, ShutdownChannelGroupException { if (readPending) throw new ReadPendingException(); if (readString.length() > 0) { try { int charactersTransferred = target.write(readString); readString.delete(0, charactersTransferred); handler.completed(charactersTransferred, attachment); } catch (IOException e) { handler.failed(e, attachment); } return; } readPending = true; CompletionHandler readHandler = new ReadHandler(attachment, handler, target); channel.read(readBytes, readBytes, readHandler); } public Future read(CharBuffer target) throws IllegalArgumentException, ReadPendingException, ShutdownChannelGroupException { if (target.isReadOnly()) throw new IllegalArgumentException("target may not be read-only"); if (target.remaining() <= 0) return new CompletedFuture(0); return read(new CharBufferWriter(target)); } /** * @see #read(CharBuffer) */ private synchronized Future read(CharSequenceWriter target) throws IllegalArgumentException, ReadPendingException, ShutdownChannelGroupException { if (readPending) throw new ReadPendingException(); if (readString.length() > 0) { try { int charactersTransferred = target.write(readString); readString.delete(0, charactersTransferred); return new CompletedFuture(charactersTransferred); } catch (IOException e) { return new CompletedFuture(e); } } readPending = true; return new ReadFuture(channel.read(readBytes), readBytes, target); } public void readLine(StringBuilder target, A attachment, CompletionHandler handler) throws IllegalArgumentException, ReadPendingException, ShutdownChannelGroupException { read(attachment, new ReadLineHandler(handler, target), new LineWriter(target)); } public Future readLine(StringBuilder target) throws IllegalArgumentException, ReadPendingException, ShutdownChannelGroupException { return new ReadLineFuture(read(new LineWriter(target)), target); } public synchronized void write(CharBuffer source, A attachment, CompletionHandler handler, boolean endOfInput) throws WritePendingException, ShutdownChannelGroupException { if (writePending) throw new WritePendingException(); ByteBuffer bytesWritten = ByteBuffer.allocate((int) Math.ceil(encoder.maxBytesPerChar() * source.remaining())); // duplicate source to avoid modifying its position CharBuffer sourceCopy = source.duplicate(); CoderResult encodingResult = encoder.encode(sourceCopy, bytesWritten, endOfInput); if (encodingResult.isError()) { delegateError(encodingResult, attachment, handler); return; } writePending = true; CompletionHandler writeHandler = new WriteHandler(attachment, handler, source, endOfInput); channel.write(bytesWritten, bytesWritten, writeHandler); } public Future write(CharBuffer source, boolean endOfInput) throws WritePendingException, ShutdownChannelGroupException { if (writePending) throw new WritePendingException(); ByteBuffer bytesWritten = ByteBuffer.allocate((int) Math.ceil(encoder.maxBytesPerChar() * source.remaining())); // duplicate source to avoid modifying its position CharBuffer sourceCopy = source.duplicate(); CoderResult encodingResult = encoder.encode(sourceCopy, bytesWritten, endOfInput); if (encodingResult.isError()) { // TODO: do we really need PollableCompletionHandler here? PollableCompletionHandler handler = new PollableCompletionHandler(); delegateError(encodingResult, null, handler); return new CompletedFuture(handler.throwable); } writePending = true; bytesWritten.flip(); return new WriteFuture(channel.write(bytesWritten), source, bytesWritten, endOfInput); } @Override public void close() throws IOException { channel.close(); } @Override public boolean isOpen() { return channel.isOpen(); } /** * A Future for an operation that has already completed. * * @author Gili Tzabari */ private static class CompletedFuture implements Future { private final int charactersTransfered; private final Throwable throwable; /** * Creates a new CompletedFuture. * * @param charactersTransfered * The number of characters transfered */ public CompletedFuture(int charactersTransfered) { this.charactersTransfered = charactersTransfered; this.throwable = null; } /** * Creates a new CompletedFuture. * * @param throwable the Throwable thrown by the operation */ public CompletedFuture(Throwable throwable) { this.throwable = throwable; this.charactersTransfered = 0; } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { return true; } @Override public Integer get() throws InterruptedException, ExecutionException { if (throwable != null) throw new ExecutionException(throwable); return charactersTransfered; } @Override public Integer get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (throwable != null) throw new ExecutionException(throwable); return charactersTransfered; } } /** * Invoked when a byte reading operation completes. * * @param * The attachment type * @param bytesRead * The buffer from which bytes are to be retrieved * @param endOfInput * true if the end of stream has been reached * @param target * The buffer into which characters are to be transferred * @param attachment * The object to attach to the I/O operation; can be null * @param handler * The CompletionHandler to delegate to * @return true if the read operation completed */ private boolean onBytesRead(ByteBuffer bytesRead, boolean endOfInput, CharSequenceWriter target, A attachment, CompletionHandler handler) { try { decodeBytesForRead(decoder, bytesRead, endOfInput, readString); int charactersTransferred = target.write(readString); if (endOfInput && charactersTransferred == 0) charactersTransferred = -1; if (charactersTransferred != 0) { if (charactersTransferred > 0) readString.delete(0, charactersTransferred); handler.completed(charactersTransferred, attachment); return true; } } catch (IOException e) { handler.failed(e, attachment); return true; } return false; } /** * Decodes a ByteBuffer. * * @param decoder * The Charset decoder * @param source * The ByteBuffer to decode * @param endOfInput * true if the end of the stream has been reached * @param target * The buffer into which characters are to be transferred * @throws CharacterCodingException * If a decoding error occured */ private static void decodeBytesForRead(CharsetDecoder decoder, ByteBuffer source, boolean endOfInput, StringBuilder target) throws CharacterCodingException { final CharBuffer charBuffer = CharBuffer.allocate((int) Math.ceil(decoder.maxCharsPerByte() * source.remaining())); CoderResult decodingResult = decoder.decode(source, charBuffer, endOfInput); if (decodingResult.isError()) decodingResult.throwException(); charBuffer.flip(); target.append(charBuffer.toString()); if (endOfInput) flushDecoderForRead(decoder, charBuffer, target); } /** * Flush the CharsetDecoder. * * @param decoder * The Charset decoder * @param temp * A temporary buffer * @param target * The buffer into which characters are to be transferred */ private static void flushDecoderForRead(CharsetDecoder decoder, CharBuffer temp, StringBuilder target) { CoderResult decodingResult; do { temp.clear(); decodingResult = decoder.flush(temp); temp.flip(); target.append(temp.toString()); } while (!decodingResult.isUnderflow()); } /** * A handler for consuming the result of an asynchronous character reading operation. * * @author Gili Tzabari */ private class ReadHandler implements CompletionHandler { private final CompletionHandler handler; private final A attachment; private final CharSequenceWriter target; /** * Creates a new ReadHandler. * * @param attachment * The object to attach to the I/O operation; can be null * @param handler * The CompletionHandler to delegate to * @param target * The buffer into which characters are to be transferred */ public ReadHandler(A attachment, CompletionHandler handler, CharSequenceWriter target) { this.handler = handler; this.attachment = attachment; this.target = target; } @Override public void completed(Integer numBytesRead, ByteBuffer bytesRead) { if (onBytesRead(bytesRead, numBytesRead == -1, target, attachment, handler)) { synchronized (this) { readPending = false; } return; } // ask for more bytes bytesRead.compact(); channel.read(bytesRead, bytesRead, this); } @Override public void failed(Throwable throwable, ByteBuffer source) { handler.failed(throwable, attachment); synchronized (AsynchronousByteCharChannel.this) { readPending = false; } } } /** * A CompletionHandler that retains its results for future examination. * * @author Gili Tzabari */ private static class PollableCompletionHandler implements CompletionHandler { public Integer result; public Throwable throwable; @Override public void completed(Integer result, Void attachment) { this.result = result; } @Override public void failed(Throwable throwable, Void attachment) { this.throwable = throwable; } } /** * The result of an asynchronous character reading operation. * * @author Gili Tzabari */ private class ReadFuture implements Future { private final ByteBuffer source; private final CharSequenceWriter target; private Future future; private ExecutionException exception; /** * Creates a new ReadFuture. * * @param future * The result of an asynchronous byte reading operation * @param source * The buffer from which bytes are to be retrieved * @param target * The buffer into which characters are to be transferred */ private ReadFuture(Future future, ByteBuffer source, CharSequenceWriter target) { this.future = future; this.source = source; this.target = target; } @Override public boolean cancel(boolean mayInterruptIfRunning) { return future.cancel(mayInterruptIfRunning); } @Override public boolean isCancelled() { return future.isCancelled(); } @Override public boolean isDone() { return future.isDone(); } @Override public Integer get() throws CancellationException, InterruptedException, ExecutionException { if (exception != null) { // Prevent CharsetDecoder from being invoked again throw exception; } while (true) { boolean endOfInput = future.get() == -1; PollableCompletionHandler handler = new PollableCompletionHandler(); source.flip(); if (onBytesRead(source, endOfInput, target, null, handler)) { synchronized (AsynchronousByteCharChannel.this) { readPending = false; } if (handler.throwable != null) { exception = new ExecutionException(handler.throwable); throw exception; } return handler.result; } // ask for more bytes source.compact(); future = channel.read(source); } } @Override public Integer get(long timeout, TimeUnit unit) throws CancellationException, InterruptedException, ExecutionException, TimeoutException { if (exception != null) { // Prevent CharsetDecoder from being invoked again throw exception; } while (true) { boolean endOfInput = future.get(timeout, unit) == -1; PollableCompletionHandler handler = new PollableCompletionHandler(); if (onBytesRead(source, endOfInput, target, null, handler)) { synchronized (AsynchronousByteCharChannel.this) { readPending = false; } if (handler.throwable != null) { exception = new ExecutionException(handler.throwable); throw exception; } return handler.result; } // ask for more bytes source.compact(); future = channel.read(source); } } } /** * A handler for consuming the result of an asynchronous line reading operation. * * @author Gili Tzabari */ private class ReadLineHandler implements CompletionHandler { private final CompletionHandler handler; private final StringBuilder target; /** * The length of the StringBuilder before the operation began. */ private final int originalLength; /** * Creates a new ReadLineHandler. * * @param attachment * The object to attach to the I/O operation; can be null * @param handler * The CompletionHandler to delegate to * @param target * The buffer into which characters are to be transferred */ public ReadLineHandler(CompletionHandler handler, StringBuilder target) { this.handler = handler; this.target = target; this.originalLength = target.length(); } @Override public void completed(Integer numBytesRead, A attachment) { handler.completed(target.length() - originalLength, attachment); } @Override public void failed(Throwable throwable, A attachment) { handler.failed(throwable, attachment); } } /** * The result of an asynchronous line reading operation. * * @author Gili Tzabari */ private class ReadLineFuture implements Future { private Future future; private final StringBuilder target; /** * The length of the StringBuilder before the operation began. */ private final int originalLength; /** * Creates a new ReadLineFuture. * * @param future * The result of an asynchronous byte reading operation * @param target * The buffer into which characters are to be transferred */ private ReadLineFuture(Future future, StringBuilder target) { this.future = future; this.target = target; this.originalLength = target.length(); } @Override public boolean cancel(boolean mayInterruptIfRunning) { return future.cancel(mayInterruptIfRunning); } @Override public boolean isCancelled() { return future.isCancelled(); } @Override public boolean isDone() { return future.isDone(); } @Override public Integer get() throws CancellationException, InterruptedException, ExecutionException { future.get(); return target.length() - originalLength; } @Override public Integer get(long timeout, TimeUnit unit) throws CancellationException, InterruptedException, ExecutionException, TimeoutException { future.get(timeout, unit); return target.length() - originalLength; } } /** * Invoked when a byte reading operation completes. * * @param * The attachment type * @param source * The buffer from which characters are to be retrieved * @param sourceOffset * Source's initial position when the write operation began * @param bytesWritten * The bytes that were written * @param bytesRead * The buffer from which bytes are to be retrieved * @param target * The buffer into which characters are to be transferred * @param attachment * The object to attach to the I/O operation; can be null * @param handler * The CompletionHandler to delegate to * @param endOfInput * true if the end of the stream has been reached * @return true if the write operation completed */ private boolean onBytesWritten(CharBuffer source, int sourceOffset, ByteBuffer bytesWritten, A attachment, CompletionHandler handler, boolean endOfInput) { // Decode the bytes we sent out final CharBuffer charBuffer = CharBuffer.allocate((int) Math.ceil(decoder.maxCharsPerByte() * bytesWritten.remaining())); CoderResult decodingResult = decoder.decode(bytesWritten, charBuffer, endOfInput); if (decodingResult.isUnmappable()) { delegateError(decodingResult, attachment, handler); return true; } if (decodingResult.isMalformed()) { // The write operation stopped in the middle of a codepoint skipWellFormedBytes(source, bytesWritten, decoder); bytesWritten.clear(); putCodepoint(bytesWritten, source.toString().codePointAt(0), encoder); if (bytesWritten.position() > decodingResult.length()) { // Send the rest of the codepoint bytesWritten.position(decodingResult.length()); return false; } delegateError(decodingResult, attachment, handler); return true; } // We sent a whole number of codepoints source.position(source.position() + charBuffer.position()); if (endOfInput) flushDecoderForWrite(decoder, charBuffer, source); handler.completed(source.position() - sourceOffset, attachment); return true; } /** * Skips any well-formed bytes that were written. * * @param source * The buffer from which characters are to be retrieved * @param malformedBytes * The buffer containing the malformed bytes */ private static void skipWellFormedBytes(CharBuffer source, ByteBuffer malformedBytes, CharsetDecoder decoder) { malformedBytes.flip(); CharBuffer charBuffer = CharBuffer.allocate((int) Math.ceil(decoder.maxCharsPerByte() * malformedBytes.remaining())); CoderResult decodingResult = decoder.decode(malformedBytes, charBuffer, true); assertCoderResult(decodingResult); source.position(source.position() + charBuffer.position()); } /** * Puts a codepoint into a ByteBuffer. * * @param target * The buffer to write into * @param codepoint * The codepoint */ private static void putCodepoint(ByteBuffer target, int codepoint, CharsetEncoder encoder) { // Encode one codepoint int charCount = Character.charCount(codepoint); CharBuffer codepointBuffer = CharBuffer.allocate(charCount); codepointBuffer.put(Character.toChars(codepoint)); codepointBuffer.flip(); CoderResult result = encoder.encode(codepointBuffer, target, true); assertCoderResult(result); } /** * Throws AssertionError if an CoderResult error has occured. * * @param coderResult * The result of an coding operation * @throws AssertionError * If coderResult.isError() */ private static void assertCoderResult(CoderResult coderResult) throws AssertionError { if (coderResult.isError()) { try { coderResult.throwException(); } catch (CharacterCodingException e) { throw new AssertionError(e); } } } /** * Delegate a decoding error if necessary. * * @param * The attachment type * @param coderResult * The CoderResult * @param attachment * The object to attach to the I/O operation; can be null * @param handler * The CompletionHandler to delegate to */ private static void delegateError(CoderResult coderResult, A attachment, CompletionHandler handler) { try { coderResult.throwException(); } catch (CharacterCodingException e) { handler.failed(e, attachment); } } /** * Flush the CharsetDecoder. * * @param decoder * The Charset decoder * @param temp * A temporary buffer * @param source * The buffer from which bytes are to be retrieved */ private static void flushDecoderForWrite(CharsetDecoder decoder, CharBuffer charBuffer, CharBuffer source) { CoderResult decodingResult; do { charBuffer.clear(); decodingResult = decoder.flush(charBuffer); source.position(source.position() + charBuffer.position()); } while (!decodingResult.isUnderflow()); } /** * A handler for consuming the result of an asynchronous character writing operation. * * @author Gili Tzabari */ private class WriteHandler implements CompletionHandler { private final CompletionHandler handler; private final A attachment; private final CharBuffer source; /** * Source's initial position. */ private final int sourceOffset; private final boolean endOfInput; /** * Creates a new WriteHandler. * * @param attachment * The object to attach to the I/O operation; can be null * @param handler * The CompletionHandler to delegate to * @param source * The buffer from which bytes are to be retrieved * @param endOfInput * true if the end of stream has been reached */ public WriteHandler(A attachment, CompletionHandler handler, CharBuffer source, boolean endOfInput) { this.handler = handler; this.attachment = attachment; this.source = source; this.sourceOffset = source.position(); this.endOfInput = endOfInput; } @Override public void completed(Integer unused, ByteBuffer bytesWritten) { // Ensure we wrote a whole number of characters bytesWritten.flip(); if (onBytesWritten(source, sourceOffset, bytesWritten, attachment, handler, endOfInput)) { synchronized (AsynchronousByteCharChannel.this) { writePending = false; } return; } // Write the remaining bytes channel.write(bytesWritten, bytesWritten, this); } @Override public void failed(Throwable throwable, ByteBuffer source) { handler.failed(throwable, attachment); synchronized (AsynchronousByteCharChannel.this) { readPending = false; } } } /** * The result of an asynchronous character writing operation. * * @author Gili Tzabari */ private class WriteFuture implements Future { private final CharBuffer source; private final ByteBuffer bytesWritten; /** * Source's initial position. */ private final int sourceOffset; private final boolean endOfInput; private Future future; private ExecutionException throwable; /** * Creates a new WriteFuture. * * @param future * The result of an asynchronous byte writing operation * @param source * The buffer from which characters are to be retrieved * @param bytesWritten * The buffer from which bytes are to be retrieved * @param endOfInput * true if the end of stream has been reached */ private WriteFuture(Future future, CharBuffer source, ByteBuffer bytesWritten, boolean endOfInput) { this.future = future; this.source = source; this.bytesWritten = bytesWritten; this.sourceOffset = source.position(); this.endOfInput = endOfInput; } @Override public boolean cancel(boolean mayInterruptIfRunning) { return future.cancel(mayInterruptIfRunning); } @Override public boolean isCancelled() { return future.isCancelled(); } @Override public boolean isDone() { return future.isDone(); } @Override public Integer get() throws CancellationException, InterruptedException, ExecutionException { if (throwable != null) { // Prevent CharsetDecoder from being invoked again throw throwable; } while (true) { future.get(); // Ensure we wrote a whole number of characters bytesWritten.flip(); PollableCompletionHandler handler = new PollableCompletionHandler(); if (onBytesWritten(source, sourceOffset, bytesWritten, null, handler, endOfInput)) { synchronized (AsynchronousByteCharChannel.this) { writePending = false; } if (handler.throwable != null) { throwable = new ExecutionException(handler.throwable); throw throwable; } return handler.result; } // Write the remaining bytes future = channel.write(bytesWritten); } } @Override public Integer get(long timeout, TimeUnit unit) throws CancellationException, InterruptedException, ExecutionException, TimeoutException { if (throwable != null) { // Prevent CharsetDecoder from being invoked again throw throwable; } while (true) { future.get(timeout, unit); // Ensure we wrote a whole number of characters source.flip(); PollableCompletionHandler handler = new PollableCompletionHandler(); if (onBytesWritten(source, sourceOffset, bytesWritten, null, handler, endOfInput)) { synchronized (AsynchronousByteCharChannel.this) { writePending = false; } if (handler.throwable != null) { throwable = new ExecutionException(handler.throwable); throw throwable; } return handler.result; } // Write the remaining bytes future = channel.write(bytesWritten); } } } /** * An interface for writing CharSequences. * * @author Gili Tzabari */ private interface CharSequenceWriter { /** * Writes a sequence of characters. * * @param source * The character sequence to write * @return The number of characters that were consumed, * 0 <= n < source.length(). * Note that there is no implied correlation between the number of characters that are consumed * and the number of characters that are written. Writers may choose to skip an arbitrary number * of bytes. * @throws IOException * If an I/O error occurs */ int write(CharSequence source) throws IOException; } /** * A CharSequenceWriter for writing into CharBuffers. * * @author Gili Tzabari */ private static class CharBufferWriter implements CharSequenceWriter { private final CharBuffer target; /** * Creates a new CharBufferWriter. * * @param target * The buffer into which characters are to be transferred */ public CharBufferWriter(CharBuffer target) { this.target = target; } @Override public int write(CharSequence source) { int result = Math.min(source.length(), target.remaining()); // WORKAROUND: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4860681 StringBuilder sourceString = (StringBuilder) source; target.put(sourceString.substring(0, result)); return result; } } /** * A CharSequenceWriter for writing lines of text. * * @author Gili Tzabari */ private class LineWriter implements CharSequenceWriter { private final StringBuilder target; /** * Creates a new StringBuilderWriter. * * @param target * The buffer into which characters are to be transferred */ public LineWriter(StringBuilder target) { this.target = target; } @Override public int write(CharSequence source) throws IOException { Matcher matcher = delimiters.matcher(source); while (true) { if (!matcher.find()) return 0; if (matcher.group().equals("\r")) consumeNewline = true; else if (matcher.group().equals("\n")) { if (consumeNewline && matcher.start() == 0) continue; consumeNewline = false; } target.append(source.subSequence(0, matcher.start())); // Add 1 to strip out the line delimiter return matcher.start() + 1; } } } }