Skip to content

API reference

agp

Functions for reading and writing AGP files.

AgpFormatError

Bases: Exception

Invalidly formatted AGP

Source code in agp/__init__.py
 5
 6
 7
 8
 9
10
11
12
class AgpFormatError(Exception):
    """Invalidly formatted AGP"""

    def __init__(self, line):
        self.line = line

    def __str__(self) -> str:
        return 'Invalid AGP: "{}"'.format(self.line)

AgpRow

A single non-comment row of an AGP

A single non-comment row of an AGP file. Because AGP is a weird kind of TSV where fields can have different meanings depending on the value of the component_type field, I'm representing this as a class with instance variables rather than an OrderedDict or something along those lines. See the NCBI documentation for more information:

https://www.ncbi.nlm.nih.gov/assembly/agp/AGP_Specification/

Source code in agp/__init__.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class AgpRow:
    """A single non-comment row of an AGP

    A single non-comment row of an AGP file. Because AGP is a weird
    kind of TSV where fields can have different meanings depending on
    the value of the component_type field, I'm representing this as a
    class with instance variables rather than an OrderedDict or
    something along those lines. See the NCBI documentation for more
    information:

    <https://www.ncbi.nlm.nih.gov/assembly/agp/AGP_Specification/>
    """

    def __init__(self, line: str):
        """Creates a new instance of AgpRow

        Creates a new instance of AgpRow by parsing the given line of
        text.

        Args:
            line: a line of text in AGP format
        """
        try:
            splits = line.strip().split("\t")
            self.object = splits[0]
            self.object_beg = int(splits[1])
            self.object_end = int(splits[2])
            self.part_number = int(splits[3])
            self.component_type = splits[4]

            if self.component_type in ["N", "U"]:
                self.is_gap = True
                self.gap_length = int(splits[5])
                self.gap_type = splits[6]
                self.linkage = splits[7]
                self.linkage_evidence = splits[8]
            else:
                self.is_gap = False
                self.component_id = splits[5]
                self.component_beg = int(splits[6])
                self.component_end = int(splits[7])
                self.orientation = splits[8]
        except (ValueError, IndexError) as e:
            raise AgpFormatError(line) from e

    def __str__(self) -> str:
        """
        Returns the string representation of the AGP row as a line of
        text containing all the fields separated by tabs.
        """
        if self.is_gap:
            return "\t".join(
                map(
                    str,
                    [
                        self.object,
                        self.object_beg,
                        self.object_end,
                        self.part_number,
                        self.component_type,
                        self.gap_length,
                        self.gap_type,
                        self.linkage,
                        self.linkage_evidence,
                    ],
                )
            )
        else:
            return "\t".join(
                map(
                    str,
                    [
                        self.object,
                        self.object_beg,
                        self.object_end,
                        self.part_number,
                        self.component_type,
                        self.component_id,
                        self.component_beg,
                        self.component_end,
                        self.orientation,
                    ],
                )
            )

    def __eq__(self, other):
        if self.is_gap != other.is_gap:
            return False

        common_field_equality = (
            self.object == other.object
            and self.object_beg == other.object_beg
            and self.object_end == other.object_end
            and self.part_number == other.part_number
            and self.component_type == other.component_type
        )

        if self.is_gap:
            return (
                common_field_equality
                and self.gap_length == other.gap_length
                and self.gap_type == other.gap_type
                and self.linkage == other.linkage
                and self.linkage_evidence == other.linkage_evidence
            )
        else:
            return (
                common_field_equality
                and self.component_id == other.component_id
                and self.component_beg == other.component_beg
                and self.component_end == other.component_end
                and self.orientation == other.orientation
            )

    def contains(self, position: int) -> bool:
        """Check whether a row contains a position

        Determine whether this row contains a given coordinate.

        Args:
            position: a genomic position in base pairs

        Returns:
            true if position is within the bounds of this entry, false
                otherwise
        """
        return self.object_beg <= position and self.object_end >= position

__init__(line)

Creates a new instance of AgpRow

Creates a new instance of AgpRow by parsing the given line of text.

Parameters:

Name Type Description Default
line str

a line of text in AGP format

required
Source code in agp/__init__.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(self, line: str):
    """Creates a new instance of AgpRow

    Creates a new instance of AgpRow by parsing the given line of
    text.

    Args:
        line: a line of text in AGP format
    """
    try:
        splits = line.strip().split("\t")
        self.object = splits[0]
        self.object_beg = int(splits[1])
        self.object_end = int(splits[2])
        self.part_number = int(splits[3])
        self.component_type = splits[4]

        if self.component_type in ["N", "U"]:
            self.is_gap = True
            self.gap_length = int(splits[5])
            self.gap_type = splits[6]
            self.linkage = splits[7]
            self.linkage_evidence = splits[8]
        else:
            self.is_gap = False
            self.component_id = splits[5]
            self.component_beg = int(splits[6])
            self.component_end = int(splits[7])
            self.orientation = splits[8]
    except (ValueError, IndexError) as e:
        raise AgpFormatError(line) from e

__str__()

Returns the string representation of the AGP row as a line of text containing all the fields separated by tabs.

Source code in agp/__init__.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def __str__(self) -> str:
    """
    Returns the string representation of the AGP row as a line of
    text containing all the fields separated by tabs.
    """
    if self.is_gap:
        return "\t".join(
            map(
                str,
                [
                    self.object,
                    self.object_beg,
                    self.object_end,
                    self.part_number,
                    self.component_type,
                    self.gap_length,
                    self.gap_type,
                    self.linkage,
                    self.linkage_evidence,
                ],
            )
        )
    else:
        return "\t".join(
            map(
                str,
                [
                    self.object,
                    self.object_beg,
                    self.object_end,
                    self.part_number,
                    self.component_type,
                    self.component_id,
                    self.component_beg,
                    self.component_end,
                    self.orientation,
                ],
            )
        )

contains(position)

Check whether a row contains a position

Determine whether this row contains a given coordinate.

Parameters:

Name Type Description Default
position int

a genomic position in base pairs

required

Returns:

Type Description
bool

true if position is within the bounds of this entry, false otherwise

Source code in agp/__init__.py
129
130
131
132
133
134
135
136
137
138
139
140
141
def contains(self, position: int) -> bool:
    """Check whether a row contains a position

    Determine whether this row contains a given coordinate.

    Args:
        position: a genomic position in base pairs

    Returns:
        true if position is within the bounds of this entry, false
            otherwise
    """
    return self.object_beg <= position and self.object_end >= position

is_string(var)

Determine whether var is a string

Determine whether var is a string.

Parameters:

Name Type Description Default
var

variable to check for stringiness

required

Returns:

Type Description
bool

true if var is a string, false otherwise

Source code in agp/__init__.py
164
165
166
167
168
169
170
171
172
173
174
175
def is_string(var) -> bool:
    """Determine whether var is a string

    Determine whether `var` is a string.

    Args:
        var: variable to check for stringiness

    Returns:
        true if `var` is a string, false otherwise
    """
    return isinstance(var, str)

open_agp(filename)

Open and read an AGP file

Open an AGP file from a path, and then parse it

Parameters:

Name Type Description Default
filename str

path to an AGP file to read

required

Returns:

Type Description
Iterator[Union[str, AgpRow]]

an Iterator over the rows of the file

Source code in agp/__init__.py
198
199
200
201
202
203
204
205
206
207
208
209
def open_agp(filename: str) -> Iterator[Union[str, AgpRow]]:
    """Open and read an AGP file

    Open an AGP file from a path, and then parse it

    Args:
        filename: path to an AGP file to read

    Returns:
        an Iterator over the rows of the file
    """
    return read(open(filename))

