Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure LogDataWorkerThread::interruptRequested_ atomicity. #220

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/data/logdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void LogData::PartialIndexOperation::doStart(
// Constructs an empty log file.
// It must be displayed without error.
LogData::LogData() : AbstractLogData(), indexing_data_(),
fileMutex_(), workerThread_( &indexing_data_ )
fileMutex_(), workerThread_( indexing_data_ )
{
// Start with an "empty" log
attached_file_ = nullptr;
Expand Down
64 changes: 32 additions & 32 deletions src/data/logdataworkerthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* along with glogg. If not, see <http://www.gnu.org/licenses/>.
*/

#include <atomic>
#include <QFile>

#include "log.h"
Expand Down Expand Up @@ -84,17 +85,14 @@ void IndexingData::clear()
encoding_ = EncodingSpeculator::Encoding::ASCII7;
}

LogDataWorkerThread::LogDataWorkerThread( IndexingData* indexing_data )
: QThread(), mutex_(), operationRequestedCond_(),
nothingToDoCond_(), fileName_(), indexing_data_( indexing_data )
LogDataWorkerThread::LogDataWorkerThread( IndexingData& indexing_data )
: indexing_data_( indexing_data )
{
terminate_ = false;
interruptRequested_ = false;
operationRequested_ = NULL;
}

LogDataWorkerThread::~LogDataWorkerThread()
{
interruptRequested_.store( true, std::memory_order_relaxed );
{
QMutexLocker locker( &mutex_ );
terminate_ = true;
Expand All @@ -120,9 +118,9 @@ void LogDataWorkerThread::indexAll()
while ( (operationRequested_ != NULL) )
nothingToDoCond_.wait( &mutex_ );

interruptRequested_ = false;
interruptRequested_.store( false , std::memory_order_relaxed );
operationRequested_ = new FullIndexOperation( fileName_,
indexing_data_, &interruptRequested_, &encodingSpeculator_ );
indexing_data_, interruptRequested_, encodingSpeculator_ );
operationRequestedCond_.wakeAll();
}

Expand All @@ -136,18 +134,17 @@ void LogDataWorkerThread::indexAdditionalLines()
while ( (operationRequested_ != NULL) )
nothingToDoCond_.wait( &mutex_ );

interruptRequested_ = false;
interruptRequested_.store( false, std::memory_order_relaxed );
operationRequested_ = new PartialIndexOperation( fileName_,
indexing_data_, &interruptRequested_, &encodingSpeculator_ );
indexing_data_, interruptRequested_, encodingSpeculator_ );
operationRequestedCond_.wakeAll();
}

void LogDataWorkerThread::interrupt()
{
LOG(logDEBUG) << "Load interrupt requested";

// No mutex here, setting a bool is probably atomic!
interruptRequested_ = true;
interruptRequested_.store( true, std::memory_order_relaxed );
}

// This is the thread's main loop
Expand Down Expand Up @@ -195,17 +192,16 @@ void LogDataWorkerThread::run()
//

IndexOperation::IndexOperation( const QString& fileName,
IndexingData* indexingData, bool* interruptRequest,
EncodingSpeculator* encodingSpeculator )
IndexingData& indexingData, std::atomic_bool& interruptRequest,
EncodingSpeculator& encodingSpeculator )
: fileName_( fileName )
, interruptRequest_( interruptRequest )
, indexing_data_( indexingData )
, encoding_speculator_( encodingSpeculator )
{
interruptRequest_ = interruptRequest;
indexing_data_ = indexingData;
encoding_speculator_ = encodingSpeculator;
}

