Skip to content

Commit

Permalink
Merge pull request #43 from FOI-Bioinformatics/version_update_v0.4.2
Browse files Browse the repository at this point in the history
Version update v0.4.2
  • Loading branch information
davve2 authored Jul 9, 2021
2 parents cf99c8d + de1f6d2 commit cf05579
Show file tree
Hide file tree
Showing 16 changed files with 224 additions and 91 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ build*
test*
*.dmp
.ctbd
*.db
*.txt
#*.db
#*.txt
*egg*
dist*
logs*
9 changes: 7 additions & 2 deletions flextaxd/create_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(self, message,expression=""):
parser = argparse.ArgumentParser()
basic = parser.add_argument_group('basic', 'Basic commands')
basic.add_argument('-o', '--outdir',metavar="", default=".", help="Output directory (same directory as custom_taxonomy_databases dump)")
basic.add_argument('-db', '--database',metavar="", type=str, default=".ctdb" , help="Custom taxonomy sqlite3 database file")
basic.add_argument('-db', '--database', '--db' ,metavar="", type=str, default=".ctdb" , help="Custom taxonomy sqlite3 database file")

### Download options, process local directory and potentially download files
download_opts = parser.add_argument_group('download_opts', "Download and file handling")
Expand Down Expand Up @@ -116,6 +116,10 @@ def __init__(self, message,expression=""):

parser.add_argument("--version", action='store_true', help=argparse.SUPPRESS)

if len(sys.argv)==1:
parser.print_help(sys.stderr)
sys.exit(1)

args = parser.parse_args()

if not os.path.exists(args.database):
Expand Down Expand Up @@ -185,7 +189,7 @@ def __init__(self, message,expression=""):
if args.download or args.representative:
download = dynamic_import("modules", "DownloadGenomes")
download_obj = download(args.processes,outdir=args.outdir,force=args.force_download,download_path=args.genomes_path)
new_genome_path, missing = download_obj.run(missing,args.rep_path)
new_genome_path, missing = download_obj.run(missing,representative=args.representative,url=args.rep_path)
if not new_genome_path:
still_missing = missing
if len(still_missing) > 0: print("Not able to download: {nr}".format(nr=len(still_missing)))
Expand All @@ -209,6 +213,7 @@ def __init__(self, message,expression=""):
limit = 10
'''Use the genome -> path dictionary to build database'''
if not skip:
logger.info("Get genomes from input directory!")
genomes = process_directory_obj.get_genome_path_dict()
else: genomes=False
if args.skip:
Expand Down
7 changes: 6 additions & 1 deletion flextaxd/custom_taxonomy_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
each column (not <tab>|<tab>).
'''

__version__ = "0.4.1"
__version__ = "0.4.2"
__author__ = "David Sundell"
__credits__ = ["David Sundell"]
__license__ = "GPLv3"
Expand Down Expand Up @@ -147,6 +147,10 @@ def __init__(self, message,expression=""):

parser.add_argument("--version", action='store_true', help=argparse.SUPPRESS)

if len(sys.argv)==1:
parser.print_help(sys.stderr)
sys.exit(1)

args = parser.parse_args()

if args.version:
Expand Down Expand Up @@ -187,6 +191,7 @@ def __init__(self, message,expression=""):
if args.force:
force = True


### Run pipeline

'''If database is not given, and input data is not given raise error'''
Expand Down
2 changes: 2 additions & 0 deletions flextaxd/modules/CreateKrakenDatabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def __init__(self, database, kraken_database, genome_names, outdir,verbose=False
if genome_names:
self.genome_names = list(genome_names.keys()) ## List for multiprocessing
self.genome_path = genome_names ## genome_id to path dictionary
else:
logger.error("Genome names are missing")
self.accession_to_taxid = self.database.get_genomes(self.database)
self.files = []
self.params = params
Expand Down
109 changes: 62 additions & 47 deletions flextaxd/modules/DownloadGenomes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from modules.functions import download_genomes
from multiprocessing import Process,Manager,Pool
from subprocess import Popen,PIPE
import os
import os,time

class DownloadGenomes(object):
"""DownloadGenomes takes a list of genome names (GCF or GCA) and download (if possible) files from NCBI refseq and or genbank"""
Expand Down Expand Up @@ -85,7 +85,7 @@ def download_represenatives(self,genome_path,url="https://data.ace.uq.edu.au/pub
logger.debug(" ".join(args))
logger.info("Waiting for download process to finish (this may take a while)!")
p = Popen(args, stdout=PIPE)
(output, err) = p.communicate()
(output, err) = p.communicate()
p_status = p.wait()
return input_file_name

Expand Down Expand Up @@ -114,52 +114,65 @@ def download_files(self,files):
Returns
list - list of files not downloaded
'''
logger.info("Downloading {files} files".format(files=len(files)))
if len(files) < self.processes:
self.processes = len(files)
self.download_map = self._split(files,self.processes)
logger.info("Using {np} parallel processes to download files".format(np=self.processes))

