From 14a22c73ee127448ea148b3bbf51481d98386f3f Mon Sep 17 00:00:00 2001 From: Kishori Konwar Date: Thu, 24 Sep 2020 10:52:37 +0000 Subject: [PATCH] refactored process_file into four functions --- fastqpreprocessing/src/fastqprocess.cpp | 231 ++++++++++++++---------- fastqpreprocessing/src/utilities.cpp | 14 +- 2 files changed, 144 insertions(+), 101 deletions(-) diff --git a/fastqpreprocessing/src/fastqprocess.cpp b/fastqpreprocessing/src/fastqprocess.cpp index f3834c0..00e473b 100644 --- a/fastqpreprocessing/src/fastqprocess.cpp +++ b/fastqpreprocessing/src/fastqprocess.cpp @@ -165,7 +165,9 @@ void bam_writers(int windex, SAM_RECORD_BINS *samrecord_data) { std::string outputfile; // name of the output file - outputfile = std::format("subfile_{}.bam", windex); + char buf[MAX_FILE_LENGTH]; + sprintf(buf, "subfile_%d.bam", windex); + outputfile = buf; // open to write the outputfile samOut.OpenForWrite(outputfile.c_str()); @@ -215,6 +217,124 @@ void bam_writers(int windex, SAM_RECORD_BINS *samrecord_data) { samOut.Close(); } +void fillSamRecord(SamRecord *samRecord, FastQFile &fastQFileI1, + FastQFile &fastQFileR1, FastQFile &fastQFileR2, + unsigned int barcode_length, unsigned int umi_length, + bool has_I1_file_list) { + // check the sequence names matching + std::string a = std::string(fastQFileR1.myRawSequence.c_str()); + std::string b = std::string(fastQFileR1.myQualityString.c_str()); + + // extract the raw barcode and UMI + std::string barcode = a.substr(0, barcode_length); + std::string UMI = a.substr(barcode_length, umi_length); + + // extract raw barcode and UMI quality string + std::string barcodeQString = b.substr(0, barcode_length); + std::string UMIQString = b.substr(barcode_length, umi_length); + + // reset the samrecord + samRecord->resetRecord(); + + // add read group and the sam flag + samRecord->addTag("RG", 'Z', "A"); + samRecord->setFlag(4); + + // add idenfifier, sequence and quality score of the alignments + samRecord->setReadName(fastQFileR2.mySequenceIdentifier.c_str()); + samRecord->setSequence(fastQFileR2.myRawSequence.c_str()); + samRecord->setQuality(fastQFileR2.myQualityString.c_str()); + + // add barcode and quality + samRecord->addTag("CR", 'Z', barcode.c_str()); + samRecord->addTag("CY", 'Z', barcodeQString.c_str()); + + // add UMI + samRecord->addTag("UR", 'Z', UMI.c_str()); + samRecord->addTag("UY", 'Z', UMIQString.c_str()); + + // add raw sequence and quality sequence for the index + if (has_I1_file_list) { + std::string indexseq = std::string(fastQFileI1.myRawSequence.c_str()); + std::string indexSeqQual = std::string(fastQFileI1.myQualityString.c_str()); + samRecord->addTag("SR", 'Z', indexseq.c_str()); + samRecord->addTag("SY", 'Z', indexSeqQual.c_str()); + } +} + +int32_t getBucketIndex(const std::string &barcode, SamRecord *samRecord, + const WHITE_LIST_DATA* white_list_data, SAM_RECORD_BINS *samrecord_data, + int *n_barcode_corrected, int *n_barcode_correct, int *n_barcode_errors) { + + std::string correct_barcode; + // bucket barcode is used to pick the target bam file + // This is done because in the case of incorrigible barcodes + // we need a mechanism to uniformly distribute the alignments + // so that no bam is oversized to putting all such barcode less + // sequences into one particular. Incorregible barcodes are simply + // added withouth the CB tag + std::string bucket_barcode; + if (white_list_data->mutations.find(barcode) != + white_list_data->mutations.end()) { + + // if -1 then the raw barcode is the correct barcode + if (white_list_data->mutations.at(barcode) == -1) { + correct_barcode = barcode; + *n_barcode_correct += 1; + } else { + // it is a 1-mutation of some whitelist barcode so get the + // barcode by indexing into the vector of whitelist barcodes + correct_barcode = + white_list_data->barcodes.at(white_list_data->mutations.at(barcode)); + *n_barcode_corrected += 1; + } + // is used for computing the file index + bucket_barcode = correct_barcode; + + // corrected barcode should be added to the samrecord + samRecord->addTag("CB", 'Z', correct_barcode.c_str()); + } else { // not possible to correct the raw barcode + *n_barcode_errors += 1; + bucket_barcode = barcode; + } + // destination bam file index computed based on the bucket_barcode + int32_t bucket = std::hash{}(bucket_barcode.c_str()) % + samrecord_data->num_files; + + return bucket; +} + +void submit_block_tobe_written(SAM_RECORD_BINS *samrecord_data, int tindex) { + mtx.lock(); + + // it sets itself as the active thread who wants the + // readers to clear the data + samrecord_data->active_thread_num = tindex; + + // send a signal to every writer thread, i.e., write out data + // data to any file where the samheader should be written to + for (int32_t j = 0; j < samrecord_data->num_files; j++) { + if (sem_post(&semaphores[j]) == -1) + error("sem_post: semaphores"); + } + + // there is where I wait while the writers are writing + for (int32_t j = 0; j < samrecord_data->num_files; j++) { + if (sem_wait(&semaphores_workers[j]) == -1) + error("sem_wait: semaphores_workers"); + } + + // they are done writing + for (int j = 0; j < samrecord_data->num_files; j++) { + samrecord_data->file_index[tindex][j].clear(); + } + + // not records to write in the current + samrecord_data->num_records[tindex] = 0; + + // release the mutex, so that other readers might want to write + mtx.unlock(); +} void process_file(int tindex, std::string filenameI1, String filenameR1, String filenameR2, unsigned int barcode_length, @@ -277,83 +397,33 @@ void process_file(int tindex, std::string filenameI1, String filenameR1, fastQFileR2.readFastQSequence() == FastQStatus::FASTQ_SUCCESS) { i = i + 1; - // check the sequence names matching - std::string a = std::string(fastQFileR1.myRawSequence.c_str()); - std::string b = std::string(fastQFileR1.myQualityString.c_str()); + // prepare the rth samrecord with the sequence, sequence quality + SamRecord *samrec = samRecord + r; + // barcode and UMI and their quality sequences + fillSamRecord(samrec, fastQFileI1, fastQFileR1, fastQFileR2, + barcode_length, umi_length, has_I1_file_list); // extract the raw barcode and UMI + std::string a = std::string(fastQFileR1.myRawSequence.c_str()); std::string barcode = a.substr(0, barcode_length); - std::string UMI = a.substr(barcode_length, umi_length); - - // extract raw barcode and UMI quality string - std::string barcodeQString = b.substr(0, barcode_length); - std::string UMIQString = b.substr(barcode_length, umi_length); - - // reset the samrecord - samRecord[r].resetRecord(); - - // add read group and the sam flag - samRecord[r].addTag("RG", 'Z', "A"); - samRecord[r].setFlag(4); - - // add idenfifier, sequence and quality score of the alignments - samRecord[r].setReadName(fastQFileR2.mySequenceIdentifier.c_str()); - samRecord[r].setSequence(fastQFileR2.myRawSequence.c_str()); - samRecord[r].setQuality(fastQFileR2.myQualityString.c_str()); - - // add barcode and quality - samRecord[r].addTag("CR", 'Z', barcode.c_str()); - samRecord[r].addTag("CY", 'Z', barcodeQString.c_str()); - - // add UMI - samRecord[r].addTag("UR", 'Z', UMI.c_str()); - samRecord[r].addTag("UY", 'Z', UMIQString.c_str()); - - // add raw sequence and quality sequence for the index - if (has_I1_file_list) { - std::string indexseq = std::string(fastQFileI1.myRawSequence.c_str()); - std::string indexSeqQual = std::string(fastQFileI1.myQualityString.c_str()); - samRecord[r].addTag("SR", 'Z', indexseq.c_str()); - samRecord[r].addTag("SY", 'Z', indexSeqQual.c_str()); - } - std::string correct_barcode; // bucket barcode is used to pick the target bam file // This is done because in the case of incorrigible barcodes // we need a mechanism to uniformly distribute the alignments // so that no bam is oversized to putting all such barcode less // sequences into one particular. Incorregible barcodes are simply // added withouth the CB tag - std::string bucket_barcode; - if (white_list_data->mutations.find(barcode) != - white_list_data->mutations.end()) { - - // if -1 then the raw barcode is the correct barcode - if (white_list_data->mutations.at(barcode) == -1) { - correct_barcode = barcode; - n_barcode_correct += 1; - } else { - // it is a 1-mutation of some whitelist barcode so get the - // barcode by indexing into the vector of whitelist barcodes - correct_barcode = - white_list_data->barcodes.at(white_list_data->mutations.at(barcode)); - n_barcode_corrected += 1; - } - // is used for computing the file index - bucket_barcode = correct_barcode; - samRecord[r].addTag("CB", 'Z', correct_barcode.c_str()); - } else { // not possible to correct the raw barcode - n_barcode_errors += 1; - bucket_barcode = barcode; - } + int32_t bucket = getBucketIndex(barcode, samrec, white_list_data, + samrecord_data, &n_barcode_corrected, + &n_barcode_correct, &n_barcode_errors); + samrecord_data->num_records[tindex]++; - // destination bam file index computed based on the bucket_barcode - int32_t bucket = std::hash{}(bucket_barcode.c_str()) % - samrecord_data->num_files; // store the index of the record in the right vector that is done // to serve as a bucket B to hold the indices of samrecords that are // going to bamfile B samrecord_data->file_index[tindex][bucket].push_back(r); + + samrecord_data->num_records[tindex]++; // write a block of samrecords r = r + 1; @@ -367,37 +437,10 @@ void process_file(int tindex, std::string filenameI1, String filenameR1, // sam records in its buffer. This is the same behavior across all // reader threads if (r == block_size || !fastQFileR1.keepReadingFile()) { - // mutex makes sure only one reader thread can lock - mtx.lock(); - - // it sets itself as the active thread who wants the - // readers to clear the data - samrecord_data->active_thread_num = tindex; - - // send a signal to every writer thread, i.e., write out data - // data to any file where the samheader should be written to - for (int32_t j = 0; j < samrecord_data->num_files; j++) { - if (sem_post(&semaphores[j]) == -1) - error("sem_post: semaphores"); - } - - // there is where I wait while the writers are writing - for (int32_t j = 0; j < samrecord_data->num_files; j++) { - if (sem_wait(&semaphores_workers[j]) == -1) - error("sem_wait: semaphores_workers"); - } - - // they are done writing - for (int j = 0; j < samrecord_data->num_files; j++) { - samrecord_data->file_index[tindex][j].clear(); - } - // start reading a new block of FASTQ sequences - r = 0; - // not records to write in the current - samrecord_data->num_records[tindex] = 0; - - // release the mutex, so that other readers might want to write - mtx.unlock(); + submit_block_tobe_written(samrecord_data, tindex); + + // start reading a new block of FASTQ sequences + r = 0; } if (i % 10000000 == 0) { printf("%d\n", i); diff --git a/fastqpreprocessing/src/utilities.cpp b/fastqpreprocessing/src/utilities.cpp index 567a4cd..6b1a48e 100644 --- a/fastqpreprocessing/src/utilities.cpp +++ b/fastqpreprocessing/src/utilities.cpp @@ -85,8 +85,7 @@ void read_options(int argc, char **argv, INPUT_OPTIONS &options) { int verbose_flag; - while (true) { - static struct option long_options[] = { + static struct option long_options[] = { /* These options set a flag. */ {"verbose", no_argument, &verbose_flag, 1}, /* These options don’t set a flag. @@ -95,15 +94,15 @@ void read_options(int argc, char **argv, INPUT_OPTIONS &options) { {"umi-length", required_argument, 0, 'u'}, {"bam-size", required_argument, 0, 'B'}, {"sample-id", required_argument, 0, 's'}, - {"I1", optional_argument, 0, 'I'}, + {"I1", required_argument, 0, 'I'}, {"R1", required_argument, 0, 'R'}, {"R2", required_argument, 0, 'r'}, {"white-list", required_argument, 0, 'w'}, {0, 0, 0, 0} - }; + }; - // help messages when the user types -h - const char *help_messages[] = { + // help messages when the user types -h + const char *help_messages[] = { "verbose messages ", "barcode length [required]", "UMI length [required]", @@ -113,9 +112,10 @@ void read_options(int argc, char **argv, INPUT_OPTIONS &options) { "R1 [required]", "R2 [required]", "whitelist (from cellranger) of barcodes [required]", - }; + }; + while (true) { /* getopt_long stores the option index here. */ int option_index = 0;