Skip to content

Commit

Permalink
Merge pull request #39 from onoyuuya/sendBufferedLogs
Browse files Browse the repository at this point in the history
Introduce BufferedOutput.sendBufferedLog
  • Loading branch information
mii-chan authored Oct 4, 2024
2 parents d4a6c37 + 7d896ab commit 35ff14b
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 13 deletions.
6 changes: 6 additions & 0 deletions Sources/Puree/Logger.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ public final class Logger {
}
}

public func sendBufferedLogs() {
dispatchQueue.sync {
outputs.forEach { ($0 as? BufferedOutput)?.sendBufferedLogs() }
}
}

public func suspend() {
dispatchQueue.sync {
outputs.forEach { $0.suspend() }
Expand Down
24 changes: 13 additions & 11 deletions Sources/Puree/Output/BufferedOutput.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,13 @@ open class BufferedOutput: InstantiatableOutput {

public func start() {
reloadLogStore()
readWriteQueue.sync {
flush()
}
sendBufferedLogs()
setUpTimer()
}

public func resume() {
reloadLogStore()
readWriteQueue.sync {
flush()
}
sendBufferedLogs()
setUpTimer()
}

Expand All @@ -100,19 +96,25 @@ open class BufferedOutput: InstantiatableOutput {
logStore.add(log, for: storageGroup, completion: nil)

if buffer.count >= logLimit {
flush()
writeBufferedLogs()
} else if let logSizeLimit = configuration.chunkDataSizeLimit {
let currentBufferedLogSize = buffer.reduce(0, { (size, log) -> Int in
size + (log.userData?.count ?? 0)
})

if currentBufferedLogSize >= logSizeLimit {
flush()
writeBufferedLogs()
}
}
}
}

public func sendBufferedLogs() {
readWriteQueue.sync {
writeBufferedLogs()
}
}

open func write(_ chunk: Chunk, completion: @escaping (Bool) -> Void) {
completion(false)
}
Expand All @@ -138,12 +140,12 @@ open class BufferedOutput: InstantiatableOutput {
if let lastFlushDate = lastFlushDate {
if currentDate.timeIntervalSince(lastFlushDate) > flushInterval {
readWriteQueue.async {
self.flush()
self.writeBufferedLogs()
}
}
} else {
readWriteQueue.async {
self.flush()
self.writeBufferedLogs()
}
}
}
Expand All @@ -165,7 +167,7 @@ open class BufferedOutput: InstantiatableOutput {
}
}

private func flush() {
private func writeBufferedLogs() {
dispatchPrecondition(condition: .onQueue(readWriteQueue))

lastFlushDate = currentDate
Expand Down
40 changes: 38 additions & 2 deletions Tests/PureeTests/Output/BufferedOutputTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,20 @@ class BufferedOutputTests: XCTestCase {
XCTAssertEqual(output.calledWriteCount, 1)
}

func testBufferedOutputSendBufferedLogs() {
output.configuration.logEntryCountLimit = 10

XCTAssertEqual(logStore.logs(for: "pv_TestingBufferedOutput").count, 0)
XCTAssertEqual(output.calledWriteCount, 0)

output.emit(log: makeLog())
output.sendBufferedLogs()
output.waitUntilCurrentQueuedJobFinished()

XCTAssertEqual(logStore.logs(for: "pv_TestingBufferedOutput").count, 0)
XCTAssertEqual(output.calledWriteCount, 1)
}

func testBufferedOutputFlushedByInterval() {
output.configuration.logEntryCountLimit = 10
output.configuration.flushInterval = 1
Expand All @@ -87,7 +101,7 @@ class BufferedOutputTests: XCTestCase {
XCTAssertEqual(output.calledWriteCount, 0)

output.writeCallback = {
XCTFail("flush should not be called")
XCTFail("writeBufferedLogs should not be called")
}
sleep(2)
}
Expand Down Expand Up @@ -291,6 +305,28 @@ class BufferedOutputAsyncTests: XCTestCase {
XCTAssertEqual(output.calledWriteCount, 1)
}

func testBufferedOutputSendBufferedLogs() {
output.configuration.logEntryCountLimit = 10

let expectation = self.expectation(description: "async writing")
output.writeCallback = {
expectation.fulfill()
}
output.waitUntilCurrentCompletionBlock = { [weak self] in
self?.wait(for: [expectation], timeout: 1.0)
}

XCTAssertEqual(logStore.logs(for: "pv_TestingBufferedOutput").count, 0)
XCTAssertEqual(output.calledWriteCount, 0)

output.emit(log: makeLog())
output.sendBufferedLogs()
output.waitUntilCurrentQueuedJobFinished()

XCTAssertEqual(logStore.logs(for: "pv_TestingBufferedOutput").count, 0)
XCTAssertEqual(output.calledWriteCount, 1)
}

func testBufferedOutputFlushedByInterval() {
output.configuration.logEntryCountLimit = 10
output.configuration.flushInterval = 1
Expand All @@ -315,7 +351,7 @@ class BufferedOutputAsyncTests: XCTestCase {
XCTAssertEqual(output.calledWriteCount, 0)

output.writeCallback = {
XCTFail("flush should not be called")
XCTFail("writeBufferedLogs should not be called")
}
sleep(2)
}
Expand Down

0 comments on commit 35ff14b

Please sign in to comment.