void IndexOperation::doIndex( IndexingData* indexing_data,
EncodingSpeculator* encoding_speculator, qint64 initialPosition )
void IndexOperation::doIndex( qint64 initialPosition )
{
qint64 pos = initialPosition; // Absolute position of the start of current line
qint64 end = 0; // Absolute position of the end of current line
Expand All @@ -220,7 +216,7 @@ void IndexOperation::doIndex( IndexingData* indexing_data,
FastLinePositionArray line_positions;
int max_length = 0;

if ( *interruptRequest_ ) // a bool is always read/written atomically isn't it?
if ( interruptRequest_.load( std::memory_order_relaxed ) )
break;

// Read a chunk of 5MB
Expand All @@ -235,7 +231,7 @@ void IndexOperation::doIndex( IndexingData* indexing_data,
do {
if ( pos_within_block < block.length() ) {
const char c = block.at(pos_within_block);
encoding_speculator->inject_byte( c );
encoding_speculator_.inject_byte( c );
if ( c == '\n' )
break;
else if ( c == '\t' )
Expand Down Expand Up @@ -263,8 +259,8 @@ void IndexOperation::doIndex( IndexingData* indexing_data,
}

// Update the shared data
indexing_data->addAll( block.length(), max_length, line_positions,
encoding_speculator->guess() );
indexing_data_.addAll( block.length(), max_length, line_positions,
encoding_speculator_.guess() );

// Update the caller for progress indication
int progress = ( file.size() > 0 ) ? pos*100 / file.size() : 100;
Expand All @@ -273,15 +269,15 @@ void IndexOperation::doIndex( IndexingData* indexing_data,

// Check if there is a non LF terminated line at the end of the file
qint64 file_size = file.size();
if ( !*interruptRequest_ && file_size > pos ) {
if ( !interruptRequest_.load( std::memory_order_relaxed ) && file_size > pos ) {
LOG( logWARNING ) <<
"Non LF terminated file, adding a fake end of line";

FastLinePositionArray line_position;
line_position.append( file_size + 1 );
line_position.setFakeFinalLF();

indexing_data->addAll( 0, 0, line_position, encoding_speculator->guess() );
indexing_data_.addAll( 0, 0, line_position, encoding_speculator_.guess() );
}
}
else {
Expand All @@ -304,31 +300,35 @@ bool FullIndexOperation::start()
emit indexingProgressed( 0 );

// First empty the index
indexing_data_->clear();
indexing_data_.clear();

doIndex( indexing_data_, encoding_speculator_, 0 );
doIndex( 0 );

bool interruptRequest = interruptRequest_.load( std::memory_order_relaxed );

LOG(logDEBUG) << "FullIndexOperation: ... finished counting."
"interrupt = " << *interruptRequest_;
"interrupt = " << interruptRequest;

return ( *interruptRequest_ ? false : true );
return !interruptRequest;
}

bool PartialIndexOperation::start()
{
LOG(logDEBUG) << "PartialIndexOperation::start(), file "
<< fileName_.toStdString();

qint64 initial_position = indexing_data_->getSize();
qint64 initial_position = indexing_data_.getSize();

LOG(logDEBUG) << "PartialIndexOperation: Starting the count at "
<< initial_position << " ...";

emit indexingProgressed( 0 );

doIndex( indexing_data_, encoding_speculator_, initial_position );
doIndex( initial_position );

bool interruptRequest = interruptRequest_.load( std::memory_order_relaxed );

LOG(logDEBUG) << "PartialIndexOperation: ... finished counting.";

return ( *interruptRequest_ ? false : true );
return !interruptRequest;
}
49 changes: 17 additions & 32 deletions src/data/logdataworkerthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#ifndef LOGDATAWORKERTHREAD_H
#define LOGDATAWORKERTHREAD_H

#include <atomic>
#include <QObject>
#include <QThread>
#include <QMutex>
Expand All @@ -35,9 +36,6 @@
class IndexingData
{
public:
IndexingData() : dataMutex_(), linePosition_(), maxLength_(0),
indexedSize_(0), encoding_(EncodingSpeculator::Encoding::ASCII7) { }

// Get the total indexed size
qint64 getSize() const;

Expand Down Expand Up @@ -67,19 +65,19 @@ class IndexingData
mutable QMutex dataMutex_;

LinePositionArray linePosition_;
int maxLength_;
qint64 indexedSize_;
int maxLength_{0};
qint64 indexedSize_{0};

EncodingSpeculator::Encoding encoding_;
EncodingSpeculator::Encoding encoding_{EncodingSpeculator::Encoding::ASCII7};
};

class IndexOperation : public QObject
{
Q_OBJECT
public:
IndexOperation( const QString& fileName,
IndexingData* indexingData, bool* interruptRequest,
EncodingSpeculator* encodingSpeculator );
IndexingData& indexingData, std::atomic_bool& interruptRequest,
EncodingSpeculator& encodingSpeculator );

virtual ~IndexOperation() { }

Expand All @@ -93,35 +91,26 @@ class IndexOperation : public QObject
protected:
static const int sizeChunk;

// Returns the total size indexed
// Modify the passed linePosition and maxLength
void doIndex( IndexingData* linePosition, EncodingSpeculator* encodingSpeculator,
qint64 initialPosition );
void doIndex( qint64 initialPosition );

QString fileName_;
bool* interruptRequest_;
IndexingData* indexing_data_;
std::atomic_bool& interruptRequest_;
IndexingData& indexing_data_;

EncodingSpeculator* encoding_speculator_;
EncodingSpeculator& encoding_speculator_;
};

class FullIndexOperation : public IndexOperation
{
public:
FullIndexOperation( const QString& fileName,
IndexingData* indexingData, bool* interruptRequest,
EncodingSpeculator* speculator )
: IndexOperation( fileName, indexingData, interruptRequest, speculator ) { }
using IndexOperation::IndexOperation;
virtual bool start();
};

class PartialIndexOperation : public IndexOperation
{
public:
PartialIndexOperation( const QString& fileName,
IndexingData* indexingData, bool* interruptRequest,
EncodingSpeculator* speculator )
: IndexOperation( fileName, indexingData, interruptRequest, speculator ) { }
using IndexOperation::IndexOperation;
virtual bool start();
};

Expand All @@ -137,7 +126,7 @@ class LogDataWorkerThread : public QThread
public:
// Pass a pointer to the IndexingData (initially empty)
// This object will change it when indexing (IndexingData must be thread safe!)
LogDataWorkerThread( IndexingData* indexing_data );
LogDataWorkerThread( IndexingData& indexing_data );
~LogDataWorkerThread();

// Attaches to a file on disk. Attaching to a non existant file
Expand All @@ -152,10 +141,6 @@ class LogDataWorkerThread : public QThread
// Interrupts the indexing if one is in progress
void interrupt();

// Returns a copy of the current indexing data
void getIndexingData( qint64* indexedSize,
int* maxLength, LinePositionArray* linePosition );

signals:
// Sent during the indexing process to signal progress
// percent being the percentage of completion.
Expand All @@ -177,12 +162,12 @@ class LogDataWorkerThread : public QThread
QString fileName_;

// Set when the thread must die
bool terminate_;
bool interruptRequested_;
IndexOperation* operationRequested_;
bool terminate_{false};
std::atomic_bool interruptRequested_{false};
IndexOperation* operationRequested_{nullptr};

// Pointer to the owner's indexing data (we modify it)
IndexingData* indexing_data_;
IndexingData& indexing_data_;

// To guess the encoding
EncodingSpeculator encodingSpeculator_;
Expand Down