Source code for crispio.cli

"""Command-line interface for crispio."""
from typing import TYPE_CHECKING, Any
from collections.abc import Callable

from argparse import Namespace, FileType
from dataclasses import replace
from functools import wraps
from io import TextIOWrapper
import os
import signal
import sys

if TYPE_CHECKING:
    from bioino import FastaCollection, FastaSequence, GffFile
else:
    FastaCollection, FastaSequence, GffFile = Any, Any, Any
from carabiner import print_err
from carabiner.cliutils import CLIApp, CLICommand, CLIOption, clicommand
from tqdm.auto import tqdm

from . import appname, __version__

from .utils import sequences

def _allow_broken_pipe(f: Callable) -> Callable:

    @wraps(f)
    def _f(*args, **kwargs):
        try:
            return f(*args, **kwargs)
        except BrokenPipeError:

            sys.exit(0)

    return _f


def _load_genome_and_gff(
    genome: TextIOWrapper, 
    gff: TextIOWrapper
) -> tuple[FastaSequence, GffFile]:
    from bioino import FastaCollection, GffFile
    from streq import Circular

    fasta_sequences = [
        replace(
            s,
            sequence=Circular(s.sequence),
        ) for s in FastaCollection.from_file(genome).sequences
    ]
    if not fasta_sequences:
        raise IOError(f"No sequences found in {genome.name}.")

    genome_filename = os.path.basename(genome.name)
    gff_filename = os.path.basename(gff.name)

    gff_data = GffFile.from_file(gff)

    seq_metadata = [
        ('genome-sequence', 'constrained', [seq.name, 1, len(seq.sequence)])
        for seq in fasta_sequences
    ]
    for seq in fasta_sequences:
        seq_metadata.append(
            ('genome-description', 'constrained', [seq.description])
        )

    new_metadata = seq_metadata + [
        ('genome-filename', 'constrained', [genome_filename]),
        ('sgRNA-map', 'free', [__package__, genome_filename, gff_filename]),
    ]
    old_metadata = list(gff_data.metadata.data)
    gff_data = replace(
        gff_data, 
        lookup=True,
        metadata=old_metadata + new_metadata,
    )
    return fasta_sequences, gff_data


def _prepare_to_search(args: Namespace) -> tuple[FastaSequence, GffFile, str, dict[str, str | int]]:
    fasta_sequences, gff_data = _load_genome_and_gff(
        args.genome,
        args.annotations,
    )
    pam_search = sequences.pams.get(args.pam, args.pam)
    sgRNA_defaults = {
        "source": __package__,
        "feature": 'protospacer',
        "score": '.',
        "phase": '.',
    }
    return fasta_sequences, gff_data, pam_search, sgRNA_defaults


@clicommand(message=f"Mapping sgRNAs with the following parameters ({appname} v{__version__})")
def _map(args: Namespace) -> None:

    from bioino import FastaCollection
    from .map import GuideLibrary

    fasta_sequences, gff_data, pam_search, sgRNA_defaults = _prepare_to_search(args)

    guide_sequences = list(FastaCollection.from_file(args.input).sequences)
    n_guide_sequences = len(guide_sequences)

    print_err(
        f'\nFinding sgRNA sites matching {n_guide_sequences} sequences',
        f'from {args.input.name} and matching',
        f'PAM {args.pam} ({pam_search})',
        f'in {args.genome.name}...',
    )
    gff_data.metadata.write(file=args.output)
    for seq in fasta_sequences:
        print_err(f'\n  Chromosome: {seq.name} ({len(seq.sequence):,} bp)')
        guide_library = GuideLibrary.from_mapping(
            guide_seq=guide_sequences,
            genome=seq.sequence,
            pam_search=pam_search,
            limit=args.limit,
        )
        for guide_match in guide_library.as_gff(
            annotations_from=gff_data,
            tags=args.attributes,
            gff_defaults=sgRNA_defaults | {"seqid": seq.name},
        ):
            _allow_broken_pipe(guide_match.write)(file=args.output)
    return None


@clicommand(message=f"Generating sgRNAs with the following parameters (crispio v{__version__})")
def _generate(args: Namespace) -> None:
    from .map import GuideLibrary
    import io

    fasta_sequences, gff_data, pam_search, sgRNA_defaults = _prepare_to_search(args)
    gff_data.metadata.write(file=args.output)

    for seq in fasta_sequences:
        print_err(f'\n  Chromosome: {seq.name} ({len(seq.sequence):,} bp)')
        guide_library = GuideLibrary.from_generating(
            max_length=args.max_length,
            min_length=args.min_length,
            genome=seq.sequence,
            pam_search=pam_search,
            limit=args.limit,
        )
    
        for guide_match in guide_library.as_gff(
            max_per_collection=1,
            annotations_from=gff_data,
            tags=args.attributes,
            gff_defaults=sgRNA_defaults | {"seqid": seq.name},
        ):
            _allow_broken_pipe(guide_match.write)(file=args.output)
    return None


