View Javadoc
1   package wjhk.jupload2.upload;
2   
3   import java.util.concurrent.BlockingQueue;
4   
5   import wjhk.jupload2.exception.JUploadException;
6   import wjhk.jupload2.policies.UploadPolicy;
7   
8   /**
9    * @author etienne_sf
10   */
11  public class PacketConstructionThread extends Thread {
12  
13      /** The current upload policy. */
14      UploadPolicy uploadPolicy = null;
15  
16      /** The thread which globally manages the upload */
17      FileUploadManagerThread fileUploadManagerThread = null;
18  
19      /**
20       * The queue where each prepared file are stored, for further processing. This class picks files here, and post them
21       * to the packetQueue.
22       */
23      BlockingQueue<UploadFileData> preparedFileQueue = null;
24  
25      /**
26       * The queue where each prepared file will be stored, for further processing
27       */
28      BlockingQueue<UploadFilePacket> packetQueue = null;
29  
30      /**
31       * The packet this instance is working on.
32       */
33      UploadFilePacket packetInProgress = null;
34  
35      /**
36       * Indicates when the last file has been received. The last file is the poisonned
37       */
38      boolean lastFileReceived = false;
39  
40      PacketConstructionThread(BlockingQueue<UploadFileData> preparedFileQueue,
41              BlockingQueue<UploadFilePacket> packetQueue, FileUploadManagerThread fileUploadManagerThread,
42              UploadPolicy uploadPolicy) {
43          // A thread name is very useful, when debugging...
44          super("PacketConstructionThread");
45  
46          this.preparedFileQueue = preparedFileQueue;
47          this.packetQueue = packetQueue;
48          this.fileUploadManagerThread = fileUploadManagerThread;
49          this.uploadPolicy = uploadPolicy;
50  
51          // Let's construct the first packet...
52          this.packetInProgress = new UploadFilePacket(this.uploadPolicy);
53      }
54  
55      /**
56       * The actual command to generate packets.
57       * 
58       * @see java.lang.Thread#run()
59       */
60      @Override
61      final public void run() {
62          int nbFiles = 0;
63          this.uploadPolicy.displayDebug("Start of PacketConstructionThread", 80);
64          try { // catch (JUploadException e)
65  
66              // We loop, and wait for the 'poisonned' UploadFileData to finish.
67              try {
68                  while (!lastFileReceived && !this.fileUploadManagerThread.isUploadFinished()) {
69                      UploadFileData ufd = preparedFileQueue.take();
70                      receiveNewFile(ufd);
71                      nbFiles += 1;
72                  }
73  
74                  this.uploadPolicy.displayDebug("PacketConstructionThread: end of loop, the thread is about to finish",
75                          30);
76  
77                  // We may have some file left to send...
78                  if (this.packetInProgress.size() > 0) {
79                      this.uploadPolicy.displayDebug("Last file received: the current packet is not empty, we send it",
80                              30);
81                      sendCurrentPacket();
82                  }
83              } catch (InterruptedException e) {
84                  this.uploadPolicy.displayWarn("packetConstructionThread received InterruptedException, exiting");
85              }
86  
87              // In standard mode, we should have no more file to manage. The
88              // following test is meaningful only if the FilePreparationThread
89              // has been finished before (otherwise, other files could enter the
90              // queue after this test)
91              if (!this.preparedFileQueue.isEmpty()) {
92                  if (!this.fileUploadManagerThread.isUploadFinished()) {
93                      // Hum, hum. This should not happen.
94                      this.uploadPolicy.displayWarn("The preparedFileQueue is not empty, at the end of "
95                              + this.getClass().getName());
96                  }
97                  // This can happen, if we are interrupted while working. Let's
98                  // empty this.
99                  this.uploadPolicy
100                         .displayDebug(
101                                 "The PacketConstructionThread is about to finish, but the preparedFileQueue is not empty. Let's clear it.",
102                                 30);
103                 while (!this.preparedFileQueue.isEmpty()) {
104                     this.preparedFileQueue.poll();
105                 }
106             }
107         } catch (JUploadException e) {
108             this.fileUploadManagerThread.setUploadException(e);
109         } finally {
110             // To properly finish the job, we send a 'poisonned' packet, so that
111             // the FileUploadThread knows it's finished.
112             try {
113                 this.packetQueue.put(new UploadFilePacketPoisonned(this.uploadPolicy));
114             } catch (InterruptedException e) {
115                 this.uploadPolicy
116                         .displayWarn("packetConstructionThread received InterruptedException (while checking if packetQueue is empty), exiting");
117             }
118         }
119         this.uploadPolicy.displayDebug("End of PacketConstructionThread (" + nbFiles
120                 + " files have been taken from the queue, including the 'poisoned' one)", 10);
121     }
122 
123     /**
124      * Called when a new file is received
125      * 
126      * @param uploadFileData
127      * @throws JUploadException
128      * @throws InterruptedException
129      */
130     private void receiveNewFile(UploadFileData uploadFileData) throws JUploadException, InterruptedException {
131         // Are we finished ?
132         if (uploadFileData.isPoisonned()) {
133             lastFileReceived = true;
134             this.uploadPolicy.displayDebug(
135                     "Poisonned UploadFileData received, PacketContructionThread will exit normally", 50);
136         } else {
137             // We try to add the file to the current packet. If it doesn't work,
138             // this packet is probably full. We sent it, and add the packet to
139             // the new one.
140             if (!this.packetInProgress.add(uploadFileData)) {
141                 // The packet was refused. We send the current one, and retry.
142                 this.uploadPolicy.displayDebug(
143                         "The file can't be added to the current packet. Let's send this packet first.", 80);
144                 sendCurrentPacket();
145                 if (!this.packetInProgress.add(uploadFileData)) {
146                     throw new JUploadException("Could not add file to packet! (filename: "
147                             + uploadFileData.getFileName() + ")");
148                 }
149             }
150 
151             // If the current packet is finished, we send it immediatly.
152             if (this.packetInProgress.isFull()) {
153                 sendCurrentPacket();
154             }
155         }
156     }
157 
158     private void sendCurrentPacket() throws InterruptedException {
159         if (this.packetInProgress == null) {
160             throw new java.lang.AssertionError(this.getClass().getName()
161                     + ".sendCurrentPacket(): this.packetInProgress may not be null");
162         } else if (this.packetInProgress.size() == 0) {
163             throw new java.lang.AssertionError(this.getClass().getName()
164                     + ".sendCurrentPacket(): this.packetInProgress.size() may not be 0");
165         }
166         // If a packet is ready, we post it into the relevant queue.
167         this.packetQueue.put(packetInProgress);
168 
169         // And we start a new one.
170         this.packetInProgress = new UploadFilePacket(this.uploadPolicy);
171     }
172 }