Skip to content

Commit

Permalink
Optimized ValueLinkedQueue<T> to only store 1 reference.
Browse files Browse the repository at this point in the history
  • Loading branch information
timcassell committed Nov 11, 2024
1 parent 48aa866 commit ad9ccd2
Show file tree
Hide file tree
Showing 13 changed files with 404 additions and 214 deletions.
190 changes: 133 additions & 57 deletions Package/Core/Channels/Internal/BoundedChannelInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,16 @@ internal override ChannelReadResult<T> TryRead(int id)
else
{
// Otherwise, notify waiting writers.
var waitToWriters = _waitToWriters.MoveElementsToStack();
var waitToWriters = _waitToWriters.TakeElements();
_smallFields._locker.Exit();

while (waitToWriters.IsNotEmpty)
if (waitToWriters.IsNotEmpty)
{
waitToWriters.Pop().Resolve(true);
var stack = waitToWriters.MoveElementsToStackUnsafe();
do
{
stack.Pop().Resolve(true);
} while (stack.IsNotEmpty);
}
}
return new ChannelReadResult<T>(item, ChannelReadResult.Success);
Expand Down Expand Up @@ -197,13 +201,17 @@ internal override ChannelWriteResult<T> TryWrite(in T item, int id)
if (_queue.Count < _capacity)
{
// Notify waiting readers.
var waitToReaders = _waitToReaders.MoveElementsToStack();
var waitToReaders = _waitToReaders.TakeElements();
_queue.EnqueueTail(item);
_smallFields._locker.Exit();

while (waitToReaders.IsNotEmpty)
if (waitToReaders.IsNotEmpty)
{
waitToReaders.Pop().Resolve(true);
var stack = waitToReaders.MoveElementsToStackUnsafe();
do
{
stack.Pop().Resolve(true);
} while (stack.IsNotEmpty);
}
return new ChannelWriteResult<T>(default, ChannelWriteResult.Success);
}
Expand Down Expand Up @@ -316,13 +324,17 @@ internal override Promise<ChannelWriteResult<T>> WriteAsync(in T item, int id, C
if (_queue.Count < _capacity)
{
// Notify waiting readers.
var waitToReaders = _waitToReaders.MoveElementsToStack();
var waitToReaders = _waitToReaders.TakeElements();
_queue.EnqueueTail(item);
_smallFields._locker.Exit();

while (waitToReaders.IsNotEmpty)
if (waitToReaders.IsNotEmpty)
{
waitToReaders.Pop().Resolve(true);
var stack = waitToReaders.MoveElementsToStackUnsafe();
do
{
stack.Pop().Resolve(true);
} while (stack.IsNotEmpty);
}
return Promise.Resolved(new ChannelWriteResult<T>(default, ChannelWriteResult.Success));
}
Expand Down Expand Up @@ -458,27 +470,43 @@ internal override bool TryReject(object reason, int id)

var rejection = CreateRejectContainer(reason, 1, null, this);
_closedReason = rejection;
var readers = _readers.MoveElementsToStack();
var writers = _writers.MoveElementsToStack();
var waitToReaders = _waitToReaders.MoveElementsToStack();
var waitToWriters = _waitToWriters.MoveElementsToStack();
var readers = _readers.TakeElements();
var writers = _writers.TakeElements();
var waitToReaders = _waitToReaders.TakeElements();
var waitToWriters = _waitToWriters.TakeElements();
_smallFields._locker.Exit();

while (readers.IsNotEmpty)
if (readers.IsNotEmpty)
{
readers.Pop().Reject(rejection);
var stack = readers.MoveElementsToStackUnsafe();
do
{
stack.Pop().Reject(rejection);
} while (stack.IsNotEmpty);
}
while (writers.IsNotEmpty)
if (writers.IsNotEmpty)
{
writers.Pop().Reject(rejection);
var stack = writers.MoveElementsToStackUnsafe();
do
{
stack.Pop().Reject(rejection);
} while (stack.IsNotEmpty);
}
while (waitToReaders.IsNotEmpty)
if (waitToReaders.IsNotEmpty)
{
waitToReaders.Pop().Reject(rejection);
var stack = waitToReaders.MoveElementsToStackUnsafe();
do
{
stack.Pop().Reject(rejection);
} while (stack.IsNotEmpty);
}
while (waitToWriters.IsNotEmpty)
if (waitToWriters.IsNotEmpty)
{
waitToWriters.Pop().Reject(rejection);
var stack = waitToWriters.MoveElementsToStackUnsafe();
do
{
stack.Pop().Reject(rejection);
} while (stack.IsNotEmpty);
}
}
return true;
Expand All @@ -497,27 +525,43 @@ internal override bool TryCancel(int id)
}

_closedReason = ChannelSmallFields.ClosedCanceledReason;
var readers = _readers.MoveElementsToStack();
var writers = _writers.MoveElementsToStack();
var waitToReaders = _waitToReaders.MoveElementsToStack();
var waitToWriters = _waitToWriters.MoveElementsToStack();
var readers = _readers.TakeElements();
var writers = _writers.TakeElements();
var waitToReaders = _waitToReaders.TakeElements();
var waitToWriters = _waitToWriters.TakeElements();
_smallFields._locker.Exit();

