"""Command-line interface for crispio."""
from typing import TYPE_CHECKING, Any
from collections.abc import Callable
from argparse import Namespace, FileType
from dataclasses import replace
from functools import wraps
from io import TextIOWrapper
import os
import signal
import sys
if TYPE_CHECKING:
from bioino import FastaCollection, FastaSequence, GffFile
else:
FastaCollection, FastaSequence, GffFile = Any, Any, Any
from carabiner import print_err
from carabiner.cliutils import CLIApp, CLICommand, CLIOption, clicommand
from tqdm.auto import tqdm
from . import appname, __version__
from .utils import sequences
def _allow_broken_pipe(f: Callable) -> Callable:
@wraps(f)
def _f(*args, **kwargs):
try:
return f(*args, **kwargs)
except BrokenPipeError:
sys.exit(0)
return _f
def _load_genome_and_gff(
genome: TextIOWrapper,
gff: TextIOWrapper
) -> tuple[FastaSequence, GffFile]:
from bioino import FastaCollection, GffFile
from streq import Circular
fasta_sequences = [
replace(
s,
sequence=Circular(s.sequence),
) for s in FastaCollection.from_file(genome).sequences
]
if not fasta_sequences:
raise IOError(f"No sequences found in {genome.name}.")
genome_filename = os.path.basename(genome.name)
gff_filename = os.path.basename(gff.name)
gff_data = GffFile.from_file(gff)
seq_metadata = [
('genome-sequence', 'constrained', [seq.name, 1, len(seq.sequence)])
for seq in fasta_sequences
]
for seq in fasta_sequences:
seq_metadata.append(
('genome-description', 'constrained', [seq.description])
)
new_metadata = seq_metadata + [
('genome-filename', 'constrained', [genome_filename]),
('sgRNA-map', 'free', [__package__, genome_filename, gff_filename]),
]
old_metadata = list(gff_data.metadata.data)
gff_data = replace(
gff_data,
lookup=True,
metadata=old_metadata + new_metadata,
)
return fasta_sequences, gff_data
def _prepare_to_search(args: Namespace) -> tuple[FastaSequence, GffFile, str, dict[str, str | int]]:
fasta_sequences, gff_data = _load_genome_and_gff(
args.genome,
args.annotations,
)
pam_search = sequences.pams.get(args.pam, args.pam)
sgRNA_defaults = {
"source": __package__,
"feature": 'protospacer',
"score": '.',
"phase": '.',
}
return fasta_sequences, gff_data, pam_search, sgRNA_defaults
@clicommand(message=f"Mapping sgRNAs with the following parameters ({appname} v{__version__})")
def _map(args: Namespace) -> None:
from bioino import FastaCollection
from .map import GuideLibrary
fasta_sequences, gff_data, pam_search, sgRNA_defaults = _prepare_to_search(args)
guide_sequences = list(FastaCollection.from_file(args.input).sequences)
n_guide_sequences = len(guide_sequences)
print_err(
f'\nFinding sgRNA sites matching {n_guide_sequences} sequences',
f'from {args.input.name} and matching',
f'PAM {args.pam} ({pam_search})',
f'in {args.genome.name}...',
)
gff_data.metadata.write(file=args.output)
for seq in fasta_sequences:
print_err(f'\n Chromosome: {seq.name} ({len(seq.sequence):,} bp)')
guide_library = GuideLibrary.from_mapping(
guide_seq=guide_sequences,
genome=seq.sequence,
pam_search=pam_search,
limit=args.limit,
)
for guide_match in guide_library.as_gff(
annotations_from=gff_data,
tags=args.attributes,
gff_defaults=sgRNA_defaults | {"seqid": seq.name},
):
_allow_broken_pipe(guide_match.write)(file=args.output)
return None
@clicommand(message=f"Generating sgRNAs with the following parameters (crispio v{__version__})")
def _generate(args: Namespace) -> None:
from .map import GuideLibrary
import io
fasta_sequences, gff_data, pam_search, sgRNA_defaults = _prepare_to_search(args)
gff_data.metadata.write(file=args.output)
for seq in fasta_sequences:
print_err(f'\n Chromosome: {seq.name} ({len(seq.sequence):,} bp)')
guide_library = GuideLibrary.from_generating(
max_length=args.max_length,
min_length=args.min_length,
genome=seq.sequence,
pam_search=pam_search,
limit=args.limit,
)
for guide_match in guide_library.as_gff(
max_per_collection=1,
annotations_from=gff_data,
tags=args.attributes,
gff_defaults=sgRNA_defaults | {"seqid": seq.name},
):
_allow_broken_pipe(guide_match.write)(file=args.output)
return None
@clicommand(message=f"Featurizing sgRNAs with the following parameters (crispio v{__version__})")
def _featurize(args: Namespace) -> None:
from bioino import GffFile
from .features import featurize
try:
scaffold = sequences.scaffolds[args.scaffold]
except KeyError:
scaffold = args.scaffold
print_err('> Generating features for guides...')
input_gff = GffFile.from_file(args.input)
input_gff.metadata.write(file=args.output)
with tqdm(input_gff.lines, desc="Featurizing") as t: ## run a progress bar
for gff_line in t:
t.set_postfix(current=gff_line.attributes["Name"][:40])
new_features = featurize(
gff_line,
scaffold=scaffold,
)
attr = gff_line.attributes.copy()
attr.update(new_features)
gff_line = replace(gff_line, attributes=attr)
_allow_broken_pipe(gff_line.write)(file=args.output)
return None
@clicommand(message=f"Detecting off-targets with the following parameters (crispio v{__version__})")
def _offtarget(args: Namespace) -> None:
from bioino import GffFile
from .crosstalk import _get_mismatches
gff1 = GffFile.from_file(args.input)
gff2 = GffFile.from_file(args.gff2)
gff2_lines = tuple(gff2.lines)
n_mismatches = 0
pairs_checked = set()
new_metadata = [
('crosstalk-comparator', 'free', [os.path.basename(args.gff2.name)]),
]
old_metadata = list(gff1.metadata.data)
new_metadata = replace(
gff1.metadata,
data=old_metadata + new_metadata,
)
new_metadata.write(file=args.output)
with tqdm(gff1.lines) as t:
for gff_line1 in t:
mm = {}
for gff_line2 in gff2_lines:
t.set_postfix(mismatches=n_mismatches)
pair, added_mm = _get_mismatches(
gff_line1,
gff_line2,
maximum=args.mismatches,
pairs_checked=pairs_checked,
)
pairs_checked.add(pair)
mm.update(added_mm)
n_mismatches = len(mm)
attr = gff_line1.attributes.copy()
if len(mm) > 0:
attr['crosstalk'] = ('+'.join(f'{key}~{val}'for key, val in mm.items()))
gff_line1 = replace(gff_line1, attributes=attr)
_allow_broken_pipe(gff_line1.write)(args.output)
return None
[docs]
def main():
try:
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
except AttributeError:
pass
length_max = CLIOption(
'--max_length', '-l',
type=int,
default=20,
help='Maximum length.',
)
length_min = CLIOption(
'--min_length', '-m',
type=int,
default=None,
help='Minimum length.',
)
# featurize
scaffold = CLIOption(
'--scaffold', '-s',
dest='scaffold',
type=str,
required=True,
help=(
'Name of a scaffold from "{}" or a scaffold sequence.'
).format('", "'.join(sequences.scaffolds)),
)
# offtarget
gff2 = CLIOption(
'--gff2', '-2',
type=FileType('r'),
required=True,
help='GFF file containing protospacers for comparison.',
)
mismatches = CLIOption(
'--mismatches', '-e',
type=int,
default=2,
help='Maximum number of edits between guides to mark a close match.',
)
inputs = CLIOption(
'input',
type=FileType('r'),
default=sys.stdin,
nargs='?',
help='Input file. Default: STDIN.',
)
pam = CLIOption(
'--pam', '-p',
type=str,
required=True,
help=('Either a PAM sequence or one of "{}".').format('", "'.join(sequences.pams)),
)
genome = CLIOption(
'--genome', '-g',
type=FileType('r'),
required=True,
help='Genome FASTA file',
)
annotations = CLIOption(
'--annotations', '-a',
type=FileType('r'),
required=True,
help='GFF file.',
)
attributes = CLIOption(
'--attributes', '-t',
type=str,
nargs='*',
default=['Name', 'locus_tag', 'old_locus_tag', 'gene', 'gene_biotype'],
help='Tag to use in attribute field (column 9) of GFF file.',
)
_limit = CLIOption(
'--limit', '-n',
type=int,
default=None,
help="Number to generate. Default: generate all.",
)
outputs = CLIOption(
'--output', '-o',
type=FileType('w'),
default=sys.stdout,
help='Output file. Default: STDOUT',
)
generate = CLICommand(
"generate",
description="Generate and annotate all guide RNAs for a given genome.",
main=_generate,
options=[
length_max,
length_min,
pam,
genome,
annotations,
_limit,
attributes,
outputs,
],
)
map_guides = CLICommand(
"map",
description="Map and annotate provided guide RNAs to a given genome.",
main=_map,
options=[
inputs,
pam,
genome,
annotations,
_limit,
attributes,
outputs,
],
)
featurize = CLICommand(
"featurize",
description="Annotate guide RNAs with additional calculated features.",
main=_featurize,
options=[
inputs,
scaffold,
outputs,
],
)
offtarget = CLICommand(
"offtarget",
description="Compare two sets of guide RNAs for potential cross-target activity.",
main=_offtarget,
options=[
inputs,
gff2,
mismatches,
outputs,
],
)
app = CLIApp(
appname,
description="Design and analysis CRISPR guides.",
version=__version__,
commands=[
generate,
map_guides,
featurize,
offtarget,
],
)
app.run()
return None
if __name__ == '__main__':
main()