Source code for atramhasis.validators

# -*- coding: utf-8 -*-
"""
Module that validates incoming JSON.
"""

import copy

import bleach
import colander
from language_tags import tags
from skosprovider_sqlalchemy.models import (
    Language
)
from sqlalchemy.orm.exc import NoResultFound

from atramhasis.errors import ValidationError


[docs]class Label(colander.MappingSchema): label = colander.SchemaNode( colander.String() ) type = colander.SchemaNode( colander.String() ) language = colander.SchemaNode( colander.String() )
[docs]def html_preparer(value): ''' Prepare the value by stripping all html except certain tags. :param value: The value to be cleaned. :rtype: str ''' try: return bleach.clean(value, tags=['strong', 'em', 'a'], strip=True) except TypeError as e: # Trying to clean a non-string # Ignore for now so it can be caught later on return value
[docs]class Note(colander.MappingSchema): note = colander.SchemaNode( colander.String(), preparer=html_preparer ) type = colander.SchemaNode( colander.String() ) language = colander.SchemaNode( colander.String() )
[docs]class Source(colander.MappingSchema): citation = colander.SchemaNode( colander.String(), preparer=html_preparer )
[docs]class Labels(colander.SequenceSchema): label = Label()
[docs]class Notes(colander.SequenceSchema): note = Note()
[docs]class Sources(colander.SequenceSchema): source = Source()
[docs]class RelatedConcept(colander.MappingSchema): id = colander.SchemaNode( colander.Int() )
[docs]class Concepts(colander.SequenceSchema): concept = RelatedConcept()
[docs]class MatchList(colander.SequenceSchema): match = colander.SchemaNode( colander.String(), missing=None )
[docs]class Matches(colander.MappingSchema): broad = MatchList(missing=[]) close = MatchList(missing=[]) exact = MatchList(missing=[]) narrow = MatchList(missing=[]) related = MatchList(missing=[])
[docs]class Concept(colander.MappingSchema): id = colander.SchemaNode( colander.Int(), missing=None ) type = colander.SchemaNode( colander.String(), missing='concept' ) labels = Labels(missing=[]) notes = Notes(missing=[]) sources = Sources(missing=[]) broader = Concepts(missing=[]) narrower = Concepts(missing=[]) related = Concepts(missing=[]) members = Concepts(missing=[]) member_of = Concepts(missing=[]) subordinate_arrays = Concepts(missing=[]) superordinates = Concepts(missing=[]) matches = Matches(missing={}) infer_concept_relations = colander.SchemaNode( colander.Boolean(), missing=colander.drop )
[docs]class ConceptScheme(colander.MappingSchema): labels = Labels(missing=[]) notes = Notes(missing=[]) sources = Sources(missing=[])
[docs]class LanguageTag(colander.MappingSchema): id = colander.SchemaNode( colander.String() ) name = colander.SchemaNode( colander.String() )
[docs]def concept_schema_validator(node, cstruct): """ This validator validates an incoming concept or collection This validator will run a list of rules against the concept or collection to see that there are no validation rules being broken. :param colander.SchemaNode node: The schema that's being used while validating. :param cstruct: The concept or collection being validated. """ request = node.bindings['request'] skos_manager = request.data_managers['skos_manager'] languages_manager = request.data_managers['languages_manager'] conceptscheme_id = node.bindings['conceptscheme_id'] concept_type = cstruct['type'] id = cstruct['id'] narrower = None broader = None related = None members = None member_of = None r_validated = False n_validated = False b_validated = False m_validated = False o_validated = False errors = [] min_labels_rule(errors, node, cstruct) if 'labels' in cstruct: labels = copy.deepcopy(cstruct['labels']) label_type_rule(errors, node, skos_manager, labels) label_lang_rule(errors, node, languages_manager, labels) max_preflabels_rule(errors, node, labels) if 'related' in cstruct: related = copy.deepcopy(cstruct['related']) related = [m['id'] for m in related] r_validated = semantic_relations_rule(errors, node['related'], skos_manager, conceptscheme_id, related, id) concept_relations_rule(errors, node['related'], related, concept_type) if 'narrower' in cstruct: narrower = copy.deepcopy(cstruct['narrower']) narrower = [m['id'] for m in narrower] n_validated = semantic_relations_rule(errors, node['narrower'], skos_manager, conceptscheme_id, narrower, id) concept_relations_rule(errors, node['narrower'], narrower, concept_type) if 'broader' in cstruct: broader = copy.deepcopy(cstruct['broader']) broader = [m['id'] for m in broader] b_validated = semantic_relations_rule(errors, node['broader'], skos_manager, conceptscheme_id, broader, id) concept_relations_rule(errors, node['broader'], broader, concept_type) if 'members' in cstruct: members = copy.deepcopy(cstruct['members']) members = [m['id'] for m in members] m_validated = semantic_relations_rule(errors, node['members'], skos_manager, conceptscheme_id, members, id) if 'member_of' in cstruct: member_of = copy.deepcopy(cstruct['member_of']) member_of = [m['id'] for m in member_of] o_validated = semantic_relations_rule(errors, node['member_of'], skos_manager, conceptscheme_id, member_of, id) if r_validated and n_validated and b_validated: concept_type_rule(errors, node['narrower'], skos_manager, conceptscheme_id, narrower) narrower_hierarchy_rule(errors, node['narrower'], skos_manager, conceptscheme_id, cstruct) concept_type_rule(errors, node['broader'], skos_manager, conceptscheme_id, broader) broader_hierarchy_rule(errors, node['broader'], skos_manager, conceptscheme_id, cstruct) concept_type_rule(errors, node['related'], skos_manager, conceptscheme_id, related) if m_validated and o_validated: members_only_in_collection_rule(errors, node['members'], concept_type, members) collection_members_unique_rule(errors, node['members'], members) collection_type_rule(errors, node['member_of'], skos_manager, conceptscheme_id, member_of) memberof_hierarchy_rule(errors, node['member_of'], skos_manager, conceptscheme_id, cstruct) members_hierarchy_rule(errors, node['members'], skos_manager, conceptscheme_id, cstruct) if 'matches' in cstruct: matches = copy.deepcopy(cstruct['matches']) concept_matches_rule(errors, node['matches'], matches, concept_type) concept_matches_unique_rule(errors, node['matches'], matches) if 'subordinate_arrays' in cstruct: subordinate_arrays = copy.deepcopy(cstruct['subordinate_arrays']) subordinate_arrays = [m['id'] for m in subordinate_arrays] subordinate_arrays_only_in_concept_rule(errors, node['subordinate_arrays'], concept_type, subordinate_arrays) subordinate_arrays_type_rule(errors, node['subordinate_arrays'], skos_manager, conceptscheme_id, subordinate_arrays) subordinate_arrays_hierarchy_rule(errors, node['subordinate_arrays'], skos_manager, conceptscheme_id, cstruct) if 'superordinates' in cstruct: superordinates = copy.deepcopy(cstruct['superordinates']) superordinates = [m['id'] for m in superordinates] superordinates_only_in_concept_rule(errors, node['superordinates'], concept_type, superordinates) superordinates_type_rule(errors, node['superordinates'], skos_manager, conceptscheme_id, superordinates) superordinates_hierarchy_rule(errors, node['superordinates'], skos_manager, conceptscheme_id, cstruct) if cstruct['type'] == 'concept' and 'infer_concept_relations' in cstruct: msg = "'infer_concept_relations' can only be set for collections." errors.append(colander.Invalid(node['infer_concept_relations'], msg=msg)) if len(errors) > 0: raise ValidationError( 'Concept could not be validated', [e.asdict() for e in errors] )
[docs]def conceptscheme_schema_validator(node, cstruct): """ This validator validates the incoming conceptscheme labels :param colander.SchemaNode node: The schema that's being used while validating. :param cstruct: The conceptscheme being validated. """ request = node.bindings['request'] skos_manager = request.data_managers['skos_manager'] languages_manager = request.data_managers['languages_manager'] errors = [] min_labels_rule(errors, node, cstruct) if 'labels' in cstruct: labels = copy.deepcopy(cstruct['labels']) label_type_rule(errors, node, skos_manager, labels) label_lang_rule(errors, node, languages_manager, labels) max_preflabels_rule(errors, node, labels) if len(errors) > 0: raise ValidationError( 'ConceptScheme could not be validated', [e.asdict() for e in errors] )
[docs]def concept_relations_rule(errors, node_location, relations, concept_type): """ Checks that only concepts have narrower, broader and related relations. """ if relations is not None and len(relations) > 0 and concept_type != 'concept': errors.append(colander.Invalid( node_location, 'Only concepts can have narrower/broader/related relations' ))
[docs]def max_preflabels_rule(errors, node, labels): """ Checks that there's only one prefLabel for a certain language. """ preflabel_found = [] for label in labels: if label['type'] == 'prefLabel': if label['language'] in preflabel_found: errors.append(colander.Invalid( node['labels'], 'Only one prefLabel per language allowed.' )) else: preflabel_found.append(label['language'])
[docs]def min_labels_rule(errors, node, cstruct): """ Checks that a label or collection always has a least one label. """ if 'labels' in cstruct: labels = copy.deepcopy(cstruct['labels']) if len(labels) == 0: errors.append(colander.Invalid( node['labels'], 'At least one label is necessary' ))
[docs]def label_type_rule(errors, node, skos_manager, labels): """ Checks that a label has the correct type. """ label_types = skos_manager.get_all_label_types() label_types = [label_type.name for label_type in label_types] for label in labels: if label['type'] not in label_types: errors.append(colander.Invalid( node['labels'], 'Invalid labeltype.' ))
[docs]def label_lang_rule(errors, node, languages_manager, labels): """ Checks that languages of a label are valid. Checks that they are valid IANA language tags. If the language tag was not already present in the database, it adds them. """ for label in labels: language_tag = label['language'] if not tags.check(language_tag): errors.append(colander.Invalid( node['labels'], 'Invalid language tag: %s' % ", ".join([err.message for err in tags.tag(language_tag).errors]) )) else: languages_present = languages_manager.count_languages(language_tag) if not languages_present: descriptions = ', '.join(tags.description(language_tag)) language_item = Language(id=language_tag, name=descriptions) languages_manager.save(language_item)
[docs]def concept_type_rule(errors, node_location, skos_manager, conceptscheme_id, items): """ Checks that the targets of narrower, broader and related are concepts and not collections. """ for item_concept_id in items: item_concept = skos_manager.get_thing(item_concept_id, conceptscheme_id) if item_concept.type != 'concept': errors.append(colander.Invalid( node_location, 'A narrower, broader or related concept should always be a concept, not a collection' ))
[docs]def collection_type_rule(errors, node_location, skos_manager, conceptscheme_id, members): """ Checks that the targets of member_of are collections and not concepts. """ for member_collection_id in members: member_collection = skos_manager.get_thing(member_collection_id, conceptscheme_id) if member_collection.type != 'collection': errors.append(colander.Invalid( node_location, 'A member_of parent should always be a collection' ))
[docs]def semantic_relations_rule(errors, node_location, skos_manager, conceptscheme_id, members, collection_id): """ Checks that the elements in a group of concepts or collections are not the the group itself, that they actually exist and are within the same conceptscheme. """ for member_concept_id in members: if member_concept_id == collection_id: errors.append(colander.Invalid( node_location, 'A concept or collection cannot be related to itself' )) return False try: skos_manager.get_thing(member_concept_id, conceptscheme_id) except NoResultFound: errors.append(colander.Invalid( node_location, 'Concept not found, check concept_id. Please be aware members should be within one scheme' )) return False return True
def hierarchy_build(skos_manager, conceptscheme_id, property_list, property_hierarchy, property_concept_type, property_list_name): for property_concept_id in property_list: try: property_concept = skos_manager.get_thing(property_concept_id, conceptscheme_id) except NoResultFound: property_concept = None if property_concept is not None and ( property_concept.type == property_concept_type or property_concept_type is None): property_concepts = [n.concept_id for n in getattr(property_concept, property_list_name)] for members_id in property_concepts: property_hierarchy.append(members_id) hierarchy_build(skos_manager, conceptscheme_id, property_concepts, property_hierarchy, property_concept_type, property_list_name)
[docs]def hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, property1, property2, property2_list_name, concept_type, error_message): """ Checks that the property1 of a concept are not already in property2 hierarchy """ property2_hierarchy = [] property1_list = [] if property1 in cstruct: property1_value = copy.deepcopy(cstruct[property1]) property1_list = [m['id'] for m in property1_value] if property2 in cstruct: property2_value = copy.deepcopy(cstruct[property2]) property2_list = [m['id'] for m in property2_value] property2_hierarchy = property2_list hierarchy_build(skos_manager, conceptscheme_id, property2_list, property2_hierarchy, concept_type, property2_list_name) for broader_concept_id in property1_list: if broader_concept_id in property2_hierarchy: errors.append(colander.Invalid( node_location, error_message ))
[docs]def broader_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct): """ Checks that the broader concepts of a concepts are not alreadt narrower concepts of that concept. """ hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'broader', 'narrower', 'narrower_concepts', 'concept', 'The broader concept of a concept must not itself be a narrower concept of the concept being edited.' )
[docs]def narrower_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct): """ Checks that the narrower concepts of a concept are not already broader concepts of that concept. """ hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'narrower', 'broader', 'broader_concepts', 'concept', 'The narrower concept of a concept must not itself be a broader concept of the concept being edited.' )
[docs]def collection_members_unique_rule(errors, node_location, members): """ Checks that a collection has no duplicate members. """ if len(members) > len(set(members)): errors.append(colander.Invalid( node_location, 'All members of a collection should be unique.' ))
[docs]def members_only_in_collection_rule(errors, node, concept_type, members): """ Checks that only collections have members. """ if concept_type != 'collection' and len(members) > 0: errors.append(colander.Invalid( node, 'Only collections can have members.' ))
def memberof_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct): hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'member_of', 'members', 'members', 'collection', 'The parent member_of collection of a concept must not itself be a member of the concept being edited.' )
[docs]def members_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct): """ Checks that a collection does not have members that are in themselves already "parents" of that collection. """ hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'members', 'member_of', 'member_of', 'collection', 'The item of a members collection must not itself be a parent of the concept/collection being edited.' )
[docs]def concept_matches_rule(errors, node_location, matches, concept_type): """ Checks that only concepts have matches. """ if matches is not None and len(matches) > 0 and concept_type != 'concept': errors.append(colander.Invalid( node_location, 'Only concepts can have matches' ))
[docs]def concept_matches_unique_rule(errors, node_location, matches): """ Checks that a concept has not duplicate matches. This means that a concept can only have one match (no matter what the type) with another concept. We don't allow eg. a concept that has both a broadMatch and a relatedMatch with the same concept. """ if matches is not None: uri_list = [] for matchtype in matches: uri_list.extend([uri for uri in matches[matchtype]]) if len(uri_list) > len(set(uri_list)): errors.append(colander.Invalid( node_location, 'All matches of a concept should be unique.' ))
[docs]def languagetag_validator(node, cstruct): """ This validator validates a languagetag. The validator will check if a tag is a valid IANA language tag. The the validator is informed that this should be a new language tag, it will also check if the tag doesn't already exist. :param colander.SchemaNode node: The schema that's being used while validating. :param cstruct: The value being validated. """ request = node.bindings['request'] languages_manager = request.data_managers['languages_manager'] new = node.bindings['new'] errors = [] language_tag = cstruct['id'] if new: languagetag_checkduplicate(node['id'], language_tag, languages_manager, errors) languagetag_isvalid_rule(node['id'], language_tag, errors) if len(errors) > 0: raise ValidationError( 'Language could not be validated', [e.asdict() for e in errors] )
[docs]def languagetag_isvalid_rule(node, language_tag, errors): """ Check that a languagetag is a valid IANA language tag. """ if not tags.check(language_tag): errors.append(colander.Invalid( node, 'Invalid language tag: %s' % ", ".join([err.message for err in tags.tag(language_tag).errors]) ))
[docs]def languagetag_checkduplicate(node, language_tag, languages_manager, errors): """ Check that a languagetag isn't duplicated. """ language_present = languages_manager.count_languages(language_tag) if language_present: errors.append(colander.Invalid( node, 'Duplicate language tag: %s' % language_tag) )
[docs]def subordinate_arrays_only_in_concept_rule(errors, node, concept_type, subordinate_arrays): """ Checks that only a concept has subordinate arrays. """ if concept_type != 'concept' and len(subordinate_arrays) > 0: errors.append(colander.Invalid( node, 'Only concept can have subordinate arrays.' ))
[docs]def subordinate_arrays_type_rule(errors, node_location, skos_manager, conceptscheme_id, subordinate_arrays): """ Checks that subordinate arrays are always collections. """ for subordinate_id in subordinate_arrays: subordinate = skos_manager.get_thing(subordinate_id, conceptscheme_id) if subordinate.type != 'collection': errors.append(colander.Invalid( node_location, 'A subordinate array should always be a collection' ))
[docs]def subordinate_arrays_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct): """ Checks that the subordinate arrays of a concept are not themselves parents of that concept. """ hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'subordinate_arrays', 'member_of', 'members', 'collection', 'The subordinate_array collection of a concept must not itself be a parent of the concept being edited.' )
[docs]def superordinates_only_in_concept_rule(errors, node, concept_type, superordinates): """ Checks that only collections have superordinates. """ if concept_type != 'collection' and len(superordinates) > 0: errors.append(colander.Invalid( node, 'Only collection can have superordinates.' ))
[docs]def superordinates_type_rule(errors, node_location, skos_manager, conceptscheme_id, superordinates): """ Checks that superordinates are always concepts. """ for superordinate_id in superordinates: superordinate = skos_manager.get_thing(superordinate_id, conceptscheme_id) if superordinate.type != 'concept': errors.append(colander.Invalid( node_location, 'A superordinate should always be a concept' ))
[docs]def superordinates_hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct): """ Checks that the superordinate concepts of a collection are not themselves members of that collection. """ hierarchy_rule(errors, node_location, skos_manager, conceptscheme_id, cstruct, 'superordinates', 'members', 'members', 'collection', 'The superordinates of a collection must not itself be a member of the collection being edited.' )