Skip to content

Parser

Parser

Bases: object

Extracts names and affiliations from text.

Source code in ckanext/attribution/commands/migration/parser.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
class Parser(object):
    """
    Extracts names and affiliations from text.
    """

    def __init__(self):
        check_installed(cli_installed)

        self.contributors = {'person': {}, 'org': {}, 'other': {}}
        self.affiliations = {}
        spacy_model = 'en_core_web_trf'
        try:
            self.nlp = spacy.load(spacy_model)
        except OSError:
            spacy.cli.download(spacy_model)
            self.nlp = spacy.load(spacy_model)

    def run(self, txt, pkg_id, contrib_type):
        """
        Run the whole process over a line of text.

        :param txt: the chunk of text to process
        :param pkg_id: associated package
        :param contrib_type: author, contributor, or affiliation
        :returns: list of ParsedSegment instances extracted from the text
        """
        if not self.validate(txt):
            return
        segments = []
        for line in txt.split('\n'):
            line = line.replace('\\r', '')
            sublines = self.split(line)
            # bulk sort for large lists
            if len(sublines) > 20 and click.confirm(
                f'Do you want to set the same type for all {len(sublines)} contributors found in this block?'
            ):
                i = multi_choice(
                    f'What type of contributors are in "{shorten(line, width=50, placeholder="...")}"?',
                    self.contributors.keys(),
                )
                _type = list(self.contributors.keys())[i]
            else:
                _type = None
            for i, subline in enumerate(sublines):
                if subline == '':
                    continue
                if contrib_type != 'affiliation':
                    name, affiliations = self.extract_affiliations(subline)
                    for a in affiliations:
                        self.affiliations[pkg_id] = self.affiliations.get(
                            pkg_id, []
                        ) + [(name.strip(), a)]
                        parsed_affiliation = ParsedSegment(
                            a.strip(),
                            text=a,
                            affiliations=[],
                            packages={'affiliation': (pkg_id, None)},
                        )
                        self.sort_contributor(parsed_affiliation)
                else:
                    name = subline
                    affiliations = []
                order = (i + 1) if contrib_type == 'author' else None
                parsed_segment = ParsedSegment(
                    name=name.strip(),
                    text=subline,
                    affiliations=affiliations,
                    packages={contrib_type: (pkg_id, order)},
                )
                self.sort_contributor(parsed_segment, _type)
                segments.append(parsed_segment)
        return segments

    def validate(self, txt):
        """
        Check the text can/should actually be parsed.

        :returns: True/False
        """
        if txt is None:
            return False
        parsed = self.nlp(txt)
        # check that it has some proper nouns and doesn't just look like a sentence
        tokens = [t for t in parsed]
        pos = Counter([t.pos_ for t in tokens])
        if len(tokens) == 0:
            return []
        pc_proper_nouns = pos.get('PROPN', 0) / len(tokens)
        if pc_proper_nouns < 0.5:
            click.echo("\nThis text doesn't look right:")
            click.echo(txt)
            return not click.confirm('Skip it?')
        return True

    def split(self, txt):
        """
        Uses multiple sub-methods to attempt to split the text into individual
        contributors.

        :param txt: a string containing multiple contributor names
        :returns: a list of names
        """
        lines = [ln.strip() for ln in txt.split('\n')]
        segments = [
            rgx.initials.sub('\\1 ', s).strip() for ln in lines for s in ln.split(';')
        ]
        names = []

        def _process_segment(segment):
            if ';' in segment:
                semicolon_splits = [s.strip() for s in segment.split(';')]
            else:
                semicolon_splits = []
            rgx_splits = rgx.name.findall(segment)
            rv_names = self._split_by_reversed_name(segment)
            nlp_ent_splits = self._split_by_nlp_ents(segment)
            options = []
            printable_options = []
            all_options = [
                nlp_ent_splits,
                rv_names,
                rgx_splits,
                semicolon_splits,
                [segment],
            ]
            all_options = sorted(all_options, key=lambda x: -len(''.join(x)))
            all_options = sorted(all_options, key=lambda x: -len(x))
            for o in all_options:
                if len(o) == 0:
                    continue
                printable = '; '.join(o) + ' ({0} fragments)'.format(len(o))
                if printable in printable_options:
                    continue
                options.append(o)
                printable_options.append(printable)
            options.append(None)
            printable_options.append('None of these')
            if len(options) > 1:
                click.echo(segment)
                ix = multi_choice('Which one looks right?', printable_options)
                return options[ix]

        if len(segments) > 1:
            click.echo(
                f'{len(segments)} found in "{shorten(txt, width=50, placeholder="...")}". e.g.:'
            )
            for s in segments[:5]:
                click.echo('\t' + s)
            if click.confirm('Skip individual processing of these segments?'):
                return segments

        for t in segments:
            splits = None
            while not splits:
                splits = _process_segment(t)
                if splits:
                    names += splits
                else:
                    t = prompt(
                        'Edit the line (try adding ";" between contributors): ',
                        default=t,
                    )
        return names

    def _split_by_nlp_sep(self, txt):
        """
        Finds entities using spacy, then attempts to identify the character(s)
        separating them and splits by that.
        """
        parsed = self.nlp(txt)
        if len(parsed.ents) > 2:
            start_char = parsed.ents[0].end_char
            end_char = parsed.ents[1].start_char
            sep = txt[start_char:end_char].strip()
            if sep == '':
                sep = '\n'
            return [s.strip() for s in txt.split(sep)]
        return [txt]

    def _split_by_nlp_ents(self, txt):
        """
        Extract all entities from the text using spacy.
        """
        parsed = self.nlp(txt)
        return [ent.text for ent in parsed.ents]

    def _split_by_reversed_name(self, txt):
        names = rgx.reversed_name.findall(txt)
        return [', '.join(n) for n in names]

    def extract_affiliations(self, txt):
        """
        Uses regexes to find probable affiliations in parentheses.

        :returns: contributor name, list of affiliations
        """
        has_affiliation = rgx.has_affiliation.match(txt)
        no_affiliation = rgx.no_affiliation.match(txt)
        if has_affiliation is not None:
            return has_affiliation.groups()[0], self.split(has_affiliation.groups()[1])
        if no_affiliation is not None:
            return no_affiliation.groups()[0], []
        else:
            return txt, []

    def sort_contributor(self, c: ParsedSegment, default_type=None):
        """
        Sort a contributor into lists based on agent type.
        """
        name = HumanName(c.name)
        initials = ''.join(rgx.abbr.findall(c.name))
        _type = default_type
        if name.last in self.contributors['person']:
            _type = 'person'
        else:
            for k, v in self.contributors.items():
                if k == 'person':
                    continue
                if initials in v:
                    if any([x.name == c.name for x in v[initials]]):
                        _type = k
        if _type is None:
            i = multi_choice(
                'What type of contributor is "{0}"?'.format(c.name),
                self.contributors.keys(),
            )
            _type = list(self.contributors.keys())[i]
        if _type == 'person':
            if name.last == '':
                name.last = name.first
                name.first = '?'
            if name.first == '':
                name.first = '?'
            c.name = name
            family_name_records = self.contributors[_type].get(name.last, {})
            initial_records = family_name_records.get(name.first[0], []) + [c]
            family_name_records[name.first[0]] = initial_records
            self.contributors[_type][name.last] = family_name_records
        else:
            self.contributors[_type][initials] = self.contributors[_type].get(
                initials, []
            ) + [c]

