Skip to content

Commit

Permalink
refactored process_file into four functions
Browse files Browse the repository at this point in the history
  • Loading branch information
kishorikonwar committed Sep 24, 2020
1 parent 046cefa commit 14a22c7
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 101 deletions.
231 changes: 137 additions & 94 deletions fastqpreprocessing/src/fastqprocess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ void bam_writers(int windex, SAM_RECORD_BINS *samrecord_data) {
std::string outputfile;

// name of the output file
outputfile = std::format("subfile_{}.bam", windex);
char buf[MAX_FILE_LENGTH];
sprintf(buf, "subfile_%d.bam", windex);
outputfile = buf;

// open to write the outputfile
samOut.OpenForWrite(outputfile.c_str());
Expand Down Expand Up @@ -215,6 +217,124 @@ void bam_writers(int windex, SAM_RECORD_BINS *samrecord_data) {
samOut.Close();
}

void fillSamRecord(SamRecord *samRecord, FastQFile &fastQFileI1,
FastQFile &fastQFileR1, FastQFile &fastQFileR2,
unsigned int barcode_length, unsigned int umi_length,
bool has_I1_file_list) {
// check the sequence names matching
std::string a = std::string(fastQFileR1.myRawSequence.c_str());
std::string b = std::string(fastQFileR1.myQualityString.c_str());

// extract the raw barcode and UMI
std::string barcode = a.substr(0, barcode_length);
std::string UMI = a.substr(barcode_length, umi_length);

// extract raw barcode and UMI quality string
std::string barcodeQString = b.substr(0, barcode_length);
std::string UMIQString = b.substr(barcode_length, umi_length);

// reset the samrecord
samRecord->resetRecord();

// add read group and the sam flag
samRecord->addTag("RG", 'Z', "A");
samRecord->setFlag(4);

// add idenfifier, sequence and quality score of the alignments
samRecord->setReadName(fastQFileR2.mySequenceIdentifier.c_str());
samRecord->setSequence(fastQFileR2.myRawSequence.c_str());
samRecord->setQuality(fastQFileR2.myQualityString.c_str());

// add barcode and quality
samRecord->addTag("CR", 'Z', barcode.c_str());
samRecord->addTag("CY", 'Z', barcodeQString.c_str());

// add UMI
samRecord->addTag("UR", 'Z', UMI.c_str());
samRecord->addTag("UY", 'Z', UMIQString.c_str());

// add raw sequence and quality sequence for the index
if (has_I1_file_list) {
std::string indexseq = std::string(fastQFileI1.myRawSequence.c_str());
std::string indexSeqQual = std::string(fastQFileI1.myQualityString.c_str());
samRecord->addTag("SR", 'Z', indexseq.c_str());
samRecord->addTag("SY", 'Z', indexSeqQual.c_str());
}
}

int32_t getBucketIndex(const std::string &barcode, SamRecord *samRecord,
const WHITE_LIST_DATA* white_list_data, SAM_RECORD_BINS *samrecord_data,
int *n_barcode_corrected, int *n_barcode_correct, int *n_barcode_errors) {

std::string correct_barcode;
// bucket barcode is used to pick the target bam file
// This is done because in the case of incorrigible barcodes
// we need a mechanism to uniformly distribute the alignments
// so that no bam is oversized to putting all such barcode less
// sequences into one particular. Incorregible barcodes are simply
// added withouth the CB tag
std::string bucket_barcode;
if (white_list_data->mutations.find(barcode) !=
white_list_data->mutations.end()) {

// if -1 then the raw barcode is the correct barcode
if (white_list_data->mutations.at(barcode) == -1) {
correct_barcode = barcode;
*n_barcode_correct += 1;
} else {
// it is a 1-mutation of some whitelist barcode so get the
// barcode by indexing into the vector of whitelist barcodes
correct_barcode =
white_list_data->barcodes.at(white_list_data->mutations.at(barcode));
*n_barcode_corrected += 1;
}
// is used for computing the file index
bucket_barcode = correct_barcode;

// corrected barcode should be added to the samrecord
samRecord->addTag("CB", 'Z', correct_barcode.c_str());
} else { // not possible to correct the raw barcode
*n_barcode_errors += 1;
bucket_barcode = barcode;
}
// destination bam file index computed based on the bucket_barcode
int32_t bucket = std::hash<std::string>{}(bucket_barcode.c_str()) %
samrecord_data->num_files;

return bucket;
}

void submit_block_tobe_written(SAM_RECORD_BINS *samrecord_data, int tindex) {
mtx.lock();

// it sets itself as the active thread who wants the
// readers to clear the data
samrecord_data->active_thread_num = tindex;

// send a signal to every writer thread, i.e., write out data
// data to any file where the samheader should be written to
for (int32_t j = 0; j < samrecord_data->num_files; j++) {
if (sem_post(&semaphores[j]) == -1)
error("sem_post: semaphores");
}

// there is where I wait while the writers are writing
for (int32_t j = 0; j < samrecord_data->num_files; j++) {
if (sem_wait(&semaphores_workers[j]) == -1)
error("sem_wait: semaphores_workers");
}

// they are done writing
for (int j = 0; j < samrecord_data->num_files; j++) {
samrecord_data->file_index[tindex][j].clear();
}

// not records to write in the current
samrecord_data->num_records[tindex] = 0;

// release the mutex, so that other readers might want to write
mtx.unlock();
}

void process_file(int tindex, std::string filenameI1, String filenameR1,
String filenameR2, unsigned int barcode_length,
Expand Down Expand Up @@ -277,83 +397,33 @@ void process_file(int tindex, std::string filenameI1, String filenameR1,
fastQFileR2.readFastQSequence() == FastQStatus::FASTQ_SUCCESS) {
i = i + 1;

// check the sequence names matching
std::string a = std::string(fastQFileR1.myRawSequence.c_str());
std::string b = std::string(fastQFileR1.myQualityString.c_str());
// prepare the rth samrecord with the sequence, sequence quality
SamRecord *samrec = samRecord + r;
// barcode and UMI and their quality sequences
fillSamRecord(samrec, fastQFileI1, fastQFileR1, fastQFileR2,
barcode_length, umi_length, has_I1_file_list);

// extract the raw barcode and UMI
std::string a = std::string(fastQFileR1.myRawSequence.c_str());
std::string barcode = a.substr(0, barcode_length);
std::string UMI = a.substr(barcode_length, umi_length);

// extract raw barcode and UMI quality string
std::string barcodeQString = b.substr(0, barcode_length);
std::string UMIQString = b.substr(barcode_length, umi_length);

// reset the samrecord
samRecord[r].resetRecord();

// add read group and the sam flag
samRecord[r].addTag("RG", 'Z', "A");
samRecord[r].setFlag(4);

// add idenfifier, sequence and quality score of the alignments
samRecord[r].setReadName(fastQFileR2.mySequenceIdentifier.c_str());
samRecord[r].setSequence(fastQFileR2.myRawSequence.c_str());
samRecord[r].setQuality(fastQFileR2.myQualityString.c_str());

// add barcode and quality
samRecord[r].addTag("CR", 'Z', barcode.c_str());
samRecord[r].addTag("CY", 'Z', barcodeQString.c_str());

// add UMI
samRecord[r].addTag("UR", 'Z', UMI.c_str());
samRecord[r].addTag("UY", 'Z', UMIQString.c_str());

// add raw sequence and quality sequence for the index
if (has_I1_file_list) {
std::string indexseq = std::string(fastQFileI1.myRawSequence.c_str());
std::string indexSeqQual = std::string(fastQFileI1.myQualityString.c_str());
samRecord[r].addTag("SR", 'Z', indexseq.c_str());
samRecord[r].addTag("SY", 'Z', indexSeqQual.c_str());
}

std::string correct_barcode;
// bucket barcode is used to pick the target bam file
// This is done because in the case of incorrigible barcodes
// we need a mechanism to uniformly distribute the alignments
// so that no bam is oversized to putting all such barcode less
// sequences into one particular. Incorregible barcodes are simply
// added withouth the CB tag
std::string bucket_barcode;
if (white_list_data->mutations.find(barcode) !=
white_list_data->mutations.end()) {

// if -1 then the raw barcode is the correct barcode
if (white_list_data->mutations.at(barcode) == -1) {
correct_barcode = barcode;
n_barcode_correct += 1;
} else {
// it is a 1-mutation of some whitelist barcode so get the
// barcode by indexing into the vector of whitelist barcodes
correct_barcode =
white_list_data->barcodes.at(white_list_data->mutations.at(barcode));
n_barcode_corrected += 1;
}
// is used for computing the file index
bucket_barcode = correct_barcode;
samRecord[r].addTag("CB", 'Z', correct_barcode.c_str());
} else { // not possible to correct the raw barcode
n_barcode_errors += 1;
bucket_barcode = barcode;
}
int32_t bucket = getBucketIndex(barcode, samrec, white_list_data,
samrecord_data, &n_barcode_corrected,
&n_barcode_correct, &n_barcode_errors);

samrecord_data->num_records[tindex]++;
// destination bam file index computed based on the bucket_barcode
int32_t bucket = std::hash<std::string>{}(bucket_barcode.c_str()) %
samrecord_data->num_files;
// store the index of the record in the right vector that is done
// to serve as a bucket B to hold the indices of samrecords that are
// going to bamfile B
samrecord_data->file_index[tindex][bucket].push_back(r);

samrecord_data->num_records[tindex]++;
// write a block of samrecords
r = r + 1;

Expand All @@ -367,37 +437,10 @@ void process_file(int tindex, std::string filenameI1, String filenameR1,
// sam records in its buffer. This is the same behavior across all
// reader threads
if (r == block_size || !fastQFileR1.keepReadingFile()) {
// mutex makes sure only one reader thread can lock
mtx.lock();

// it sets itself as the active thread who wants the
// readers to clear the data
samrecord_data->active_thread_num = tindex;

// send a signal to every writer thread, i.e., write out data
// data to any file where the samheader should be written to
for (int32_t j = 0; j < samrecord_data->num_files; j++) {
if (sem_post(&semaphores[j]) == -1)
error("sem_post: semaphores");
}

// there is where I wait while the writers are writing
for (int32_t j = 0; j < samrecord_data->num_files; j++) {
if (sem_wait(&semaphores_workers[j]) == -1)
error("sem_wait: semaphores_workers");
}

// they are done writing
for (int j = 0; j < samrecord_data->num_files; j++) {
samrecord_data->file_index[tindex][j].clear();
}
// start reading a new block of FASTQ sequences
r = 0;
// not records to write in the current
samrecord_data->num_records[tindex] = 0;

// release the mutex, so that other readers might want to write
mtx.unlock();
submit_block_tobe_written(samrecord_data, tindex);

// start reading a new block of FASTQ sequences
r = 0;
}
if (i % 10000000 == 0) {
printf("%d\n", i);
Expand Down
14 changes: 7 additions & 7 deletions fastqpreprocessing/src/utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ void read_options(int argc, char **argv, INPUT_OPTIONS &options) {

int verbose_flag;

while (true) {
static struct option long_options[] = {
static struct option long_options[] = {
/* These options set a flag. */
{"verbose", no_argument, &verbose_flag, 1},
/* These options don’t set a flag.
Expand All @@ -95,15 +94,15 @@ void read_options(int argc, char **argv, INPUT_OPTIONS &options) {
{"umi-length", required_argument, 0, 'u'},
{"bam-size", required_argument, 0, 'B'},
{"sample-id", required_argument, 0, 's'},
{"I1", optional_argument, 0, 'I'},
{"I1", required_argument, 0, 'I'},
{"R1", required_argument, 0, 'R'},
{"R2", required_argument, 0, 'r'},
{"white-list", required_argument, 0, 'w'},
{0, 0, 0, 0}
};
};

// help messages when the user types -h
const char *help_messages[] = {
// help messages when the user types -h
const char *help_messages[] = {
"verbose messages ",
"barcode length [required]",
"UMI length [required]",
Expand All @@ -113,9 +112,10 @@ void read_options(int argc, char **argv, INPUT_OPTIONS &options) {
"R1 [required]",
"R2 [required]",
"whitelist (from cellranger) of barcodes [required]",
};
};


while (true) {
/* getopt_long stores the option index here. */
int option_index = 0;

Expand Down

0 comments on commit 14a22c7

Please sign in to comment.