Skip to content

Commit

Permalink
Implement nullability in streams Buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus committed Feb 5, 2025
1 parent 8ea9d3b commit 6fe8437
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 64 deletions.
6 changes: 5 additions & 1 deletion src/core/Akka.Streams/Attributes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Text;
using Akka.Event;
Expand Down Expand Up @@ -325,8 +326,11 @@ public IEnumerable<TAttr> GetAttributeList<TAttr>() where TAttr : IAttribute
/// <typeparam name="TAttr">TBD</typeparam>
/// <param name="defaultIfNotFound">TBD</param>
/// <returns>TBD</returns>
public TAttr GetAttribute<TAttr>(TAttr defaultIfNotFound) where TAttr : class, IAttribute
#nullable enable
[return: NotNullIfNotNull("defaultIfNotFound")]
public TAttr? GetAttribute<TAttr>(TAttr? defaultIfNotFound) where TAttr : class, IAttribute
=> GetAttribute<TAttr>() ?? defaultIfNotFound;
#nullable restore

/// <summary>
/// Get the first (least specific) attribute of a given type or subtype thereof.
Expand Down
18 changes: 9 additions & 9 deletions src/core/Akka.Streams/Implementation/ActorRefSourceActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Akka.Event;
using Akka.Streams.Actors;