read(infile)

Read an AGP file

Read an AGP file, yielding rows as AgpRow instances and comment lines as plain strings.

Parameters:

Name Type Description Default
infile TextIO

input file in AGP format

required

Yields:

Type Description
Union[str, AgpRow]

each row as either a string containing the entire line for

Union[str, AgpRow]

comments, or an AgpRow for non-comment lines

Source code in agp/__init__.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def read(infile: TextIO) -> Iterator[Union[str, AgpRow]]:
    """Read an AGP file

    Read an AGP file, yielding rows as AgpRow instances and comment
    lines as plain strings.

    Args:
        infile: input file in AGP format

    Yields:
        each row as either a string containing the entire line for
        comments, or an AgpRow for non-comment lines
    """
    for line in infile:
        if line.startswith("#"):
            yield line.strip()
        else:
            yield AgpRow(line)

agp.assemble

Functions for assembling scaffolds from contigs based on an agp file

print_fasta(name, sequence, outfile, wrap=60)

Given a sequence name and the sequence itself, print a fasta- formatted string of it.

Parameters:

Name Type Description Default
name str

the sequence id

required
sequence str

the sequence itself

required
outfile IO

where to write the sequence to

required
wrap int

the number of bases per line of sequence

60

Examples:

>>> import sys
>>> print_fasta(
...    'chr1', 'ATCGACTGATCGACTGACTGACTACTG', outfile=sys.stdout, wrap=10
... )
>chr1
ATCGACTGAT
CGACTGACTG
ACTACTG
Source code in agp/assemble.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def print_fasta(name: str, sequence: str, outfile: IO, wrap: int = 60):
    """
    Given a sequence name and the sequence itself, print a fasta-
    formatted string of it.

    Arguments:
        name: the sequence id
        sequence: the sequence itself
        outfile: where to write the sequence to
        wrap: the number of bases per line of sequence

    Examples:
        >>> import sys
        >>> print_fasta(
        ...    'chr1', 'ATCGACTGATCGACTGACTGACTACTG', outfile=sys.stdout, wrap=10
        ... )
        >chr1
        ATCGACTGAT
        CGACTGACTG
        ACTACTG
    """
    print(f">{name}", file=outfile)
    for start_pos in range(0, len(sequence), wrap):
        print(sequence[start_pos : start_pos + wrap])

run(contigs_fasta, outfile, agp_rows)

Given contigs in fasta format and their order and orientation into scaffolds in AGP format, outputs the assembled scaffolds in fasta format.

Parameters:

Name Type Description Default
contigs_fasta screed.openscreed.Open

fasta iterator in screed format containing contigs

required
outfile IO

file where scaffolds fasta should be written

required
agp_rows Iterable[agp.AgpRow]

iterable returning agp.AgpRow objects, each containing a single row of the agp file

required
Source code in agp/assemble.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def run(
    contigs_fasta: screed.openscreed.Open, outfile: IO, agp_rows: Iterable[agp.AgpRow]
):
    """
    Given contigs in fasta format and their order and orientation into
    scaffolds in AGP format, outputs the assembled scaffolds in fasta
    format.

    Args:
        contigs_fasta: fasta iterator in screed format containing contigs
        outfile: file where scaffolds fasta should be written
        agp_rows: iterable returning agp.AgpRow objects, each
            containing a single row of the agp file
    """
    # unfortunately, the contigs fasta file I'm writing this for has
    # variable line-length and is thus not faidx-able, so we have to
    # load it into memory :(
    contigs = {record.name: record.sequence for record in contigs_fasta}

    current_sequence = None
    current_chrom = None
    # loop through AGP, skipping comment lines, which my agp library
    # yields as strings
    for row in filterfalse(agp.is_string, agp_rows):
        # check if starting a new chromosome
        if row.object != current_chrom:
            # if this is not the first chromosome, output the previous
            # chromosome
            if current_chrom is not None:
                print_fasta(current_chrom, current_sequence, outfile=outfile)
            # start the new chromosome as an empty sequence
            current_chrom = row.object
            current_sequence = ""

        if row.is_gap:
            current_sequence += "N" * row.gap_length
        else:
            start, end = row.component_beg - 1, row.component_end
            if row.component_id not in contigs.keys():
                raise NoSuchContigError(row.component_id)
            component = contigs[row.component_id][start:end]
            if row.orientation == "-":
                component = reverse_complement(component)
            current_sequence += component

    if current_sequence is not None and current_chrom is not None:
        print_fasta(current_chrom, current_sequence, outfile=outfile)

agp.bed

Functions for parsing bed files

BadRangeError

Bases: Exception

Error raised when range starts in middle of component

Source code in agp/bed.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class BadRangeError(Exception):
    """Error raised when range starts in middle of component"""

    def __init__(self, bed_range: "BedRange"):
        self.bed_range = bed_range

    def __str__(self):
        return (
            "The bed range {}:{}-{} starts and/or ends in the middle of a"
            "component. This is not supported in this module."
        ).format(
            self.bed_range.chrom,
            self.bed_range.start,
            self.bed_range.end,
        )

BedRange dataclass

Single entry (i.e., line) of a bed file.

Source code in agp/bed.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@dataclass
class BedRange:
    """Single entry (i.e., line) of a bed file."""

    chrom: str
    """Name of the chromosome or sequence of the bed range"""
    start: Optional[int] = None
    """Start position of the bed range"""
    end: Optional[int] = None
    """End position of the bed range"""
    strand: Optional[str] = None
    """Strand of the bed range (either '+' or '-')"""
    extra_fields: Optional[List[str]] = None
    """Additional columns containing miscellaneous data"""

    def __str__(self) -> str:
        fields: List[Union[str, int]] = [self.chrom]
        if self.start and self.end:
            fields += [self.start, self.end]
            if self.strand:
                fields += [self.strand]
            if self.extra_fields:
                fields += self.extra_fields
        return "\t".join(map(str, fields))

chrom: str class-attribute

Name of the chromosome or sequence of the bed range

end: Optional[int] = None class-attribute

End position of the bed range

extra_fields: Optional[List[str]] = None class-attribute

Additional columns containing miscellaneous data

start: Optional[int] = None class-attribute

Start position of the bed range

strand: Optional[str] = None class-attribute

Strand of the bed range (either '+' or '-')

EmptyRangeError

Bases: Exception

Error raised when bed range contains no contigs

Source code in agp/bed.py
 9
10
11
12
13
14
15
16
17
18
class EmptyRangeError(Exception):
    """Error raised when bed range contains no contigs"""

    def __init__(self, bed_range: "BedRange"):
        self.bed_range = bed_range

    def __str__(self):
        return "The range {}:{}-{} does not contain any contigs.".format(
            self.bed_range.chrom, self.bed_range.start, self.bed_range.end
        )

ParsingError

Bases: Exception

Error parsing bed file

Source code in agp/bed.py
38
39
40
41
class ParsingError(Exception):
    """Error parsing bed file"""

    pass

open_bed(filename)

Open and parse a bed file

Parameters:

Name Type Description Default
filename str

path to a bed file

required
Source code in agp/bed.py
73
74
75
76
77
78
79
def open_bed(filename: str) -> Iterator[BedRange]:
    """Open and parse a bed file

    Args:
        filename: path to a bed file
    """
    return read(open(filename))

read(bedfile)

Read and parse a bed file

