Skip to content

Combiner

Combiner

Bases: object

Combines names extracted by a Parser.

Source code in ckanext/attribution/commands/migration/combiner.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class Combiner(object):
    """
    Combines names extracted by a Parser.
    """

    def __init__(self, parser):
        self.contributors = parser.contributors
        self.affiliations = parser.affiliations

    def separate(self, group):
        """
        Ensure that the automated grouping is correct.

        :param group: a list of ParsedSegment instances that are probably the same
            contributor
        :returns: a list of lists of ParsedSegments
        """
        all_names = sorted(
            list(set([str(x.name) for x in group])), key=lambda x: -len(x)
        )
        if len(all_names) > 1:
            same = click.confirm(
                'Are these all the same contributor?\n\t{0}\n'.format(
                    '\n\t'.join(all_names)
                ),
                default=True,
            )
            if not same:
                subgroups = {}
                for n in all_names:
                    v = [x for x in group if x.name == n]
                    if len(subgroups) == 0:
                        subgroups[n] = v
                        continue
                    matches = [m[0] for m in process.extract(n, list(subgroups.keys()))]
                    ix = multi_choice(
                        'Is "{0}" the same as any of these contributors?'.format(n),
                        matches + ['None of these'],
                        default=len(matches),
                    )
                    k = matches[ix] if ix < len(matches) else n
                    subgroups[k] = subgroups.get(k, []) + v
                return list(subgroups.values())
        return [group]

    def combine(self, group, agent_type, name_func=None):
        all_names = [x.name for x in group]
        _contrib_dicts = sorted(
            [(ct, pkgs) for c in group for ct, pkgs in c.packages.items()],
            key=lambda x: x[0],
        )
        _grouped_contribs = itertools.groupby(_contrib_dicts, key=lambda x: x[0])
        contrib = {
            'agent_type': agent_type,
            'all_names': [str(n) for n in all_names],
            'affiliations': list(set([a for x in group for a in x.affiliations])),
            'packages': {
                contrib_type: list(set([pkgid for ct, pkgid in v]))
                for contrib_type, v in _grouped_contribs
            },
        }
        if name_func is None:
            longest_name = sorted(list(set(all_names)), key=lambda x: -len(x))[
                0
            ].strip()
            name = {'name': longest_name}
        else:
            name = name_func(all_names)
        contrib.update(name)
        contrib['key'] = self._get_key(contrib)
        return contrib

    def combine_person_names(self, names):
        """
        Uses a list of HumanNames to determine the longest possible name for a person.

        :returns: a dict of family_name, given_names (includes middle names), and key
            (i.e. a sort/display name)
        """
        check_installed(cli_installed)

        def _filter_diacritics(name_list):
            filtered = [n for n in name_list if unidecode(n) != n]
            if len(filtered) > 0:
                return filtered
            else:
                return name_list

        given = []
        family = []
        for n in names:
            given.append(' '.join([n.first, n.middle]))
            family.append(n.last)
        given = list(set(given))
        family = list(set(family))

        # use longest family name
        family_name = sorted(_filter_diacritics(family), key=lambda x: -len(x))[
            0
        ].strip()
        # given names are more complicated
        # remove empty strings and split into parts
        given = [re.split(r'\s+', m) for m in list(set(given)) if m != '']
        given_parts = {}
        for m in given:
            for i, x in enumerate(m):
                given_parts[i] = given_parts.get(i, []) + [x]
        given_names = ' '.join(
            [
                sorted(_filter_diacritics(p), key=lambda x: -len(x))[0]
                for p in given_parts.values()
            ]
        ).strip()
        combined = {'family_name': family_name, 'given_names': given_names}
        return combined

    def update_affiliations(self, contributor):
        """
        Update the self.affiliations dict to ensure the names are consistent.

        :param contributor: contributor dict
        """
        no_affiliations = len(contributor.get('affiliations', [])) == 0
        is_not_affiliation = len(contributor['packages'].get('affiliations', [])) == 0
        if no_affiliations and is_not_affiliation:
            return
        all_packages = [
            pkg_id for x in contributor['packages'].values() for pkg_id in x
        ]
        for pkg in all_packages:
            items = self.affiliations.get(pkg[0])
            if items is None:
                continue
            updated_items = []
            for name, affiliation in items:
                if name in contributor['all_names']:
                    updated_items.append((contributor['key'], affiliation))
                elif affiliation in contributor['all_names']:
                    updated_items.append((name, contributor['key']))
                else:
                    updated_items.append((name, affiliation))
            self.affiliations[pkg[0]] = updated_items

    def run(self):
        """
        Run the combiner over the whole parser list, including separating groups,
        combining names, searching APIs, and updating the affiliations dict.

        :returns: a list of contributors
        """
        combined = []

        for g in [
            grp
            for family_name, initials_list in self.contributors['person'].items()
            for initial, grp in initials_list.items()
        ]:
            for person in self.separate(g):
                c = self.combine(person, 'person', self.combine_person_names)
                if c is not None:
                    combined.append(c)
                    self.update_affiliations(c)
        for abbr, g in self.contributors['org'].items():
            for org in self.separate(g):
                c = self.combine(org, 'org', None)
                if c is not None:
                    combined.append(c)
                    self.update_affiliations(c)
        for abbr, g in self.contributors['other'].items():
            for o in self.separate(g):
                c = self.combine(o, 'other')
                if c is not None:
                    combined.append(c)
                    self.update_affiliations(c)
        return combined

    def _get_key(self, contrib_dict):
        if contrib_dict['agent_type'] == 'person':
            return contrib_dict['family_name'] + ', ' + contrib_dict['given_names']
        else:
            return contrib_dict['name']

