1 package wjhk.jupload2.upload;
2
3 import java.util.concurrent.BlockingQueue;
4
5 import wjhk.jupload2.exception.JUploadException;
6 import wjhk.jupload2.policies.UploadPolicy;
7
8
9
10
11 public class PacketConstructionThread extends Thread {
12
13
14 UploadPolicy uploadPolicy = null;
15
16
17 FileUploadManagerThread fileUploadManagerThread = null;
18
19
20
21
22
23 BlockingQueue<UploadFileData> preparedFileQueue = null;
24
25
26
27
28 BlockingQueue<UploadFilePacket> packetQueue = null;
29
30
31
32
33 UploadFilePacket packetInProgress = null;
34
35
36
37
38 boolean lastFileReceived = false;
39
40 PacketConstructionThread(BlockingQueue<UploadFileData> preparedFileQueue,
41 BlockingQueue<UploadFilePacket> packetQueue, FileUploadManagerThread fileUploadManagerThread,
42 UploadPolicy uploadPolicy) {
43
44 super("PacketConstructionThread");
45
46 this.preparedFileQueue = preparedFileQueue;
47 this.packetQueue = packetQueue;
48 this.fileUploadManagerThread = fileUploadManagerThread;
49 this.uploadPolicy = uploadPolicy;
50
51
52 this.packetInProgress = new UploadFilePacket(this.uploadPolicy);
53 }
54
55
56
57
58
59
60 @Override
61 final public void run() {
62 int nbFiles = 0;
63 this.uploadPolicy.displayDebug("Start of PacketConstructionThread", 80);
64 try {
65
66
67 try {
68 while (!lastFileReceived && !this.fileUploadManagerThread.isUploadFinished()) {
69 UploadFileData ufd = preparedFileQueue.take();
70 receiveNewFile(ufd);
71 nbFiles += 1;
72 }
73
74 this.uploadPolicy.displayDebug("PacketConstructionThread: end of loop, the thread is about to finish",
75 30);
76
77
78 if (this.packetInProgress.size() > 0) {
79 this.uploadPolicy.displayDebug("Last file received: the current packet is not empty, we send it",
80 30);
81 sendCurrentPacket();
82 }
83 } catch (InterruptedException e) {
84 this.uploadPolicy.displayWarn("packetConstructionThread received InterruptedException, exiting");
85 }
86
87
88
89
90
91 if (!this.preparedFileQueue.isEmpty()) {
92 if (!this.fileUploadManagerThread.isUploadFinished()) {
93
94 this.uploadPolicy.displayWarn("The preparedFileQueue is not empty, at the end of "
95 + this.getClass().getName());
96 }
97
98
99 this.uploadPolicy
100 .displayDebug(
101 "The PacketConstructionThread is about to finish, but the preparedFileQueue is not empty. Let's clear it.",
102 30);
103 while (!this.preparedFileQueue.isEmpty()) {
104 this.preparedFileQueue.poll();
105 }
106 }
107 } catch (JUploadException e) {
108 this.fileUploadManagerThread.setUploadException(e);
109 } finally {
110
111
112 try {
113 this.packetQueue.put(new UploadFilePacketPoisonned(this.uploadPolicy));
114 } catch (InterruptedException e) {
115 this.uploadPolicy
116 .displayWarn("packetConstructionThread received InterruptedException (while checking if packetQueue is empty), exiting");
117 }
118 }
119 this.uploadPolicy.displayDebug("End of PacketConstructionThread (" + nbFiles
120 + " files have been taken from the queue, including the 'poisoned' one)", 10);
121 }
122
123
124
125
126
127
128
129
130 private void receiveNewFile(UploadFileData uploadFileData) throws JUploadException, InterruptedException {
131
132 if (uploadFileData.isPoisonned()) {
133 lastFileReceived = true;
134 this.uploadPolicy.displayDebug(
135 "Poisonned UploadFileData received, PacketContructionThread will exit normally", 50);
136 } else {
137
138
139
140 if (!this.packetInProgress.add(uploadFileData)) {
141
142 this.uploadPolicy.displayDebug(
143 "The file can't be added to the current packet. Let's send this packet first.", 80);
144 sendCurrentPacket();
145 if (!this.packetInProgress.add(uploadFileData)) {
146 throw new JUploadException("Could not add file to packet! (filename: "
147 + uploadFileData.getFileName() + ")");
148 }
149 }
150
151
152 if (this.packetInProgress.isFull()) {
153 sendCurrentPacket();
154 }
155 }
156 }
157
158 private void sendCurrentPacket() throws InterruptedException {
159 if (this.packetInProgress == null) {
160 throw new java.lang.AssertionError(this.getClass().getName()
161 + ".sendCurrentPacket(): this.packetInProgress may not be null");
162 } else if (this.packetInProgress.size() == 0) {
163 throw new java.lang.AssertionError(this.getClass().getName()
164 + ".sendCurrentPacket(): this.packetInProgress.size() may not be 0");
165 }
166
167 this.packetQueue.put(packetInProgress);
168
169
170 this.packetInProgress = new UploadFilePacket(this.uploadPolicy);
171 }
172 }