Read a bed file line by line. If a line contains start and end positions, yield the sequence name and start and end positions as a BedRange; if the line contains only a sequence name, yield a BedRange with {start,end}=None.

Parameters:

Name Type Description Default
bedfile TextIO

a bed-formatted file

required

Returns:

Type Description
Iterator[BedRange]

an iterator that yields single lines of the file at a time

Source code in agp/bed.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def read(bedfile: TextIO) -> Iterator[BedRange]:
    """Read and parse a bed file

    Read a bed file line by line. If a line contains start and end
    positions, yield the sequence name and start and end positions as
    a BedRange; if the line contains only a sequence name, yield a
    BedRange with {start,end}=None.

    Args:
        bedfile: a bed-formatted file

    Returns:
        an iterator that yields single lines of the file at a time
    """
    for i, line in enumerate(bedfile):
        if not empty_line_regex.match(line):
            splits = line.strip().split("\t")
            try:
                if len(splits) == 1:
                    yield BedRange(splits[0])
                elif len(splits) == 2:
                    raise ParsingError(f"Line {i+1} of bed misformatted.")
                elif len(splits) == 3:
                    yield BedRange(
                        splits[0],
                        start=int(splits[1]),
                        end=int(splits[2]),
                    )
                else:
                    yield BedRange(
                        splits[0],
                        start=int(splits[1]),
                        end=int(splits[2]),
                        strand=splits[3],
                        extra_fields=splits[4:],
                    )
            except (ValueError, IndexError):
                raise ParsingError(f"Line {i+1} of bed misformatted.")

agp.compose

Functions for composing two AGPs together.

contains(outer_row, inner_row)

Determines whether outer_row fully contains inner_row. For example, if outer_row specifies that the current object contains the range 500-1000 of a component1, and inner_row specifies how to assemble the range 600-800 of this component, then outer_row contains inner_row. Because breaking components is not supported, right now, this function raises a BrokenComponentError if the outer row partially contains the inner row.

Source code in agp/compose.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def contains(outer_row, inner_row):
    """
    Determines whether outer_row fully contains inner_row. For example,
    if outer_row specifies that the current object contains the range
    500-1000 of a component1, and inner_row specifies how to assemble
    the range 600-800 of this component, then outer_row contains
    inner_row. Because breaking components is not supported, right now,
    this function raises a BrokenComponentError if the outer row
    partially contains the inner row.
    """
    if (
        outer_row.component_beg <= inner_row.object_beg
        and outer_row.component_end >= inner_row.object_end
    ):
        return True
    elif (
        outer_row.component_beg <= inner_row.object_beg
        or outer_row.component_end >= inner_row.object_end
    ):
        raise BrokenComponentError(outer_row, inner_row)
    else:
        return False

run(outer_agp, inner_agp, outfile, print_unused=False)

runs the compose module of agptools

Source code in agp/compose.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def run(outer_agp, inner_agp, outfile, print_unused=False):
    """runs the compose module of agptools"""
    # read the inner agp into a dictionary so that we can look up the
    # definition of an outer component later. inner_dict maps object
    # names to a list of all AgpRow instances with that object name
    inner_dict = collections.defaultdict(list)
    for inner_row in filterfalse(agp.is_string, inner_agp):
        inner_dict[inner_row.object].append(inner_row)

    previous_scaffold = None
    component_counter = 1
    for outer_row in outer_agp:
        # print the row as is if it's a comment
        if isinstance(outer_row, str):
            print(outer_row, file=outfile)
            continue

        # reset the component counter if we're on a new scaffold
        if previous_scaffold and previous_scaffold != outer_row.object:
            component_counter = 1

        previous_scaffold = outer_row.object

        # for gaps, we only need to change the part number
        if outer_row.is_gap:
            outer_row.part_number = component_counter
            print(outer_row, file=outfile)
            component_counter += 1
        # for non-gaps, we need to replace this single row with all of
        # the component rows, modified as necessary
        else:
            # check to make sure this component is actually defined
            if outer_row.component_id not in inner_dict:
                raise ComponentNotDefinedError(outer_row.component_id)

            # make a list of all rows in this component and make sure
            # the object is using the whole component
            inner_rows = inner_dict[outer_row.component_id]
            del inner_dict[outer_row.component_id]
            if (
                outer_row.component_beg != 1
                or outer_row.component_end != inner_rows[-1].object_end
            ):
                raise BrokenComponentError(outer_row, inner_rows[-1])

            # reverse complement everything if necessary
            if outer_row.orientation == "-":
                inner_rows = flip.reverse_rows(inner_rows)

            # fix offsets and print rows
            for inner_row in inner_rows:
                inner_row.object = outer_row.object
                inner_row.object_beg += outer_row.object_beg - 1
                inner_row.object_end += outer_row.object_beg - 1
                inner_row.part_number = component_counter
                component_counter += 1
                print(inner_row, file=outfile)

    # at the end, print all the objects in the inner AGP that were not
    # assembled into anything in the outer AGP
    if print_unused:
        for row in chain.from_iterable(inner_dict.values()):
            print(row, file=outfile)

agp.flip

Given an AGP file describing an assembly and a list of chromosomes to flip, output a new AGP file with those chromosomes reoriented in reverse-complement.

flip(agp_rows, ranges_to_flip)

Reverse-complements all rows of an AGP file that fall within a list of ranges.

Parameters:

Name Type Description Default
agp_rows Sequence[AgpRow]

all rows in an AGP file

required
ranges_to_flip Iterable[BedRange]

a list of ranges in the AGP file to reverse-complement. Takes genomic (bed) coordinates, not array coordinates.

required

Returns:

Name Type Description
agp_rows List[AgpRow]

the same rows that were input, except the rows in the specified range are now reverse-complemented

Source code in agp/flip.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def flip(
    agp_rows: Sequence[AgpRow], ranges_to_flip: Iterable[BedRange]
) -> List[AgpRow]:
    """
    Reverse-complements all rows of an AGP file that fall within a list
    of ranges.

    Args:
        agp_rows: all rows in an AGP file
        ranges_to_flip: a list of ranges in the AGP file to
            reverse-complement. Takes genomic (bed) coordinates, not
            array coordinates.

    Returns:
        agp_rows: the same rows that were input, except the rows in the
            specified range are now reverse-complemented
    """
    # agp_rows is a generator, but we're going to have to store the
    # whole agp in memory
    agp_rows = list(agp_rows)

    for bed_range in ranges_to_flip:
        # list of tuples (i, agp_row) of rows in bed_range
        rows_to_reverse: List[Tuple[int, AgpRow]] = []
        for i, row in enumerate(agp_rows):
            if not isinstance(row, str) and row.object == bed_range.chrom:
                # if no range in seq specified, flip the whole sequence
                if bed_range.start is None:
                    rows_to_reverse.append((i, row))
                # otherwise, flip only the part within range
                elif (
                    row.object_beg >= bed_range.start
                    and row.object_end <= bed_range.end
                ):
                    rows_to_reverse.append((i, row))
                # check for bad ranges (i.e., ones that only partially
                # contain a component)
                elif (bed_range.start and bed_range.end) and (
                    row.contains(bed_range.start) or row.contains(bed_range.end)
                ):
                    raise bed.BadRangeError(bed_range)

        # unzip and flip those rows we've collected
        if len(rows_to_reverse) == 0:
            raise bed.EmptyRangeError(bed_range)
        unzipped = list(zip(*rows_to_reverse))
        indices: List[int] = list(unzipped[0])
        rows: List[AgpRow] = list(unzipped[1])
        reversed_rows = reverse_rows(rows)

        # replace the original rows with the reversed rows
        for i, row in zip(indices, reversed_rows):
            agp_rows[i] = row

    return agp_rows