extract_affiliations(txt)

Uses regexes to find probable affiliations in parentheses.

Returns:

Type Description

contributor name, list of affiliations

Source code in ckanext/attribution/commands/migration/parser.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def extract_affiliations(self, txt):
    """
    Uses regexes to find probable affiliations in parentheses.

    :returns: contributor name, list of affiliations
    """
    has_affiliation = rgx.has_affiliation.match(txt)
    no_affiliation = rgx.no_affiliation.match(txt)
    if has_affiliation is not None:
        return has_affiliation.groups()[0], self.split(has_affiliation.groups()[1])
    if no_affiliation is not None:
        return no_affiliation.groups()[0], []
    else:
        return txt, []

run(txt, pkg_id, contrib_type)

Run the whole process over a line of text.

Parameters:

Name Type Description Default
txt

the chunk of text to process

required
pkg_id

associated package

required
contrib_type

author, contributor, or affiliation

required

Returns:

Type Description

list of ParsedSegment instances extracted from the text

Source code in ckanext/attribution/commands/migration/parser.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def run(self, txt, pkg_id, contrib_type):
    """
    Run the whole process over a line of text.

    :param txt: the chunk of text to process
    :param pkg_id: associated package
    :param contrib_type: author, contributor, or affiliation
    :returns: list of ParsedSegment instances extracted from the text
    """
    if not self.validate(txt):
        return
    segments = []
    for line in txt.split('\n'):
        line = line.replace('\\r', '')
        sublines = self.split(line)
        # bulk sort for large lists
        if len(sublines) > 20 and click.confirm(
            f'Do you want to set the same type for all {len(sublines)} contributors found in this block?'
        ):
            i = multi_choice(
                f'What type of contributors are in "{shorten(line, width=50, placeholder="...")}"?',
                self.contributors.keys(),
            )
            _type = list(self.contributors.keys())[i]
        else:
            _type = None
        for i, subline in enumerate(sublines):
            if subline == '':
                continue
            if contrib_type != 'affiliation':
                name, affiliations = self.extract_affiliations(subline)
                for a in affiliations:
                    self.affiliations[pkg_id] = self.affiliations.get(
                        pkg_id, []
                    ) + [(name.strip(), a)]
                    parsed_affiliation = ParsedSegment(
                        a.strip(),
                        text=a,
                        affiliations=[],
                        packages={'affiliation': (pkg_id, None)},
                    )
                    self.sort_contributor(parsed_affiliation)
            else:
                name = subline
                affiliations = []
            order = (i + 1) if contrib_type == 'author' else None
            parsed_segment = ParsedSegment(
                name=name.strip(),
                text=subline,
                affiliations=affiliations,
                packages={contrib_type: (pkg_id, order)},
            )
            self.sort_contributor(parsed_segment, _type)
            segments.append(parsed_segment)
    return segments