@clicommand(message=f"Featurizing sgRNAs with the following parameters (crispio v{__version__})")
def _featurize(args: Namespace) -> None:
    from bioino import GffFile
    from .features import featurize
    try:
        scaffold = sequences.scaffolds[args.scaffold]
    except KeyError:
        scaffold = args.scaffold

    print_err('> Generating features for guides...')

    input_gff = GffFile.from_file(args.input)
    input_gff.metadata.write(file=args.output)

    with tqdm(input_gff.lines, desc="Featurizing") as t:  ## run a progress bar
        for gff_line in t:
            t.set_postfix(current=gff_line.attributes["Name"][:40])
            new_features = featurize(
                gff_line, 
                scaffold=scaffold,
            )
            attr = gff_line.attributes.copy()
            attr.update(new_features)
            gff_line = replace(gff_line, attributes=attr)
            _allow_broken_pipe(gff_line.write)(file=args.output)
    
    return None


@clicommand(message=f"Detecting off-targets with the following parameters (crispio v{__version__})")
def _offtarget(args: Namespace) -> None:
    from bioino import GffFile
    from .crosstalk import _get_mismatches

    gff1 = GffFile.from_file(args.input)
    gff2 = GffFile.from_file(args.gff2)

    gff2_lines = tuple(gff2.lines)
    
    n_mismatches = 0 
    pairs_checked = set()

    new_metadata = [
        ('crosstalk-comparator', 'free', [os.path.basename(args.gff2.name)]),
    ]
    old_metadata = list(gff1.metadata.data)
    new_metadata = replace(
        gff1.metadata, 
        data=old_metadata + new_metadata,
    )
    new_metadata.write(file=args.output)

    with tqdm(gff1.lines) as t:
        for gff_line1 in t:
            mm = {}
            for gff_line2 in gff2_lines:
                t.set_postfix(mismatches=n_mismatches)
                pair, added_mm = _get_mismatches(
                    gff_line1, 
                    gff_line2, 
                    maximum=args.mismatches,
                    pairs_checked=pairs_checked,
                )
                pairs_checked.add(pair)
                mm.update(added_mm)
                n_mismatches = len(mm)

            attr = gff_line1.attributes.copy()
            if len(mm) > 0:
                attr['crosstalk'] = ('+'.join(f'{key}~{val}'for key, val in mm.items()))
            gff_line1 = replace(gff_line1, attributes=attr)
            _allow_broken_pipe(gff_line1.write)(args.output)
            
    return None


[docs] def main(): try: signal.signal(signal.SIGPIPE, signal.SIG_DFL) except AttributeError: pass length_max = CLIOption( '--max_length', '-l', type=int, default=20, help='Maximum length.', ) length_min = CLIOption( '--min_length', '-m', type=int, default=None, help='Minimum length.', ) # featurize scaffold = CLIOption( '--scaffold', '-s', dest='scaffold', type=str, required=True, help=( 'Name of a scaffold from "{}" or a scaffold sequence.' ).format('", "'.join(sequences.scaffolds)), ) # offtarget gff2 = CLIOption( '--gff2', '-2', type=FileType('r'), required=True, help='GFF file containing protospacers for comparison.', ) mismatches = CLIOption( '--mismatches', '-e', type=int, default=2, help='Maximum number of edits between guides to mark a close match.', ) inputs = CLIOption( 'input', type=FileType('r'), default=sys.stdin, nargs='?', help='Input file. Default: STDIN.', ) pam = CLIOption( '--pam', '-p', type=str, required=True, help=('Either a PAM sequence or one of "{}".').format('", "'.join(sequences.pams)), ) genome = CLIOption( '--genome', '-g', type=FileType('r'), required=True, help='Genome FASTA file', ) annotations = CLIOption( '--annotations', '-a', type=FileType('r'), required=True, help='GFF file.', ) attributes = CLIOption( '--attributes', '-t', type=str, nargs='*', default=['Name', 'locus_tag', 'old_locus_tag', 'gene', 'gene_biotype'], help='Tag to use in attribute field (column 9) of GFF file.', ) _limit = CLIOption( '--limit', '-n', type=int, default=None, help="Number to generate. Default: generate all.", ) outputs = CLIOption( '--output', '-o', type=FileType('w'), default=sys.stdout, help='Output file. Default: STDOUT', ) generate = CLICommand( "generate", description="Generate and annotate all guide RNAs for a given genome.", main=_generate, options=[ length_max, length_min, pam, genome, annotations, _limit, attributes, outputs, ], ) map_guides = CLICommand( "map", description="Map and annotate provided guide RNAs to a given genome.", main=_map, options=[ inputs, pam, genome, annotations, _limit, attributes, outputs, ], ) featurize = CLICommand( "featurize", description="Annotate guide RNAs with additional calculated features.", main=_featurize, options=[ inputs, scaffold, outputs, ], ) offtarget = CLICommand( "offtarget", description="Compare two sets of guide RNAs for potential cross-target activity.", main=_offtarget, options=[ inputs, gff2, mismatches, outputs, ], ) app = CLIApp( appname, description="Design and analysis CRISPR guides.", version=__version__, commands=[ generate, map_guides, featurize, offtarget, ], ) app.run() return None
if __name__ == '__main__': main()