'''function to run download of genomes in paralell'''
jobs = []
manager = Manager()
added = manager.Queue()
fpath = manager.Queue()
missing = manager.Queue()
for i in range(self.processes):
p = Process(target=download_genomes, args=(self.download_map[i],added,fpath,missing,self.force))
p.daemon=True
p.start()
jobs.append(p)
for job in jobs:
job.join()
self.added = added.qsize()
if not any(proc.is_alive() for proc in jobs):
logger.info('All download processes completed')
elif len(added.qsize()) % 100 == 0:
print("{n} downloaded".format(added.qsize()),end="\r")

count = 0
while True:
if added.qsize() == 0: ## Check if no genome was succesfully downlaoded if no break
logger.info("None of listed genomes could be downloaded! Files not downloaded will be printed to {outdir}/FlexTaxD.missing".format(outdir=self.outdir.rstrip("/")))
self.write_missing(files)
break
l = len(self.genome_names)
gen = added.get()
if gen:
self.genome_names += gen
self.genome_path[gen] = fpath.get()
if l == len(self.genome_names):
try:
logger.info("Downloading {files} files".format(files=len(files)))
if len(files) < self.processes:
self.processes = len(files)
self.download_map = self._split(files,self.processes)
logger.info("Using {np} parallel processes to download files".format(np=self.processes))

'''function to run download of genomes in paralell'''
jobs = []
manager = Manager()
added = manager.Queue()
fpath = manager.Queue()
missing = manager.Queue()
for i in range(self.processes):
p = Process(target=download_genomes, args=(self.download_map[i],added,fpath,missing,self.force))
p.daemon=True
p.start()
jobs.append(p)
for job in jobs:
job.join()
self.added = added.qsize()
if not any(proc.is_alive() for proc in jobs):
logger.info('All download processes completed')
elif len(added.qsize()) % 100 == 0:
print("{n} downloaded".format(added.qsize()),end="\r")

count = 0
while True:
if self.added == 0: ## Check if no genome was succesfully downlaoded if no break
logger.info("None of listed genomes could be downloaded! Files not downloaded will be printed to {outdir}/FlexTaxD.missing".format(outdir=self.outdir.rstrip("/")))
self.write_missing(files)
break
else:
self.not_downloaded += missing.get()
count+=1
if len(self.not_downloaded) > 0:
self.write_missing(self.not_downloaded)
l = len(self.genome_names)
gen = added.get()
if gen:
self.genome_names += gen
self.genome_path[gen] = fpath.get()
if l == len(self.genome_names):
break
else:
self.not_downloaded += missing.get()
count+=1
if added.qsize():
break
if len(self.not_downloaded) > 0:
self.write_missing(self.not_downloaded)
except KeyboardInterrupt:
logger.info("Program was interrupted by user: cleaning up subprocesses!")
finally: ## Make sure all sub-processes are ended even if program is forced to quit
if any(proc.is_alive() for proc in jobs):
for p in jobs:
print(p)
p.kill()
time.sleep(1)
if any(proc.is_alive() for proc in jobs):
logger.error("Could not stop all subprocesses check your process manager and end them manually!")
return self.not_downloaded

