001package com.hfg.bio.seq.format;
002
003import java.io.BufferedReader;
004import java.io.ByteArrayInputStream;
005import java.io.CharArrayReader;
006import java.io.FilterInputStream;
007import java.io.IOException;
008import java.io.InputStream;
009import java.io.InputStreamReader;
010import java.util.ArrayList;
011import java.util.Collection;
012import java.util.List;
013
014import com.hfg.bio.seq.BioSequence;
015import com.hfg.exception.ProgrammingException;
016import com.hfg.util.StringBuilderPlus;
017import com.hfg.util.collection.CollectionUtil;
018import com.hfg.util.io.GZIP;
019
020//------------------------------------------------------------------------------
021/**
022 Buffered sequence reader.
023 <div>
024 @author J. Alex Taylor, hairyfatguy.com
025 </div>
026 */
027//------------------------------------------------------------------------------
028// com.hfg Library
029//
030// This library is free software; you can redistribute it and/or
031// modify it under the terms of the GNU Lesser General Public
032// License as published by the Free Software Foundation; either
033// version 2.1 of the License, or (at your option) any later version.
034//
035// This library is distributed in the hope that it will be useful,
036// but WITHOUT ANY WARRANTY; without even the implied warranty of
037// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
038// Lesser General Public License for more details.
039//
040// You should have received a copy of the GNU Lesser General Public
041// License along with this library; if not, write to the Free Software
042// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
043//
044// J. Alex Taylor, President, Founder, CEO, COO, CFO, OOPS hairyfatguy.com
045// jataylor@hairyfatguy.com
046//------------------------------------------------------------------------------
047
048public class BufferedSeqReader<T extends BioSequence>
049{
050   private ReadableSeqFormat<T> mSeqFormatObj;
051   private BufferedReader       mBufferedReader;
052   private boolean              mEndOfContentReached;
053   private String               mRecordStartLine;
054   private int                  mNumRecordsParsed;
055   // How long the record should be before compression is used.
056   private int                  mCompressionThreshold = sDefaultCompressionThreshold;
057
058   private StringBuilderPlus mUncompressedRecordChunk = new StringBuilderPlus(mCompressionThreshold + 500).setDelimiter("\n");
059   private List<byte[]>      mCompressedRecordChunks  = new ArrayList<>(50);
060   private int               mCurrentRecordLength     = 0;
061
062   // Default value for how long the record should be before compression is used.
063   private static final int   sDefaultCompressionThreshold = 8 * 1024;
064
065   //###########################################################################
066   // CONSTRUCTORS
067   //###########################################################################
068
069   //---------------------------------------------------------------------------
070   public BufferedSeqReader(BufferedReader inReader, ReadableSeqFormat<T> inSeqFormatObj)
071   {
072      mBufferedReader = inReader;
073      mSeqFormatObj   = inSeqFormatObj;
074   }
075
076   //###########################################################################
077   // PUBLIC METHODS
078   //###########################################################################
079
080   //---------------------------------------------------------------------------
081   public static <T extends BioSequence> ReadableSeqFormat<T> determineSeqFormat(BufferedReader inReader, Collection<ReadableSeqFormat<T>> inSeqFormatObjects)
082         throws IOException
083   {
084      if (! inReader.markSupported())
085      {
086         throw new ProgrammingException("The passed reader must support setting a mark!");
087      }
088
089      // Grab a chunk of content from the start of the stream that will hopefully be enough
090      // to let us determine the format.
091      // We need to be careful to put the reader back in its original state when we are done.
092      int readAheadLimit = 10 * 1024;
093      inReader.mark(readAheadLimit);
094      // Put the chunk into a buffer we can reuse
095      char[] buffer = new char[readAheadLimit];
096      inReader.read(buffer);
097      inReader.reset();
098
099      ReadableSeqFormat<T> successfulFormat = null;
100
101 //     LimitedBufferedReader reader = new LimitedBufferedReader(new NoCloseBufferedReader(inReader), readAheadLimit);
102      for (ReadableSeqFormat<T> seqFormat : inSeqFormatObjects)
103      {
104         try
105         {
106            BufferedReader reader = new BufferedReader(new CharArrayReader(buffer));
107//            reader.mark(readAheadLimit);
108            BufferedSeqReader<T> seqReader = new BufferedSeqReader<>(reader, seqFormat);
109            seqReader.readNextRecord((int) (0.9 * readAheadLimit));
110            T seq = seqReader.next();
111            successfulFormat = seqFormat;
112            break;
113         }
114         catch (SeqIOException e)
115         {
116            // Ignore. Try the next format
117         }
118         finally
119         {
120//            reader.reset();
121         }
122      }
123
124      inReader.reset();
125
126      return successfulFormat;
127   }
128
129   //---------------------------------------------------------------------------
130   public BufferedSeqReader<T> setCompressionThreshold(int inNumBytes)
131   {
132      mCompressionThreshold = inNumBytes;
133      return this;
134   }
135
136   //---------------------------------------------------------------------------
137   public void close()
138         throws IOException
139   {
140      mBufferedReader.close();
141   }
142
143   //---------------------------------------------------------------------------
144   public ReadableSeqFormat<T> getSeqFormat()
145   {
146      return mSeqFormatObj;
147   }
148
149   //---------------------------------------------------------------------------
150   public synchronized boolean hasNext()
151   {
152      boolean result = false;
153      if (! endOfContentReached())
154      {
155         if (0 == mCurrentRecordLength)
156         {
157            readNextRecord();
158         }
159
160         result = mCurrentRecordLength > 0;
161      }
162
163      return result;
164   }
165
166   //---------------------------------------------------------------------------
167   public synchronized T next()
168   {
169      T nextSeq = null;
170      if (0 == mCurrentRecordLength)
171      {
172         readNextRecord();
173      }
174
175      if (mCurrentRecordLength > 0)
176      {
177         nextSeq = mSeqFormatObj.readRecord(getBufferedRecordReader());
178         mCurrentRecordLength = 0;
179      }
180
181      return nextSeq;
182   }
183
184   //---------------------------------------------------------------------------
185   public List<T> readAll()
186   {
187      List<T> seqs = new ArrayList<T>();
188      while (hasNext())
189      {
190         seqs.add(next());
191      }
192
193      return seqs;
194   }
195
196   //---------------------------------------------------------------------------
197   protected boolean endOfContentReached()
198   {
199      return mEndOfContentReached;
200   }
201
202   //---------------------------------------------------------------------------
203   private void readNextRecord()
204   {
205      readNextRecord(null);
206   }
207
208   //---------------------------------------------------------------------------
209   private synchronized void readNextRecord(Integer inReadLimit)
210   {
211      if (! endOfContentReached())
212      {
213         // Start w/ a fresh record
214         mUncompressedRecordChunk.setLength(0);
215         mCompressedRecordChunks.clear();
216         mCurrentRecordLength = 0;
217
218         if (mRecordStartLine != null)
219         {
220            appendLineToCurrentRecord(mRecordStartLine);
221            mRecordStartLine = null;
222         }
223
224         try
225         {
226            String line;
227            while ((line = mBufferedReader.readLine()) != null)
228            {
229               if (line.length() > 0)
230               {
231                  if (mSeqFormatObj.isEndOfRecord(line))
232                  {
233                     if (mSeqFormatObj.hasJanusDelimiter())
234                     {
235                        // Line is the end of one record and the start of another
236                        if (0 == mNumRecordsParsed
237                            && 0 == mCurrentRecordLength)
238                        {
239                           // Line is the start of the first record
240                           appendLineToCurrentRecord(line);
241                        }
242                        else
243                        {
244                           // Save the line for the next record
245                           mRecordStartLine = line;
246                           break;
247                        }
248                     }
249                     else
250                     {
251                        // Line is the end of the record
252                        appendLineToCurrentRecord(line);
253                        break;
254                     }
255                  }
256                  else
257                  {
258                     // Line is not the end of the record
259                     appendLineToCurrentRecord(line);
260                  }
261               }
262
263               // We might just be testing formats and need to quit before we go too far
264               if (inReadLimit != null
265                   && mCurrentRecordLength > inReadLimit)
266               {
267                  break;
268               }
269            }
270
271            if (null == line)
272            {
273               mEndOfContentReached = true;
274            }
275         }
276         catch (IOException e)
277         {
278            throw new SeqIOException(e);
279         }
280      }
281
282      if (mCurrentRecordLength > 0)
283      {
284         mNumRecordsParsed++;
285      }
286   }
287
288   //--------------------------------------------------------------------------
289   // Note: inLine will not have a return at the end
290   private void appendLineToCurrentRecord(String inLine)
291         throws SeqIOException
292   {
293      mCurrentRecordLength += inLine.length() + 1;
294
295      mUncompressedRecordChunk.appendln(inLine);
296      if (mUncompressedRecordChunk.length() > mCompressionThreshold)
297      {
298         mCompressedRecordChunks.add(GZIP.compress(mUncompressedRecordChunk.toString()));
299         mUncompressedRecordChunk.setLength(0);
300      }
301   }
302
303   //--------------------------------------------------------------------------
304   private BufferedReader getBufferedRecordReader()
305   {
306      InputStream seqStream = null;
307
308      if (CollectionUtil.hasValues(mCompressedRecordChunks))
309      {
310         // Compress any leftover lines and add them to the rest of the chunks
311         if (mUncompressedRecordChunk.length() > 0)
312         {
313            mCompressedRecordChunks.add(GZIP.compress(mUncompressedRecordChunk.toString()));
314         }
315
316         seqStream = new GZIPRecordStreamer();
317      }
318      else if (mUncompressedRecordChunk.length() > 0)
319      {
320         seqStream = new ByteArrayInputStream(mUncompressedRecordChunk.toString().getBytes());
321      }
322
323      return new BufferedReader(new InputStreamReader(seqStream));
324   }
325
326   //##########################################################################
327   // INNER CLASS
328   //##########################################################################
329
330   private class GZIPRecordStreamer extends FilterInputStream
331   {
332      private byte[]    mBuffer;
333      private int       mBufferLimit;
334      private int       mCurrentChunkIndex;
335      private int       mCharIndex;
336      private boolean   mDone = false;
337      private boolean   mEndOfStreamReached;
338      private int       mBytesStreamed;
339
340      //-----------------------------------------------------------------------
341      public GZIPRecordStreamer()
342      {
343         super(null);
344         mCurrentChunkIndex = 0;
345      }
346
347      //---------------------------------------------------------------------------
348      @Override
349      public int read(byte[] inBuffer, int inOffset, int inMaxReadLength)
350            throws IOException
351      {
352         byte theChar;
353         int numCharsRead = 0;
354         do
355         {
356            if (mCharIndex >= mBufferLimit)
357            {
358               fillBuffer();
359            }
360
361            theChar = (mEndOfStreamReached ? -1 : mBuffer[mCharIndex++]);
362
363            if (theChar > 0)
364            {
365               inBuffer[inOffset++] = theChar;
366               numCharsRead++;
367            }
368         }
369         while (theChar >= 0
370                && numCharsRead < inMaxReadLength);
371
372         mBytesStreamed += numCharsRead;
373
374         return (theChar < 0 && 0 == numCharsRead ? -1 : numCharsRead);
375      }
376
377      //---------------------------------------------------------------------------
378      private void fillBuffer()
379            throws IOException
380      {
381         if (mCurrentChunkIndex < mCompressedRecordChunks.size())
382         {
383            mBuffer = GZIP.uncompress(mCompressedRecordChunks.get(mCurrentChunkIndex++));
384            mBufferLimit = mBuffer.length;
385
386            // Reset the index
387            mCharIndex = 0;
388         }
389         else
390         {
391            mEndOfStreamReached = true;
392         }
393      }
394
395      //-----------------------------------------------------------------------
396      @Override
397      public int available()
398            throws IOException
399      {
400         return mCurrentRecordLength - mBytesStreamed;
401      }
402
403      //-----------------------------------------------------------------------
404      @Override
405      public void close()
406            throws IOException
407      {
408         // Do nothing
409      }
410
411      //-----------------------------------------------------------------------
412      @Override
413      public int read()
414      {
415         int nextChar = -1;
416         if (! mDone)
417         {
418            if (null == mBuffer)
419            {
420               mBuffer = GZIP.uncompress(mCompressedRecordChunks.get(mCurrentChunkIndex));
421               mBufferLimit = mBuffer.length;
422
423               // Reset the index
424               mCharIndex = 0;
425            }
426
427            nextChar = mBuffer[mCharIndex++];
428
429            if (mCharIndex >= mBufferLimit)
430            {
431               // This is the last char in this chunk.
432               mBuffer = null;
433               mCurrentChunkIndex++;
434               if (mCurrentChunkIndex < 0 || mCurrentChunkIndex == mCompressedRecordChunks.size())
435               {
436                  // This was the last chunk.
437                  mDone = true;
438               }
439            }
440         }
441
442         return nextChar;
443      }
444   }
445
446}