reverse_rows(rows)

Given a list of AGP rows, return a new set of rows assembling the reverse complement of the input rows.

Parameters:

Name Type Description Default
rows List[AgpRow]

an ordered list of AgpRow objects to reverse

required

Returns:

Type Description
List[AgpRow]

the input, except in reverse order, with the rows modified to be

List[AgpRow]

reverse-complements

Source code in agp/flip.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def reverse_rows(rows: List[AgpRow]) -> List[AgpRow]:
    """
    Given a list of AGP rows, return a new set of rows assembling
    the reverse complement of the input rows.

    Args:
        rows: an ordered list of AgpRow objects to reverse

    Returns:
        the input, except in reverse order, with the rows modified to be
        reverse-complements
    """
    # first, get the first and last part numbers and the start and end
    # position of the entire segment being flipped so we can count
    # backwards from there.
    first_part_number = rows[0].part_number
    last_part_number = rows[-1].part_number
    first_part_start = rows[0].object_beg
    last_part_end = rows[-1].object_end

    reversed_rows = []
    for row in reversed(rows):
        new_start = first_part_start + last_part_end - row.object_end
        new_end = first_part_start + last_part_end - row.object_beg
        row.object_beg, row.object_end = new_start, new_end

        row.part_number = first_part_number + last_part_number - row.part_number

        if not row.is_gap:
            if row.orientation == "+":
                row.orientation = "-"
            elif row.orientation == "-":
                row.orientation = "+"
            # if the orientation is something else, leave it be

        reversed_rows.append(row)

    return reversed_rows

agp.join

Functions for joining scaffolds

BadSequenceNameError

Bases: Exception

Sequence name is not allowed

Source code in agp/join.py
32
33
34
35
36
37
38
39
class BadSequenceNameError(Exception):
    """Sequence name is not allowed"""

    def __init__(self, name: str):
        self.name = name

    def __str__(self) -> str:
        return f"Bad scaffold name: '{self.name}'. Only [a-zA-Z0-9._] allowed."

JoinGroup dataclass

Bases: Sequence

A group of scaffolds to be joined together

This class represents a group of scaffolds that should be joined together as specified in a joins file. Optionally, a group can be given a name to use for the new superscaffold instead of using make_superscaffold_name() to make one up based on the names of the components. If accessed as a Sequence, it provides direct access to the underlying list of scaffolds, e.g.,

from agp.join import JoinGroup join_group = JoinGroup(["scaffold_1", "-scaffold_2", "+scaffold_3"]) len(join_group) 3 join_group[1] '-scaffold_2'

Attributes:

Name Type Description
scaffolds List[str]

a list of scaffolds to join together, in the correct order. A scaffold name can be preceded by "+" or "-" to indicate orientation; if none is specified, "+" is assumed.

name Optional[str]

a name to use for the new superscaffold in the output agp

Source code in agp/join.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@dataclass
class JoinGroup(Sequence):
    """A group of scaffolds to be joined together

    This class represents a group of scaffolds that should be joined
    together as specified in a joins file. Optionally, a group can be given
    a name to use for the new superscaffold instead of using
    `make_superscaffold_name()` to make one up based on the names of the
    components. If accessed as a `Sequence`, it provides direct access to
    the underlying list of scaffolds, e.g.,

    >>> from agp.join import JoinGroup
    >>> join_group = JoinGroup(["scaffold_1", "-scaffold_2", "+scaffold_3"])
    >>> len(join_group)
    3
    >>> join_group[1]
    '-scaffold_2'

    Attributes:
        scaffolds: a list of scaffolds to join together, in the correct
            order. A scaffold name can be preceded by "+" or "-" to
            indicate orientation; if none is specified, "+" is assumed.
        name: a name to use for the new superscaffold in the output agp
    """

    scaffolds: List[str]
    name: Optional[str] = None

    def __getitem__(self, index) -> str:
        return self.scaffolds[index]

    def __len__(self) -> int:
        return len(self.scaffolds)

ScaffoldNotFoundError

Bases: Exception

Scaffold in joins list not found in agp

Source code in agp/join.py
20
21
22
23
class ScaffoldNotFoundError(Exception):
    """Scaffold in joins list not found in agp"""

    pass

ScaffoldUsedTwiceError

Bases: Exception

Scaffold(s) used more than once in joins file

Source code in agp/join.py
26
27
28
29
class ScaffoldUsedTwiceError(Exception):
    """Scaffold(s) used more than once in joins file"""

    pass

join_scaffolds(superscaffold_rows, gap_size=500, gap_type='scaffold', gap_evidence='na', name=None)

Transforms agp rows from multiple scaffolds into agp rows for a single superscaffold containing all the given scaffolds in the specified order. Scaffolds should be oriented properly before using this function.

Parameters:

Name Type Description Default
superscaffold_rows List[List[AgpRow]]

a list of lists of agp rows. Each sub-list is a list of all rows in a single scaffold. These scaffolds will be joined in the order given

required
gap_size int

length of the new gaps created by joining scaffolds together

500
gap_type str

what to call the new gaps in the agp file

'scaffold'
gap_evidence str

evidence type for linkage across gap

'na'
name str

name of new superscaffold. If None, this function will come up with a name based on the names of subscaffolds contained by this superscaffold

None

Returns:

Type Description
List[AgpRow]

a list of AgpRow instances containing the new scaffold specification

Source code in agp/join.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def join_scaffolds(
    superscaffold_rows: List[List[AgpRow]],
    gap_size: int = 500,
    gap_type: str = "scaffold",
    gap_evidence: str = "na",
    name: str = None,
) -> List[AgpRow]:
    """
    Transforms agp rows from multiple scaffolds into agp rows for a
    single superscaffold containing all the given scaffolds in the
    specified order. Scaffolds should be oriented properly before
    using this function.

    Args:
        superscaffold_rows: a list of lists of agp rows. Each sub-list is a
            list of all rows in a single scaffold. These scaffolds will be
            joined in the order given
        gap_size: length of the new gaps created by joining scaffolds
            together
        gap_type: what to call the new gaps in the agp file
        gap_evidence: evidence type for linkage across gap
        name: name of new superscaffold. If None, this function will come
            up with a name based on the names of subscaffolds contained by
            this superscaffold

    Returns:
        a list of AgpRow instances containing the new scaffold specification
    """
    # make a nice name for the new superscaffold we are creating
    subscaffold_names = (s[0].object for s in superscaffold_rows)
    if name is None:
        superscaffold_name = make_superscaffold_name(subscaffold_names)
    else:
        superscaffold_name = name

    # keeps track of what component number of the superscaffold we are
    # currently on
    component_number_counter = 1
    # keeps track of the ending position of the previous subscaffold,
    # so that we can add it as an offset to components of the current
    # subscaffold
    end_of_previous_scaffold = 0
    # list(AgpRow) containing the rows to be returned
    outrows = []
    # loop over the subscaffolds
    for i, this_scaffold_rows in enumerate(superscaffold_rows):
        # loop over the agp rows in this subscaffold
        for row in this_scaffold_rows:
            # update the current row and put it in the output list
            row.object = superscaffold_name
            row.part_number = component_number_counter
            component_number_counter += 1
            row.object_beg += end_of_previous_scaffold
            row.object_end += end_of_previous_scaffold
            outrows.append(row)
        end_of_previous_scaffold = outrows[-1].object_end

        # add a gap if we're not on the last subscaffold
        if i < len(superscaffold_rows) - 1:
            outrows.append(
                GapRow(
                    superscaffold_name,
                    end_of_previous_scaffold + 1,
                    end_of_previous_scaffold + gap_size,
                    component_number_counter,
                    length=gap_size,
                    gap_type=gap_type,
                    evidence=gap_evidence,
                )
            )
            component_number_counter += 1
            end_of_previous_scaffold = outrows[-1].object_end

    return outrows

