Skip to content

Commit

Permalink
addressed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kishorikonwar committed Sep 22, 2020
1 parent f69a90c commit 046cefa
Show file tree
Hide file tree
Showing 13 changed files with 34 additions and 30,704 deletions.
1 change: 1 addition & 0 deletions fastqpreprocessing/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
dox/
dox_errors.txt
*#
*nohup.txt
1 change: 0 additions & 1 deletion fastqpreprocessing/src/big-run.sh

This file was deleted.

10 changes: 0 additions & 10 deletions fastqpreprocessing/src/create_fastq.sh

This file was deleted.

1 change: 0 additions & 1 deletion fastqpreprocessing/src/example-run.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/bin/bash

./fastqprocess --verbose \
--bam-size 0.001 \
--barcode-length 16 \
Expand Down
57 changes: 25 additions & 32 deletions fastqpreprocessing/src/fastqprocess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ SAM_RECORD_BINS * create_samrecord_holders(int16_t nthreads,
return 0;
}

// for each thread we allocate an array of indices (to final output files(
// for each thread we allocate an array of indices (to final output files)
if ((samrecord_data->file_index = new vector<int> *[nthreads]) == 0) {
std::cerr << "Failed to allocate memory for the pointer for "
"array of vectors" << std::endl;
Expand Down Expand Up @@ -130,7 +130,7 @@ void process_inputs(const INPUT_OPTIONS &options,
// set the stop flag for the writers
samrecord_data->stop = true;

// ask the writers to make one more loop in the whilte loop
// ask the writers to make one more loop in the while loop
for (int j = 0; j < samrecord_data->num_files; j++) {
if (sem_post(&semaphores[j]) == -1)
error("sem_post: semaphores");
Expand All @@ -150,7 +150,10 @@ void process_inputs(const INPUT_OPTIONS &options,
for (int i = 0; i < samrecord_data->num_files; i++) {
sem_destroy(&semaphores_workers[i]);
}


// delete the records
delete [] samrecord_data->num_records;

// delete reader and writer threads
delete [] readers;
delete [] writers;
Expand All @@ -162,9 +165,7 @@ void bam_writers(int windex, SAM_RECORD_BINS *samrecord_data) {
std::string outputfile;

// name of the output file
char buf[MAX_FILE_LENGTH];
sprintf(buf, "subfile_%d.bam", windex);
outputfile = buf;
outputfile = std::format("subfile_{}.bam", windex);

// open to write the outputfile
samOut.OpenForWrite(outputfile.c_str());
Expand All @@ -185,19 +186,19 @@ void bam_writers(int windex, SAM_RECORD_BINS *samrecord_data) {
// add the header to the output bam
samOut.WriteHeader(samHeader);

// keep writing foever, until there is a flag to stop
// keep writing forever, until there is a flag to stop
while (true) {
// wait untile some data is ready from a reader thread
// wait until some data is ready from a reader thread
if (sem_wait(&semaphores[windex]) == -1)
error("sem_wait:semaphores");

// write out the record buffers for the reader thread "active_thread_no"
// write out the record buffers for the reader thread "active_thread_num"
// that signalled that buffer is ready to be written
SamRecord *samRecord =
samrecord_data->samrecords[samrecord_data->active_thread_no];
samrecord_data->samrecords[samrecord_data->active_thread_num];
// go through the index of the samrecords that are stored for the current
// writer, i.e., "windex" or the corresponding BAM file
for (auto index : samrecord_data->file_index[samrecord_data->active_thread_no][windex]) {
for (auto index : samrecord_data->file_index[samrecord_data->active_thread_num][windex]) {
samOut.WriteRecord(samHeader, samRecord[index]);
}

Expand Down Expand Up @@ -227,29 +228,29 @@ void process_file(int tindex, std::string filenameI1, String filenameR1,
FastQFile fastQFileR2(4, 4);

// open the I1 file
bool empty_I1_file_list = false;
bool has_I1_file_list = true;
if (!filenameI1.empty()) {
if (fastQFileI1.openFile(String(filenameI1.c_str()), BaseAsciiMap::UNKNOWN) !=
FastQStatus::FASTQ_SUCCESS) {
std::cerr << "Failed to open file: " << filenameI1.c_str();
return;
abort();
}
} else {
empty_I1_file_list = true;
has_I1_file_list = false;
}

// open the R1 file
if (fastQFileR1.openFile(filenameR1, BaseAsciiMap::UNKNOWN) !=
FastQStatus::FASTQ_SUCCESS) {
std::cerr << "Failed to open file: " << filenameR1.c_str();
return;
abort();
}

// open the R2 file
if (fastQFileR2.openFile(filenameR2, BaseAsciiMap::UNKNOWN) !=
FastQStatus::FASTQ_SUCCESS) {
std::cerr << "Failed to open file: " << filenameR2.c_str();
return;
abort();
}

// point to the array of records already allocated for this reader
Expand All @@ -266,9 +267,9 @@ void process_file(int tindex, std::string filenameI1, String filenameR1,


while (fastQFileR1.keepReadingFile()) {
if ((empty_I1_file_list == true ||
if ((!has_I1_file_list ||
(
empty_I1_file_list == false &&
has_I1_file_list &&
fastQFileI1.readFastQSequence() == FastQStatus::FASTQ_SUCCESS
)
) &&
Expand Down Expand Up @@ -309,7 +310,7 @@ void process_file(int tindex, std::string filenameI1, String filenameR1,
samRecord[r].addTag("UY", 'Z', UMIQString.c_str());

// add raw sequence and quality sequence for the index
if (empty_I1_file_list == false) {
if (has_I1_file_list) {
std::string indexseq = std::string(fastQFileI1.myRawSequence.c_str());
std::string indexSeqQual = std::string(fastQFileI1.myQualityString.c_str());
samRecord[r].addTag("SR", 'Z', indexseq.c_str());
Expand All @@ -321,7 +322,8 @@ void process_file(int tindex, std::string filenameI1, String filenameR1,
// This is done because in the case of incorrigible barcodes
// we need a mechanism to uniformly distribute the alignments
// so that no bam is oversized to putting all such barcode less
// sequences into one particular
// sequences into one particular. Incorregible barcodes are simply
// added withouth the CB tag
std::string bucket_barcode;
if (white_list_data->mutations.find(barcode) !=
white_list_data->mutations.end()) {
Expand Down Expand Up @@ -370,7 +372,7 @@ void process_file(int tindex, std::string filenameI1, String filenameR1,

// it sets itself as the active thread who wants the
// readers to clear the data
samrecord_data->active_thread_no = tindex;
samrecord_data->active_thread_num = tindex;

// send a signal to every writer thread, i.e., write out data
// data to any file where the samheader should be written to
Expand All @@ -379,7 +381,7 @@ void process_file(int tindex, std::string filenameI1, String filenameR1,
error("sem_post: semaphores");
}

// there is where I wait white the writers are writing
// there is where I wait while the writers are writing
for (int32_t j = 0; j < samrecord_data->num_files; j++) {
if (sem_wait(&semaphores_workers[j]) == -1)
error("sem_wait: semaphores_workers");
Expand All @@ -400,24 +402,15 @@ void process_file(int tindex, std::string filenameI1, String filenameR1,
if (i % 10000000 == 0) {
printf("%d\n", i);
std::string a = std::string(fastQFileR1.myRawSequence.c_str());
//printf("%s\n", fastQFileI1.mySequenceIdLine.c_str());
printf("%s\n", fastQFileR1.mySequenceIdLine.c_str());
printf("%s\n", fastQFileR2.mySequenceIdLine.c_str());
}
/*
if(i== 5000000) {
for(int j = 0; j < samrecord_data->num_files; j++) {
sem_destroy(&semaphores[j]);
}
break;
}
*/
} // if successful read of a sequence
}

// Finished processing all of the sequences in the file.
// Close the input files.
if(empty_I1_file_list == false) fastQFileI1.closeFile();
if (has_I1_file_list) fastQFileI1.closeFile();

fastQFileR1.closeFile();
fastQFileR2.closeFile();
Expand Down
8 changes: 3 additions & 5 deletions fastqpreprocessing/src/fastqprocess.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,20 @@ typedef struct SamRecordBins {
std::string sample_id;
int32_t block_size;

/// number of output bam files, and one writer thread per bam file
/// number of output bam files, and one writer thread per bam file
int16_t num_files;
/// flag to stop the writer
bool stop;
/// number of thread equal to the number of set of R1/R2
int16_t num_threads;
/// the thread (reader) that is currently wanting to write
int32_t active_thread_no;
int32_t active_thread_num;
} SAM_RECORD_BINS;


/**
* @brief Processes the input fastq files
*
* @detail
* This function creates a set of readers (as there are files),
* This function creates a set of readers (as many as there are files),
* a set of writers to write the individual bam files, a set of
* semaphores for readers to signal to writers when buffer of records
* are ready, and another set of semaphores for writers to signal
Expand Down
2 changes: 0 additions & 2 deletions fastqpreprocessing/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ int main (int argc, char **argv)
std::cout << "done" << std::endl;

process_inputs(options, white_list_data);

return 0;

}

Loading

0 comments on commit 046cefa

Please sign in to comment.