Coverage Report - wjhk.jupload2.upload.PacketConstructionThread
 
Classes in this File Line Coverage Branch Coverage Complexity
PacketConstructionThread
0 %
0/66
0 %
0/24
5,5
 
 1  
 package wjhk.jupload2.upload;
 2  
 
 3  
 import java.util.concurrent.BlockingQueue;
 4  
 
 5  
 import wjhk.jupload2.exception.JUploadException;
 6  
 import wjhk.jupload2.policies.UploadPolicy;
 7  
 
 8  
 /**
 9  
  * @author etienne_sf
 10  
  */
 11  
 public class PacketConstructionThread extends Thread {
 12  
 
 13  
     /** The current upload policy. */
 14  0
     UploadPolicy uploadPolicy = null;
 15  
 
 16  
     /** The thread which globally manages the upload */
 17  0
     FileUploadManagerThread fileUploadManagerThread = null;
 18  
 
 19  
     /**
 20  
      * The queue where each prepared file are stored, for further processing. This class picks files here, and post them
 21  
      * to the packetQueue.
 22  
      */
 23  0
     BlockingQueue<UploadFileData> preparedFileQueue = null;
 24  
 
 25  
     /**
 26  
      * The queue where each prepared file will be stored, for further processing
 27  
      */
 28  0
     BlockingQueue<UploadFilePacket> packetQueue = null;
 29  
 
 30  
     /**
 31  
      * The packet this instance is working on.
 32  
      */
 33  0
     UploadFilePacket packetInProgress = null;
 34  
 
 35  
     /**
 36  
      * Indicates when the last file has been received. The last file is the poisonned
 37  
      */
 38  0
     boolean lastFileReceived = false;
 39  
 
 40  
     PacketConstructionThread(BlockingQueue<UploadFileData> preparedFileQueue,
 41  
             BlockingQueue<UploadFilePacket> packetQueue, FileUploadManagerThread fileUploadManagerThread,
 42  
             UploadPolicy uploadPolicy) {
 43  
         // A thread name is very useful, when debugging...
 44  0
         super("PacketConstructionThread");
 45  
 
 46  0
         this.preparedFileQueue = preparedFileQueue;
 47  0
         this.packetQueue = packetQueue;
 48  0
         this.fileUploadManagerThread = fileUploadManagerThread;
 49  0
         this.uploadPolicy = uploadPolicy;
 50  
 
 51  
         // Let's construct the first packet...
 52  0
         this.packetInProgress = new UploadFilePacket(this.uploadPolicy);
 53  0
     }
 54  
 
 55  
     /**
 56  
      * The actual command to generate packets.
 57  
      * 
 58  
      * @see java.lang.Thread#run()
 59  
      */
 60  
     @Override
 61  
     final public void run() {
 62  0
         int nbFiles = 0;
 63  0
         this.uploadPolicy.displayDebug("Start of PacketConstructionThread", 80);
 64  
         try { // catch (JUploadException e)
 65  
 
 66  
             // We loop, and wait for the 'poisonned' UploadFileData to finish.
 67  
             try {
 68  0
                 while (!lastFileReceived && !this.fileUploadManagerThread.isUploadFinished()) {
 69  0
                     UploadFileData ufd = preparedFileQueue.take();
 70  0
                     receiveNewFile(ufd);
 71  0
                     nbFiles += 1;
 72  0
                 }
 73  
 
 74  0
                 this.uploadPolicy.displayDebug("PacketConstructionThread: end of loop, the thread is about to finish",
 75  
                         30);
 76  
 
 77  
                 // We may have some file left to send...
 78  0
                 if (this.packetInProgress.size() > 0) {
 79  0
                     this.uploadPolicy.displayDebug("Last file received: the current packet is not empty, we send it",
 80  
                             30);
 81  0
                     sendCurrentPacket();
 82  
                 }
 83  0
             } catch (InterruptedException e) {
 84  0
                 this.uploadPolicy.displayWarn("packetConstructionThread received InterruptedException, exiting");
 85  0
             }
 86  
 
 87  
             // In standard mode, we should have no more file to manage. The
 88  
             // following test is meaningful only if the FilePreparationThread
 89  
             // has been finished before (otherwise, other files could enter the
 90  
             // queue after this test)
 91  0
             if (!this.preparedFileQueue.isEmpty()) {
 92  0
                 if (!this.fileUploadManagerThread.isUploadFinished()) {
 93  
                     // Hum, hum. This should not happen.
 94  0
                     this.uploadPolicy.displayWarn("The preparedFileQueue is not empty, at the end of "
 95  0
                             + this.getClass().getName());
 96  
                 }
 97  
                 // This can happen, if we are interrupted while working. Let's
 98  
                 // empty this.
 99  0
                 this.uploadPolicy
 100  0
                         .displayDebug(
 101  
                                 "The PacketConstructionThread is about to finish, but the preparedFileQueue is not empty. Let's clear it.",
 102  
                                 30);
 103  0
                 while (!this.preparedFileQueue.isEmpty()) {
 104  0
                     this.preparedFileQueue.poll();
 105  
                 }
 106  
             }
 107  0
         } catch (JUploadException e) {
 108  0
             this.fileUploadManagerThread.setUploadException(e);
 109  0
         } finally {
 110  
             // To properly finish the job, we send a 'poisonned' packet, so that
 111  
             // the FileUploadThread knows it's finished.
 112  0
             try {
 113  0
                 this.packetQueue.put(new UploadFilePacketPoisonned(this.uploadPolicy));
 114  0
             } catch (InterruptedException e) {
 115  0
                 this.uploadPolicy
 116  0
                         .displayWarn("packetConstructionThread received InterruptedException (while checking if packetQueue is empty), exiting");
 117  0
             }
 118  0
         }
 119  0
         this.uploadPolicy.displayDebug("End of PacketConstructionThread (" + nbFiles
 120  
                 + " files have been taken from the queue, including the 'poisoned' one)", 10);
 121  0
     }
 122  
 
 123  
     /**
 124  
      * Called when a new file is received
 125  
      * 
 126  
      * @param uploadFileData
 127  
      * @throws JUploadException
 128  
      * @throws InterruptedException
 129  
      */
 130  
     private void receiveNewFile(UploadFileData uploadFileData) throws JUploadException, InterruptedException {
 131  
         // Are we finished ?
 132  0
         if (uploadFileData.isPoisonned()) {
 133  0
             lastFileReceived = true;
 134  0
             this.uploadPolicy.displayDebug(
 135  
                     "Poisonned UploadFileData received, PacketContructionThread will exit normally", 50);
 136  
         } else {
 137  
             // We try to add the file to the current packet. If it doesn't work,
 138  
             // this packet is probably full. We sent it, and add the packet to
 139  
             // the new one.
 140  0
             if (!this.packetInProgress.add(uploadFileData)) {
 141  
                 // The packet was refused. We send the current one, and retry.
 142  0
                 this.uploadPolicy.displayDebug(
 143  
                         "The file can't be added to the current packet. Let's send this packet first.", 80);
 144  0
                 sendCurrentPacket();
 145  0
                 if (!this.packetInProgress.add(uploadFileData)) {
 146  0
                     throw new JUploadException("Could not add file to packet! (filename: "
 147  0
                             + uploadFileData.getFileName() + ")");
 148  
                 }
 149  
             }
 150  
 
 151  
             // If the current packet is finished, we send it immediatly.
 152  0
             if (this.packetInProgress.isFull()) {
 153  0
                 sendCurrentPacket();
 154  
             }
 155  
         }
 156  0
     }
 157  
 
 158  
     private void sendCurrentPacket() throws InterruptedException {
 159  0
         if (this.packetInProgress == null) {
 160  0
             throw new java.lang.AssertionError(this.getClass().getName()
 161  
                     + ".sendCurrentPacket(): this.packetInProgress may not be null");
 162  0
         } else if (this.packetInProgress.size() == 0) {
 163  0
             throw new java.lang.AssertionError(this.getClass().getName()
 164  
                     + ".sendCurrentPacket(): this.packetInProgress.size() may not be 0");
 165  
         }
 166  
         // If a packet is ready, we post it into the relevant queue.
 167  0
         this.packetQueue.put(packetInProgress);
 168  
 
 169  
         // And we start a new one.
 170  0
         this.packetInProgress = new UploadFilePacket(this.uploadPolicy);
 171  0
     }
 172  
 }