001package com.hfg.bio.seq.format; 002 003import java.io.BufferedReader; 004import java.io.ByteArrayInputStream; 005import java.io.CharArrayReader; 006import java.io.FilterInputStream; 007import java.io.IOException; 008import java.io.InputStream; 009import java.io.InputStreamReader; 010import java.util.ArrayList; 011import java.util.Collection; 012import java.util.List; 013 014import com.hfg.bio.seq.BioSequence; 015import com.hfg.exception.ProgrammingException; 016import com.hfg.util.StringBuilderPlus; 017import com.hfg.util.collection.CollectionUtil; 018import com.hfg.util.io.GZIP; 019 020//------------------------------------------------------------------------------ 021/** 022 Buffered sequence reader. 023 <div> 024 @author J. Alex Taylor, hairyfatguy.com 025 </div> 026 */ 027//------------------------------------------------------------------------------ 028// com.hfg Library 029// 030// This library is free software; you can redistribute it and/or 031// modify it under the terms of the GNU Lesser General Public 032// License as published by the Free Software Foundation; either 033// version 2.1 of the License, or (at your option) any later version. 034// 035// This library is distributed in the hope that it will be useful, 036// but WITHOUT ANY WARRANTY; without even the implied warranty of 037// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 038// Lesser General Public License for more details. 039// 040// You should have received a copy of the GNU Lesser General Public 041// License along with this library; if not, write to the Free Software 042// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 043// 044// J. Alex Taylor, President, Founder, CEO, COO, CFO, OOPS hairyfatguy.com 045// jataylor@hairyfatguy.com 046//------------------------------------------------------------------------------ 047 048public class BufferedSeqReader<T extends BioSequence> 049{ 050 private ReadableSeqFormat<T> mSeqFormatObj; 051 private BufferedReader mBufferedReader; 052 private boolean mEndOfContentReached; 053 private String mRecordStartLine; 054 private int mNumRecordsParsed; 055 // How long the record should be before compression is used. 056 private int mCompressionThreshold = sDefaultCompressionThreshold; 057 058 private StringBuilderPlus mUncompressedRecordChunk = new StringBuilderPlus(mCompressionThreshold + 500).setDelimiter("\n"); 059 private List<byte[]> mCompressedRecordChunks = new ArrayList<>(50); 060 private int mCurrentRecordLength = 0; 061 062 // Default value for how long the record should be before compression is used. 063 private static final int sDefaultCompressionThreshold = 8 * 1024; 064 065 //########################################################################### 066 // CONSTRUCTORS 067 //########################################################################### 068 069 //--------------------------------------------------------------------------- 070 public BufferedSeqReader(BufferedReader inReader, ReadableSeqFormat<T> inSeqFormatObj) 071 { 072 mBufferedReader = inReader; 073 mSeqFormatObj = inSeqFormatObj; 074 } 075 076 //########################################################################### 077 // PUBLIC METHODS 078 //########################################################################### 079 080 //--------------------------------------------------------------------------- 081 public static <T extends BioSequence> ReadableSeqFormat<T> determineSeqFormat(BufferedReader inReader, Collection<ReadableSeqFormat<T>> inSeqFormatObjects) 082 throws IOException 083 { 084 if (! inReader.markSupported()) 085 { 086 throw new ProgrammingException("The passed reader must support setting a mark!"); 087 } 088 089 // Grab a chunk of content from the start of the stream that will hopefully be enough 090 // to let us determine the format. 091 // We need to be careful to put the reader back in its original state when we are done. 092 int readAheadLimit = 10 * 1024; 093 inReader.mark(readAheadLimit); 094 // Put the chunk into a buffer we can reuse 095 char[] buffer = new char[readAheadLimit]; 096 inReader.read(buffer); 097 inReader.reset(); 098 099 ReadableSeqFormat<T> successfulFormat = null; 100 101 // LimitedBufferedReader reader = new LimitedBufferedReader(new NoCloseBufferedReader(inReader), readAheadLimit); 102 for (ReadableSeqFormat<T> seqFormat : inSeqFormatObjects) 103 { 104 try 105 { 106 BufferedReader reader = new BufferedReader(new CharArrayReader(buffer)); 107// reader.mark(readAheadLimit); 108 BufferedSeqReader<T> seqReader = new BufferedSeqReader<>(reader, seqFormat); 109 seqReader.readNextRecord((int) (0.9 * readAheadLimit)); 110 T seq = seqReader.next(); 111 successfulFormat = seqFormat; 112 break; 113 } 114 catch (SeqIOException e) 115 { 116 // Ignore. Try the next format 117 } 118 finally 119 { 120// reader.reset(); 121 } 122 } 123 124 inReader.reset(); 125 126 return successfulFormat; 127 } 128 129 //--------------------------------------------------------------------------- 130 public BufferedSeqReader<T> setCompressionThreshold(int inNumBytes) 131 { 132 mCompressionThreshold = inNumBytes; 133 return this; 134 } 135 136 //--------------------------------------------------------------------------- 137 public void close() 138 throws IOException 139 { 140 mBufferedReader.close(); 141 } 142 143 //--------------------------------------------------------------------------- 144 public ReadableSeqFormat<T> getSeqFormat() 145 { 146 return mSeqFormatObj; 147 } 148 149 //--------------------------------------------------------------------------- 150 public synchronized boolean hasNext() 151 { 152 boolean result = false; 153 if (! endOfContentReached()) 154 { 155 if (0 == mCurrentRecordLength) 156 { 157 readNextRecord(); 158 } 159 160 result = mCurrentRecordLength > 0; 161 } 162 163 return result; 164 } 165 166 //--------------------------------------------------------------------------- 167 public synchronized T next() 168 { 169 T nextSeq = null; 170 if (0 == mCurrentRecordLength) 171 { 172 readNextRecord(); 173 } 174 175 if (mCurrentRecordLength > 0) 176 { 177 nextSeq = mSeqFormatObj.readRecord(getBufferedRecordReader()); 178 mCurrentRecordLength = 0; 179 } 180 181 return nextSeq; 182 } 183 184 //--------------------------------------------------------------------------- 185 public List<T> readAll() 186 { 187 List<T> seqs = new ArrayList<T>(); 188 while (hasNext()) 189 { 190 seqs.add(next()); 191 } 192 193 return seqs; 194 } 195 196 //--------------------------------------------------------------------------- 197 protected boolean endOfContentReached() 198 { 199 return mEndOfContentReached; 200 } 201 202 //--------------------------------------------------------------------------- 203 private void readNextRecord() 204 { 205 readNextRecord(null); 206 } 207 208 //--------------------------------------------------------------------------- 209 private synchronized void readNextRecord(Integer inReadLimit) 210 { 211 if (! endOfContentReached()) 212 { 213 // Start w/ a fresh record 214 mUncompressedRecordChunk.setLength(0); 215 mCompressedRecordChunks.clear(); 216 mCurrentRecordLength = 0; 217 218 if (mRecordStartLine != null) 219 { 220 appendLineToCurrentRecord(mRecordStartLine); 221 mRecordStartLine = null; 222 } 223 224 try 225 { 226 String line; 227 while ((line = mBufferedReader.readLine()) != null) 228 { 229 if (line.length() > 0) 230 { 231 if (mSeqFormatObj.isEndOfRecord(line)) 232 { 233 if (mSeqFormatObj.hasJanusDelimiter()) 234 { 235 // Line is the end of one record and the start of another 236 if (0 == mNumRecordsParsed 237 && 0 == mCurrentRecordLength) 238 { 239 // Line is the start of the first record 240 appendLineToCurrentRecord(line); 241 } 242 else 243 { 244 // Save the line for the next record 245 mRecordStartLine = line; 246 break; 247 } 248 } 249 else 250 { 251 // Line is the end of the record 252 appendLineToCurrentRecord(line); 253 break; 254 } 255 } 256 else 257 { 258 // Line is not the end of the record 259 appendLineToCurrentRecord(line); 260 } 261 } 262 263 // We might just be testing formats and need to quit before we go too far 264 if (inReadLimit != null 265 && mCurrentRecordLength > inReadLimit) 266 { 267 break; 268 } 269 } 270 271 if (null == line) 272 { 273 mEndOfContentReached = true; 274 } 275 } 276 catch (IOException e) 277 { 278 throw new SeqIOException(e); 279 } 280 } 281 282 if (mCurrentRecordLength > 0) 283 { 284 mNumRecordsParsed++; 285 } 286 } 287 288 //-------------------------------------------------------------------------- 289 // Note: inLine will not have a return at the end 290 private void appendLineToCurrentRecord(String inLine) 291 throws SeqIOException 292 { 293 mCurrentRecordLength += inLine.length() + 1; 294 295 mUncompressedRecordChunk.appendln(inLine); 296 if (mUncompressedRecordChunk.length() > mCompressionThreshold) 297 { 298 mCompressedRecordChunks.add(GZIP.compress(mUncompressedRecordChunk.toString())); 299 mUncompressedRecordChunk.setLength(0); 300 } 301 } 302 303 //-------------------------------------------------------------------------- 304 private BufferedReader getBufferedRecordReader() 305 { 306 InputStream seqStream = null; 307 308 if (CollectionUtil.hasValues(mCompressedRecordChunks)) 309 { 310 // Compress any leftover lines and add them to the rest of the chunks 311 if (mUncompressedRecordChunk.length() > 0) 312 { 313 mCompressedRecordChunks.add(GZIP.compress(mUncompressedRecordChunk.toString())); 314 } 315 316 seqStream = new GZIPRecordStreamer(); 317 } 318 else if (mUncompressedRecordChunk.length() > 0) 319 { 320 seqStream = new ByteArrayInputStream(mUncompressedRecordChunk.toString().getBytes()); 321 } 322 323 return new BufferedReader(new InputStreamReader(seqStream)); 324 } 325 326 //########################################################################## 327 // INNER CLASS 328 //########################################################################## 329 330 private class GZIPRecordStreamer extends FilterInputStream 331 { 332 private byte[] mBuffer; 333 private int mBufferLimit; 334 private int mCurrentChunkIndex; 335 private int mCharIndex; 336 private boolean mDone = false; 337 private boolean mEndOfStreamReached; 338 private int mBytesStreamed; 339 340 //----------------------------------------------------------------------- 341 public GZIPRecordStreamer() 342 { 343 super(null); 344 mCurrentChunkIndex = 0; 345 } 346 347 //--------------------------------------------------------------------------- 348 @Override 349 public int read(byte[] inBuffer, int inOffset, int inMaxReadLength) 350 throws IOException 351 { 352 byte theChar; 353 int numCharsRead = 0; 354 do 355 { 356 if (mCharIndex >= mBufferLimit) 357 { 358 fillBuffer(); 359 } 360 361 theChar = (mEndOfStreamReached ? -1 : mBuffer[mCharIndex++]); 362 363 if (theChar > 0) 364 { 365 inBuffer[inOffset++] = theChar; 366 numCharsRead++; 367 } 368 } 369 while (theChar >= 0 370 && numCharsRead < inMaxReadLength); 371 372 mBytesStreamed += numCharsRead; 373 374 return (theChar < 0 && 0 == numCharsRead ? -1 : numCharsRead); 375 } 376 377 //--------------------------------------------------------------------------- 378 private void fillBuffer() 379 throws IOException 380 { 381 if (mCurrentChunkIndex < mCompressedRecordChunks.size()) 382 { 383 mBuffer = GZIP.uncompress(mCompressedRecordChunks.get(mCurrentChunkIndex++)); 384 mBufferLimit = mBuffer.length; 385 386 // Reset the index 387 mCharIndex = 0; 388 } 389 else 390 { 391 mEndOfStreamReached = true; 392 } 393 } 394 395 //----------------------------------------------------------------------- 396 @Override 397 public int available() 398 throws IOException 399 { 400 return mCurrentRecordLength - mBytesStreamed; 401 } 402 403 //----------------------------------------------------------------------- 404 @Override 405 public void close() 406 throws IOException 407 { 408 // Do nothing 409 } 410 411 //----------------------------------------------------------------------- 412 @Override 413 public int read() 414 { 415 int nextChar = -1; 416 if (! mDone) 417 { 418 if (null == mBuffer) 419 { 420 mBuffer = GZIP.uncompress(mCompressedRecordChunks.get(mCurrentChunkIndex)); 421 mBufferLimit = mBuffer.length; 422 423 // Reset the index 424 mCharIndex = 0; 425 } 426 427 nextChar = mBuffer[mCharIndex++]; 428 429 if (mCharIndex >= mBufferLimit) 430 { 431 // This is the last char in this chunk. 432 mBuffer = null; 433 mCurrentChunkIndex++; 434 if (mCurrentChunkIndex < 0 || mCurrentChunkIndex == mCompressedRecordChunks.size()) 435 { 436 // This was the last chunk. 437 mDone = true; 438 } 439 } 440 } 441 442 return nextChar; 443 } 444 } 445 446}