while (readers.IsNotEmpty)
if (readers.IsNotEmpty)
{
readers.Pop().CancelDirect();
var stack = readers.MoveElementsToStackUnsafe();
do
{
stack.Pop().CancelDirect();
} while (stack.IsNotEmpty);
}
while (writers.IsNotEmpty)
if (writers.IsNotEmpty)
{
writers.Pop().CancelDirect();
var stack = writers.MoveElementsToStackUnsafe();
do
{
stack.Pop().CancelDirect();
} while (stack.IsNotEmpty);
}
while (waitToReaders.IsNotEmpty)
if (waitToReaders.IsNotEmpty)
{
waitToReaders.Pop().CancelDirect();
var stack = waitToReaders.MoveElementsToStackUnsafe();
do
{
stack.Pop().CancelDirect();
} while (stack.IsNotEmpty);
}
while (waitToWriters.IsNotEmpty)
if (waitToWriters.IsNotEmpty)
{
waitToWriters.Pop().CancelDirect();
var stack = waitToWriters.MoveElementsToStackUnsafe();
do
{
stack.Pop().CancelDirect();
} while (stack.IsNotEmpty);
}
}
return true;
Expand All @@ -536,27 +580,43 @@ internal override bool TryClose(int id)
}

_closedReason = ChannelSmallFields.ClosedResolvedReason;
var readers = _readers.MoveElementsToStack();
var writers = _writers.MoveElementsToStack();
var waitToReaders = _waitToReaders.MoveElementsToStack();
var waitToWriters = _waitToWriters.MoveElementsToStack();
var readers = _readers.TakeElements();
var writers = _writers.TakeElements();
var waitToReaders = _waitToReaders.TakeElements();
var waitToWriters = _waitToWriters.TakeElements();
_smallFields._locker.Exit();

while (readers.IsNotEmpty)
if (readers.IsNotEmpty)
{
readers.Pop().Resolve(new ChannelReadResult<T>(default, ChannelReadResult.Closed));
var stack = readers.MoveElementsToStackUnsafe();
do
{
stack.Pop().Resolve(new ChannelReadResult<T>(default, ChannelReadResult.Closed));
} while (stack.IsNotEmpty);
}
while (writers.IsNotEmpty)
if (writers.IsNotEmpty)
{
writers.Pop().Resolve(new ChannelWriteResult<T>(default, ChannelWriteResult.Closed));
var stack = writers.MoveElementsToStackUnsafe();
do
{
stack.Pop().Resolve(new ChannelWriteResult<T>(default, ChannelWriteResult.Closed));
} while (stack.IsNotEmpty);
}
while (waitToReaders.IsNotEmpty)
if (waitToReaders.IsNotEmpty)
{
waitToReaders.Pop().Resolve(false);
var stack = waitToReaders.MoveElementsToStackUnsafe();
do
{
stack.Pop().Resolve(false);
} while (stack.IsNotEmpty);
}
while (waitToWriters.IsNotEmpty)
if (waitToWriters.IsNotEmpty)
{
waitToWriters.Pop().Resolve(false);
var stack = waitToWriters.MoveElementsToStackUnsafe();
do
{
stack.Pop().Resolve(false);
} while (stack.IsNotEmpty);
}
}
return true;
Expand All @@ -576,32 +636,48 @@ internal override void Dispose(int id)
unchecked { _smallFields._id = id + 1; }
_closedReason = ChannelSmallFields.DisposedReason;
_queue.Dispose();
var readers = _readers.MoveElementsToStack();
var writers = _writers.MoveElementsToStack();
var waitToReaders = _waitToReaders.MoveElementsToStack();
var waitToWriters = _waitToWriters.MoveElementsToStack();
var readers = _readers.TakeElements();
var writers = _writers.TakeElements();
var waitToReaders = _waitToReaders.TakeElements();
var waitToWriters = _waitToWriters.TakeElements();
_smallFields._locker.Exit();

ObjectPool.MaybeRepool(this);

if (readers.IsNotEmpty | writers.IsNotEmpty | waitToReaders.IsNotEmpty | waitToWriters.IsNotEmpty)
{
var rejection = CreateRejectContainer(new System.ObjectDisposedException(nameof(Channel<T>)), 3, null, this);
while (readers.IsNotEmpty)
if (readers.IsNotEmpty)
{
readers.Pop().Reject(rejection);
var stack = readers.MoveElementsToStackUnsafe();
do
{
stack.Pop().Reject(rejection);
} while (stack.IsNotEmpty);
}
while (writers.IsNotEmpty)
if (writers.IsNotEmpty)
{
writers.Pop().Reject(rejection);
var stack = writers.MoveElementsToStackUnsafe();
do
{
stack.Pop().Reject(rejection);
} while (stack.IsNotEmpty);
}
while (waitToReaders.IsNotEmpty)
if (waitToReaders.IsNotEmpty)
{
waitToReaders.Pop().Reject(rejection);
var stack = waitToReaders.MoveElementsToStackUnsafe();
do
{
stack.Pop().Reject(rejection);
} while (stack.IsNotEmpty);
}
while (waitToWriters.IsNotEmpty)
if (waitToWriters.IsNotEmpty)
{
waitToWriters.Pop().Reject(rejection);
var stack = waitToWriters.MoveElementsToStackUnsafe();
do
{
stack.Pop().Reject(rejection);
} while (stack.IsNotEmpty);
}
}
}
Expand Down
Loading

0 comments on commit ad9ccd2

Please sign in to comment.