#nullable enable
namespace Akka.Streams.Implementation
{
/// <summary>
Expand Down Expand Up @@ -40,7 +41,7 @@ public static Props Props(int bufferSize, OverflowStrategy overflowStrategy, Act
/// <summary>
/// TBD
/// </summary>
protected readonly IBuffer<T> Buffer;
protected readonly IBuffer<T>? Buffer;

/// <summary>
/// TBD
Expand All @@ -50,7 +51,6 @@ public static Props Props(int bufferSize, OverflowStrategy overflowStrategy, Act
/// TBD
/// </summary>
public readonly OverflowStrategy OverflowStrategy;
private ILoggingAdapter _log;

/// <summary>
/// TBD
Expand All @@ -63,13 +63,13 @@ public ActorRefSourceActor(int bufferSize, OverflowStrategy overflowStrategy, in
{
BufferSize = bufferSize;
OverflowStrategy = overflowStrategy;
Buffer = bufferSize != 0 ? Implementation.Buffer.Create<T>(bufferSize, maxFixedBufferSize) : null;
Buffer = bufferSize > 0 ? Implementation.Buffer.Create<T>(bufferSize, maxFixedBufferSize) : null;
}

/// <summary>
/// TBD
/// </summary>
protected ILoggingAdapter Log => _log ??= Context.GetLogger();
protected ILoggingAdapter Log { get; } = Context.GetLogger();

/// <summary>
/// TBD
Expand All @@ -90,7 +90,7 @@ protected bool DefaultReceive(object message)
Context.Stop(Self);
else if (message is Status.Success)
{
if (BufferSize == 0 || Buffer.IsEmpty)
if (Buffer is null || Buffer.IsEmpty)
OnCompleteThenStop(); // will complete the stream successfully
else
Context.Become(DrainBufferThenComplete);
Expand All @@ -112,7 +112,7 @@ protected virtual bool RequestElement(object message)
if (message is Request)
{
// totalDemand is tracked by base
if (BufferSize != 0)
if (Buffer is not null)
while (TotalDemand > 0L && !Buffer.IsEmpty)
OnNext(Buffer.Dequeue());

Expand All @@ -133,7 +133,7 @@ protected virtual bool ReceiveElement(T message)
{
if (TotalDemand > 0L)
OnNext(message);
else if (BufferSize == 0)
else if (Buffer is null)
Log.Debug("Dropping element because there is no downstream demand: [{0}]", message);
else if (!Buffer.IsFull)
Buffer.Enqueue(message);
Expand Down Expand Up @@ -189,7 +189,7 @@ private bool DrainBufferThenComplete(object message)
// even if previously valid completion was requested via Status.Success
OnErrorThenStop(failure.Cause);
}
else if (message is Request)
else if (message is Request && Buffer is not null)
{
// totalDemand is tracked by base
while (TotalDemand > 0L && !Buffer.IsEmpty)
Expand All @@ -201,7 +201,7 @@ private bool DrainBufferThenComplete(object message)
else if (IsActive)
Log.Debug(
"Dropping element because Status.Success received already, only draining already buffered elements: [{0}] (pending: [{1}])",
message, Buffer.Used);
message, Buffer?.Used ?? 0);
else
return false;

Expand Down
32 changes: 24 additions & 8 deletions src/core/Akka.Streams/Implementation/Buffers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Runtime.CompilerServices;
using Akka.Annotations;

#nullable enable
namespace Akka.Streams.Implementation
{
/// <summary>
Expand Down Expand Up @@ -54,7 +55,7 @@ internal interface IBuffer<T>
/// TBD
/// </summary>
/// <returns>TBD</returns>
T Peek();
T? Peek();
/// <summary>
/// TBD
/// </summary>
Expand Down Expand Up @@ -161,7 +162,7 @@ internal abstract class FixedSizeBuffer<T> : IBuffer<T>
/// </summary>
protected long WriteIndex;

private readonly T[] _buffer;
private readonly T?[] _buffer;

/// <summary>
/// TBD
Expand Down Expand Up @@ -222,14 +223,20 @@ public void Enqueue(T element)
/// <param name="element">TBD</param>
/// <param name="maintenance">TBD</param>
[MethodImpl(MethodImplOptions.NoInlining)]
public void Put(long index, T element, bool maintenance) => _buffer[ToOffset(index, maintenance)] = element;
public void Put(long index, T? element, bool maintenance) => _buffer[ToOffset(index, maintenance)] = element;

/// <summary>
/// TBD
/// </summary>
/// <param name="index">TBD</param>
/// <returns>TBD</returns>
public T Get(long index) => _buffer[ToOffset(index, false)];
public T Get(long index)
{
var elem = _buffer[ToOffset(index, false)];
if(elem == null)
throw new IndexOutOfRangeException($"Invalid buffer element at index {index}, element is null");
return elem;
}

/// <summary>
/// TBD
Expand Down Expand Up @@ -354,7 +361,7 @@ private sealed class FixedQueue : IBuffer<T>
private const int Size = 16;
private const int Mask = 15;

private readonly T[] _queue = new T[Size];
private readonly T?[] _queue = new T[Size];
private readonly BoundedBuffer<T> _boundedBuffer;
private int _head;
private int _tail;
Expand All @@ -373,6 +380,9 @@ public FixedQueue(BoundedBuffer<T> boundedBuffer)

public void Enqueue(T element)
{
if(element is null)
throw new ArgumentNullException(nameof(element));

if (_tail - _head == Size)
{
var queue = new DynamicQueue(Capacity);
Expand All @@ -392,12 +402,15 @@ public T Dequeue()
{
var pos = _head & Mask;
var ret = _queue[pos];
if(ret is null)
throw new IndexOutOfRangeException();

_queue[pos] = default(T);
_head += 1;
return ret;
}

public T Peek() => _tail == _head ? default(T) : _queue[_head & Mask];
public T? Peek() => _tail == _head ? default(T) : _queue[_head & Mask];

public void Clear()
{
Expand Down Expand Up @@ -431,12 +444,15 @@ public DynamicQueue(int capacity)

public T Dequeue()
{
if(First is null)
throw new IndexOutOfRangeException();

var result = First.Value;
RemoveFirst();
return result;
}

public T Peek() => First.Value;
public T? Peek() => First is null ? default : First.Value;

public void DropHead() => RemoveFirst();

Expand Down Expand Up @@ -498,7 +514,7 @@ public BoundedBuffer(int capacity)
/// TBD
/// </summary>
/// <returns>TBD</returns>
public T Peek() => _q.Peek();
public T? Peek() => _q.Peek();

/// <summary>
/// TBD
Expand Down
Loading

0 comments on commit 6fe8437

Please sign in to comment.