joins_type(filename)

argparse type function for file listing scaffold joins

Parameters:

Name Type Description Default
filename str

filename giving path to a file listing scaffolds to join. Each line should contain a comma- separated list of scaffolds to be joined into a single scaffold. Optionally, the file can contain a second column after a tab that gives each group a name.

required

Returns:

Type Description
List[JoinGroup]

list of join groups. Scaffold names may have "+" or "-" before their name; that is the job of downstream code to handle.

Raises:

Type Description
ScaffoldUsedTwiceError

if a scaffold is used multiple times

Source code in agp/join.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def joins_type(filename: str) -> List[JoinGroup]:
    """
    argparse type function for file listing scaffold joins

    Args:
        filename: filename giving path to a file listing
            scaffolds to join. Each line should contain a comma-
            separated list of scaffolds to be joined into a single
            scaffold. Optionally, the file can contain a second column
            after a tab that gives each group a name.

    Returns:
        list of join groups. Scaffold names may have "+" or "-"
            before their name; that is the job of downstream code to handle.

    Raises:
        ScaffoldUsedTwiceError: if a scaffold is used multiple times
    """
    joins: List[JoinGroup] = []
    with open(filename) as joins_file:
        for line in joins_file:
            if not empty_line_regex.match(line):
                columns = line.strip().split("\t")
                join_group = JoinGroup(columns[0].split(","))
                if len(columns) > 1:
                    if not sequence_name_regex.match(columns[1]):
                        raise BadSequenceNameError(columns[1])
                    join_group.name = columns[1]
                joins.append(join_group)

    # look for scaffolds used more than once
    scaffold_counts = Counter(s.lstrip("+-") for s in chain.from_iterable(joins))
    reused_scaffolds = [scaf for scaf, count in scaffold_counts.items() if count > 1]
    if reused_scaffolds:
        raise ScaffoldUsedTwiceError(f"Scaffolds used 2+ times: {reused_scaffolds}")

    return joins

make_superscaffold_name(subscaffold_names)

Comes up with a nice superscaffold name based on the given subscaffold names. If the subscaffold names are all in the format '[prefix][suffix]', where prefix is the same for all subscaffolds, then the superscaffold name is '[prefix][suffix1]p[suffix2]p[etc.]' Otherwise, it's the names of all scaffolds concatenated with 'p'.

Parameters:

Name Type Description Default
subscaffold_names list(str

list of names of subscaffolds being combined into a superscaffold

required

Returns:

Name Type Description
superscaffold_name str

a name for the new scaffolds based on the subscaffold names

Source code in agp/join.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def make_superscaffold_name(subscaffold_names: Iterable[str]) -> str:
    """
    Comes up with a nice superscaffold name based on the given
    subscaffold names. If the subscaffold names are all in the format
    '[prefix]_[suffix]', where prefix is the same for all subscaffolds,
    then the superscaffold name is '[prefix]_[suffix1]p[suffix2]p[etc.]'
    Otherwise, it's the names of all scaffolds concatenated with 'p'.

    Args:
        subscaffold_names (list(str)): list of names of subscaffolds
            being combined into a superscaffold

    Returns:
        superscaffold_name (str): a name for the new scaffolds based
            on the subscaffold names
    """
    # TODO this casting is necessary but ugly as sin. Find a nicer way
    matches = list(map(scaffold_regex.search, subscaffold_names))
    if all(matches):
        if all(
            cast(Match, m).group(1) == cast(Match, matches[0]).group(1) for m in matches
        ):
            prefix = cast(Match, matches[0]).group(1)
            suffix = "p".join([cast(Match, m).group(2) for m in matches])
            return "{}_{}".format(prefix, suffix)
    return "p".join(subscaffold_names)

run(joins_list, outfile, agp_infile, gap_size=500, gap_type='scaffold', gap_evidence='na')

Runs the 'join' module of agptools.

Source code in agp/join.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def run(
    joins_list: List[JoinGroup],
    outfile: TextIO,
    agp_infile: Iterator[Union[str, AgpRow]],
    gap_size: int = 500,
    gap_type: str = "scaffold",
    gap_evidence: str = "na",
):
    """
    Runs the 'join' module of agptools.
    """
    # first, make a dict mapping the names of all scaffolds that will
    # be modified to an empty list which will later contain all agp rows
    # of that scaffold.
    scaffolds_to_join: Dict[str, List[AgpRow]] = {}
    for name in (s.lstrip("+-") for s in chain.from_iterable(joins_list)):
        scaffolds_to_join[name] = []

    # print all the rows to be output as-is and put the rows that need
    # to be modified first into the correct slot of the
    # scaffolds_to_join dict
    for row in agp_infile:
        if isinstance(row, AgpRow) and row.object in scaffolds_to_join:
            scaffolds_to_join[row.object].append(row)
        else:
            print(row, file=outfile)

    # make sure we found agp entries for all the scaffolds
    for scaffold_name, rows in scaffolds_to_join.items():
        if len(rows) == 0:
            raise ScaffoldNotFoundError(f"Scaffold {scaffold_name} not found in agp.")

    # loop through each join group
    for i, join_group in enumerate(joins_list):
        scaffold_rows = []
        # loop through the scaffolds in this join group
        for scaffold_name in join_group:
            if scaffold_name.startswith("-"):
                # reverse-complement if necessary
                scaffold_rows.append(
                    reverse_rows(scaffolds_to_join[scaffold_name.lstrip("-")])
                )
            else:
                scaffold_rows.append(scaffolds_to_join[scaffold_name.lstrip("+")])

        # print out all the rows for this join group
        for row in join_scaffolds(
            scaffold_rows, gap_size, gap_type, name=join_group.name
        ):
            print(row, file=outfile)

agp.remove

scaffolds_list_type(filename)

Parse a list of scaffolds

Given a filename pointing to a list of scaffolds, one per line, parse it and return a list of scaffold strings.

Parameters:

Name Type Description Default
filename str

path to a file containing a list of scaffolds

required

Returns:

Type Description
Set[str]

the set of scaffold names in the file

Source code in agp/remove.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def scaffolds_list_type(filename: str) -> Set[str]:
    """Parse a list of scaffolds

    Given a filename pointing to a list of scaffolds, one per line,
    parse it and return a list of scaffold strings.

    Args:
        filename: path to a file containing a list of scaffolds

    Returns:
        the set of scaffold names in the file
    """
    scaffolds: Set[str] = set()
    with open(filename) as scaffolds_list_file:
        for line in scaffolds_list_file:
            if not empty_line_regex.match(line):
                scaffolds.add(line.strip())
    return scaffolds

agp.rename

rename_rows(rows_to_rename, new_name, orientation)

Rename a bunch of agp rows

Rename a bunch of agp rows, and possibly reverse them too.

Parameters:

Name Type Description Default
rows_to_rename Iterable[AgpRow]

all of the rows of a scaffold that is being renamed

required
new_name str

the new name for this scaffold

required
orientation str

either "+" or "-", the latter if reverse orienting the scaffold is desired

required

Returns:

Type Description
list[AgpRow]

the renamed and perhaps reverse oriented rows

Source code in agp/rename.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def rename_rows(
    rows_to_rename: Iterable[AgpRow], new_name: str, orientation: str
) -> list[AgpRow]:
    """Rename a bunch of agp rows

    Rename a bunch of agp rows, and possibly reverse them too.

    Args:
        rows_to_rename:
            all of the rows of a scaffold that is being renamed
        new_name:
            the new name for this scaffold
        orientation:
            either "+" or "-", the latter if reverse orienting the
            scaffold is desired

    Returns:
        the renamed and perhaps reverse oriented rows
    """
    new_rows = []
    for row in rows_to_rename:
        row.object = new_name
        new_rows.append(row)
    if orientation == "-":
        return reverse_rows(new_rows)
    return new_rows

renaming_file_type(filename)

Parse a renaming file

A renaming file has 2-3 columns: 1. current scaffold name 2. desired new scaffold name 3. orientation (+/-)

Parse one of these files and return it in dictionary format

Parameters:

Name Type Description Default
filename str

path to renaming file

required

Returns:

Type Description
dict[str, tuple[str, str]]

the file in dict format: d[old_name] = (new_name, orientation)

Source code in agp/rename.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def renaming_file_type(filename: str) -> dict[str, tuple[str, str]]:
    """Parse a renaming file

    A renaming file has 2-3 columns:
    1. current scaffold name
    2. desired new scaffold name
    3. orientation (+/-)

    Parse one of these files and return it in dictionary format

    Args:
        filename: path to renaming file

    Returns:
        the file in dict format: d[old_name] = (new_name, orientation)
    """
    renaming_dict = {}
    with open(filename, "r") as renaming_file:
        for i, line in enumerate(renaming_file):
            if not empty_line_regex.match(line):
                fields = line.strip().split("\t")
                if len(fields) < 2:
                    raise FormatError(
                        f"Line {i+1} of renaming file doesn't have enough columns."
                    )
                if len(fields) == 2:  # default to + orientation
                    renaming_dict[fields[0]] = (fields[1], "+")
                elif len(fields) >= 3:
                    if fields[2] not in ["+", "-"]:
                        raise FormatError(
                            f"Line {i+1}: orientation column can only contain + or -"
                        )
                    renaming_dict[fields[0]] = (fields[1], fields[2])

    return renaming_dict

run(renaming_map, outfile, agp_rows)

Run the rename module

Parameters:

Name Type Description Default
renaming_map Mapping[str, tuple[str, str]]

maps the name of a contig to be renamed to a tuple containing the new name and orientation

required
outfile TextIO

file where agp output should be sent

required
agp_rows Sequence[AgpRow]

all input AGP rows

required
Source code in agp/rename.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def run(
    renaming_map: Mapping[str, tuple[str, str]],
    outfile: TextIO,
    agp_rows: Sequence[AgpRow],
):
    """Run the rename module

    Args:
        renaming_map:
            maps the name of a contig to be renamed to a tuple
            containing the new name and orientation
        outfile: file where agp output should be sent
        agp_rows: all input AGP rows
    """
    rows_to_rename: list[AgpRow] = []
    current_object = "none"  # can't use None because dict needs a string
    renamed_objects: set[str] = set()
    for row in agp_rows:
        if isinstance(row, str):
            print(row, file=outfile)
        elif row.object in renaming_map:
            if rows_to_rename and current_object != row.object:
                # we're on a new scaffold that does need to be renamed,
                # but we haven't renamed and printed the last scaffold
                # yet, so let's do that first
                for row_to_print in rename_rows(
                    rows_to_rename, *renaming_map[current_object]
                ):
                    print(row_to_print, file=outfile)
                rows_to_rename = []
                renamed_objects.add(current_object)
            current_object = row.object
            rows_to_rename.append(row)
        else:
            if rows_to_rename and current_object != row.object:
                # we're on a new scaffold that does not need to be
                # renamed, but the previous scaffold does need to
                # be renamed, so let's rename and print it out first
                for row_to_print in rename_rows(
                    rows_to_rename, *renaming_map[current_object]
                ):
                    print(row_to_print, file=outfile)
                rows_to_rename = []
                renamed_objects.add(current_object)
                current_object = "none"
            # now we can print the new row
            print(row, file=outfile)

    if rows_to_rename:
        for row_to_print in rename_rows(rows_to_rename, *renaming_map[row.object]):
            print(row_to_print, file=outfile)
        renamed_objects.add(current_object)

    remaining_objects = renaming_map.keys() - renamed_objects
    if remaining_objects:
        raise ScaffoldNotFoundError(remaining_objects)

agp.split

Functions for splitting a scaffold into subscaffolds at gaps.

ParsingError

Bases: Exception

Raised when breakpoints file is misformatted.

Source code in agp/split.py
12
13
14
15
class ParsingError(Exception):
    """Raised when breakpoints file is misformatted."""

    pass

breakpoints_type(filename)

Argparse type function for breakpoints file: first column is the scaffold name; second column is a comma-separated list of locations within gaps where scaffold should be broken.

Parameters:

Name Type Description Default
filename str

path to the breakpoints file

required

Returns:

Name Type Description
breakpoints Dict[str, List[int]]

a dict mapping scaffold name to a list of breakpoints (int) on that scaffold

Raises:

Type Description
FileNotFoundError

if filename does not point to readable file

ParsingError

if there is an error in the input file's format or content

Source code in agp/split.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def breakpoints_type(filename: str) -> Dict[str, List[int]]:
    """
    Argparse type function for breakpoints file: first column is the
    scaffold name; second column is a comma-separated list of
    locations within gaps where scaffold should be broken.

    Args:
        filename: path to the breakpoints file

    Returns:
        breakpoints: a dict mapping scaffold name to a list
            of breakpoints (int) on that scaffold

    Raises:
        FileNotFoundError: if `filename` does not point to readable file
        ParsingError: if there is an error in the input file's format or
            content
    """
    breakpoints = {}
    with open(filename) as breakpoints_file:
        for i, line in enumerate(breakpoints_file):
            if not empty_line_regex.match(line):
                splits = line.strip().split("\t")
                try:
                    if splits[0] in breakpoints:
                        raise ParsingError(
                            f"{splits[0]} specified multiple times in breakpoints file"
                        )
                    breakpoints[splits[0]] = list(map(int, splits[1].split(",")))
                except (ValueError, IndexError):
                    raise ParsingError(f"Cannot parse line {i} of breakpoints: {line}")
    return breakpoints

convert_rows(rows, subscaffold_counter)

Converts rows that are part of a scaffold into their own standalone scaffold. Changes the positions and part numbers so that the new scaffold starts at 1, and names the new scaffold after the old scaffold, but with '.{subscaffold_counter}' at the end.

Parameters:

Name Type Description Default
rows List[AgpRow]

rows to turn into their own scaffold.

required
subscaffold_counter int

suffix to add to the old scaffold name in order to turn it into the new scaffold name.

required

Returns:

Type Description
List[AgpRow]

the input rows, but with object names, positions, and part

List[AgpRow]

numbers changed so that they now function as a standalone

List[AgpRow]

scaffold

Source code in agp/split.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def convert_rows(rows: List[AgpRow], subscaffold_counter: int) -> List[AgpRow]:
    """
    Converts rows that are part of a scaffold into their own standalone
    scaffold. Changes the positions and part numbers so that the new
    scaffold starts at 1, and names the new scaffold after the old
    scaffold, but with '.{subscaffold_counter}' at the end.

    Args:
        rows: rows to turn into their own scaffold.
        subscaffold_counter: suffix to add to the old scaffold
            name in order to turn it into the new scaffold name.

    Returns:
        the input rows, but with object names, positions, and part
        numbers changed so that they now function as a standalone
        scaffold
    """
    new_scaffold_name = "{}.{}".format(rows[0].object, subscaffold_counter)
    return unoffset_rows(new_scaffold_name, rows)

split_contig(contig_row, breakpoints)

Splits a single row containing a contig into multiple rows, each containing a piece of the contig.

Parameters:

Name Type Description Default
contig_row AgpRow

a single row to be split

required
breakpoints List[int]

positions where contig should be split, in object coordinates, not component coordinates. The left part of the split includes the breakpoint: e.g., splitting a contig of length 100 at 43 will make two new contigs: one from 1-43 and the other from 44-100.

required

Returns:

Type Description
List[AgpRow]

a list of rows of length len(breakpoints) + 1, where each row is

List[AgpRow]

a piece of the contig after splitting. Note that while the new

List[AgpRow]

coordinates are correct in the coordinates of the original

List[AgpRow]

scaffold they came from, the object coordinates still need to be

List[AgpRow]

offset to match the new scaffold they will end up in.

Examples:

>>> import agp
>>> r = agp.AgpRow('\t'.join(map(str, ['scaf', 501, 1000, 5, 'W',
...                                    'ctg', 1, 500, '+'])))
>>> [str(x).split('\t') for x in split_contig(r, [750, 867])]
[['scaf', '501', '750', '5', 'W', 'ctg', '1', '250', '+'],
 ['scaf', '751', '867', '6', 'W', 'ctg', '251', '367', '+'],
 ['scaf', '868', '1000', '7', 'W', 'ctg', '368', '500', '+']]
Source code in agp/split.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def split_contig(contig_row: AgpRow, breakpoints: List[int]) -> List[AgpRow]:
    """
    Splits a single row containing a contig into multiple rows,
    each containing a piece of the contig.

    Args:
        contig_row: a single row to be split
        breakpoints: positions where contig should be split,
            in object coordinates, *not* component coordinates. The left
            part of the split includes the breakpoint: e.g., splitting a
            contig of length 100 at 43 will make two new contigs: one
            from 1-43 and the other from 44-100.

    Returns:
        a list of rows of length len(breakpoints) + 1, where each row is
        a piece of the contig after splitting. Note that while the new
        coordinates are correct in the coordinates of the original
        scaffold they came from, the object coordinates still need to be
        offset to match the new scaffold they will end up in.

    Examples:
        >>> import agp
        >>> r = agp.AgpRow('\\t'.join(map(str, ['scaf', 501, 1000, 5, 'W',
        ...                                    'ctg', 1, 500, '+'])))
        >>> [str(x).split('\\t') for x in split_contig(r, [750, 867])]
        [['scaf', '501', '750', '5', 'W', 'ctg', '1', '250', '+'],
         ['scaf', '751', '867', '6', 'W', 'ctg', '251', '367', '+'],
         ['scaf', '868', '1000', '7', 'W', 'ctg', '368', '500', '+']]
    """
    rows = [contig_row]
    for this_breakpoint in sorted(breakpoints):
        left_part = deepcopy(rows.pop())
        right_part = deepcopy(left_part)

        left_part.object_end = this_breakpoint
        right_part.object_beg = this_breakpoint + 1
        right_part.part_number += 1

        if contig_row.orientation == "+":
            left_part.component_end = left_part.component_beg + (
                this_breakpoint - left_part.object_beg
            )
            right_part.component_beg = left_part.component_end + 1
        else:
            left_part.component_beg = (
                left_part.object_beg + left_part.component_end - this_breakpoint
            )
            right_part.component_end = left_part.component_beg - 1

        rows += [left_part, right_part]
    return rows

split_scaffold(scaffold_rows, breakpoints)

Splits a scaffold at specified breakpoints.

Parameters:

Name Type Description Default
scaffold_rows Iterable[AgpRow]

all the rows for a given scaffold in an AGP file

required
breakpoints List[int]

a list of locations where scaffold should be broken. All locations are specified in genomic coordinates and must be within the boundaries of a gap.

required

Returns:

Name Type Description
broken_rows List[AgpRow]

rows of the input scaffold broken into len(breakpoints)+1 sub-scaffolds, with the gaps containing the breakpoints removed

Source code in agp/split.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def split_scaffold(
    scaffold_rows: Iterable[AgpRow], breakpoints: List[int]
) -> List[AgpRow]:
    """
    Splits a scaffold at specified breakpoints.

    Args:
        scaffold_rows: all the rows for a given scaffold in an AGP file
        breakpoints: a list of locations where scaffold should be
            broken. All locations are specified in genomic coordinates
            and must be within the boundaries of a gap.

    Returns:
        broken_rows: rows of the input scaffold broken
            into len(breakpoints)+1 sub-scaffolds, with the gaps
            containing the breakpoints removed
    """
    out_rows = []
    rows_this_subscaffold: List[AgpRow] = []
    subscaffold_counter = 1
    for row in scaffold_rows:
        if any(map(row.contains, breakpoints)):
            # if the breakpoint is within a gap, our job is simple:
            # just forget about the gap row, output the previous
            # subscaffold, and start a new subscaffold
            if row.is_gap:
                out_rows += convert_rows(rows_this_subscaffold, subscaffold_counter)

                rows_this_subscaffold = []
                subscaffold_counter += 1
            # if the breakpoint is not within a gap, we need to actually
            # break a contig into pieces
            else:
                # split the contig into two or more rows
                contig_rows = split_contig(row, list(filter(row.contains, breakpoints)))

                # the first row goes at the end of the current scaffold
                rows_this_subscaffold.append(contig_rows[0])
                del contig_rows[0]
                out_rows += convert_rows(rows_this_subscaffold, subscaffold_counter)
                subscaffold_counter += 1

                # the last row goes at the beginning of the next
                # scaffold
                rows_this_subscaffold = [contig_rows.pop()]

                # if there are any rows in between, they each get their
                # own subscaffold
                for contig_part in contig_rows:
                    out_rows += convert_rows([contig_part], subscaffold_counter)
                    subscaffold_counter += 1

        else:  # only add this row if there are no breakpoints in it
            rows_this_subscaffold.append(row)

    out_rows += convert_rows(rows_this_subscaffold, subscaffold_counter)

    return out_rows

unoffset_rows(new_scaffold_name, rows)

Modifies some AGP rows so that they can be their own standalone scaffold. This requires changing their object names to a new scaffold name, and changing the part numbers and coordinates such that the first row starts with 1 and the rest follow.

Parameters:

Name Type Description Default
new_scaffold_name str

name for the new scaffold which will replace all 'object' fields

required
rows List[AgpRow]

rows to modify so that they function as a standalone scaffold together. The first row will be used to calculate offsets.

required

Returns:

Name Type Description
out_rows List[AgpRow]

input rows, but with all 'object' fields replaced with new_scaffold_name, and all positions and part numbers modified so that the first row is the beginning of a new scaffold.

Source code in agp/split.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def unoffset_rows(new_scaffold_name: str, rows: List[AgpRow]) -> List[AgpRow]:
    """
    Modifies some AGP rows so that they can be their own standalone
    scaffold. This requires changing their object names to a new
    scaffold name, and changing the part numbers and coordinates such
    that the first row starts with 1 and the rest follow.

    Args:
        new_scaffold_name: name for the new scaffold which will
            replace all 'object' fields
        rows: rows to modify so that they function as a
            standalone scaffold together. The first row will be used to
            calculate offsets.

    Returns:
        out_rows: input rows, but with all 'object'
            fields replaced with new_scaffold_name, and all positions
            and part numbers modified so that the first row is the
            beginning of a new scaffold.
    """
    position_offset = rows[0].object_beg - 1
    out_rows = []
    for i, row in enumerate(rows):
        row.object = new_scaffold_name
        row.object_beg -= position_offset
        row.object_end -= position_offset
        row.part_number = i + 1
        out_rows.append(row)
    return out_rows

agp.transform

Functions for transforming genomic coordinates from contig-based to scaffold-based.

BadOrientationError

Bases: Exception

Error for bad orientation (not +/-)

Source code in agp/transform.py
34
35
36
37
class BadOrientationError(Exception):
    """Error for bad orientation (not +/-)"""

    pass

CoordinateNotFoundError

Bases: Exception

Error for bad coordinates specified

Source code in agp/transform.py
28
29
30
31
class CoordinateNotFoundError(Exception):
    """Error for bad coordinates specified"""

    pass

UnsupportedOperationError

Bases: Exception

Error for trying to transform an unsupported style of bed row

Source code in agp/transform.py
22
23
24
25
class UnsupportedOperationError(Exception):
    """Error for trying to transform an unsupported style of bed row"""

    pass

create_contig_dict(agp_in)

Load the agp file into a dictionary mapping contig name to a list of rows in which that contig is the component_id.

Parameters:

Name Type Description Default
agp_in Iterator[Union[str, AgpRow]]

an agp file to read into a dictionary

required

Returns:

Type Description
ContigDict

a dict mapping contig name to a list of rows in which that

ContigDict

contig is the component_id

Source code in agp/transform.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def create_contig_dict(agp_in: Iterator[Union[str, AgpRow]]) -> ContigDict:
    """
    Load the agp file into a dictionary mapping contig name to a list
    of rows in which that contig is the component_id.

    Args:
        agp_in: an agp file to read into a dictionary

    Returns:
        a dict mapping contig name to a list of rows in which that
        contig is the component_id
    """
    contig_dict: ContigDict = defaultdict(list)
    for row in (r for r in agp_in if isinstance(r, AgpRow) and not r.is_gap):
        contig_dict[row.component_id].append(row)
    return contig_dict

find_agp_row(contig_name, coordinate_on_contig, contig_dict)

Find the agp row containing a coordinate.

Given a dictionary mapping contig name to a list of agp rows containing that contig, and a single position in contig coordinates, find and return the AGP row containing that position.

Parameters:

Name Type Description Default
contig_name str

the contig containing the coordinate

required
coordinate_on_contig int

the position to look for, in contig coordinates

required
contig_dict ContigDict

a dictionary mapping contig name to a list of agp rows containing that contig

required

Returns:

Type Description
AgpRow

a single AGP row containing the requested position

Source code in agp/transform.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def find_agp_row(
    contig_name: str, coordinate_on_contig: int, contig_dict: ContigDict
) -> AgpRow:
    """Find the agp row containing a coordinate.

    Given a dictionary mapping contig name to a list of agp rows
    containing that contig, and a single position in contig coordinates,
    find and return the AGP row containing that position.

    Args:
        contig_name:
            the contig containing the coordinate
        coordinate_on_contig:
            the position to look for, in contig coordinates
        contig_dict:
            a dictionary mapping contig name to a list of agp rows
            containing that contig

    Returns:
        a single AGP row containing the requested position
    """
    contig_list = contig_dict[contig_name]
    if not contig_list:
        raise NoSuchContigError(contig_name)
    for agp_row in contig_list:
        if (
            agp_row.component_beg <= coordinate_on_contig
            and agp_row.component_end >= coordinate_on_contig
        ):
            return agp_row

    raise CoordinateNotFoundError(f"{contig_name}:{coordinate_on_contig}")

transform_bed_row(bed_row, contig_dict)

Transform a bed row to scaffold coordinates

Transform a single row of a bed file from contig coordinates to scaffold coordinates.

Parameters:

Name Type Description Default
bed_row BedRange

the row to transform

required
contig_dict ContigDict

a dictionary mapping contig name to the agp row(s) containing that contig

required

Returns:

Type Description
BedRange

the same bed row, but in scaffold coordinates now

Source code in agp/transform.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def transform_bed_row(bed_row: BedRange, contig_dict: ContigDict) -> BedRange:
    """Transform a bed row to scaffold coordinates

    Transform a single row of a bed file from contig coordinates to
    scaffold coordinates.

    Args:
        bed_row: the row to transform
        contig_dict:
            a dictionary mapping contig name to the agp row(s)
            containing that contig

    Returns:
        the same bed row, but in scaffold coordinates now
    """
    if bed_row.start is None or bed_row.end is None:
        raise UnsupportedOperationError(
            f"Transforming coordinateless bed row: {bed_row}"
        )

    # retrieve the AGP rows containing the bed range's start and end points
    agp_row_start = find_agp_row(bed_row.chrom, bed_row.start, contig_dict)
    agp_row_end = find_agp_row(bed_row.chrom, bed_row.end, contig_dict)

    # only transform bed ranges that are entirely within one scaffold
    if agp_row_start.object == agp_row_end.object:
        bed_row.chrom = agp_row_start.object
        # transform the start and end positions of the bed range to the
        # new coordinate system
        bed_row.start, bed_row.end = sorted(
            [
                transform_single_position(bed_row.start, agp_row_start),
                transform_single_position(bed_row.end, agp_row_end),
            ]
        )
        # flip the orientation of the bed row, if necessary
        if (
            agp_row_start == agp_row_end
            and bed_row.strand
            and agp_row_start.orientation == "-"
        ):
            if bed_row.strand == "+":
                bed_row.strand = "-"
            else:
                bed_row.strand = "+"

    return bed_row

transform_single_position(position, agp_row)

Transform a position from component to object coordinates

Given a position in component coordinates (e.g., a position on a contig), and the agp row containing that position, transform the position to object coordinates (e.g., the same position, but on a scaffold of which that contig is a part).

Parameters:

Name Type Description Default
position int

a position in component/contig coordinates

required
agp_row AgpRow

the row containing that position

required

Returns:

Type Description
int

the same position, but in object/scaffold coordinates

Source code in agp/transform.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def transform_single_position(position: int, agp_row: AgpRow) -> int:
    """Transform a position from component to object coordinates

    Given a position in component coordinates (e.g., a position on a
    contig), and the agp row containing that position, transform the
    position to object coordinates (e.g., the same position, but on
    a scaffold of which that contig is a part).

    Args:
        position: a position in component/contig coordinates
        agp_row: the row containing that position

    Returns:
        the same position, but in object/scaffold coordinates
    """

    # first, offset the position by where it is on this _row_. This
    # is necessary because not all rows contain an entire component/
    # contig. E.g., if this row contains positions 101-200 on the
    # contig, then position 150 on the contig is the 50th base in
    # the piece of the contig enclosed by this row
    position = position - agp_row.component_beg + 1
    if agp_row.orientation == "+":
        # then, offset the position by where the row is on the object/
        # scaffold
        position += agp_row.object_beg - 1
    elif agp_row.orientation == "-":
        position = agp_row.object_end - position + 1
    else:
        raise BadOrientationError(f"Orientation must be +/-: {agp_row}")
    return position