def run(self, files, representatives=False):
def run(self, files, representative=False,url=""):
'''Download list of GCF and or GCA files from NCBI or download represenative genomes
Parameters
Expand All @@ -169,12 +182,14 @@ def run(self, files, representatives=False):
Returns
list - list of files not downloaded
'''
if representatives:
tarfile = self.download_represenatives(genome_path=self.download_path, url=representatives)
if representative:
logger.info("Download GTDB Representative genomes")
tarfile = self.download_represenatives(genome_path=self.download_path, url=url)
folder_path = self.parse_representatives(tarfile)
'''Process the downloaded folder'''
return folder_path,[]
else:
if len(files) > 0:
logger.info("Download missing genomes")
not_downloaded = self.download_files(files)
return False,not_downloaded
11 changes: 9 additions & 2 deletions flextaxd/modules/ModifyTree.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def __init__(self, database=".taxonomydb", mod_database=False, mod_file=False, c
self.ncbi_order = True
self.mod_genomes =False

self.fast_clean = True

### Connect to or create database
self.taxonomydb = ModifyFunctions(database,verbose=verbose)
self.rank= self.taxonomydb.get_rank(col=2)
Expand Down Expand Up @@ -398,7 +400,7 @@ def clean_database(self, ncbi=False):
self.all_nodes = set(self.taxonomydb.get_nodes(col=1).keys())
'''Add parents to all nodes that may not have annotations'''
logger.info("Retrieve all parents of annotated nodes")
self.annotated_nodes |= self.taxonomydb.get_parents(self.annotated_nodes,find_all=True)
self.annotated_nodes |= self.taxonomydb.get_parents(self.annotated_nodes,find_all=True,ncbi=ncbi)
logger.info("Parents added: {an}".format(an=len(self.annotated_nodes)-an))
if ncbi:
logger.info("Keep main nodes of the NCBI taxonomy (parents on level 3 and above)")
Expand All @@ -415,7 +417,12 @@ def clean_database(self, ncbi=False):
logger.debug("Nodes remaining {nnodes}".format(nnodes=len(self.annotated_nodes)))
logger.info("Clean annotations related to removed nodes")
if len(self.clean_links) < len(self.all_links) and len(self.clean_links) > 0:
self.taxonomydb.fast_delete_links(self.clean_links)
logger.info("Cleaning {nlinks} links".format(nlinks=len(self.clean_links)))
if self.fast_clean:
self.taxonomydb.fast_delete_links(self.clean_links)
else:
logger.info("Cleaning tree (this will take a long time for large trees)")
self.taxonomydb.delete_links(self.clean_links)
if len(self.clean_nodes) < len(self.all_nodes) and len(self.clean_nodes) > 0:
self.taxonomydb.delete_nodes(self.clean_nodes)
if not ncbi:
Expand Down
4 changes: 3 additions & 1 deletion flextaxd/modules/NewickTree.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,13 @@ def print(self,type="newick"):
exists = importlib.find_loader('Bio')
if not exists:
raise VisualisationError("Visualisations other than newick requires biopython package (conda install biopython)!")

from Bio import Phylo
self.phylo = Phylo.read(StringIO(self.newickTree), "newick")
if type == "newick_vis":
Phylo.draw_ascii(self.phylo)
exists = importlib.find_loader('matplotlib')
if not exists:
raise VisualisationError("Visualisations using newick tree requires the matplotlib package (conda install matplotlib)!")
import matplotlib.pylab as pylab
if type == "tree":
Phylo.draw(self.phylo)
Expand Down
1 change: 1 addition & 0 deletions flextaxd/modules/ReadTaxonomy.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def read_nodes(self, treefile=False):
with self.zopen(treefile, "r") as _treefile:
headers = _treefile.readline().strip().split(self.sep)
if "parent" not in headers or "child" not in headers:
logger.debug("Headers: {h} separator [{sep}]".format(h=headers,sep=self.sep))
raise InputError("Your input tree file does not contain the headers to specify child and parent!")
if headers[0] == "parent":
swap = True
Expand Down
37 changes: 25 additions & 12 deletions flextaxd/modules/ReadTaxonomyNCBI.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from .ReadTaxonomy import ReadTaxonomy
from gzip import open as zopen
import zlib
import os
import logging
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -98,18 +99,30 @@ def parse_genomeid2taxid(self, genomes_path,annotation_file):
if not annotation_file.endswith("accession2taxid.gz"):
raise TypeError("The supplied annotation file does not seem to be the ncbi nucl_gb.accession2taxid.gz")
annotated_genome = set()
with zopen(annotation_file,"r") as f:
headers = f.readline().split(b"\t")
for row in f:
if row.strip() != "": ## If there are trailing empty lines in the file
refseqid,taxid = row.split(b"\t")[1:3]
try:
genebankid = self.refseqid_to_GCF[refseqid]
self.database.add_genome(genome=genebankid,_id=taxid.decode("utf-8"))
annotated_genome.add(refseqid)
except:
pass
self.database.commit()
try:
with zopen(annotation_file,"r") as f:
headers = f.readline().split(b"\t")
for row in f:
if row.strip() != "": ## If there are trailing empty lines in the file
if len(row.split(b"\t")) > 2:
try:
refseqid,taxid = row.split(b"\t")[1:3]
except:
logger.info(row)
logger.info(row.split(b"\t"))
if len(annotated_genome) > 0:
logger.info("Potential error in last row?")
else:
logger.info("Error on first line in annotation file, check format!")
try:
genebankid = self.refseqid_to_GCF[refseqid]
self.database.add_genome(genome=genebankid,_id=taxid.decode("utf-8"))
annotated_genome.add(refseqid)
except KeyError:
pass
self.database.commit()
except zlib.error as e:
logger.info("Error in annotation file {e}".format(e=e))
missing = set(self.refseqid_to_GCF.keys()) - annotated_genome
missing = [self.refseqid_to_GCF[m] for m in missing] ## Translate to GCF ids
if logging.root.level <=20: ## Equal to --verbose
Expand Down
Loading

0 comments on commit cf05579

Please sign in to comment.