-
Notifications
You must be signed in to change notification settings - Fork 0
/
ParforProgressbar.m
343 lines (321 loc) · 14.9 KB
/
ParforProgressbar.m
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
% ParforProgressbar Progress monitor for `parfor` loops
% ppm = ParforProgressbar(numIterations) constructs a ParforProgressbar object.
% 'numIterations' is an integer with the total number of
% iterations in the parfor loop.
%
% ppm = ParforProgressbar(___, 'showWorkerProgress', true) will display
% the progress of all workers (default: false).
%
% ppm = ParforProgressbar(___, 'progressBarUpdatePeriod', 1.5) will
% update the progressbar every 1.5 second (default: 1.0 seconds).
%
% ppm = ParforProgressbar(___, 'title', 'my fancy title') will
% show 'my fancy title' on the progressbar.
%
% ppm = ParforProgressbar(___, 'parpool', 'local') will
% start the parallel pool (parpool) using the 'local' profile.
%
% ppm = ParforProgressbar(___, 'parpool', {profilename, poolsize, Name, Value})
% will start the parallel pool (parpool) using the profilename profile with
% poolsize workers and any Name Value pair supported by function parpool.
%
%
% <strong>Usage:</strong>
% % 'numIterations' is an integer with the total number of iterations in the loop.
% numIterations = 100000;
%
% % Then construct a ParforProgMon object:
% ppm = ParforProgressbar(numIterations);
%
% parfor i = 1:numIterations
% % do some parallel computation
% pause(100/numIterations);
% % increment counter to track progress
% ppm.increment();
% end
%
% % Delete the progress handle when the parfor loop is done.
% delete(ppm);
%
%
% Based on <a href="https://de.mathworks.com/matlabcentral/fileexchange/60135-parfor-progress-monitor-progress-bar-v3">ParforProgMonv3</a>.
% Uses the progressbar from: <a href="https://de.mathworks.com/matlabcentral/fileexchange/6922-progressbar">progressbar</a>.
classdef ParforProgressbar < handle
% These properties are the same for the server and worker and are not
% subject to change
properties ( GetAccess = private, SetAccess = private )
ServerPort % Port of the server connection
ServerName % host name of the server connection
totalIterations % Total number of iterations (entire parfor)
numWorkersPossible % Total number of possible workers
stepSize uint64 % Number of steps before update
end
% These properties are completely different between server and each
% worker.
properties (Transient, GetAccess = private, SetAccess = private)
it uint64 % worker: iteration
UserData % Anything the user wants to store temporarily in the worker
workerTable % server: total progress with ip and port of each worker
showWorkerProgress logical% server: show not only total progress but also the estimated progress of each worker
timer % server: timer object
progressTotalOld % server: Current total progress (float between 0 and 1).
isWorker logical % server/worker: This identifies a worker/server
connection % server/worker: udp connection
end
properties (Transient, GetAccess = public, SetAccess = private)
workerID uint64 % worker: unique id for each worker
end
methods ( Static )
function o = loadobj( X )
% import libUtil.ParforProgressbar;
% loadobj - METHOD Reconstruct a ParforProgressbar object
% Once we've been loaded, we need to reconstruct ourselves correctly as a
% worker-side object.
debug('LoadObj');
o = ParforProgressbar( {X.ServerName, X.ServerPort, X.totalIterations, X.numWorkersPossible, X.stepSize, X.UserData} );
end
end
methods
function o = ParforProgressbar( numIterations, varargin )
% import libUtil.progressbar;
% ParforProgressbar - CONSTRUCTOR Create a ParforProgressbar object
%
% ppb = ParforProgressbar(numIterations)
% numIterations is an integer with the total number of
% iterations in the parfor loop.
%
% ppm = ParforProgressbar(___, 'showWorkerProgress', true) will display
% the progress of all workers (default: false).
%
% ppm = ParforProgressbar(___, 'progressBarUpdatePeriod', 1.5) will
% update the progressbar every 1.5 second (default: 1.0 seconds).
%
% ppm = ParforProgressbar(___, 'title', 'my fancy title') will
% show 'my fancy title' on the progressbar).
%
% ppm = ParforProgressbar(___, 'parpool', 'local') will
% start the parallel pool (parpool) using the 'local' profile.
%
% ppm = ParforProgressbar(___, 'parpool', {profilename, poolsize, Name, Value})
% will start the parallel pool (parpool) using the profilename profile with
% poolsize workers and any Name Value pair supported by function parpool.
%
if iscell(numIterations) % worker
debug('Start worker.');
host = numIterations{1};
port = numIterations{2};
o.totalIterations = numIterations{3};
o.numWorkersPossible = numIterations{4};
o.stepSize = numIterations{5};
o.UserData = numIterations{6};
o.ServerName = host;
o.ServerPort = port;
t = getCurrentTask();
o.workerID = t.ID;
% Connect the worker to the server, so that we can send the
% progress to the server.
o.connection = udp(o.ServerName, o.ServerPort);
fopen(o.connection);
o.isWorker = true;
o.it = 0; % This is the number of iterations this worker is called.
debug('Send login cmd');
% Send a login request to the server, so that the ip and
% port can be saved by the server. This is neccessary to
% close each worker when the parfor loop is finished.
fwrite(o.connection,[o.workerID, 0],'ulong'); % login to server
else % server
% - Server constructor
p = inputParser;
showWorkerProgressDefault = false;
progressBarUpdatePeriodDefault = 1.0;
titleDefault = '';
poolDefault = '';
validScalarPosNum = @(x) isnumeric(x) && isscalar(x) && (x > 0);
is_valid_profile = @(x) ischar(x) || iscell(x);
addRequired(p,'numIterations', validScalarPosNum );
addParameter(p,'showWorkerProgress', showWorkerProgressDefault, @isscalar);
addParameter(p,'progressBarUpdatePeriod', progressBarUpdatePeriodDefault, validScalarPosNum);
addParameter(p,'title',titleDefault,@ischar);
addParameter(p,'parpool',poolDefault,is_valid_profile)
parse(p,numIterations, varargin{:});
o.showWorkerProgress = p.Results.showWorkerProgress;
o.totalIterations = p.Results.numIterations;
o.progressTotalOld = 0;
ppool = p.Results.parpool;
debug('Start server.');
pPool = gcp('nocreate');
if isempty(pPool)
if isempty(ppool)
pPool = parpool; % Create new parallel pool with standard setting
elseif ischar(ppool)
pPool = parpool(ppool); % Create parallel pool with given profilename
elseif iscell(ppool)
pPool = parpool(ppool{:}); % Create parallel pool with given input arguments.
end
else
% A parallel pool is still running. Let's keep it.
end
o.numWorkersPossible = pPool.NumWorkers;
% We don't send each progress step to the server because
% this will slow down each worker. Insead, we send the
% progress each stepSize iterations.
if (o.totalIterations / o.numWorkersPossible) > 200
% We only need to resolve 1% gain in worker progress
% progressStepSize = worker workload/100
progressStepSize = floor(o.totalIterations/o.numWorkersPossible/100);
else
% We will transmit the progress each step.
progressStepSize = 1;
end
o.stepSize = progressStepSize;
pct = pctconfig;
o.ServerName = pct.hostname;
% Create server connection to receive the updates from each
% worker via udp. receiver is called each time a data
% package is received with this class object handle to keep
% track of the progress.
o.connection = udp(o.ServerName, 'DatagramReceivedFcn', {@receiver, o}, 'DatagramTerminateMode', 'on', 'EnablePortSharing', 'on');
fopen(o.connection);
% This new connection uses a free port, which we have to
% provide to each worker to connect to.
o.ServerPort = o.connection.LocalPort;
o.workerTable = table('Size',[pPool.NumWorkers, 4],'VariableTypes',{'uint64','string','uint32','logical'},'VariableNames',{'progress','ip','port','connected'});
o.isWorker = false;
% Open a progressbar with 0% progress and optionally
% initiallize also the progress of each worker with 0%.
% Also optionally, provide a title to the main progress
if o.showWorkerProgress
titles = cell(pPool.NumWorkers + 1, 1);
if ~any(contains(p.UsingDefaults,'title'))
titles{1} = p.Results.title;
else
titles{1} = 'Total progress';
end
for i = 1 : pPool.NumWorkers
titles{i+1} = sprintf('Worker %d', i);
end
progressbar(titles{:});
else
if ~any(contains(p.UsingDefaults,'title'))
progressbar(p.Results.title);
else
progressbar;
end
end
% Start a timer and update the progressbar periodically
o.timer = timer('BusyMode','drop','ExecutionMode','fixedSpacing','StartDelay',p.Results.progressBarUpdatePeriod*2,'Period',p.Results.progressBarUpdatePeriod,'TimerFcn',{@draw_progress_bar, o});
start(o.timer);
o.UserData = {};
end
end
function o = saveobj( X )
debug('SaveObj');
o.ServerPort = X.ServerPort;
o.ServerName = X.ServerName;
o.totalIterations = X.totalIterations;
o.numWorkersPossible = X.numWorkersPossible;
o.stepSize = X.stepSize;
o.UserData = X.UserData;
end
function delete( o )
debug('Delete object');
o.close();
end
function increment( o )
o.it = o.it + 1;
if mod(o.it, o.stepSize) == 0
debug('Send it=%d',o.it);
fwrite(o.connection,[o.workerID, o.it], 'ulong');
end
end
function UserData = getUserData( o )
UserData = o.UserData;
end
function setUserData( o, UserData )
o.UserData = UserData;
end
function close( o )
% import libUtil.progressbar;
% Close worker/server connection
if isa(o.connection, 'udp')
if strcmp(o.connection.Status, 'open')
debug('Close worker/server connection');
fclose(o.connection);
end
debug('Delete worker/server connection');
delete(o.connection);
end
if ~o.isWorker
if isa(o.timer,'timer') && isvalid(o.timer)
debug('Stop and delete timer');
stop(o.timer);
delete(o.timer);
end
% Let's close the progressbar after we are sure that no more
% data will be collected
progressbar(1.0);
end
end
end
end
% In this function we usually receive the progress of each worker
% This function belongs to the udp connection of the server/main thread and
% is called whenever data from a worker is received.
% It is also used to log the ip address and port of each worker when they
% connect at the beginning of their execution.
function receiver(h, ~, o)
[data,count,msg,ip,port] = fread(h, 1, 'ulong');
if count ~= 2 % error
debug('Unkown received data from %s:%d with count = %d and fread msg = %s', ip, port, count, msg);
else
id = data(1);
if data(2) == 0 % log in request in worker constructor
o.workerTable.progress(id) = 0;
o.workerTable.ip(id) = ip;
o.workerTable.port(id) = port;
o.workerTable.connected(id) = true;
debug('login worker id=%02d with ip:port=%s:%d',id,ip,port);
else % from worker increment call
o.workerTable.progress(id) = data(2);
debug('Set progress for worker id=%02d to %d',id,data(2));
end
end
end
% This function is called by the main threads timer to calculate and draw
% the progress bar
% if showWorkerProgress was set to true then the estimated progress of each
% worker thread is displayed (assuming the workload is evenly split)
function draw_progress_bar(~, ~, o)
% import libUtil.progressbar;
progressTotal = sum(o.workerTable.progress) / o.totalIterations;
if progressTotal > o.progressTotalOld
o.progressTotalOld = progressTotal;
if(o.showWorkerProgress)
numWorkers = sum(o.workerTable.connected);
EstWorkPerWorker = o.totalIterations / numWorkers;
progWorker = double(o.workerTable.progress) / EstWorkPerWorker;
progWorkerC = mat2cell(progWorker,ones(1,length(progWorker)));
progressbar(progressTotal, progWorkerC{:});
else
progressbar(progressTotal);
end
end
end
% Workers within the parfor loop can sometimes display the commands using
% printf or disp. However, if you start a timer or udp connection and want
% to display anything after an interrupt occured, it is simply impossible
% to print anything. Unfortunately error messages also don't get shown...
% I used this method to just print stuff to a file with the info about
% the current worker/server (main thread).
function debug(varargin)
% fid = fopen('E:/tmp/debugParforProgressbar.txt', 'a');
% t = getCurrentTask();
% if isempty(t)
% fprintf(fid, 'Server: ');
% else
% fprintf(fid, 'Worker ID=%02d: ', t.ID);
% end
% fprintf(fid, varargin{:});
% fprintf(fid, '\n');
% fclose(fid);
end