Skip to content

Commit e7ee993

Browse files
committed
fixing SocketModelCommunicationTest and support for large messages
1 parent e3b0f27 commit e7ee993

File tree

3 files changed

+81
-26
lines changed

3 files changed

+81
-26
lines changed

source/P4VFS.Console/P4VFS.Notes.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ Version [1.29.0.0]
1111
virtual machine with including kernel mode debugging. Includes support for
1212
RestartPerforceServer script to quick spin-up of existing server on override drive
1313
* Reconfiguring our 32 bit driver allocation pool tags to 4-byte string "Pvs*"
14+
* Fixing bug recieving large messages to/from the P4VFS.Service. This was typically
15+
encountered when using global "-x <file>" with a file containing many arguments.
16+
There's a new ReflectPackage message which is now used by SocketModelCommunicationTest
17+
to stress test a wide range of message sizes
1418

1519
Version [1.28.4.0]
1620
* Addition of optional service commandline argument to launch with debugger attached.

source/P4VFS.Extensions/Source/Common/SocketModel.cs

Lines changed: 75 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ public void Shutdown()
4848
try
4949
{
5050
if (_Cancellation != null)
51+
{
5152
_Cancellation.Cancel();
53+
}
5254

5355
if (_TcpListener != null)
5456
{
@@ -66,7 +68,9 @@ public void Shutdown()
6668
if (_ListenThread != null)
6769
{
6870
if (_ListenThread.IsAlive)
71+
{
6972
_ListenThread.Join(500);
73+
}
7074
_ListenThread = null;
7175
}
7276
}
@@ -96,7 +100,9 @@ private void ListenForClients()
96100
catch (Exception e)
97101
{
98102
if (_Cancellation.IsCancellationRequested == false)
103+
{
99104
VirtualFileSystemLog.Error("SocketModelServer.ListenForClients exiting with exception: {0}", e.Message);
105+
}
100106
}
101107
}
102108

