diff --git a/Dockerfile b/Dockerfile index e664493..06d2f56 100644 --- a/Dockerfile +++ b/Dockerfile @@ -12,26 +12,32 @@ RUN pip3 install -r requirements.txt RUN mkdir /sctools/ -COPY . /sctools +COPY . /sctools RUN pip3 install /sctools ARG libStatGen_version="1.0.14" - +ARG htslib_version="1.13" RUN cd /sctools/fastqpreprocessing &&\ wget https://github.com/statgen/libStatGen/archive/v${libStatGen_version}.tar.gz &&\ + wget https://github.com/samtools/htslib/releases/download/${htslib_version}/htslib-${htslib_version}.tar.bz2 &&\ tar -zxvf v${libStatGen_version}.tar.gz &&\ - mv libStatGen-${libStatGen_version} libStatGen &&\ - patch libStatGen/fastq/FastQFile.cpp patches/FastQFile.cpp.patch &&\ - patch libStatGen/general/BgzfFileType.cpp patches/BgzfFileType.cpp.patch &&\ + tar -jxvf htslib-${htslib_version}.tar.bz2 &&\ + mv libStatGen-${libStatGen_version} libStatGen + +RUN cd /sctools/fastqpreprocessing &&\ + patch -f libStatGen/fastq/FastQFile.cpp patches/FastQFile.cpp.patch &&\ + patch -f libStatGen/general/BgzfFileType.cpp patches/BgzfFileType.cpp.patch &&\ patch libStatGen/Makefile patches/Makefile.patch &&\ patch libStatGen/general/Makefile patches/general.Makefile.patch &&\ - make -C libStatGen &&\ - mkdir src/obj &&\ - make -C src/ + make -C libStatGen + +RUN cd /sctools/fastqpreprocessing && make -C htslib-${htslib_version}/ + +RUN cd /sctools/fastqpreprocessing && mkdir bin src/obj && make -C src/ install -RUN cp /sctools/fastqpreprocessing/src/fastqprocess /usr/local/bin/ +RUN cp /sctools/fastqpreprocessing/bin/* /usr/local/bin/ WORKDIR usr/local/bin/sctools diff --git a/fastqpreprocessing/src/Makefile b/fastqpreprocessing/src/Makefile index 2328064..d109ed5 100644 --- a/fastqpreprocessing/src/Makefile +++ b/fastqpreprocessing/src/Makefile @@ -1,28 +1,46 @@ IDIR =../libStatGen/include +IDIR2 =../htslib-1.13 -CC = g++ -std=c++17 -O4 -CFLAGS = -I$(IDIR) -L../libStatGen +CC = g++ -std=c++17 -fPIC -DHTSLIB -Wall -O4 -Wwrite-strings + +CFLAGS = -I$(IDIR) -L../libStatGen ODIR=obj LDIR =../libStatGen/ -TARGET = fastqprocess -LIBS = -lStatGen -lz -lpthread -lstdc++fs +LIBS = -lStatGen -lz -lpthread -lstdc++fs + +_DEPS = fastqprocess.h utilities.h input_options.h + +TARGET1 = fastqprocess +_TARGET1_OBJ = fastqprocess.o +TARGET1_OBJ = $(patsubst %,$(ODIR)/%,$(_TARGET1_OBJ)) + +TARGET2 = TagSort +_TARGET2_OBJ = tagsort.o htslib_tagsort.o globals.o sort_write.o metricgatherer.o +TARGET2_OBJ = $(patsubst %,$(ODIR)/%,$(_TARGET2_OBJ)) -_DEPS = fastqprocess.h utilities.h -#DEPS = $(patsubst %,$(IDIR)/%,$(_DEPS)) -_OBJ = fastqprocess.o utilities.o main.o -OBJ = $(patsubst %,$(ODIR)/%,$(_OBJ)) +install: $(TARGET1) $(TARGET2) + cp $(TARGET1) ../bin/ + cp $(TARGET2) ../bin/ + cp ../htslib-1.13/*.so.? ../bin/ +all: $(TARGET1) $(TARGET2) -$(ODIR)/%.o: %.cpp $(DEPS) - $(CC) -c -o $@ $< $(CFLAGS) +_COMMON_OBJ = utilities.o input_options.o +OBJ = $(patsubst %,$(ODIR)/%,$(_COMMON_OBJ)) -$(TARGET): $(OBJ) $(_DEPS) +$(ODIR)/%.o: %.cpp $(_DEPS) + $(CC) -c -o $@ $< -I$(IDIR) -I. -I$(IDIR2) + +$(TARGET1): $(OBJ) $(TARGET1_OBJ) $(CC) -o $@ $^ $(CFLAGS) $(LIBS) -.PHONY: clean +$(TARGET2): $(OBJ) $(TARGET2_OBJ) + $(CC) -Wl,-rpath,/usr/local/bin:fastqpreprocessing/bin:bin:. -o $@ $(OBJ) $(TARGET2_OBJ) -I. -L. -lstdc++fs -lz -L../htslib-1.13 -lhts -lpthread +.PHONY: clean clean: - rm -f $(ODIR)/*.o *~ core $(INCDIR)/*~ + rm -f $(ODIR)/*.o *~ core $(INCDIR)/*~ *.o *.so *.a + rm -rf $(TARGET1) $(TARGET2) diff --git a/fastqpreprocessing/src/datatypes.h b/fastqpreprocessing/src/datatypes.h new file mode 100644 index 0000000..bd943a3 --- /dev/null +++ b/fastqpreprocessing/src/datatypes.h @@ -0,0 +1,189 @@ +#ifndef __DATA_TYPES__ +#define __DATA_TYPES__ + +/** + * @file input_options.h + * @brief Utility functions for input options processing + * @author Kishori Konwar + * @date 2021-08-11 + ***********************************************/ + +#include +#include +#include +#include +#include "globals.h" + +using namespace std; + +typedef std::tuple TRIPLET; + +typedef struct TagCounter { + TagCounter() { + prev_tag = ""; + } + std::unordered_map data; + std::string prev_tag; + int count = 0; + + void clear() { data.clear(); } + + void update(const std::string &tag) { + if (tag.compare(prev_tag)!=0) { + count++; + prev_tag = tag; + } + } +} TAG_COUNTER; + +typedef std::tuple*/, + std::string /* reference */, + std::string /* biotype */, + int /* pos */, + int /*rev strand 1 for yes, 0 otherwise*/, + float /*avg barcode qual score */, + float /* frac of barcode qual score >30 */, + float /*avg qual seq */ , + float /*fract of >30 score qual seq*/, + int /*NH*/, + int /*perfect molecule barcode, 1 is yes, 0 otherwise*/, + int /*spliced reads 1 yes, 0 otherwise*/, + int /*is duplicate */, + int /*perfect cell barcode 1 is yes, 0 otherwise*/, + float /* fraction of umi qual score > 30 */ + > TAGTUPLE; + +typedef std::tuple QUEUETUPLE; + +typedef std::pair STRING_BOOL_PAIR; + +typedef std::vector STRING_VECTOR; + +typedef std::unordered_map STRING_INT64_MAP; + + +typedef struct _tags { + char **tags; +} TAGS; + +typedef struct _tags_holder { + int num_tags; + TAGS *tags; + char *memorypool; + + char *allocated_memory(int size) { + return 0; + } + + char *double_memory() { + return 0; + } +} TAGS_HOLDER; + + +// structure for correcting the barcodes +typedef struct _white_list_data { + // an unordered map from whitelist barcodes and 1-mutations + // to the index of the correct barcode + STRING_INT64_MAP mutations; + // vector of whitelist barcodes + STRING_VECTOR barcodes; +} WHITE_LIST_DATA; + + +// Structure to hold input options for fastqprocess +typedef struct _input_options_fastqprocess { + // Initialize some of the values + _input_options_fastqprocess() { + barcode_length = -1; + umi_length = -1; + sample_id = ""; + bam_size = 1.0; + verbose_flag = 0; + } + // verbose flag + unsigned int verbose_flag; + + // I1, R1 and R2 files name + std::vector I1s, R1s, R2s; + // Barcode white list file + + std::string white_list_file; + // chemistry dependent (V2/V3) barcode and UMI length + int barcode_length, umi_length; + + // Bam file size to split by (in GB) + double bam_size; + + // sample name + std::string sample_id; +} INPUT_OPTIONS_FASTQPROCESS; + + +/** + * @brief Reads the options to the fastqprocess program + * + * @param argc no of arguments to the main function + * @param argv arguments array to the main function + * @param options the structure for holding the options for getopt +*/ +void read_options_fastqprocess(int, char **, INPUT_OPTIONS_FASTQPROCESS &); + +enum METRIC_TYPE {CELL, GENE}; + +// Structure to hold input options for tagsort +typedef struct _input_options_tagsort { + // Initialize some of the values + _input_options_tagsort() { + bam_input = ""; + gtf_file = ""; + temp_folder = std::string("/tmp/"); + alignments_per_thread = NUM_ALNS_PER_THREAD; + nthreads = 1; + compute_metric = 0; + output_sorted_info = 0; + metric_type = ""; + } + // metric type + std::string metric_type; + + // output sorted info + unsigned int output_sorted_info; + // compute metric + unsigned int compute_metric; + // name of the bam file + std::string bam_input; + // name of the gtf file + std::string gtf_file; + // temp folder for disk sorting + std::string temp_folder; + // metric_output file + std::string metric_output_file; + // sorted tsv output file + std::string sorted_output_file; + // number of alignment per thread + unsigned int alignments_per_thread; + // number of threads + unsigned int nthreads; + // barcode tag + std::string barcode_tag; + // umi tag + std::string umi_tag; + // gene tag + std::string gene_tag; + // order of the tags to sort by + std::unordered_map tag_order; + +} INPUT_OPTIONS_TAGSORT; + + +/** + * @brief Reads the options to the tagsort program + * + * @param argc no of arguments to the main function + * @param argv arguments array to the main function + * @param options the structure for holding the options for getopt +*/ +void read_options_tagsort(int, char **, INPUT_OPTIONS_TAGSORT &); + +#endif diff --git a/fastqpreprocessing/src/fastqprocess.cpp b/fastqpreprocessing/src/fastqprocess.cpp index 0d4a1e2..01a9d5e 100644 --- a/fastqpreprocessing/src/fastqprocess.cpp +++ b/fastqpreprocessing/src/fastqprocess.cpp @@ -79,7 +79,7 @@ SAM_RECORD_BINS * create_samrecord_holders(int16_t nthreads, } /** @copydoc process_inputs */ -void process_inputs(const INPUT_OPTIONS &options, +void process_inputs(const INPUT_OPTIONS_FASTQPROCESS &options, const WHITE_LIST_DATA *white_list_data) { int block_size = SAMRECORD_BUFFER_SIZE; @@ -109,7 +109,7 @@ void process_inputs(const INPUT_OPTIONS &options, // execute the fastq readers threads std::thread *readers = new std::thread[options.R1s.size()]; - for (int i = 0; i < options.R1s.size(); i++) { + for (unsigned int i = 0; i < options.R1s.size(); i++) { std::string I1; // if there is no I1 file then send an empty file name if (options.I1s.size() > 0) { @@ -124,7 +124,7 @@ void process_inputs(const INPUT_OPTIONS &options, } // every reader thread joins. - for (int i = 0; i < options.R1s.size(); i++) { + for (unsigned int i = 0; i < options.R1s.size(); i++) { readers[i].join(); } // set the stop flag for the writers @@ -494,3 +494,20 @@ void process_file(int tindex, std::string filenameI1, String filenameR1, i, n_barcode_correct, n_barcode_corrected, n_barcode_errors, n_barcode_errors/static_cast(i) *100); } + +/* Flag set by ‘--verbose’. */ +int main (int argc, char **argv) +{ + + INPUT_OPTIONS_FASTQPROCESS options; + + read_options_fastqprocess(argc, argv, options); + + std::cout << "reading whitelist file " << options.white_list_file << "..."; + WHITE_LIST_DATA *white_list_data = read_white_list(options.white_list_file); + std::cout << "done" << std::endl; + + process_inputs(options, white_list_data); + return 0; +} + diff --git a/fastqpreprocessing/src/fastqprocess.h b/fastqpreprocessing/src/fastqprocess.h index 2e167a5..0fa25f2 100644 --- a/fastqpreprocessing/src/fastqprocess.h +++ b/fastqpreprocessing/src/fastqprocess.h @@ -1,13 +1,11 @@ +#ifndef __FASTQ_PROCESS_H__ +#define __FASTQ_PROCESS_H__ /** * @file fastqprocess.h * @brief functions for file processing * @author Kishori Konwar * @date 2020-08-27 ***********************************************/ - -#ifndef __FASTQ_PROCESS_H__ -#define __FASTQ_PROCESS_H__ - #include #include "FastQStatus.h" #include "BaseAsciiMap.h" @@ -27,7 +25,9 @@ #include #include #include + #include "utilities.h" +#include "input_options.h" /// Samrecord bins to be accessed by all threads @@ -79,7 +79,7 @@ typedef struct SamRecordBins { * @params white_list_data data-structure to store barcode correction * map and vector of correct barcodes */ -void process_inputs(const INPUT_OPTIONS & options, \ +void process_inputs(const INPUT_OPTIONS_FASTQPROCESS & options, \ const WHITE_LIST_DATA * white_list_data); /** diff --git a/fastqpreprocessing/src/globals.cpp b/fastqpreprocessing/src/globals.cpp new file mode 100644 index 0000000..0c8d115 --- /dev/null +++ b/fastqpreprocessing/src/globals.cpp @@ -0,0 +1,15 @@ +/** + * @file globals.cpp + * @brief Utility functions for file processing + * @author Kishori Konwar + * @date 2021-08-11 + ***********************************************/ +#include "globals.h" + +sem_t semaphore; +std::mutex mtx; +std::vector partial_files; + +std::set busy_buffers, idle_buffers, threads_to_join; + + diff --git a/fastqpreprocessing/src/globals.h b/fastqpreprocessing/src/globals.h new file mode 100644 index 0000000..0a0b4e6 --- /dev/null +++ b/fastqpreprocessing/src/globals.h @@ -0,0 +1,30 @@ +#ifndef __SCTOOL_GLOBALS__ +#define __SCTOOL_GLOBALS__ + +/** + * @file globals.h + * @brief Utility functions for file processing + * @author Kishori Konwar + * @date 2021-08-11 + ***********************************************/ + +#include +#include +#include +#include +#include + +#define THRESHOLD 30.0 +#define NUM_THREADS 10 +#define MAX_THREADS 30 +#define NUM_ALNS_PER_THREAD 1000000 + +#define DATA_BUFFER_SIZE 1000 + +extern sem_t semaphore; +extern std::mutex mtx; +extern std::vector partial_files; + +extern std::set busy_buffers, idle_buffers, threads_to_join; + +#endif diff --git a/fastqpreprocessing/src/htslib_tagsort.cpp b/fastqpreprocessing/src/htslib_tagsort.cpp new file mode 100644 index 0000000..2f03d9c --- /dev/null +++ b/fastqpreprocessing/src/htslib_tagsort.cpp @@ -0,0 +1,422 @@ +/** + * @file htslib_tagsort.cpp + * @brief functions for file processing + * @author Kishori Konwar + * @date 2021-08-11 + ***********************************************/ + +#include "htslib_tagsort.h" + +unsigned int num_thread_deallocations = 0; +extern sem_t semaphore; +extern std::mutex mtx; +extern std::set busy_buffer, idle_buffer, threads_to_join; + +char* error_sem_wait = "sem_wait: semaphore"; +char* error_sem_post = "sem_post: semaphore"; + +#define SEM_PRINT_VAL(X,Y) \ + ({ \ + sem_getvalue(&X, &Y); \ + std::cout << "SEM_VALUE " << Y << std::endl; \ + }) \ + +#define SEM_INIT(X, n) (sem_init(&X, 0, n) ) + +#define SEM_DESTROY(X) (sem_destroy(&X) ) + +#define SEM_WAIT(X) \ + ({ \ + if (sem_wait(&X) == -1) \ + error(error_sem_wait); \ + }) + +#define SEM_POST(X) \ + ({ \ + if (sem_post(&X) == -1) \ + error(error_sem_wait); \ + }) + + +extern "C" { + bam_hdr_t * sam_hdr_read(samFile *); //read header + htsFile *hts_open(const char *fn, const char *mode); +} + + +/* + @brief get the int tag or -1 + +*/ +inline int get_itag_or_default(bam1_t *aln, const char *tagname, int default_value) { + uint8_t *p; + int tag_value = -1; + if ((p=bam_aux_get(aln, tagname))==NULL) { + tag_value = default_value; + } else { + tag_value = bam_aux2i(p); + } + return tag_value; +} + +/* + @brief get the string tag or the default + +*/ +inline char *get_Ztag_or_default(bam1_t *aln, const char *tagname, char *default_value) { + uint8_t *p; + char *tag_value = NULL; + if ((p=bam_aux_get(aln, tagname))==NULL) { + tag_value = default_value; + } else { + tag_value = bam_aux2Z(p); + if (strcmp(tag_value, "-")==0) { + tag_value = default_value; + } + } + return tag_value; +} + +using namespace std; + +void process_alignments(INPUT_OPTIONS_TAGSORT &options, bam1_t **aln, bam_hdr_t *bamHdr, unsigned int buf_no, unsigned int n) { + int threshold = THRESHOLD; //qual score threshold + vector tuple_records; + std::unordered_map string_map; + std::string tags[3]; + + char empty[] = {'\0'}; + char none[] = {'N', 'o', 'n', 'e', '\0'}; + char nochr[] = {'*', '\0'}; + + char *barcode, *gene_id, *umi, *umi_raw, *umi_qual, *barcode_qual, *barcode_raw; + char *location_tag; + int nh_num; + char *chr; + + uint32_t len = 0; //length of qual seq. + + for (unsigned int i = 0; i < n; i++) { + // extract the barcodes corrected and corrected + barcode = (char *)get_Ztag_or_default(aln[i], options.barcode_tag.c_str(), none); + barcode_raw = (char *)get_Ztag_or_default(aln[i], "CR", empty); + + // to be called perfect the corrected and raw barcodes should match + int perfect_cell_barcode = 1; + unsigned int barcode_len = strlen(barcode); + + if (strcmp(barcode, "None")==0) { + // empty barcodes are not perfect + perfect_cell_barcode = 0; + } else { + // perfect barcodes have the same corrected and raw barcodes + for (unsigned int k=0; k < barcode_len; k++) + if (barcode[k] != barcode_raw[k]) { + perfect_cell_barcode = 0; + break; + } + } + + // barcode quality score + barcode_qual = (char *)get_Ztag_or_default(aln[i], "CY", empty); + + //average barcode across the query and the fraction of barcodes above threshold + float sum_barcode_qual = 0; + float num_bp_above_threshold = 0; + len = strlen(barcode_qual); + + for (unsigned int k = 0; k < len; k++) { + // barcodes qual strings are in ASCII symbols subtracting 33 gives the phred qual score + sum_barcode_qual += ((uint8_t)barcode_qual[k] -33); + if (((uint8_t)barcode_qual[k] - 33) > threshold) num_bp_above_threshold += 1; + } + int avg_cell_barcode_qual = sum_barcode_qual/(float)len; + float cell_barcode_qual_above_threshold = (float)num_bp_above_threshold/(float)len; + + + // corrected molecule barcodes (UMIs) + umi = (char *)get_Ztag_or_default(aln[i], options.umi_tag.c_str(), none); + // raw molecule barcodes + umi_raw = (char *)get_Ztag_or_default(aln[i], "UR", empty); + + // to be called perfect, the corrected and raw molecular barcodes should match + unsigned int umi_len = strlen(umi); + int perfect_molecule_barcode = 1; + if (strcmp(umi, "None")==0) { // not equal length case + perfect_molecule_barcode = 0; + } else { + // if equal then compare char by char + for (unsigned int k=0; k < umi_len; k++) + if (umi[k] != umi_raw[k]) { + perfect_molecule_barcode = 0; + break; + } + } + // qual score for molecular barcodes + umi_qual = (char *)get_Ztag_or_default(aln[i], "UY", empty); + + float sum_umi_qual = 0; + float num_umi_above_threshold = 0; + len = strlen(umi_qual); + for (unsigned int k = 0; k < len; k++) { + // molecular barcodes qual strings are in ASCII symbols subtracting 33 gives the phred qual score + sum_umi_qual += ((uint8_t)umi_qual[k] -33); + if (((uint8_t)umi_qual[k] - 33) > threshold) num_umi_above_threshold += 1; + } + float frac_umi_qual_above_threshold = (float)num_umi_above_threshold/(float)len; + + gene_id = (char *)get_Ztag_or_default(aln[i], options.gene_tag.c_str(), none); + location_tag = (char *)get_Ztag_or_default(aln[i], "XF", empty); + + nh_num = get_itag_or_default(aln[i], "NH", -1); + + chr = bamHdr->target_name[aln[i]->core.tid]; + if (aln[i]->core.tid==-1){ + chr = nochr; + } else { + chr = bamHdr->target_name[aln[i]->core.tid]; + } + + uint32_t pos = aln[i]->core.pos; // position. + uint32_t isrev = bam_is_rev(aln[i]) ? 1 : 0; // is reverse stand + + uint32_t is_duplicate = 0; // is duplicate + if ((aln[i]->core.flag & BAM_FDUP) !=0) { + is_duplicate = 1; + } + + // sequence quality score + uint8_t *qual_seq = bam_get_qual(aln[i]); // pointer to the qual data + len = aln[i]->core.l_qseq; //length of qual seq. + + float avg_sequence_qual = 0, sum_qual = 0; + float qual_above_threshold = 0; + + for (unsigned int k = 0; k < len; k++) { + // the qual string are already in phred scores + sum_qual += (qual_seq[k]); + if ((qual_seq[k]) > threshold) qual_above_threshold += 1; + } + avg_sequence_qual = sum_qual/(float)len; + qual_above_threshold = qual_above_threshold/(float)len; + + uint32_t *cigar = bam_get_cigar(aln[i]); + // see if it is spliced, i.e., N appears in the CIGAR string + uint32_t spliced_read = 0; + for (unsigned int k = 0; k < aln[i]->core.n_cigar; ++k) { + uint32_t op = cigar[k] & BAM_CIGAR_MASK; + if (op==3 && (cigar[k] >> BAM_CIGAR_SHIFT)!=0) { + spliced_read = 1; + break; + } + } + + // the order of the three tags are define by the order of the supplied input arguments + // tag.order [tag_name] -> order map + tags[options.tag_order[options.barcode_tag]] = barcode; + tags[options.tag_order[options.umi_tag]] = umi; + tags[options.tag_order[options.gene_tag]] = gene_id; + + if (string_map.find(barcode)==string_map.end()) { + string_map[std::string(barcode)] = new std::string(barcode); + } + + if (string_map.find(umi)==string_map.end()) { + string_map[std::string(umi)] = new std::string(umi); + } + + if (string_map.find(gene_id)==string_map.end()) { + string_map[gene_id] = new std::string(gene_id); + } + + TRIPLET* triplet = new TRIPLET(string_map[tags[0]], string_map[tags[1]], string_map[tags[2]]); + + tuple_records.push_back( + std::make_tuple( + triplet, /* triplet of tags pointers */ + std::string(chr), /* record[0] */ + std::string(location_tag), /* record[1] */ + pos, /* record [2] */ + isrev, /* record[3] */ + avg_cell_barcode_qual, /* record[4] */ + cell_barcode_qual_above_threshold, /* record[5] */ + avg_sequence_qual, /* record[6] */ + qual_above_threshold, /* record[7] */ + nh_num, /* record[8] */ + perfect_molecule_barcode, /* record[9] */ + spliced_read, /* record[10] */ + is_duplicate, /* record[11] */ + perfect_cell_barcode, /* record[12] */ + frac_umi_qual_above_threshold /* record[13] */ + ) + ); + }//for loop + + write_out_partial_txt_file(tuple_records, options.temp_folder); + + // delete the triplet + for (auto it=tuple_records.begin(); it!=tuple_records.end(); it++) { + delete get<0>(*it); + } + tuple_records.clear(); + freeStlContainer(tuple_records); + + // free the memory for the strings + for (auto it=string_map.begin(); it!=string_map.end(); it++) { + delete it->second; + } + + string_map.clear(); + freeStlContainer(string_map); + + mtx.lock(); + threads_to_join.insert(buf_no); + mtx.unlock(); + + num_thread_deallocations += 1; + SEM_POST(semaphore); +} + +/** + * @brief From the input bam create a list of txt files with the records (lines) + * sorted according to the * tags + * + * @details + * The input bam file is read chunk by chunk, sorted by the tags and the written + * out as a text file in the sorted manner. + * + * @param options: INPUT_OPTIONS_TAGSORT the inputs to the program + * @return a vector containing the file paths of the partial files +*/ +void create_sorted_file_splits_htslib(INPUT_OPTIONS_TAGSORT &options) { + + string input_bam = options.bam_input; + string tmp_folder = options.temp_folder; + + // size of individual chunks to sort in memory as an approx 20 mil alignments makes 1 GB bam + //open bam file + samFile *fp_in=0; + if ((fp_in = hts_open(options.bam_input.c_str(),"r"))==0) { + std::cerr << "ERROR Cannot open the file " << options.bam_input << std::endl; + exit(1); + } + + bam_hdr_t *bamHdr = sam_hdr_read(fp_in); //read header + bam1_t ***aln_arr; + + // allocated memory for the alignments for the various threads + if ((aln_arr = (bam1_t ***)malloc(sizeof(bam1_t**)*options.nthreads))==NULL) { + std::cerr << "ERROR Failed to allocate memory " << std::endl; + exit(1); + } + for (unsigned int i = 0; i < options.nthreads; i++) { + if ((aln_arr[i] = (bam1_t **)malloc(sizeof(bam1_t *)*options.alignments_per_thread))==NULL) { + std::cerr << "ERROR Failed to allocate memory for alignments " << std::endl; + } + for (unsigned int k = 0; k < options.alignments_per_thread; k++) { + aln_arr[i][k]= bam_init1(); //initialize an alignment + } + } + + std::cout << "Running htslib" << std::endl; + // Keep reading records until ReadRecord returns false. + long int i =0; + unsigned int batch = 0; + long int num_alignments =0; + std::string tags[3]; + + std::thread thread_id[MAX_THREADS]; + + unsigned int buf_i=0; + + for (unsigned int j = 0; j < options.nthreads; j++) { + idle_buffers.insert(j); + } + + std::set::iterator buf_it; + unsigned int buf_no; + + SEM_INIT(semaphore, options.nthreads); + + int sem_value; + SEM_WAIT(semaphore); + + mtx.lock(); + buf_no = *idle_buffers.begin(); + idle_buffers.erase(buf_no); + busy_buffers.insert(buf_no); + mtx.unlock(); + + while(sam_read1(fp_in, bamHdr,aln_arr[buf_no][buf_i]) > 0) { + buf_i++; + num_alignments++; + + if (buf_i!=0 && buf_i==options.alignments_per_thread) { + std::cout << "Batch number : " << batch << std::endl; + batch++; + + // thread begins with the new buf no + thread_id[buf_no] = std::thread(process_alignments, std::ref(options), aln_arr[buf_no], bamHdr, buf_no, buf_i); + + // move forward only if there is room for new buffer + SEM_WAIT(semaphore); + + // join any completed thread + mtx.lock(); + for (buf_it=threads_to_join.begin(); buf_it!=threads_to_join.end(); buf_it++) { + thread_id[*buf_it].join(); + busy_buffers.erase(*buf_it); + idle_buffers.insert(*buf_it); + } + // clear the threads + threads_to_join.clear(); + mtx.unlock(); + + // now find an idle buffer and make it busy so that we can load new data + mtx.lock(); + // this is the new buffer to fill + buf_no = *idle_buffers.begin(); + idle_buffers.erase(buf_no); + busy_buffers.insert(buf_no); + //std::cout << buf_no << " busy" << std::endl; + mtx.unlock(); + buf_i = 0; + } + i = i + 1; + } // while loop + + thread_id[buf_no] = std::thread(process_alignments, std::ref(options), aln_arr[buf_no], bamHdr, buf_no, buf_i); + + // make sure you consume all the counts, the remaining semaphore is the value + sem_getvalue(&semaphore, &sem_value); + for (unsigned int k=0; k< options.nthreads; k++) { + SEM_WAIT(semaphore); + } + SEM_DESTROY(semaphore); + + // join any completed thread that are ready to be joined + mtx.lock(); + for (buf_it=threads_to_join.begin(); buf_it!=threads_to_join.end(); buf_it++) { + thread_id[*buf_it].join(); + } + threads_to_join.clear(); + mtx.unlock(); + + // release the memory and clear data-structures + for (unsigned int i = 0; i < options.nthreads; i++) { + std::cout << "releasing memory for thread " << i << std::endl; + for (unsigned int k = 0; k < options.alignments_per_thread; k++) { + bam_destroy1(aln_arr[i][k]); + } + free(aln_arr[i]); + } + free(aln_arr); + + sam_hdr_destroy(bamHdr); + hts_close(fp_in); + + std::cout << "Deallocate threads " << num_thread_deallocations << std::endl; + std::cout << std::endl << "Read " << i << " records" << std::endl; + std::cout << "Read " << num_alignments << " records as batches" << std::endl; +} // function + diff --git a/fastqpreprocessing/src/htslib_tagsort.h b/fastqpreprocessing/src/htslib_tagsort.h new file mode 100644 index 0000000..bb10674 --- /dev/null +++ b/fastqpreprocessing/src/htslib_tagsort.h @@ -0,0 +1,49 @@ +#ifndef __HTSLIB_TAG_SORT__ +#define __HTSLIB_TAG_SORT__ + +/** + * @file htslib_tagsort.h + * @brief Utility functions for input options processing + * @author Kishori Konwar + * @date 2021-08-11 + ***********************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include "input_options.h" +#include "datatypes.h" +#include "sort_write.h" +#include "utilities.h" +#include "globals.h" + + +/** + * @brief From the input bam create a list of txt files with the records (lines) + * sorted according to the * tags + * + * @details + * The input bam file is read chunk by chunk, sorted by the tags and the written + * out as a text file in the sorted manner. + * + * @param options: INPUT_OPTIONS_TAGSORT the inputs to the program + * @return a vector containing the file paths of the partial files +*/ +void create_sorted_file_splits_htslib(INPUT_OPTIONS_TAGSORT &options); + +#endif diff --git a/fastqpreprocessing/src/input_options.cpp b/fastqpreprocessing/src/input_options.cpp new file mode 100644 index 0000000..f120254 --- /dev/null +++ b/fastqpreprocessing/src/input_options.cpp @@ -0,0 +1,409 @@ +/** + * @file input_options.cpp + * @brief functions for optons and input checking + * @author Kishori Konwar + * @date 2021-08-11 + ***********************************************/ +#include "input_options.h" +#include + +using namespace std; +namespace fs = std::experimental::filesystem; + +void error_message(const char *msg) { + std::cerr << msg; +} + +/** @copydoc read_options_tagsort */ +void read_options_tagsort(int argc, char **argv, INPUT_OPTIONS_TAGSORT &options) { + int c; + int i; + + static struct option long_options[] = { + /* These options set a flag. */ + {"compute-metric", no_argument, 0, 'm'}, + {"output-sorted-info", no_argument, 0, 'n'}, + /* These options don’t set a flag. + We distinguish them by their indices. */ + {"bam-input", required_argument, 0, 'b'}, + {"gtf-file", required_argument, 0, 'a'}, + {"temp-folder", required_argument, 0, 't'}, + {"sorted-output", required_argument, 0, 'o'}, + {"metric-output", required_argument, 0, 'M'}, + {"alignments-per-thread", required_argument, 0, 'p'}, + {"nthreads", required_argument, 0, 'T'}, + {"barcode-tag", required_argument, 0, 'C'}, + {"umi-tag", required_argument, 0, 'U'}, + {"gene-tag", required_argument, 0, 'G'}, + {"metric-type", required_argument, 0, 'K'}, + {0, 0, 0, 0} + }; + + // help messages when the user types -h + const char *help_messages[] = { + "compute metric, metrics are computed if this option is provided [optional]", + "sorted output file is produced if this option is provided [optional]", + "input bam file [required]", + "gtf file (unzipped) required then metric type is cell [required with metric cell]", + "temp folder for disk sorting [options: default /tmp]", + "sorted output file [optional]", + "metric file, the metrics are output in this file [optional]", + "number of alignments per thread [optional: default 1000000], if this number is increased then more RAM is required but reduces the number of file splits", + "number of threads [optional: default 1]", + "barcode-tag the call barcode tag [required]", + "umi-tag the umi tag [required]: the tsv file output is sorted according the tags in the options barcode-tag, umi-tag or gene-tag", + "gene-tag the gene tag [required]", + "metric type, either \"cell\" or \"gene\" [required]" + }; + + + /* getopt_long stores the option index here. */ + int option_index = 0; + int curr_size = 0; + while ((c = getopt_long(argc, argv, + "b:a:t:no:mM:p:T:C:U:G:K:", + long_options, + &option_index)) !=- 1 + ) + { + // process the option or arguments + switch (c) { + case 'm': + options.compute_metric = 1; + break; + case 'n': + options.output_sorted_info = 1; + break; + case 0: + /* If this option set a flag, do nothing else now. */ + if (long_options[option_index].flag != 0) + break; + printf("option %s", long_options[option_index].name); + if (optarg) + printf(" with arg %s", optarg); + printf("\n"); + break; + case 'b': + options.bam_input = string(optarg); + break; + case 'a': + options.gtf_file = string(optarg); + break; + case 't': + options.temp_folder = string(optarg); + break; + case 'o': + options.sorted_output_file = string(optarg); + break; + case 'M': + options.metric_output_file = string(optarg); + break; + case 'p': + options.alignments_per_thread = atoi(optarg); + break; + case 'T': + options.nthreads = atoi(optarg); + break; + case 'C': + options.barcode_tag = string(optarg); + curr_size = options.tag_order.size(); + options.tag_order[string(optarg)] = curr_size; + break; + case 'U': + options.umi_tag = string(optarg); + curr_size = options.tag_order.size(); + options.tag_order[string(optarg)] = curr_size; + break; + case 'G': + options.gene_tag = string(optarg); + curr_size = options.tag_order.size(); + options.tag_order[string(optarg)] = curr_size; + break; + case 'K': + options.metric_type = string(optarg); + break; + case '?': + case 'h': + i = 0; + printf("Usage: %s [options] \n", argv[0]); + while (long_options[i].name != 0) { + printf("\t--%-20s %-25s %-35s\n", long_options[i].name, + long_options[i].has_arg == no_argument? + "no argument" : "required_argument", + help_messages[i]); + i = i + 1; + } + /* getopt_long already printed an error message. */ + exit(0); + break; + default: + abort(); + } + } + + // Check the options + // either metric computation or the sorted tsv file must be produced + if (options.output_sorted_info!=1 && options.compute_metric!=1) { + error_message("ERROR: The choice of either the sorted alignment info or metric computation must be specified\n"); + exit(1); + } else { + if ( + !(options.metric_output_file.size()!=0 && options.compute_metric==1) && + !(options.sorted_output_file.size()!=0 && options.output_sorted_info==1) + ) { + error_message("ERROR: --compute-metric and --metric-output should be both specified together\n"); + exit(1); + } + } + + // metric type must be either of type cell or gene + if (options.metric_type.size() == 0 || + (options.metric_type.compare("cell")!=0 && options.metric_type.compare("gene")!=0) + ) { + error_message("ERROR: Metric type must either be \"cell\" or \"gene\"\n"); + exit(1); + } + + // if metric type is cell then the gtf file must be provided + if (options.metric_type.compare("cell")==0 && options.gtf_file.size()==0) { + error_message("ERROR: The gtf file name must be provided with metric_type \"cell\"\n"); + exit(1); + } + + // the gtf file should not be gzipped + std::regex reg1(".gz$", regex_constants::icase); + if (std::regex_search(options.gtf_file, reg1)) { + error_message("ERROR: The gtf file must not be gzipped\n"); + exit(1); + } + + // bam input file must be there + if (options.bam_input.size() == 0) { + error_message("ERROR: Must specify a input file name\n"); + exit(1); + } + + std::stringstream ss; + // check for input file + if (not fs::exists(options.bam_input.c_str())) { + ss.str(""); + ss << "ERROR " << "bam_input" << options.bam_input << " is missing!\n"; + error_message(ss.str().c_str()); + exit(1); + } + + // check for the temp folder + if (not fs::exists(options.temp_folder.c_str())) { + ss.str(""); + ss << "ERROR " << "temp folder " << options.temp_folder << " is missing!\n"; + error_message(ss.str().c_str()); + exit(1); + } + + // check for three distinct tags, barcode, umi and gene_id tags + if (options.tag_order.size()!=3) { + error_message("ERROR: Must have three distinct tags\n"); + exit(1); + } + + // The size of a set of aligments for in-memory sorting must be positive + if (options.alignments_per_thread < 1000) { + error_message("ERROR: The number of alignments per thread must be at least 1000\n"); + exit(1); + } + + // The number of threads must be between 1 and MAX_THREADS + if (options.nthreads > MAX_THREADS or options.nthreads < 1) { + ss << "ERROR: The number of threads must be between 1 and " << MAX_THREADS << "\n"; + error_message(ss.str().c_str()); + exit(1); + } +} + + +/** @copydoc read_options_fastqprocess */ +void read_options_fastqprocess(int argc, char **argv, INPUT_OPTIONS_FASTQPROCESS &options) { + int c; + int i; + + int verbose_flag = 0; + + static struct option long_options[] = { + /* These options set a flag. */ + {"verbose", no_argument, 0, 'v'}, + /* These options don’t set a flag. + We distinguish them by their indices. */ + {"barcode-length", required_argument, 0, 'b'}, + {"umi-length", required_argument, 0, 'u'}, + {"bam-size", required_argument, 0, 'B'}, + {"sample-id", required_argument, 0, 's'}, + {"I1", required_argument, 0, 'I'}, + {"R1", required_argument, 0, 'R'}, + {"R2", required_argument, 0, 'r'}, + {"white-list", required_argument, 0, 'w'}, + {0, 0, 0, 0} + }; + + // help messages when the user types -h + const char *help_messages[] = { + "verbose messages ", + "barcode length [required]", + "UMI length [required]", + "output BAM file in GB [optional: default 1 GB]", + "sample id [required]", + "I1 [optional]", + "R1 [required]", + "R2 [required]", + "whitelist (from cellranger) of barcodes [required]", + }; + + + /* getopt_long stores the option index here. */ + int option_index = 0; + while ((c = getopt_long(argc, argv, + "b:u:B:s:I:R:r:w:v", + long_options, + &option_index)) !=- 1 + ) + { + // process the option or arguments + switch (c) { + case 'v': + options.verbose_flag = 1; + break; + case 0: + /* If this option set a flag, do nothing else now. */ + if (long_options[option_index].flag != 0) + break; + printf("option %s", long_options[option_index].name); + if (optarg) + printf(" with arg %s", optarg); + printf("\n"); + break; + case 'b': + options.barcode_length = atoi(optarg); + break; + case 'u': + options.umi_length = atoi(optarg); + break; + case 'B': + options.bam_size = atof(optarg); + break; + case 's': + options.sample_id = string(optarg); + break; + case 'I': + options.I1s.push_back(string(optarg)); + break; + case 'R': + options.R1s.push_back(string(optarg)); + break; + case 'r': + options.R2s.push_back(string(optarg)); + break; + case 'w': + options.white_list_file = string(optarg); + break; + case '?': + case 'h': + i = 0; + printf("Usage: %s [options] \n", argv[0]); + while (long_options[i].name != 0) { + printf("\t--%-20s %-25s %-35s\n", long_options[i].name, + long_options[i].has_arg == no_argument? + "no argument" : "required_argument", + help_messages[i]); + i = i + 1; + } + /* getopt_long already printed an error message. */ + return; + break; + default: + abort(); + } + } + + // Check the options + // number of R1 and R2 files should be equal + bool exit_with_error = false; + if ((options.R1s.size() != options.R2s.size())) { + std::cout << "ERROR: Unequal number of R1 and R2 fastq files in input: " + << "R1 : " << options.R1s.size() + << "R2 : " << options.R2s.size() + << std::endl; + + std::cerr << "ERROR: Unequal number of R1 and R2 fastq files in input: " + << "R1 : " << options.R1s.size() + << "R2 : " << options.R2s.size() + << std::endl; + + exit_with_error = true; + } + + if (options.R1s.size() == 0) { + std::cout << "ERROR: No R1 file provided\n"; + std::cerr << "ERROR: No R1 file provided\n"; + + exit_with_error = true; + } + + + if ((options.I1s.size() != options.R1s.size()) && (options.I1s.size() != 0)) { + std::cout << "ERROR: Either the number of I1 input files are equal\n" + " to the number of R1 input files, or no I1 input files\n" + " should not be provided at all.\n"; + std::cerr << "ERROR: Either the number of I1 input files are equal\n" + " to the number of R1 input files, or no I1 input files\n" + " should not be provided at all.\n"; + + exit_with_error = true; + } + + // Bam file size must be positive + if (options.bam_size <= 0) { + std::cout << "ERROR: Size of a bam file (in GB) cannot be negative\n"; + std::cerr << "ERROR: Size of a bam file (in GB) cannot be negative\n"; + exit_with_error = true; + } + + // must have a sample id + if (options.sample_id.size() == 0) { + std::cout << "ERROR: Must provide a sample id or name\n"; + std::cerr << "ERROR: Must provide a sample id or name\n"; + exit_with_error = true; + } + + // barcode length must be positive + if (options.barcode_length <= 0) { + std::cout << "ERROR: Barcode length must be a positive integer\n"; + std::cerr << "ERROR: Barcode length must be a positive integer\n"; + exit_with_error = true; + } + + // UMI length must be positive + if (options.umi_length <= 0) { + std::cout << "ERROR: UMI length must be a positive integer\n"; + std::cerr << "ERROR: UMI length must be a positive integer\n"; + exit_with_error = true; + } + + // just prints out the files + if (verbose_flag) { + if (options.I1s.size()) { + _print_file_info(options.I1s, std::string("I1")); + } + + if (options.R1s.size()) { + _print_file_info(options.R1s, std::string("R1")); + } + + if (options.R2s.size()) { + _print_file_info(options.R2s, std::string("R2")); + } + } + + if (exit_with_error) { + exit(1); + } +} + diff --git a/fastqpreprocessing/src/input_options.h b/fastqpreprocessing/src/input_options.h new file mode 100644 index 0000000..fb8fa6a --- /dev/null +++ b/fastqpreprocessing/src/input_options.h @@ -0,0 +1,40 @@ +#ifndef __INPUT_OPTIONS__ +#define __INPUT_OPTIONS__ +/** + * @file input_options.h + * @brief Utility functions for input options processing + * @author Kishori Konwar + * @date 2021-08-11 + ***********************************************/ + +#include "datatypes.h" +#include "utilities.h" +#include +#include +#include +#include +#include + +using namespace std; + + +/** + * @brief Reads the options to the fastqprocess program + * + * @param argc no of arguments to the main function + * @param argv arguments array to the main function + * @param options the structure for holding the options for getopt +*/ +void read_options_fastqprocess(int, char **, INPUT_OPTIONS_FASTQPROCESS &); + + +/** + * @brief Reads the options to the tagsort program + * + * @param argc no of arguments to the main function + * @param argv arguments array to the main function + * @param options the structure for holding the options for getopt +*/ +void read_options_tagsort(int, char **, INPUT_OPTIONS_TAGSORT &); + +#endif diff --git a/fastqpreprocessing/src/main.cpp b/fastqpreprocessing/src/main.cpp deleted file mode 100644 index 551a05b..0000000 --- a/fastqpreprocessing/src/main.cpp +++ /dev/null @@ -1,26 +0,0 @@ -/** - * @file main.cpp - * @brief The main function for the fastqprocessing step - * @author Kishori Konwar - * @date 2020-08-27 - ***********************************************/ - -#include "fastqprocess.h" -#include "utilities.h" - -/* Flag set by ‘--verbose’. */ -int main (int argc, char **argv) -{ - - INPUT_OPTIONS options; - - read_options(argc, argv, options); - - std::cout << "reading whitelist file " << options.white_list_file << "..."; - WHITE_LIST_DATA *white_list_data = read_white_list(options.white_list_file); - std::cout << "done" << std::endl; - - process_inputs(options, white_list_data); - return 0; -} - diff --git a/fastqpreprocessing/src/metricgatherer.cpp b/fastqpreprocessing/src/metricgatherer.cpp new file mode 100644 index 0000000..7a36fbc --- /dev/null +++ b/fastqpreprocessing/src/metricgatherer.cpp @@ -0,0 +1,431 @@ +/** + * @file metricgatherer.cpp + * @brief functions for file processing + * @author Kishori Konwar + * @date 2021-08-11 + ***********************************************/ +#include "metricgatherer.h" + +#define ASSERT(condition) { if(!(condition)){ std::cerr << "ASSERT FAILED: " << #condition << " @ " << __FILE__ << " (" << __LINE__ << ")" << std::endl; } } +using namespace std; + +namespace consts { + const std::string INTRONIC_ALIGNMENT_LOCATION_TAG_VALUE = "INTRONIC"; + const std::string CODING_ALIGNMENT_LOCATION_TAG_VALUE = "CODING"; + const std::string UTR_ALIGNMENT_LOCATION_TAG_VALUE = "UTR"; + const std::string INTERGENIC_ALIGNMENT_LOCATION_TAG_VALUE = "INTERGENIC"; +} + +template +inline void freeContainer(T& p_container) +{ + T empty; + using std::swap; + swap(p_container, empty); +} + + +std::string to_nan(float x) { + stringstream s; + s << std::setprecision(10) << x; + return x==-1 ? "nan" : s.str(); +} + +void Metrics::clear() { + n_reads = 0; + noise_reads = 0; //# long polymers, N-sequences; NotImplemented + freeContainer(_fragment_histogram); + freeContainer(_molecule_histogram); + + freeContainer(_molecule_barcode_fraction_bases_above_30); + perfect_molecule_barcodes = 0; + freeContainer(_genomic_reads_fraction_bases_quality_above_30); + freeContainer(_genomic_read_quality); + + reads_mapped_exonic = 0; + reads_mapped_intronic = 0; + reads_mapped_utr = 0; + + // alignment uniqueness information + reads_mapped_uniquely = 0; + reads_mapped_multiple = 0; + duplicate_reads = 0; + + // alignment splicing information + spliced_reads = 0; + antisense_reads = 0; + plus_strand_reads = 0; // strand balance + + // higher-order methods, filled in by finalize() when all data is extracted + molecule_barcode_fraction_bases_above_30_mean = -1; + molecule_barcode_fraction_bases_above_30_variance = -1; + genomic_reads_fraction_bases_quality_above_30_mean = -1; + genomic_reads_fraction_bases_quality_above_30_variance = -1; + genomic_read_quality_mean = -1; + genomic_read_quality_variance = -1; + n_molecules = -1; + n_fragments = -1; + reads_per_molecule = -1; + reads_per_fragment = -1; + fragments_per_molecule = -1; + fragments_with_single_read_evidence = -1; + molecules_with_single_read_evidence = -1; +} + + +void Metrics::output_metrics(ofstream &fmetric_out) { + fmetric_out << std::setprecision(10) << prev_tag << "," + << this->n_reads << "," + << this->noise_reads << "," + << this->perfect_molecule_barcodes << "," + << this->reads_mapped_exonic << "," + << this->reads_mapped_intronic << "," + << this->reads_mapped_utr << "," + << this->reads_mapped_uniquely << "," + << this->reads_mapped_multiple << "," + << this->duplicate_reads << "," + << this->spliced_reads << "," + << this->antisense_reads << "," + << this->molecule_barcode_fraction_bases_above_30_mean << "," + << to_nan(this->molecule_barcode_fraction_bases_above_30_variance) << "," + << this->genomic_reads_fraction_bases_quality_above_30_mean << "," + << to_nan(this->genomic_reads_fraction_bases_quality_above_30_variance) << "," + << to_nan(this->genomic_read_quality_mean) << "," + << to_nan(this->genomic_read_quality_variance) << "," + << this->n_molecules << "," + << this->n_fragments << "," + << this->reads_per_molecule << "," + << this->reads_per_fragment << "," + << this->fragments_per_molecule << "," + << this->fragments_with_single_read_evidence << "," + << this->molecules_with_single_read_evidence; +} + + +unsigned int offset = 3; +void Metrics::parse_line(std::string &str, ofstream &fmetric_out, + std::set &mitochondrial_genes, + METRIC_TYPE metric_type + ) { + char line[1000]; + std::string NONE("None"); + std::size_t len = str.copy(line, str.size(), 0); + line[str.size()]='\0'; + + assert(len < 1000); + + char *c = line; + + unsigned int k = 0; + record[k] = c; + while (*c!='\0') { + if (*c == '\t') { + *c='\0'; + record[++k] = c + 1; + } + c++; + } + + assert(k==16); + + std::string current_tag = record[0]; + + // ignore the None gene + if (current_tag.compare(NONE)==0) return; + + // load the tags + std::string first_tag, second_tag, third_tag; + first_tag = record[0]; + second_tag = record[1]; + third_tag = record[2]; + if (metric_type == GENE && first_tag.find(",")!=std::string::npos) return; + + std::string tags = first_tag + std::string("-") + second_tag + std::string("-") + third_tag; + if (prev_tag.compare(current_tag)!=0 && prev_tag.size()!=0) { + this->finalize(mitochondrial_genes); + this->output_metrics(fmetric_out); + this->output_metrics_extra(fmetric_out); + this->clear(); + } + + this->parse_extra_fields(first_tag, second_tag, third_tag, record); + + this->n_reads += 1; + + // the tags passed to this function define a molecule, this increments the counter, + // identifying a new molecule only if a new tag combination is observed + + /* updating the molecule histogram with tags */ + if (this->_molecule_histogram.find(tags)==this->_molecule_histogram.end()) + this->_molecule_histogram[tags] = 0; + this->_molecule_histogram[tags] += 1; + + this->_molecule_barcode_fraction_bases_above_30.update(std::stof(record[offset+13])); + this->perfect_molecule_barcodes += std::stoi(record[offset+9]); + + this->_genomic_reads_fraction_bases_quality_above_30.update(std::stof(record[offset + 7])); + this->_genomic_read_quality.update(std::stof(record[offset + 6])); + + // the remaining portions deal with aligned reads, so if the read is not mapped, we are + // done with it + if (std::string(record[offset + 0]).compare("*")==0) + return; + + // get components that define a unique sequence fragment and increment the histogram + std::string position_str = record[offset + 2]; + std::string strand = std::stoi(std::string(record[offset + 3]))==1 ? "true" : "false"; + string reference = record[offset + 0]; + + std::string _ref_pos_str_tags = reference + std::string("\t") + \ + position_str + std::string("\t") + \ + strand + std::string("\t") + tags; + std::string ref_pos_str_tags = std::to_string(std::hash{}(_ref_pos_str_tags)); + + /* updating the fragment histogram with tag, strand and pos */ + if (this->_fragment_histogram.find(ref_pos_str_tags)==this->_fragment_histogram.end()) + this->_fragment_histogram[ref_pos_str_tags] = 0; + this->_fragment_histogram[ref_pos_str_tags] += 1; + + std::string alignment_location = std::string(record[offset + 1]); + if (alignment_location.compare(consts::CODING_ALIGNMENT_LOCATION_TAG_VALUE)==0) { + reads_mapped_exonic += 1; + } else if (alignment_location.compare(consts::INTRONIC_ALIGNMENT_LOCATION_TAG_VALUE)==0) { + reads_mapped_intronic += 1; + } else if (alignment_location.compare(consts::UTR_ALIGNMENT_LOCATION_TAG_VALUE)==0) { + reads_mapped_utr += 1; + } + + // in futher check if read maps outside window (when we add a gene model) + // and create distances from terminate side (needs gene model) uniqueness + int number_mappings = std::stoi(std::string(record[offset + 8])); + + if (number_mappings==1) + this->reads_mapped_uniquely += 1; + else + this->reads_mapped_multiple += 1; // without multi-mapping, this number is zero! + + this->duplicate_reads += std::stoi(std::string(record[offset + 11])); + + // cigar N field (3) indicates a read is spliced if the value is non-zero + this->spliced_reads += std::stoi(std::string(record[offset + 10])); + + prev_tag = current_tag; +} + + +// Calculate metrics that require information from all molecules of an entity +// ``finalize()`` replaces attributes in-place that were initialized by the constructor as +// ``None`` with a value calculated across all molecule data that has been aggregated. + +void Metrics::finalize(std::set &mitochondrial_genes) { + this->molecule_barcode_fraction_bases_above_30_mean = this->_molecule_barcode_fraction_bases_above_30.getMean(); + + this->molecule_barcode_fraction_bases_above_30_variance = this->_molecule_barcode_fraction_bases_above_30.calculate_variance(); + + this->genomic_reads_fraction_bases_quality_above_30_mean = this->_genomic_reads_fraction_bases_quality_above_30.getMean(); + + this->genomic_reads_fraction_bases_quality_above_30_variance = this->_genomic_reads_fraction_bases_quality_above_30.calculate_variance(); + + this->genomic_read_quality_mean = this->_genomic_read_quality.getMean(); + + this->genomic_read_quality_variance = this->_genomic_read_quality.calculate_variance(); + + this->n_molecules = this->_molecule_histogram.size(); + + this->n_fragments = this->_fragment_histogram.size(); + + try { + this->reads_per_molecule = this->n_reads / this->n_molecules; + } catch (runtime_error& e) { + this->reads_per_molecule = -1; // float("nan") + } + + try { + this->reads_per_fragment = this->n_reads / this->n_fragments; + } catch (runtime_error& e) { + this->reads_per_fragment = -1; //float("nan") + } + + try { + this->fragments_per_molecule = this->n_fragments / this->n_molecules; + } catch (runtime_error& e) { + this->fragments_per_molecule = -1; // float("nan") + } + + this->fragments_with_single_read_evidence = 0; + for (auto local_cit = this->_fragment_histogram.cbegin(); + local_cit!=this->_fragment_histogram.cend(); local_cit++) + if (local_cit->second==1) + this->fragments_with_single_read_evidence++; + + + this->molecules_with_single_read_evidence = 0; + for (auto local_cit = this->_molecule_histogram.cbegin(); + local_cit!=this->_molecule_histogram.cend(); local_cit++) + if (local_cit->second==1) + this->molecules_with_single_read_evidence++; +} + +//////////////// CellMetrics //////////////////////// +std::string CellMetrics::getHeader() { + std::string s=""; + + for (int i=0; i<24; i++) { + s += std::string(",") + headers[i]; + } + + for (int i=0; i<11; i++) { + s += std::string(",") + specific_headers[i]; + } + + return s; +} + +// Parses a record to extract gene-specific information +void CellMetrics::parse_extra_fields(const std::string &first_tag, + const std::string &second_tag, + const std::string &third_tag, + char** record) { + + this->_cell_barcode_fraction_bases_above_30.update(std::stof(record[offset + 5])); + + this->perfect_cell_barcodes += std::stoi(record[offset + 12]); + + if (std::string(record[offset + 1]).size()!= 0) { + if (std::string(record[offset + 1]).compare(consts::INTERGENIC_ALIGNMENT_LOCATION_TAG_VALUE)==0) + this->reads_mapped_intergenic += 1; + } else + this->reads_unmapped += 1; + + /* updating the genes histogram with tags */ + if (this->_genes_histogram.find(third_tag)==this->_genes_histogram.end()) + this->_genes_histogram[third_tag] = 0; + this->_genes_histogram[third_tag] += 1; +} + +void CellMetrics::output_metrics_extra(ofstream &fmetric_out) { + fmetric_out << std::setprecision(10) + << "," << this->perfect_cell_barcodes + << "," << this->reads_mapped_intergenic + << "," << this->reads_unmapped + << "," << this->reads_mapped_too_many_loci + << std::setprecision(10) << "," << to_nan(this->cell_barcode_fraction_bases_above_30_variance) + << "," << this->cell_barcode_fraction_bases_above_30_mean + << "," << this->n_genes + << "," << this->genes_detected_multiple_observations + << "," << this->n_mitochondrial_genes + << "," << this->n_mitochondrial_molecules + << "," << this->pct_mitochondrial_molecules + << std::endl; +} + +void CellMetrics::finalize(std::set &mitochondrial_genes) { + // call the finalize function in the parent class + Metrics::finalize(mitochondrial_genes); + + this->cell_barcode_fraction_bases_above_30_mean = this->_cell_barcode_fraction_bases_above_30.getMean(); + + this->cell_barcode_fraction_bases_above_30_variance = this->_cell_barcode_fraction_bases_above_30.calculate_variance(); + + this->n_genes = this->_genes_histogram.size(); + + this->genes_detected_multiple_observations = 0; + for (auto local_cit = this->_genes_histogram.cbegin(); local_cit!=this->_genes_histogram.cend(); local_cit++) + if (local_cit->second > 1) + this->genes_detected_multiple_observations++; + + + this->n_mitochondrial_genes = 0; + for (auto local_cit = this->_genes_histogram.cbegin(); local_cit!=this->_genes_histogram.cend(); local_cit++) { + if (mitochondrial_genes.find(local_cit->first)!=mitochondrial_genes.end()) + this->n_mitochondrial_genes++; + } + + + this->n_mitochondrial_molecules = 0; + for (auto local_cit = this->_genes_histogram.cbegin(); local_cit!=this->_genes_histogram.cend(); local_cit++) + if (mitochondrial_genes.find(local_cit->first)!=mitochondrial_genes.end()) + this->n_mitochondrial_molecules += local_cit->second; + + if (this->n_mitochondrial_molecules > 0) { + int tot_molecules = 0; + for (auto local_cit = this->_genes_histogram.cbegin(); local_cit!=this->_genes_histogram.cend(); local_cit++) { + tot_molecules += local_cit->second; + } + this->pct_mitochondrial_molecules = (this->n_mitochondrial_molecules/tot_molecules * 100.0); + } + else + this->pct_mitochondrial_molecules = 0.0; +} + +void CellMetrics::clear() { + // call the clear function in the parent class + Metrics::clear(); + + this->_cell_barcode_fraction_bases_above_30.clear(); + this->perfect_cell_barcodes = 0; + this->reads_mapped_intergenic = 0; + this->reads_unmapped = 0; + this->reads_mapped_too_many_loci = 0; + this->_genes_histogram.clear(); + + this->n_genes = 0; + this->genes_detected_multiple_observations = 0; + this->n_mitochondrial_genes = 0; + this->n_mitochondrial_molecules = 0; + this->pct_mitochondrial_molecules = 0; +} + + +//////////////// GeneMetrics //////////////////////// +std::string GeneMetrics::getHeader() { + std::string s=""; + + for (int i=0; i<24; i++) { + s += std::string(",") + headers[i]; + } + + for (int i=0; i<2; i++) { + s += std::string(",") + specific_headers[i]; + } + + return s; +} + +void GeneMetrics::parse_extra_fields(const std::string &first_tag, + const std::string &second_tag, + const std::string &third_tag, + char** record) { + + /* updating the cell histogram with tags */ + if (this->_cells_histogram.find(second_tag)==this->_cells_histogram.end()) + this->_cells_histogram[second_tag] = 0; + + this->_cells_histogram[second_tag] += 1; +} + +void GeneMetrics::output_metrics_extra(ofstream &fmetric_out) { + fmetric_out << "," << number_cells_detected_multiple + << "," << number_cells_expressing + << std::endl; +} + +void GeneMetrics::finalize(std::set &mitochondrial_genes) { + // call the finalize function in the parent class + Metrics::finalize(mitochondrial_genes); + this->number_cells_expressing = this->_cells_histogram.size(); + + this->number_cells_detected_multiple = 0; + for (auto local_cit = this->_cells_histogram.cbegin(); + local_cit!=this->_cells_histogram.cend(); local_cit++) + if (local_cit->second > 1) + this->number_cells_detected_multiple++; +} + +void GeneMetrics::clear() { + // call the clear function in the parent class + Metrics::clear(); + number_cells_detected_multiple = 0; + number_cells_expressing = 0; + freeContainer(_cells_histogram); +} diff --git a/fastqpreprocessing/src/metricgatherer.h b/fastqpreprocessing/src/metricgatherer.h new file mode 100644 index 0000000..582229c --- /dev/null +++ b/fastqpreprocessing/src/metricgatherer.h @@ -0,0 +1,276 @@ +#ifndef __METRIC_GATHERER__ +#define __METRIC_GATHERER__ +/** + * @file metricgatherer.h + * @brief functions for file processing + * @author Kishori Konwar + * @date 2021-08-11 + ***********************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "datatypes.h" + +using namespace std; + +/* + Methods + ------- + update(new_value: float) + incorporate new_value into the online estimate of mean and variance + getMean() + return the mean value + calculate_variance() + calculate and return the variance + mean_and_variance() + return both mean and variance +*/ +class OnlineGaussianSufficientStatistic { + private: + double _mean_squared_error = 0.0; + double sum_EX2 = 0.0; + double _mean = 0.0; + double _sum = 0.0; + double _count = 0.0; + + public: + void update(double new_value) { + _count += 1.0; + _sum += new_value; + sum_EX2 += (new_value*new_value); + } + + // return the mean value + double getMean() { + _mean = _sum/_count; + return _mean; + } + + // calculate and return the variance + double calculate_variance() { + if (_count < 2) { + //return double("nan") + return -1.0; + } + return sum_EX2/(_count -1) - (_sum/_count)*(_sum/(_count -1)); + } + + void clear() { + _mean_squared_error = 0.0; + _mean = 0.0; + _count = 0; + _sum = 0; + sum_EX2 = 0.0; + } +}; + +class Metrics { + private: + // count information + int n_reads = 0; + int noise_reads = 0; //# long polymers, N-sequences; NotImplemented + + //std::unordered_map _fragment_histogram; + std::map _fragment_histogram; + + //self._molecule_histogram: Counter[str] = Counter() + std::unordered_map _molecule_histogram; + + // molecule information + OnlineGaussianSufficientStatistic _molecule_barcode_fraction_bases_above_30; + + int perfect_molecule_barcodes = 0; + + OnlineGaussianSufficientStatistic _genomic_reads_fraction_bases_quality_above_30; + + OnlineGaussianSufficientStatistic _genomic_read_quality; + + // alignment location information + int reads_mapped_exonic = 0; + int reads_mapped_intronic = 0; + int reads_mapped_utr = 0; + + // in future we can implement this when we have a gene model + // self.reads_mapped_outside_window = 0 # reads should be within 1000 bases of UTR + // self._read_distance_from_termination_site = OnlineGaussianSufficientStatistic() + + // alignment uniqueness information + int reads_mapped_uniquely = 0; + int reads_mapped_multiple = 0; + int duplicate_reads = 0; + + // alignment splicing information + int spliced_reads = 0; + int antisense_reads = 0; + int plus_strand_reads = 0; // strand balance + + // higher-order methods, filled in by finalize() when all data is extracted + float molecule_barcode_fraction_bases_above_30_mean = -1; + float molecule_barcode_fraction_bases_above_30_variance = -1; + float genomic_reads_fraction_bases_quality_above_30_mean = -1; + float genomic_reads_fraction_bases_quality_above_30_variance = -1; + float genomic_read_quality_mean = -1; + float genomic_read_quality_variance = -1; + float n_molecules = -1; + float n_fragments = -1; + float reads_per_molecule = -1; + float reads_per_fragment = -1; + float fragments_per_molecule = -1; + int fragments_with_single_read_evidence = -1; + int molecules_with_single_read_evidence = -1; + private: + std::regex rgx; + std::sregex_token_iterator end; + std::string prev_tag; + char *record[20]; + + protected: + std::string headers[24] = { + "n_reads", + "noise_reads", + "perfect_molecule_barcodes", + "reads_mapped_exonic", + "reads_mapped_intronic", + "reads_mapped_utr", + "reads_mapped_uniquely", + "reads_mapped_multiple", + "duplicate_reads", + "spliced_reads", + "antisense_reads", + "molecule_barcode_fraction_bases_above_30_mean", + "molecule_barcode_fraction_bases_above_30_variance", + "genomic_reads_fraction_bases_quality_above_30_mean", + "genomic_reads_fraction_bases_quality_above_30_variance", + "genomic_read_quality_mean", + "genomic_read_quality_variance", + "n_molecules", + "n_fragments", + "reads_per_molecule", + "reads_per_fragment", + "fragments_per_molecule", + "fragments_with_single_read_evidence", + "molecules_with_single_read_evidence" + }; + + + public: + Metrics() { + rgx.assign("\t"); + prev_tag = std::string(""); + } + virtual ~Metrics() {} + // get the headers + virtual std::string getHeader() { return std::string(""); }; + + void parse_line(std::string &str, ofstream &fmetric_out, + std::set &mitochondrial_genes, + METRIC_TYPE metric_type); + + void output_metrics(ofstream &fmetric_out); + virtual void output_metrics_extra(ofstream &fmetric_out) {} + virtual void parse_extra_fields(const std::string &first_tag, + const std::string &second_tag, + const std::string &third_tag, + char** record) {} + virtual void finalize(std::set &mitochondrial_genes); + virtual void clear(); +}; + +class CellMetrics: public Metrics { + private: + int perfect_cell_barcodes; // The number of reads whose cell barcodes contain no errors (tag ``CB`` == ``CR``) + int reads_mapped_intergenic; // The number of reads mapped to an intergenic region for this cell + + // reads unmapped + int reads_unmapped; + // The number of reads that were mapped to too many loci across the genome and as a + // consequence, are reported unmapped by the aligner + int reads_mapped_too_many_loci; + + // The variance of the fraction of Illumina base calls for the cell barcode sequence that + // are greater than 30, across molecules + float cell_barcode_fraction_bases_above_30_variance; + + // The average fraction of Illumina base calls for the cell barcode sequence that + // are greater than 30, across molecules + float cell_barcode_fraction_bases_above_30_mean; + + int n_genes; //The number of genes detected by this cell + + int genes_detected_multiple_observations; // The number of genes that are observed by more than one read in this cell + int n_mitochondrial_genes; // The number of mitochondrial genes detected by this cell + int n_mitochondrial_molecules; // The number of molecules from mitochondrial genes detected for this cell + int pct_mitochondrial_molecules; // The percentage of molecules from mitoc + + OnlineGaussianSufficientStatistic _cell_barcode_fraction_bases_above_30; + std::unordered_map _genes_histogram; + + std::string specific_headers[11] = { + "perfect_cell_barcodes", + "reads_mapped_intergenic", + "reads_unmapped", + "reads_mapped_too_many_loci", + "cell_barcode_fraction_bases_above_30_variance", + "cell_barcode_fraction_bases_above_30_mean", + "n_genes", + "genes_detected_multiple_observations", + "n_mitochondrial_genes", + "n_mitochondrial_molecules", + "pct_mitochondrial_molecules" + }; + + public: + std::string getHeader(); + void output_metrics_extra(ofstream &fmetric_out); + void parse_extra_fields(const std::string &first_tag, + const std::string &second_tag, + const std::string &third_tag, + char** record); + + void finalize(std::set &mitochondrial_genes); + + void clear(); +}; + + +class GeneMetrics: public Metrics { + private: + int number_cells_detected_multiple; + int number_cells_expressing; + + std::unordered_map _cells_histogram; + std::string specific_headers[2] = { + "number_cells_detected_multiple", + "number_cells_expressing" + }; + + public: + GeneMetrics() { + number_cells_detected_multiple = 0; + number_cells_expressing = 0; + } + + public: + std::string getHeader(); + void output_metrics_extra(ofstream &fmetric_out); + void parse_extra_fields(const std::string &first_tag, + const std::string &second_tag, + const std::string &third_tag, + char** record); + + void finalize(std::set &mitochondrial_genes); + void clear(); +}; + + + + +#endif diff --git a/fastqpreprocessing/src/sort_write.cpp b/fastqpreprocessing/src/sort_write.cpp new file mode 100644 index 0000000..652dc0f --- /dev/null +++ b/fastqpreprocessing/src/sort_write.cpp @@ -0,0 +1,94 @@ +#include "sort_write.h" +#include +/** + * @file sort_write.cpp + * @brief functions for file processing + * @author Kishori Konwar + * @date 2021-08-11 + ***********************************************/ +#define STRING_LEN 40 + +extern sem_t semaphore; +extern std::vector partial_files; +extern std::mutex mtx; + +inline bool sortbyfirst(const std::pair& a, + const std::pair& b) { + if ((*get<0>(*a.first)).compare(*get<0>(*b.first)) !=0) { + return ((*get<0>(*a.first)).compare(*get<0>(*b.first)) < 0); + } else { + if ((*get<1>(*a.first)).compare(*get<1>(*b.first)) !=0) { + return ((*get<1>(*a.first)).compare(*get<1>(*b.first)) < 0); + } + } + return ((*get<2>(*a.first)).compare(*get<2>(*b.first)) < 0); +} + + +using namespace std; +/** @copydoc write_out_partial_txt_file */ +void write_out_partial_txt_file(const vector &tuple_records, \ + std::string const & tmp_folder) {; + + std::string tempfile = tmp_folder + string("/") + random_string(STRING_LEN) + std::string(".txt"); + + ofstream output_fp; + + output_fp.open(tempfile.c_str()); + + std::vector> index_pairs; + int k = 0; + + for (auto it=tuple_records.begin(); it!=tuple_records.end(); it++, k++) { + index_pairs.push_back(std::make_pair(get<0>(*it) , k)); + } + + // sort using the lambda function + std::sort(index_pairs.begin(), index_pairs.end(), sortbyfirst); + + stringstream str(stringstream::out|stringstream::binary); + + for (auto it=index_pairs.begin(); it!=index_pairs.end(); it++, k++) { + // what if you ran out of disk space ???? NEED TO add logic + if (k%10000==0) { + output_fp.write(str.str().c_str(), str.str().length()); + str.str(""); + str.clear(); + } + + str << *get<0>(*it->first) /* frist tag */ << "\t" + << *get<1>(*it->first) /* second tag */ << "\t" + << *get<2>(*it->first) /* third tag */ << "\t" + << get<1>(tuple_records[it->second]) /* record[0] */ << "\t" + << get<2>(tuple_records[it->second]) /* record[1] */ << "\t" + << get<3>(tuple_records[it->second]) /* record[2] */ << "\t" + << get<4>(tuple_records[it->second]) /* record[3] */ << "\t" + << get<5>(tuple_records[it->second]) /* record[4] */ << "\t" + << get<6>(tuple_records[it->second]) /* record[5] */ << "\t" + << get<7>(tuple_records[it->second]) /* record[6] */ << "\t" + << get<8>(tuple_records[it->second]) /* record[7] */ << "\t" + << get<9>(tuple_records[it->second]) /* record[8] */ << "\t" + << get<10>(tuple_records[it->second]) /* record[9] */ << "\t" + << get<11>(tuple_records[it->second]) /* record[10] */ << "\t" + << get<12>(tuple_records[it->second]) /* record[11] */ << "\t" + << get<13>(tuple_records[it->second]) /* record[12] */ << "\t" + << get<14>(tuple_records[it->second]) /* record[13] */ + << std::endl; + + + } + output_fp.write(str.str().c_str(), str.str().length()); + + str.str(""); + str.clear(); + + output_fp.close(); + mtx.lock(); + partial_files.push_back(tempfile); + mtx.unlock(); + + freeStlContainer(index_pairs); + index_pairs.clear(); +} + + diff --git a/fastqpreprocessing/src/sort_write.h b/fastqpreprocessing/src/sort_write.h new file mode 100644 index 0000000..b264146 --- /dev/null +++ b/fastqpreprocessing/src/sort_write.h @@ -0,0 +1,33 @@ +#ifndef __SORT_WRITE_H__ +#define __SORT_WRITE_H__ +/** + * @file sort_write.h + * @brief functions for file processing + * @author Kishori Konwar + * @date 2021-08-11 + ***********************************************/ +#include +#include +#include + +#include "datatypes.h" +#include "utilities.h" +#include "globals.h" + +/** + * @brief This function takes a vector of tuples of the tags, sorts them + * in the dictionary order of the tags and then writes these in the same + * order to a txt file + * + * @details + * The function take the vector of tags tuples, writes the sorted tuples into + * a file. The filename is generated randomly (enought to avoid collision with other files) + * in the temp folder specified. + * + * @param tuple_records: vector &, reference to a vector of TAGTUPLES + * @return a string for the random file name +*/ +void write_out_partial_txt_file(const vector &tuple_records, \ + std::string const & tmp_folder); + +#endif diff --git a/fastqpreprocessing/src/tagsort.cpp b/fastqpreprocessing/src/tagsort.cpp new file mode 100644 index 0000000..e0eead8 --- /dev/null +++ b/fastqpreprocessing/src/tagsort.cpp @@ -0,0 +1,388 @@ +/** + * @file tagsort.cpp + * @brief functions for file processing + * @author Kishori M. Konwar + * @date 2021-08-11 + ***********************************************/ + +#include "htslib_tagsort.h" +#include "tagsort.h" +#include +#include +#include + + +extern std::vector partial_files; +int filling_counter = 0; + +inline string ltrim(std::string &s) +{ + s.erase(s.begin(),find_if_not(s.begin(),s.end(),[](int c){return isspace(c);})); + return s; +} + +inline string rtrim(string &s) { + s.erase(find_if_not(s.rbegin(),s.rend(),[](int c){return isspace(c);}).base(), s.end()); + return s; +} + + +unsigned int split_buffer_to_fields(const std::string &str, char *line, char **fields, char delim) { + // copy the string to a buffer to split by tab + str.copy(line, str.size(), 0); + line[str.size()]='\0'; + char *c = line; + + unsigned int k = 0; + fields[k] = c; + while (*c!='\0') { + if (*c == delim) { + *c='\0'; + fields[++k] = c + 1; + } + c++; + } + return k+1; +} + + +/* + * @brief retuns the set of mitochondrial gene names + * + * @param gtf file name, unzipped + * @return std::set +*/ +std::set get_mitochondrial_gene_names( + const std::string >f_file) { + + char field_buffer[1000]; + char *fields[20]; + + char attrib_buffer[1000]; + char *attribs[20]; + + char keyval_buffer[1000]; + char *keyvals[20]; + + std::set mitochondrial_gene_ids; + ifstream *input_fp = new ifstream; + + input_fp->open(gtf_file.c_str(), std::ifstream::in); + if(!input_fp->is_open()) { + std::cerr << "ERROR failed to open the GTF file " << gtf_file<< std::endl; + exit(1); + } + + for (std::string line; std::getline(*(input_fp), line); ) { + // skip comment lines + if (std::regex_search(line, std::regex("^#"))) continue; + + int num_fields = split_buffer_to_fields(line, field_buffer, fields, '\t'); + // must have at least 8 fields + assert(num_fields >= 8); + + // skip the line unless it is a gene + if (std::string(fields[2]).compare(std::string("gene"))!=0) continue; + + // split the attributes field separated by ";" + int num_attribs = split_buffer_to_fields(std::string(fields[8]), attrib_buffer, attribs, ';'); + + std::string gene_name(""); + std::string gene_id(""); + // now examine each of the attribute name-value pairs + for (int k=0; k < num_attribs; k++) { + // splied each attribute name-value pair by the space + if (std::string(attribs[k]).size()==0) continue; + std::string attrib = std::move(std::string(attribs[k])); + attrib = ltrim(attrib); + + int n = split_buffer_to_fields(attrib, keyval_buffer, keyvals, ' '); + if(n!=2) { + throw std::runtime_error("Expect 2 field but found " + std::to_string(n) + " fields"); + } + + // the second element in the pair is the value string + std::string value = std::string(keyvals[1]); + // remove the " (quotes) from the beginning and end of the value string + value.erase(std::remove_if(value.begin(), value.end(), [](unsigned char c) { return c=='\"';}), value.end()); + + if (std::string(keyvals[0]).compare("gene_id")==0) gene_id = value; + if (std::string(keyvals[0]).compare("gene_name")==0) gene_name = value; + } + if (gene_name.compare("")==0) { + throw("Malformed GTF file detected. Record is of type gene but does not have a gene_name in line" + line); + } + + if (std::regex_search(gene_name, std::regex("^mt-", std::regex_constants::icase))) { + mitochondrial_gene_ids.insert(gene_id); + } + + } + std::cout << "Number of mitochondrial genes found " << mitochondrial_gene_ids.size() << std::endl; + return mitochondrial_gene_ids; +} + + +/* + * @brief fills the buffer for the files + * + * @param contx is the context of the file +*/ +void fill_buffer(Context &contx) { + contx.data[contx.i].clear(); + int k = 0; + + ifstream *input_fp = new ifstream; + + input_fp->open(partial_files[contx.i].c_str(), std::ifstream::in); + if(!input_fp->is_open()) { + std::cerr << "ERROR failed to open the file " << partial_files[contx.i] << std::endl; + exit(1); + } + + input_fp->seekg(contx.file_offset[contx.i]); + contx.file_handles[contx.i] = input_fp; + + // the order of the loop condition is iportant first make sure if you can accomodate then try to read, + // otherwise it might create a read but never processed + for (std::string line; k < contx.BUF_SIZE && std::getline(*(contx.file_handles[contx.i]), line); k++) { + contx.data[contx.i].push_back(line); + filling_counter += 1; + } + assert(contx.data[contx.i].size() <= contx.BUF_SIZE); + + contx.file_offset[contx.i] = input_fp->tellg(); + input_fp->close(); + delete input_fp; + + contx.data_size[contx.i] = contx.data[contx.i].size(); + + if (contx.data_size[contx.i] != 0) { + contx.ptrs[contx.i] = 0; + contx.isempty[contx.i] = false; + } else { + contx.ptrs[contx.i] = contx.BUF_SIZE; + contx.isempty[contx.i] = true; + } + + +#ifdef DEBUG + std::cout << "-->" << std::endl; + for (int m = 0; m < contx.NUM_PARTS; m++) { + std::cout << "\t" << m << " : " << contx.data_size[m] << " : " << contx.ptrs[m] << std::endl; + } +#endif + +} + +/* + * @brief Merges the files that are already sorted + * + * @param INPUT_OPTIONS_TAGSORT +*/ +bool process_partial_files(const INPUT_OPTIONS_TAGSORT &options) { + const std::string &sorted_output_file = options.sorted_output_file; + bool compute_metric = (options.compute_metric==1); + bool output_sorted_info = (options.output_sorted_info==1); + const std::string &metric_type = options.metric_type; + const std::string &metric_output_file = options.metric_output_file; + + std::set mitochondrial_genes; + if (options.gtf_file.size()>0) + mitochondrial_genes = get_mitochondrial_gene_names(options.gtf_file); + + // input the buffer size and partial files + Context contx(partial_files.size(), DATA_BUFFER_SIZE); + auto cmp = [](const QUEUETUPLE &a, const QUEUETUPLE &b) { + return get<0>(a) > get<0>(b); + }; + + std::priority_queue, decltype(cmp) > heap(cmp); + + // open the file files + //fill the buffers + for (auto i=0; i < contx.NUM_PARTS; i++) { + contx.i = i; + fill_buffer(contx); + } + + std::regex rgx("\t"); + std::sregex_token_iterator end; + + // create the heap from the first batch loaded data + contx.num_active_files = 0; + for (auto i=0; i< contx.NUM_PARTS; i++){ + contx.i = i; + if (contx.ptrs[i] != contx.BUF_SIZE) { + std::sregex_token_iterator iter(contx.data[i][contx.ptrs[i]].begin(), contx.data[i][contx.ptrs[i]].end(), rgx, -1); + std::stringstream comp_tag; + for (auto k = 0; k < 3 && iter!=end; ++iter, k++) { + if (k>0) comp_tag << "\t"; + comp_tag << *iter; + } + heap.push(QUEUETUPLE(comp_tag.str(), i, contx.ptrs[i])); + contx.ptrs[i]++; + contx.num_active_files += 1; + } + } + + // now merge by pop an push + ofstream fout; + if (compute_metric==true) fout.open(sorted_output_file.c_str()); + + // pop and push from the heap + int num_alignments = 0; + int i, j; + + Metrics *metric_gatherer; + METRIC_TYPE metric_type_enum; + if (metric_type.compare("cell")==0) { + metric_gatherer = new CellMetrics; + metric_type_enum = CELL; + } + if (metric_type.compare("gene")==0) { + metric_gatherer = new GeneMetrics; + metric_type_enum = GENE; + } + + metric_gatherer->clear(); + + ofstream fmetric_out; + + if (compute_metric) { + fmetric_out.open(metric_output_file.c_str()); + fmetric_out << metric_gatherer->getHeader() << std::endl; + } + + stringstream str(stringstream::out|stringstream::binary); + std::string prev_comp_tag = ""; + while (!heap.empty()) { + // read the top + QUEUETUPLE qtuple = heap.top(); + std::string curr_comp_tag = get<0>(qtuple); + assert(prev_comp_tag.compare(curr_comp_tag) <= 0); + +#ifdef DEBUG + contx.print_status(); + if (prev_comp_tag.compare(curr_comp_tag) <= 0 ) { + std::cout << "Expected " << prev_comp_tag << "\n\t\t" << curr_comp_tag << std::endl; + } else { + std::cout << "Anomaly " << prev_comp_tag << "\n\t\t" << curr_comp_tag << std::endl; + exit(0); + } +#endif + i = get<1>(qtuple); //buffer no + j = get<2>(qtuple); //the pointer into the ith buffer array + + // pop it now + heap.pop(); + + // start writing in chunks from the stream buffer + if (num_alignments%contx.BUF_SIZE==0) { + if (output_sorted_info==true) { + fout.write(str.str().c_str(), str.str().length()); + str.clear(); + str.str(""); + } + } + + // load into stream buffer + string field = contx.data[i][j]; + if (output_sorted_info==true) str << field << std::endl; + + if (compute_metric) { + metric_gatherer->parse_line(field, fmetric_out, mitochondrial_genes, metric_type_enum); + } + num_alignments += 1; + + // if ismpty is true means the file has been fully read + if (contx.isempty[i] == false && contx.ptrs[i] == contx.data_size[i]) { + contx.i = i; + fill_buffer(contx); + } + + // make sure it is not empty + if (contx.data_size[i] > 0) { + std::sregex_token_iterator iter(contx.data[i][contx.ptrs[i]].begin(), contx.data[i][contx.ptrs[i]].end(), rgx, -1); + std::stringstream comp_tag; + for (auto k = 0; k < 3 && iter!=end; ++iter, k++) { + if (k>0) comp_tag << "\t"; + comp_tag << *iter; + } + heap.push(QUEUETUPLE(comp_tag.str(), i, contx.ptrs[i])); + contx.ptrs[i]++; + } else { // one more file is fully read + contx.num_active_files -= 1; + } + + if (num_alignments%1000000 == 0) { + std::cout << "num alns read " << num_alignments << std::endl; + } + prev_comp_tag = curr_comp_tag; + } + + // process the final line + metric_gatherer->finalize(mitochondrial_genes); + metric_gatherer->output_metrics(fmetric_out); + metric_gatherer->output_metrics_extra(fmetric_out); + delete metric_gatherer; + + // close the metric file + if (compute_metric) fmetric_out.close(); + + // write out the remaining data + if (output_sorted_info==true) { + fout.write(str.str().c_str(), str.str().length()); + str.str(""); + str.clear(); + } + + // close output files as there is no more to write + if (output_sorted_info==true) fout.close(); + + std::cout << "Written "<< num_alignments << " alignments in total" << std::endl; + contx.clear(); + return true; +} + +/* Flag set by ‘--verbose’. */ +int main (int argc, char **argv) { + + INPUT_OPTIONS_TAGSORT options; + + read_options_tagsort(argc, argv, options); + + std::cout << "bam input " << options.bam_input << std::endl; + std::cout << "temp folder " << options.temp_folder << std::endl; + std::cout << "sorted output file " << options.sorted_output_file << std::endl; + std::cout << "metric output file " << options.metric_output_file << std::endl; + std::cout << "temp folder " << options.alignments_per_thread << std::endl; + std::cout << "tags:" << std::endl; + + for(auto it = options.tag_order.begin(); it != options.tag_order.end(); it++) { + std::cout << "\t" << it->first << "\t" << it->second << std::endl; + } + + /* first create a list of sorted, and simplified sorted files */ + create_sorted_file_splits_htslib(options); + + /* now merge the sorted files to create one giant sorted file by using + a head to compare the values based on the tags used */ + std::cout << "Merging " << partial_files.size() << " sorted files!"<< std::endl; + + if(!process_partial_files(options)) { + std::cout << "Failed to complete the merging as the number of concurrently open files increased the max limit" << std::endl; + } + + // we no longer need the partial files + for (unsigned int i=0; i < partial_files.size(); i++) { + if(remove(partial_files[i].c_str()) != 0) + std::cerr << string("Error deleting file") << partial_files[i] << std::endl; + } + partial_files.clear(); + std::cout << "Aligments " << filling_counter << " loaded to buffer " << std::endl; + + + return 0; +} + diff --git a/fastqpreprocessing/src/tagsort.h b/fastqpreprocessing/src/tagsort.h new file mode 100644 index 0000000..4387456 --- /dev/null +++ b/fastqpreprocessing/src/tagsort.h @@ -0,0 +1,100 @@ +#ifndef __TAG_SORT__ +#define __TAG_SORT__ + +/** + * @file tagsort.h + * @brief Utility functions for input options processing + * @author Kishori Konwar + * @date 2021-08-11 + ***********************************************/ + +#include "utilities.h" +#include "datatypes.h" +#include "input_options.h" +#include "globals.h" +#include "metricgatherer.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + + +struct Context { + vector> data; + vector file_handles; + + vector file_offset; + vector data_size; + vector ptrs; + vector isempty; + int i = -1; + int num_active_files; + int BUF_SIZE; + int NUM_PARTS; + + Context(unsigned int num_parts, int data_buffer_size): + NUM_PARTS(num_parts), BUF_SIZE(data_buffer_size) { + + num_active_files = 0; + + // set file file handles to 0 + for (auto i=0; i < this->NUM_PARTS; i++) { + this->file_handles.push_back(0); + } + + // set the file offsets to 0 + for (auto i=0; i < this->NUM_PARTS; i++) { + this->file_offset.push_back(0); + } + + // set the isempty for each file to false + for (auto i=0; i < this->NUM_PARTS; i++) { + this->isempty.push_back(false); + } + + // set a vector of vectors of data for each file + for (auto i=0; i < this->NUM_PARTS; i++) { + this->data.push_back(std::vector()); + } + + // set the data_size of the buffer for each file to 0 + for (auto i=0; i < this->NUM_PARTS; i++) { + this->data_size.push_back(0); + } + + // set the pointer to f each buffer to thist.BUF_SIZE + for (auto i=0; i < this->NUM_PARTS; i++) { + this->ptrs.push_back(this->BUF_SIZE); + } + } + + void print_status() { + std::cout << "Contx status " << std::endl; + for (auto i=0; i < this->NUM_PARTS; i++) { + this->i = i; + std::cout << "\t" << this->i << "\t" << this->data[this->i].size() << "\t" << this->data_size[this->i] << "\t" << this->ptrs[this->i] << std::endl; + } + } + + + void clear() { + file_handles.clear(); + data_size.clear(); + ptrs.clear(); + isempty.clear(); + } +}; + +#endif diff --git a/fastqpreprocessing/src/utilities.cpp b/fastqpreprocessing/src/utilities.cpp index 8f7f48d..4bf3eac 100644 --- a/fastqpreprocessing/src/utilities.cpp +++ b/fastqpreprocessing/src/utilities.cpp @@ -2,13 +2,10 @@ * @file utilities.cpp * @brief Utility functions for file processing * @author Kishori Konwar - * @date 2020-08-27 + * @date 2021-08-11 ***********************************************/ #include "utilities.h" -#include -#include -#include using namespace std; namespace fs = std::experimental::filesystem; @@ -42,7 +39,7 @@ WHITE_LIST_DATA *read_white_list(const string &white_list_file) { //insert the barcode into the list white_list_data->barcodes.push_back(tp); - for (int i=0; i < tp.size(); i++) { + for (unsigned int i=0; i < tp.size(); i++) { for (int j=0; j < 5; j++) { char c = tp[i]; tp[i] = ATCG[j]; @@ -77,200 +74,12 @@ WHITE_LIST_DATA *read_white_list(const string &white_list_file) { return white_list_data; } -/** @copydoc read_options */ -void read_options(int argc, char **argv, INPUT_OPTIONS &options) { - int c; - int i; - - int verbose_flag = 0; - - static struct option long_options[] = { - /* These options set a flag. */ - {"verbose", no_argument, 0, 'v'}, - /* These options don’t set a flag. - We distinguish them by their indices. */ - {"barcode-length", required_argument, 0, 'b'}, - {"umi-length", required_argument, 0, 'u'}, - {"bam-size", required_argument, 0, 'B'}, - {"sample-id", required_argument, 0, 's'}, - {"I1", required_argument, 0, 'I'}, - {"R1", required_argument, 0, 'R'}, - {"R2", required_argument, 0, 'r'}, - {"white-list", required_argument, 0, 'w'}, - {0, 0, 0, 0} - }; - - // help messages when the user types -h - const char *help_messages[] = { - "verbose messages ", - "barcode length [required]", - "UMI length [required]", - "output BAM file in GB [optional: default 1 GB]", - "sample id [required]", - "I1 [optional]", - "R1 [required]", - "R2 [required]", - "whitelist (from cellranger) of barcodes [required]", - }; - - - /* getopt_long stores the option index here. */ - int option_index = 0; - while ((c = getopt_long(argc, argv, - "b:u:B:s:I:R:r:w:v", - long_options, - &option_index)) !=- 1 - ) - { - // process the option or arguments - switch (c) { - case 'v': - verbose_flag = 1; - break; - case 0: - /* If this option set a flag, do nothing else now. */ - if (long_options[option_index].flag != 0) - break; - printf("option %s", long_options[option_index].name); - if (optarg) - printf(" with arg %s", optarg); - printf("\n"); - break; - case 'b': - options.barcode_length = atoi(optarg); - break; - case 'u': - options.umi_length = atoi(optarg); - break; - case 'B': - options.bam_size = atof(optarg); - break; - case 's': - options.sample_id = string(optarg); - break; - case 'I': - options.I1s.push_back(string(optarg)); - break; - case 'R': - options.R1s.push_back(string(optarg)); - break; - case 'r': - options.R2s.push_back(string(optarg)); - break; - case 'w': - options.white_list_file = string(optarg); - break; - case '?': - case 'h': - i = 0; - printf("Usage: %s [options] \n", argv[0]); - while (long_options[i].name != 0) { - printf("\t--%-20s %-25s %-35s\n", long_options[i].name, - long_options[i].has_arg == no_argument? - "no argument" : "required_argument", - help_messages[i]); - i = i + 1; - } - /* getopt_long already printed an error message. */ - return; - break; - default: - abort(); - } - } - - // Check the options - // number of R1 and R2 files should be equal - bool exit_with_error = false; - if ((options.R1s.size() != options.R2s.size())) { - std::cout << "ERROR: Unequal number of R1 and R2 fastq files in input: " - << "R1 : " << options.R1s.size() - << "R2 : " << options.R2s.size() - << std::endl; - - std::cerr << "ERROR: Unequal number of R1 and R2 fastq files in input: " - << "R1 : " << options.R1s.size() - << "R2 : " << options.R2s.size() - << std::endl; - - exit_with_error = true; - } - - if (options.R1s.size() == 0) { - std::cout << "ERROR: No R1 file provided\n"; - std::cerr << "ERROR: No R1 file provided\n"; - - exit_with_error = true; - } - - - if ((options.I1s.size() != options.R1s.size()) && (options.I1s.size() != 0)) { - std::cout << "ERROR: Either the number of I1 input files are equal\n" - " to the number of R1 input files, or no I1 input files\n" - " should not be provided at all.\n"; - std::cerr << "ERROR: Either the number of I1 input files are equal\n" - " to the number of R1 input files, or no I1 input files\n" - " should not be provided at all.\n"; - - exit_with_error = true; - } - // Bam file size must be positive - if (options.bam_size <= 0) { - std::cout << "ERROR: Size of a bam file (in GB) cannot be negative\n"; - std::cerr << "ERROR: Size of a bam file (in GB) cannot be negative\n"; - exit_with_error = true; - } - - // must have a sample id - if (options.sample_id.size() == 0) { - std::cout << "ERROR: Must provide a sample id or name\n"; - std::cerr << "ERROR: Must provide a sample id or name\n"; - exit_with_error = true; - } - - // barcode length must be positive - if (options.barcode_length <= 0) { - std::cout << "ERROR: Barcode length must be a positive integer\n"; - std::cerr << "ERROR: Barcode length must be a positive integer\n"; - exit_with_error = true; - } - - // UMI length must be positive - if (options.umi_length <= 0) { - std::cout << "ERROR: UMI length must be a positive integer\n"; - std::cerr << "ERROR: UMI length must be a positive integer\n"; - exit_with_error = true; - } - - // just prints out the files - if (verbose_flag) { - if (options.I1s.size()) { - _print_file_info(options.I1s, std::string("I1")); - } - - if (options.R1s.size()) { - _print_file_info(options.R1s, std::string("R1")); - } - - if (options.R2s.size()) { - _print_file_info(options.R2s, std::string("R2")); - } - } - - if (exit_with_error) { - exit(1); - } - - -} - - /** @copydoc _print_file_info */ void _print_file_info(const std::vector &fastqs, const std::string &type) { if (fastqs.size()) { std::cout << "INFO " << type << " files:" << std::endl; - for (int i= 0; i < fastqs.size(); i++) { + for (unsigned int i= 0; i < fastqs.size(); i++) { if (fs::exists(fastqs[i].c_str())) { std::cout << "\t " << fastqs[i] << " exists, file size " << filesize(fastqs[i].c_str()) << std::endl; @@ -284,11 +93,10 @@ void _print_file_info(const std::vector &fastqs, } - /** @copydoc get_num_blocks */ -int64_t get_num_blocks(const INPUT_OPTIONS &options) { +int64_t get_num_blocks(const INPUT_OPTIONS_FASTQPROCESS &options) { double tot_size = 0; - for (int i= 0; i < options.R1s.size(); i++) { + for (unsigned int i= 0; i < options.R1s.size(); i++) { if (options.I1s.size()) { tot_size += filesize(options.I1s[i].c_str()); } @@ -306,3 +114,37 @@ void error(char *msg) { exit(1); } +/** @copydoc random_string **/ +std::string random_string(size_t length) +{ + auto randchar = []() -> char + { + const char charset[] = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + const size_t max_index = (sizeof(charset) - 1); + return charset[ rand() % max_index ]; + }; + std::string str(length, 0); + std::generate_n(str.begin(), length, randchar); + return str; +} + + +/** @copydoc read_lines **/ +std::vector read_lines(const std::string &file_name) { + string line; + std::vector lines; + ifstream myfile(file_name.c_str()); + if (myfile.is_open()) { + while ( getline (myfile,line) ) { + lines.push_back(line); + } + myfile.close(); + } else { + throw std::runtime_error("Could not open file"); + } + return lines; +} + diff --git a/fastqpreprocessing/src/utilities.h b/fastqpreprocessing/src/utilities.h index 168c722..89298ab 100644 --- a/fastqpreprocessing/src/utilities.h +++ b/fastqpreprocessing/src/utilities.h @@ -1,11 +1,13 @@ +#ifndef __OPTIMUS_UTILITES__ +#define __OPTIMUS_UTILITES__ + /** * @file utilities.h * @brief Utility functions for file processing * @author Kishori Konwar - * @date 2020-08-26 + * @date 2021-08-11 ***********************************************/ -#ifndef __OPTIMUS_UTILITES__ -#define __OPTIMUS_UTILITES__ + #include #include #include @@ -13,45 +15,10 @@ #include #include #include -#include - -typedef std::pair STRING_BOOL_PAIR; - -typedef std::vector STRING_VECTOR; - -typedef std::unordered_map STRING_INT32_MAP; - -// structure for correcting the barcodes -typedef struct _white_list_data { - /// an unordered map from whitelist barcodes and 1-mutations - /// to the index of the correct barcode - STRING_INT32_MAP mutations; - /// vector of whitelist barcodes - STRING_VECTOR barcodes; -} WHITE_LIST_DATA; - -/// Structure to hold input options -typedef struct _input_options { - /// Initialize some of the values - _input_options() { - barcode_length = -1; - umi_length = -1; - sample_id = ""; - bam_size = 1.0; - } - /// I1, R1 and R2 files name - std::vector I1s, R1s, R2s; - /// Barcode white list file - std::string white_list_file; - //// chemistry dependent (V2/V3) barcode and UMI length - int barcode_length, umi_length; - /// Bam file size to split by (in GB) - double bam_size; - /// sample name - std::string sample_id; -} INPUT_OPTIONS; - +#include +#include +#include "datatypes.h" /** * @brief Compute the number of bam files * @@ -61,7 +28,7 @@ typedef struct _input_options { * @param options Input options structure that contains file name */ -int64_t get_num_blocks(const INPUT_OPTIONS &options); +int64_t get_num_blocks(const INPUT_OPTIONS_FASTQPROCESS &options); /** * @brief Build barcode correction map white list barcodes & mutations @@ -80,15 +47,6 @@ int64_t get_num_blocks(const INPUT_OPTIONS &options); */ WHITE_LIST_DATA *read_white_list(const std::string &white_list_file); -/** - * @brief Reads the options to the program - * - * @param argc no of arguments to the main function - * @param argv arguments array to the main function - * @param options the structure for holding the options for getopt -*/ -void read_options(int, char **, INPUT_OPTIONS &); - /** * @brief Computes the size of a file in bytes * @@ -120,4 +78,35 @@ void error(char *msg); void _print_file_info(const std::vector &fastqs, \ const std::string &type); + +/** + * @brief this function generates a random string of a specified length + * consisting of alphanumeric characters + * + * @param length: length of the string + * @return a random alphanumeric string of specified length +*/ +std::string random_string(size_t length); + +/** + * @brief this function reads the lines in a text file into a vector + * of strings + * + * @param file_name: file name + * @return a vector of strings +*/ + +std::vector read_lines(const std::string &file_name); + +template +inline void freeStlContainer(T& p_container) +{ + return; +/* + T empty; + using std::swap; + swap(p_container, empty); +*/ +} + #endif