sort_contributor(c, default_type=None)

Sort a contributor into lists based on agent type.

Source code in ckanext/attribution/commands/migration/parser.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def sort_contributor(self, c: ParsedSegment, default_type=None):
    """
    Sort a contributor into lists based on agent type.
    """
    name = HumanName(c.name)
    initials = ''.join(rgx.abbr.findall(c.name))
    _type = default_type
    if name.last in self.contributors['person']:
        _type = 'person'
    else:
        for k, v in self.contributors.items():
            if k == 'person':
                continue
            if initials in v:
                if any([x.name == c.name for x in v[initials]]):
                    _type = k
    if _type is None:
        i = multi_choice(
            'What type of contributor is "{0}"?'.format(c.name),
            self.contributors.keys(),
        )
        _type = list(self.contributors.keys())[i]
    if _type == 'person':
        if name.last == '':
            name.last = name.first
            name.first = '?'
        if name.first == '':
            name.first = '?'
        c.name = name
        family_name_records = self.contributors[_type].get(name.last, {})
        initial_records = family_name_records.get(name.first[0], []) + [c]
        family_name_records[name.first[0]] = initial_records
        self.contributors[_type][name.last] = family_name_records
    else:
        self.contributors[_type][initials] = self.contributors[_type].get(
            initials, []
        ) + [c]

split(txt)

Uses multiple sub-methods to attempt to split the text into individual contributors.

Parameters:

Name Type Description Default
txt

a string containing multiple contributor names

required

Returns:

Type Description

a list of names

Source code in ckanext/attribution/commands/migration/parser.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def split(self, txt):
    """
    Uses multiple sub-methods to attempt to split the text into individual
    contributors.

    :param txt: a string containing multiple contributor names
    :returns: a list of names
    """
    lines = [ln.strip() for ln in txt.split('\n')]
    segments = [
        rgx.initials.sub('\\1 ', s).strip() for ln in lines for s in ln.split(';')
    ]
    names = []

    def _process_segment(segment):
        if ';' in segment:
            semicolon_splits = [s.strip() for s in segment.split(';')]
        else:
            semicolon_splits = []
        rgx_splits = rgx.name.findall(segment)
        rv_names = self._split_by_reversed_name(segment)
        nlp_ent_splits = self._split_by_nlp_ents(segment)
        options = []
        printable_options = []
        all_options = [
            nlp_ent_splits,
            rv_names,
            rgx_splits,
            semicolon_splits,
            [segment],
        ]
        all_options = sorted(all_options, key=lambda x: -len(''.join(x)))
        all_options = sorted(all_options, key=lambda x: -len(x))
        for o in all_options:
            if len(o) == 0:
                continue
            printable = '; '.join(o) + ' ({0} fragments)'.format(len(o))
            if printable in printable_options:
                continue
            options.append(o)
            printable_options.append(printable)
        options.append(None)
        printable_options.append('None of these')
        if len(options) > 1:
            click.echo(segment)
            ix = multi_choice('Which one looks right?', printable_options)
            return options[ix]

    if len(segments) > 1:
        click.echo(
            f'{len(segments)} found in "{shorten(txt, width=50, placeholder="...")}". e.g.:'
        )
        for s in segments[:5]:
            click.echo('\t' + s)
        if click.confirm('Skip individual processing of these segments?'):
            return segments

    for t in segments:
        splits = None
        while not splits:
            splits = _process_segment(t)
            if splits:
                names += splits
            else:
                t = prompt(
                    'Edit the line (try adding ";" between contributors): ',
                    default=t,
                )
    return names

validate(txt)

Check the text can/should actually be parsed.

Returns:

Type Description

True/False

Source code in ckanext/attribution/commands/migration/parser.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def validate(self, txt):
    """
    Check the text can/should actually be parsed.

    :returns: True/False
    """
    if txt is None:
        return False
    parsed = self.nlp(txt)
    # check that it has some proper nouns and doesn't just look like a sentence
    tokens = [t for t in parsed]
    pos = Counter([t.pos_ for t in tokens])
    if len(tokens) == 0:
        return []
    pc_proper_nouns = pos.get('PROPN', 0) / len(tokens)
    if pc_proper_nouns < 0.5:
        click.echo("\nThis text doesn't look right:")
        click.echo(txt)
        return not click.confirm('Skip it?')
    return True