@@ -109,7 +115,7 @@ private void HandleClientConnection(object param)
109115
{
110116
using (NetworkStream stream = client.GetStream())
111117
{
112-
SocketModelMessage msg = SocketModelProtocol.ReceiveMessage<SocketModelMessage>(stream);
118+
SocketModelMessage msg = SocketModelProtocol.ReceiveMessage<SocketModelMessage>(stream, _Cancellation.Token);
113119
if (msg == null)
114120
{
115121
VirtualFileSystemLog.Info("SocketModelServer.HandleClientConnection exiting from null message");
@@ -133,7 +139,7 @@ private void HandleSocketModelMessage(NetworkStream stream, SocketModelMessage m
133139
{
134140
SocketModelRequestSync request = msg.GetData<SocketModelRequestSync>();
135141
SocketModelReplySync reply = Sync(stream, request);
136-
SocketModelProtocol.SendMessage(stream, new SocketModelMessage(reply));
142+
SocketModelProtocol.SendMessage(stream, new SocketModelMessage(reply), _Cancellation.Token);
137143
}
138144
else if (msg.Type == typeof(SocketModelRequestServiceStatus).Name)
139145
{
@@ -144,36 +150,36 @@ private void HandleSocketModelMessage(NetworkStream stream, SocketModelMessage m
144150
reply.LastModifiedTime = VirtualFileSystem.ServiceHost.GetLastModifiedTime();
145151
reply.LastRequestTime = VirtualFileSystem.ServiceHost.GetLastRequestTime();
146152
}
147-
SocketModelProtocol.SendMessage(stream, new SocketModelMessage(reply));
153+
SocketModelProtocol.SendMessage(stream, new SocketModelMessage(reply), _Cancellation.Token);
148154
}
149155
else if (msg.Type == typeof(SocketModelRequestSetServiceSetting).Name)
150156
{
151157
SocketModelRequestSetServiceSetting request = msg.GetData<SocketModelRequestSetServiceSetting>();
152158
SocketModelReply reply = new SocketModelReply{ Success = ServiceSettings.SetProperty(request.Value, request.Name) };
153-
SocketModelProtocol.SendMessage(stream, new SocketModelMessage(reply));
159+
SocketModelProtocol.SendMessage(stream, new SocketModelMessage(reply), _Cancellation.Token);
154160
}
155161
else if (msg.Type == typeof(SocketModelRequestGetServiceSetting).Name)
156162
{
157163
SocketModelRequestGetServiceSetting request = msg.GetData<SocketModelRequestGetServiceSetting>();
158164
SocketModelReplyGetServiceSetting reply = new SocketModelReplyGetServiceSetting{ Value = ServiceSettings.GetProperty(request.Name) };
159-
SocketModelProtocol.SendMessage(stream, new SocketModelMessage(reply));
165+
SocketModelProtocol.SendMessage(stream, new SocketModelMessage(reply), _Cancellation.Token);
160166
}
161167
else if (msg.Type == typeof(SocketModelRequestGetServiceSettings).Name)
162168
{
163169
SocketModelReplyGetServiceSettings reply = new SocketModelReplyGetServiceSettings{ Settings = SettingManager.GetProperties() };
164-
SocketModelProtocol.SendMessage(stream, new SocketModelMessage(reply));
170+
SocketModelProtocol.SendMessage(stream, new SocketModelMessage(reply), _Cancellation.Token);
165171
}
166172
else if (msg.Type == typeof(SocketModelRequestGarbageCollect).Name)
167173
{
168174
SocketModelRequestGarbageCollect request = msg.GetData<SocketModelRequestGarbageCollect>();
169175
SocketModelReply reply = new SocketModelReply{ Success = VirtualFileSystem.ServiceHost != null ? VirtualFileSystem.ServiceHost.GarbageCollect(request.Timeout) : false };
170-
SocketModelProtocol.SendMessage(stream, new SocketModelMessage(reply));
176+
SocketModelProtocol.SendMessage(stream, new SocketModelMessage(reply), _Cancellation.Token);
171177
}
172178
else if (msg.Type == typeof(SocketModelRequestReflectPackage).Name)
173179
{
174180
SocketModelRequestReflectPackage request = msg.GetData<SocketModelRequestReflectPackage>();
175181
SocketModelReplyReflectPackage reply = new SocketModelReplyReflectPackage{ Package = request.Package };
176-
SocketModelProtocol.SendMessage(stream, new SocketModelMessage(reply));
182+
SocketModelProtocol.SendMessage(stream, new SocketModelMessage(reply), _Cancellation.Token);
177183
}
178184
else
179185
{
@@ -238,7 +244,7 @@ public override void Write(LogElement element)
238244
SocketModelMessage msg = new SocketModelMessage(new SocketModelRequestLog{ Element = element });
239245
lock (_StreamMutex)
240246
{
241-
SocketModelProtocol.SendMessage(_NetworkStream, msg);
247+
SocketModelProtocol.SendMessage(_NetworkStream, msg, _CancelationToken);
242248
}
243249
}
244250
}
@@ -256,7 +262,10 @@ public DepotSyncStatus Sync(DepotConfig config, DepotSyncOptions syncOptions)
256262
{
257263
List<SocketModelMessage> result = new List<SocketModelMessage>();
258264
if (SendCommand(new SocketModelRequestSync{ Config=config, SyncOptions=syncOptions }, result) == false)
265+
{
259266
return DepotSyncStatus.Error;
267+
}
268+
260269
SocketModelReplySync reply = SocketModelMessage.GetData<SocketModelReplySync>(result);
261270
return reply != null ? reply.Status : DepotSyncStatus.Error;
262271
}
@@ -265,15 +274,21 @@ public SocketModelReplyServiceStatus GetServiceStatus()
265274
{
266275
List<SocketModelMessage> result = new List<SocketModelMessage>();
267276
if (SendCommand(new SocketModelRequestServiceStatus(), result) == false)
277+
{
268278
return null;
279+
}
280+
269281
return SocketModelMessage.GetData<SocketModelReplyServiceStatus>(result);
270282
}
271283

272284
public SettingNodeMap GetServiceSettings()
273285
{
274286
List<SocketModelMessage> result = new List<SocketModelMessage>();
275287
if (SendCommand(new SocketModelRequestGetServiceSettings(), result) == false)
288+
{
276289
return null;
290+
}
291+
277292
SocketModelReplyGetServiceSettings reply = SocketModelMessage.GetData<SocketModelReplyGetServiceSettings>(result);
278293
return reply?.Settings;
279294
}
@@ -282,7 +297,10 @@ public bool SetServiceSetting(string name, SettingNode value)
282297
{
283298
List<SocketModelMessage> result = new List<SocketModelMessage>();
284299
if (SendCommand(new SocketModelRequestSetServiceSetting{ Name=name, Value=value }, result) == false)
300+
{
285301
return false;
302+
}
303+
286304
SocketModelReply reply = SocketModelMessage.GetData<SocketModelReply>(result);
287305
return reply != null && reply.Success;
288306
}
@@ -291,7 +309,10 @@ public SettingNode GetServiceSetting(string name)
291309
{
292310
List<SocketModelMessage> result = new List<SocketModelMessage>();
293311
if (SendCommand(new SocketModelRequestGetServiceSetting{ Name=name }, result) == false)
312+
{
294313
return null;
314+
}
315+
295316
SocketModelReplyGetServiceSetting reply = SocketModelMessage.GetData<SocketModelReplyGetServiceSetting>(result);
296317
return reply?.Value;
297318
}
@@ -300,7 +321,10 @@ public bool GarbageCollect(Int64 timeout = 0)
300321
{
301322
List<SocketModelMessage> result = new List<SocketModelMessage>();
302323
if (SendCommand(new SocketModelRequestGarbageCollect{ Timeout = timeout }, result) == false)
324+
{
303325
return false;
326+
}
327+
304328
SocketModelReply reply = SocketModelMessage.GetData<SocketModelReply>(result);
305329
return reply != null && reply.Success;
306330
}
@@ -309,12 +333,15 @@ public byte[] ReflectPackage(byte[] package)
309333
{
310334
List<SocketModelMessage> result = new List<SocketModelMessage>();
311335
if (SendCommand(new SocketModelRequestReflectPackage{ Package = package }, result) == false)
336+
{
312337
return null;
338+
}
339+
313340
SocketModelReplyReflectPackage reply = SocketModelMessage.GetData<SocketModelReplyReflectPackage>(result);
314341
return reply?.Package;
315342
}
316343

317-
private bool SendCommand<CommandType>(CommandType cmd, List<SocketModelMessage> result = null)
344+
private bool SendCommand<CommandType>(CommandType cmd, List<SocketModelMessage> result = null, CancellationToken cancelationToken = default)
318345
{
319346
try
320347
{
@@ -324,15 +351,18 @@ private bool SendCommand<CommandType>(CommandType cmd, List<SocketModelMessage>
324351
using (NetworkStream stream = client.GetStream())
325352
{
326353
SocketModelMessage request = new SocketModelMessage(cmd);
327-
SocketModelProtocol.SendMessage(stream, request);
354+
SocketModelProtocol.SendMessage(stream, request, cancelationToken);
328355
while (client.Connected)
329356
{
330-
SocketModelMessage msg = SocketModelProtocol.ReceiveMessage<SocketModelMessage>(stream);
357+
SocketModelMessage msg = SocketModelProtocol.ReceiveMessage<SocketModelMessage>(stream, cancelationToken);
331358
if (msg == null)
359+
{
332360
break;
361+
}
333362
if (result != null)
363+
{
334364
result.Add(msg);
335-
365+
}
336366
HandleSocketModelMessage(stream, msg);
337367
}
338368
}
@@ -372,16 +402,18 @@ static SocketModelProtocol()
372402
_Encoding = new UTF8Encoding(false, true);
373403
}
374404

375-
public static void SendMessage<MessageType>(NetworkStream stream, MessageType msg)
405+
public static void SendMessage<MessageType>(NetworkStream stream, MessageType msg, CancellationToken cancelationToken)
376406
{
377-
SendMessageAsync(stream, msg).Wait();
407+
SendMessageAsync(stream, msg, cancelationToken).Wait(cancelationToken);
378408
}
379409

380-
public static async Task SendMessageAsync<MessageType>(NetworkStream stream, MessageType msg)
410+
public static async Task SendMessageAsync<MessageType>(NetworkStream stream, MessageType msg, CancellationToken cancelationToken)
381411
{
382412
string content = Serialize(msg);
383413
if (content == null)
414+
{
384415
throw new Exception("Failed to serialize message");
416+
}
385417

386418
byte[] contentBytes = _Encoding.GetBytes(content);
387419

@@ -395,35 +427,52 @@ public static async Task SendMessageAsync<MessageType>(NetworkStream stream, Mes
395427
Array.Copy(headerBytes, 0, packetBytes, 0, headerBytes.Length);
396428
Array.Copy(contentBytes, 0, packetBytes, headerBytes.Length, contentBytes.Length);
397429

398-
await stream.WriteAsync(packetBytes, 0, packetBytes.Length);
430+
await stream.WriteAsync(packetBytes, 0, packetBytes.Length, cancelationToken);
399431
}
400432

401-
public static MessageType ReceiveMessage<MessageType>(NetworkStream stream)
433+
public static MessageType ReceiveMessage<MessageType>(NetworkStream stream, CancellationToken cancelationToken)
402434
{
403-
Task<MessageType> msg = ReceiveMessageAsync<MessageType>(stream);
404-
msg.Wait();
435+
Task<MessageType> msg = ReceiveMessageAsync<MessageType>(stream, cancelationToken);
436+
msg.Wait(cancelationToken);
405437
return msg.Result;
406438
}
407439

408-
public static async Task<MessageType> ReceiveMessageAsync<MessageType>(NetworkStream stream)
440+
public static async Task<MessageType> ReceiveMessageAsync<MessageType>(NetworkStream stream, CancellationToken cancelationToken)
409441
{
410442
int headerSize = Marshal.SizeOf(typeof(Header));
411443
byte[] headerBytes = new byte[headerSize];
412-
int headerRead = await stream.ReadAsync(headerBytes, 0, headerBytes.Length);
444+
int headerRead = await stream.ReadAsync(headerBytes, 0, headerBytes.Length, cancelationToken);
413445
if (headerRead == 0)
446+
{
414447
return default(MessageType);
448+
}
415449
if (headerRead != headerBytes.Length)
450+
{
416451
throw new Exception("Failed to read header bytes");
452+
}
417453

418454
Header header = BytesToStruct<Header>(headerBytes);
419455
if (header.MagicNumber != _MagicNumber)
456+
{
420457
throw new Exception(String.Format("Invalid header format 0x{0:X8} expected 0x{1:X8}", header.MagicNumber, _MagicNumber));
458+
}
421459
if (header.Version != _Version)
460+
{
422461
throw new Exception(String.Format("Invalid header version {0} expected {1}", header.Version, _Version));
462+
}
423463

424464
byte[] contentBytes = new byte[header.ContentLength];
425-
if (await stream.ReadAsync(contentBytes, 0, contentBytes.Length) != contentBytes.Length)
426-
throw new Exception("Failed to read content bytes");
465+
for (int contentIndex = 0; contentIndex < contentBytes.Length;)
466+
{
467+
cancelationToken.ThrowIfCancellationRequested();
468+
int remainingCount = contentBytes.Length - contentIndex;
469+
int readCount = await stream.ReadAsync(contentBytes, contentIndex, remainingCount, cancelationToken);
470+
if (readCount <= 0)
471+
{
472+
throw new Exception($"Failed to read expected number of content bytes {contentBytes.Length}");
473+
}
474+
contentIndex += readCount;
475+
}
427476

428477
string contentString = _Encoding.GetString(contentBytes);
429478
return Deserialize<MessageType>(contentString);
@@ -460,7 +509,9 @@ public static T BytesToStruct<T>(byte[] valueBytes) where T : struct
460509
{
461510
int valueSize = Marshal.SizeOf(typeof(T));
462511
if (valueBytes.Length < valueBytes.Length)
512+
{
463513
throw new Exception("Invalid buffer size");
514+
}
464515

465516
T value = default(T);
466517
IntPtr valuePtr = Marshal.AllocHGlobal(valueSize);

source/P4VFS.UnitTest/Source/UnitTestCommon.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1445,8 +1445,8 @@ public void SocketModelCommunicationTest()
14451445
data >>= 8;
14461446
}
14471447
byte[] receiveBytes = client.ReflectPackage(packageBytes);
1448-
//Assert(receiveBytes?.Length == packageBytes.Length);
1449-
//Assert(receiveBytes.SequenceEqual(packageBytes));
1448+
Assert(receiveBytes?.Length == packageBytes.Length);
1449+
Assert(receiveBytes.SequenceEqual(packageBytes));
14501450
}
14511451

14521452
ServiceRestart();

0 commit comments

Comments
 (0)