combine_person_names(names)

Uses a list of HumanNames to determine the longest possible name for a person.

Returns:

Type Description

a dict of family_name, given_names (includes middle names), and key (i.e. a sort/display name)

Source code in ckanext/attribution/commands/migration/combiner.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def combine_person_names(self, names):
    """
    Uses a list of HumanNames to determine the longest possible name for a person.

    :returns: a dict of family_name, given_names (includes middle names), and key
        (i.e. a sort/display name)
    """
    check_installed(cli_installed)

    def _filter_diacritics(name_list):
        filtered = [n for n in name_list if unidecode(n) != n]
        if len(filtered) > 0:
            return filtered
        else:
            return name_list

    given = []
    family = []
    for n in names:
        given.append(' '.join([n.first, n.middle]))
        family.append(n.last)
    given = list(set(given))
    family = list(set(family))

    # use longest family name
    family_name = sorted(_filter_diacritics(family), key=lambda x: -len(x))[
        0
    ].strip()
    # given names are more complicated
    # remove empty strings and split into parts
    given = [re.split(r'\s+', m) for m in list(set(given)) if m != '']
    given_parts = {}
    for m in given:
        for i, x in enumerate(m):
            given_parts[i] = given_parts.get(i, []) + [x]
    given_names = ' '.join(
        [
            sorted(_filter_diacritics(p), key=lambda x: -len(x))[0]
            for p in given_parts.values()
        ]
    ).strip()
    combined = {'family_name': family_name, 'given_names': given_names}
    return combined

run()

Run the combiner over the whole parser list, including separating groups, combining names, searching APIs, and updating the affiliations dict.

Returns:

Type Description

a list of contributors

Source code in ckanext/attribution/commands/migration/combiner.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def run(self):
    """
    Run the combiner over the whole parser list, including separating groups,
    combining names, searching APIs, and updating the affiliations dict.

    :returns: a list of contributors
    """
    combined = []

    for g in [
        grp
        for family_name, initials_list in self.contributors['person'].items()
        for initial, grp in initials_list.items()
    ]:
        for person in self.separate(g):
            c = self.combine(person, 'person', self.combine_person_names)
            if c is not None:
                combined.append(c)
                self.update_affiliations(c)
    for abbr, g in self.contributors['org'].items():
        for org in self.separate(g):
            c = self.combine(org, 'org', None)
            if c is not None:
                combined.append(c)
                self.update_affiliations(c)
    for abbr, g in self.contributors['other'].items():
        for o in self.separate(g):
            c = self.combine(o, 'other')
            if c is not None:
                combined.append(c)
                self.update_affiliations(c)
    return combined

separate(group)

Ensure that the automated grouping is correct.

Parameters:

Name Type Description Default
group

a list of ParsedSegment instances that are probably the same contributor

required

Returns:

Type Description

a list of lists of ParsedSegments

Source code in ckanext/attribution/commands/migration/combiner.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def separate(self, group):
    """
    Ensure that the automated grouping is correct.

    :param group: a list of ParsedSegment instances that are probably the same
        contributor
    :returns: a list of lists of ParsedSegments
    """
    all_names = sorted(
        list(set([str(x.name) for x in group])), key=lambda x: -len(x)
    )
    if len(all_names) > 1:
        same = click.confirm(
            'Are these all the same contributor?\n\t{0}\n'.format(
                '\n\t'.join(all_names)
            ),
            default=True,
        )
        if not same:
            subgroups = {}
            for n in all_names:
                v = [x for x in group if x.name == n]
                if len(subgroups) == 0:
                    subgroups[n] = v
                    continue
                matches = [m[0] for m in process.extract(n, list(subgroups.keys()))]
                ix = multi_choice(
                    'Is "{0}" the same as any of these contributors?'.format(n),
                    matches + ['None of these'],
                    default=len(matches),
                )
                k = matches[ix] if ix < len(matches) else n
                subgroups[k] = subgroups.get(k, []) + v
            return list(subgroups.values())
    return [group]

update_affiliations(contributor)

Update the self.affiliations dict to ensure the names are consistent.

Parameters:

Name Type Description Default
contributor

contributor dict

required
Source code in ckanext/attribution/commands/migration/combiner.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def update_affiliations(self, contributor):
    """
    Update the self.affiliations dict to ensure the names are consistent.

    :param contributor: contributor dict
    """
    no_affiliations = len(contributor.get('affiliations', [])) == 0
    is_not_affiliation = len(contributor['packages'].get('affiliations', [])) == 0
    if no_affiliations and is_not_affiliation:
        return
    all_packages = [
        pkg_id for x in contributor['packages'].values() for pkg_id in x
    ]
    for pkg in all_packages:
        items = self.affiliations.get(pkg[0])
        if items is None:
            continue
        updated_items = []
        for name, affiliation in items:
            if name in contributor['all_names']:
                updated_items.append((contributor['key'], affiliation))
            elif affiliation in contributor['all_names']:
                updated_items.append((name, contributor['key']))
            else:
                updated_items.append((name, affiliation))
        self.affiliations[pkg[0]] = updated_items