diff --git a/pymisp/__init__.py b/pymisp/__init__.py index 73f1be84..ca0868d3 100644 --- a/pymisp/__init__.py +++ b/pymisp/__init__.py @@ -39,7 +39,7 @@ def warning_2024() -> None: MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster, - MISPGalaxyClusterElement, MISPGalaxyClusterRelation) + MISPGalaxyClusterElement, MISPGalaxyClusterRelation, MISPNote, MISPOpinion, MISPRelationship) from .api import PyMISP, register_user # noqa # NOTE: the direct imports to .tools are kept for backward compatibility but should be removed in the future from .tools import AbstractMISPObjectGenerator # noqa @@ -76,7 +76,8 @@ def __init__(self, *args, **kwargs): 'MISPEventDelegation', 'MISPUserSetting', 'MISPInbox', 'MISPEventBlocklist', 'MISPOrganisationBlocklist', 'MISPEventReport', 'MISPCorrelationExclusion', 'MISPDecayingModel', 'MISPGalaxy', 'MISPGalaxyCluster', 'MISPGalaxyClusterElement', - 'MISPGalaxyClusterRelation', 'PyMISPError', 'NewEventError', 'NewAttributeError', + 'MISPGalaxyClusterRelation', 'MISPNote', 'MISPOpinion', 'MISPRelationship', + 'PyMISPError', 'NewEventError', 'NewAttributeError', 'NoURL', 'NoKey', 'InvalidMISPObject', 'UnknownMISPObjectTemplate', 'PyMISPInvalidFormat', 'Distribution', 'ThreatLevel', 'Analysis', 'ExpandedPyMISP' ] diff --git a/pymisp/abstract.py b/pymisp/abstract.py index a7ccb8ed..c6d5a38b 100644 --- a/pymisp/abstract.py +++ b/pymisp/abstract.py @@ -8,7 +8,7 @@ from json import JSONEncoder from uuid import UUID from abc import ABCMeta -from enum import Enum, IntEnum +from enum import Enum from typing import Any, Mapping from collections.abc import MutableMapping from functools import lru_cache diff --git a/pymisp/api.py b/pymisp/api.py index f6dd1927..5d7d00a4 100644 --- a/pymisp/api.py +++ b/pymisp/api.py @@ -31,7 +31,8 @@ MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \ MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \ MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \ - MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel + MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel, \ + MISPNote, MISPOpinion, MISPRelationship, AnalystDataBehaviorMixin from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types @@ -584,7 +585,6 @@ def delete_event_report(self, event_report: MISPEventReport | int | str | UUID, data['hard'] = 1 r = self._prepare_request('POST', request_url, data=data) return self._check_json_response(r) - # ## END Event Report ### # ## BEGIN Galaxy Cluster ### @@ -611,15 +611,199 @@ def attach_galaxy_cluster(self, misp_entity: MISPEvent | MISPAttribute, galaxy_c elif isinstance(galaxy_cluster, (int, str)): cluster_id = galaxy_cluster else: - raise PyMISPError('The galaxy_cluster must be MISPGalaxyCluster or the id associated with the cluster (int or str)') + raise PyMISPError('The galaxy_cluster must be MISPGalaxyCluster or the id associated with the cluster (int or str)') - to_post = { 'Galaxy': { 'target_id': cluster_id } } + to_post = {'Galaxy': {'target_id': cluster_id}} url = f'galaxies/attachCluster/{attach_target_id}/{attach_target_type}/local:{local}' r = self._prepare_request('POST', url, data=to_post) return self._check_json_response(r) # ## END Galaxy Cluster ### + # ## BEGIN Analyst Data ###a + def get_analyst_data(self, analyst_data: AnalystDataBehaviorMixin | int | str | UUID, + pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship: + """Get an analyst data from a MISP instance + + :param analyst_data: analyst data to get + :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM + """ + if isinstance(analyst_data, AnalystDataBehaviorMixin): + analyst_data_type = analyst_data.analyst_data_object_type + else: + analyst_data_type = 'all' + analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data) + r = self._prepare_request('GET', f'analyst_data/view/{analyst_data_type}/{analyst_data_id}') + analyst_data_r = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in analyst_data_r or analyst_data_type == 'all': + return analyst_data_r + er = type(analyst_data)() + er.from_dict(**analyst_data_r) + return er + + def add_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship, + pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship: + """Add an analyst data to an existing MISP element + + :param analyst_data: analyst_data to add + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + object_uuid = analyst_data.object_uuid + object_type = analyst_data.object_type + r = self._prepare_request('POST', f'analyst_data/add/{analyst_data.analyst_data_object_type}/{object_uuid}/{object_type}', data=analyst_data) + new_analyst_data = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in new_analyst_data: + return new_analyst_data + er = type(analyst_data)() + er.from_dict(**new_analyst_data) + return er + + def update_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship, analyst_data_id: int | None = None, + pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship: + """Update an analyst data on a MISP instance + + :param analyst_data: analyst data to update + :param analyst_data_id: analyst data ID to update + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + if isinstance(analyst_data, AnalystDataBehaviorMixin): + analyst_data_type = analyst_data.analyst_data_object_type + else: + analyst_data_type = 'all' + if analyst_data_id is None: + analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data) + r = self._prepare_request('POST', f'analyst_data/edit/{analyst_data_type}/{analyst_data_id}', data=analyst_data) + updated_analyst_data = self._check_json_response(r) + if not (self.global_pythonify or pythonify) or 'errors' in updated_analyst_data or analyst_data_type == 'all': + return updated_analyst_data + er = type(analyst_data)() + er.from_dict(**updated_analyst_data) + return er + + def delete_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: + """Delete an analyst data from a MISP instance + + :param analyst_data: analyst data to delete + """ + if isinstance(analyst_data, AnalystDataBehaviorMixin): + analyst_data_type = analyst_data.analyst_data_object_type + else: + analyst_data_type = 'all' + analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data) + request_url = f'analyst_data/delete/{analyst_data_type}/{analyst_data_id}' + r = self._prepare_request('POST', request_url) + return self._check_json_response(r) + + # ## END Analyst Data ### + + # ## BEGIN Analyst Note ### + + def get_note(self, note: MISPNote, pythonify: bool = False) -> dict[str, Any] | MISPNote: + """Get a note from a MISP instance + + :param note: note to get + :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM + """ + return self.get_analyst_data(note, pythonify) + + def add_note(self, note: MISPNote, pythonify: bool = False) -> dict[str, Any] | MISPNote: + """Add a note to an existing MISP element + + :param note: note to add + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + return self.add_analyst_data(note, pythonify) + + def update_note(self, note: MISPNote, note_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPNote: + """Update a note on a MISP instance + + :param note: note to update + :param note_id: note ID to update + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + return self.update_analyst_data(note, note_id, pythonify) + + def delete_note(self, note: MISPNote | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: + """Delete a note from a MISP instance + + :param note: note delete + """ + return self.delete_analyst_data(note) + + # ## END Analyst Note ### + + # ## BEGIN Analyst Opinion ### + + def get_opinion(self, opinion: MISPOpinion, pythonify: bool = False) -> dict[str, Any] | MISPOpinion: + """Get an opinion from a MISP instance + + :param opinion: opinion to get + :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM + """ + return self.get_analyst_data(opinion, pythonify) + + def add_opinion(self, opinion: MISPOpinion, pythonify: bool = False) -> dict[str, Any] | MISPOpinion: + """Add an opinion to an existing MISP element + + :param opinion: opinion to add + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + return self.add_analyst_data(opinion, pythonify) + + def update_opinion(self, opinion: MISPOpinion, opinion_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPOpinion: + """Update an opinion on a MISP instance + + :param opinion: opinion to update + :param opinion_id: opinion ID to update + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + return self.update_analyst_data(opinion, opinion_id, pythonify) + + def delete_opinion(self, opinion: MISPOpinion | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: + """Delete an opinion from a MISP instance + + :param opinion: opinion to delete + """ + return self.delete_analyst_data(opinion) + + # ## END Analyst Opinion ### + + # ## BEGIN Analyst Relationship ### + + def get_relationship(self, relationship: MISPRelationship, pythonify: bool = False) -> dict[str, Any] | MISPRelationship: + """Get a relationship from a MISP instance + + :param relationship: relationship to get + :param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM + """ + return self.get_analyst_data(relationship, pythonify) + + def add_relationship(self, relationship: MISPRelationship, pythonify: bool = False) -> dict[str, Any] | MISPRelationship: + """Add a relationship to an existing MISP element + + :param relationship: relationship to add + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + return self.add_analyst_data(relationship, pythonify) + + def update_relationship(self, relationship: MISPRelationship, relationship_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPRelationship: + """Update a relationship on a MISP instance + + :param relationship: relationship to update + :param relationship_id: relationship ID to update + :param pythonify: Returns a PyMISP Object instead of the plain json output + """ + return self.update_analyst_data(relationship, relationship_id, pythonify) + + def delete_relationship(self, relationship: MISPRelationship | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]: + """Delete a relationship from a MISP instance + + :param relationship: relationship to delete + """ + return self.delete_analyst_data(relationship) + + # ## END Analyst Relationship ### + # ## BEGIN Object ### def get_object(self, misp_object: MISPObject | int | str | UUID, pythonify: bool = False) -> dict[str, Any] | MISPObject: diff --git a/pymisp/exceptions.py b/pymisp/exceptions.py index a0dd7366..071d350d 100644 --- a/pymisp/exceptions.py +++ b/pymisp/exceptions.py @@ -23,6 +23,21 @@ class NewEventReportError(PyMISPError): pass +class NewAnalystDataError(PyMISPError): + pass + + +class NewNoteError(PyMISPError): + pass + + +class NewOpinionError(PyMISPError): + pass + +class NewRelationshipError(PyMISPError): + pass + + class UpdateAttributeError(PyMISPError): pass diff --git a/pymisp/mispevent.py b/pymisp/mispevent.py index 9bab3165..8b108e8b 100644 --- a/pymisp/mispevent.py +++ b/pymisp/mispevent.py @@ -23,13 +23,89 @@ import json from .abstract import AbstractMISP, MISPTag -from .exceptions import (UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject, +from .exceptions import (NewNoteError, NewOpinionError, NewRelationshipError, UnknownMISPObjectTemplate, InvalidMISPGalaxy, InvalidMISPObject, PyMISPError, NewEventError, NewAttributeError, NewEventReportError, - NewGalaxyClusterError, NewGalaxyClusterRelationError) + NewGalaxyClusterError, NewGalaxyClusterRelationError, NewAnalystDataError) logger = logging.getLogger('pymisp') +class AnalystDataBehaviorMixin(AbstractMISP): + + # NOTE: edited here must be the property of Abstract MISP + + def __init__(self, **kwargs) -> None: # type: ignore[no-untyped-def] + super().__init__(**kwargs) + self.uuid: str # Created in the child class + self._analyst_data_object_type: str # Must be defined in the child class + self.Note: list[MISPNote] = [] + self.Opinion: list[MISPOpinion] = [] + self.Relationship: list[MISPRelationship] = [] + + @property + def analyst_data_object_type(self) -> str: + return self._analyst_data_object_type + + @property + def notes(self) -> list[MISPNote]: + return self.Note + + @property + def opinions(self) -> list[MISPOpinion]: + return self.Opinion + + @property + def relationships(self) -> list[MISPRelationship]: + return self.Relationship + + def add_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] + the_note = MISPNote() + the_note.from_dict(note=note, language=language, + object_uuid=self.uuid, object_type=self.analyst_data_object_type, + **kwargs) + self.notes.append(the_note) + self.edited = True + return the_note + + def add_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] + the_opinion = MISPOpinion() + the_opinion.from_dict(opinion=opinion, comment=comment, + object_uuid=self.uuid, object_type=self.analyst_data_object_type, + **kwargs) + self.opinions.append(the_opinion) + self.edited = True + return the_opinion + + def add_relationship(self, related_object_type: AbstractMISP | str, related_object_uuid: str | None, relationship_type: str, **kwargs) -> MISPNote: # type: ignore[no-untyped-def] + the_relationship = MISPRelationship() + the_relationship.from_dict(related_object_type=related_object_type, related_object_uuid=related_object_uuid, + relationship_type=relationship_type, + object_uuid=self.uuid, object_type=self.analyst_data_object_type, + **kwargs) + self.relationships.append(the_relationship) + self.edited = True + return the_relationship + + def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] + # These members need a fully initialized class to be loaded properly + notes = kwargs.pop('Note', []) + opinions = kwargs.pop('Opinion', []) + relationships = kwargs.pop('Relationship', []) + super().from_dict(**kwargs) + for note in notes: + note.pop('object_uuid', None) + note.pop('object_type', None) + self.add_note(**note) + for opinion in opinions: + opinion.pop('object_uuid', None) + opinion.pop('object_type', None) + self.add_opinion(**opinion) + for relationship in relationships: + relationship.pop('object_uuid', None) + relationship.pop('object_type', None) + self.add_relationship(**relationship) + + try: from dateutil.parser import parse except ImportError: @@ -226,11 +302,13 @@ def __repr__(self) -> str: return f'<{self.__class__.__name__}(NotInitialized)' -class MISPAttribute(AbstractMISP): +class MISPAttribute(AnalystDataBehaviorMixin): _fields_for_feed: set[str] = {'uuid', 'value', 'category', 'type', 'comment', 'data', 'deleted', 'timestamp', 'to_ids', 'disable_correlation', 'first_seen', 'last_seen'} + _analyst_data_object_type = 'Attribute' + def __init__(self, describe_types: dict[str, Any] | None = None, strict: bool = False): """Represents an Attribute @@ -666,12 +744,14 @@ def __repr__(self) -> str: return f'<{self.__class__.__name__}(NotInitialized)' -class MISPObject(AbstractMISP): +class MISPObject(AnalystDataBehaviorMixin): _fields_for_feed: set[str] = {'name', 'meta-category', 'description', 'template_uuid', 'template_version', 'uuid', 'timestamp', 'comment', 'first_seen', 'last_seen', 'deleted'} + _analyst_data_object_type = 'Object' + def __init__(self, name: str, strict: bool = False, standalone: bool = True, # type: ignore[no-untyped-def] default_attributes_parameters: dict[str, Any] = {}, **kwargs) -> None: ''' Master class representing a generic MISP object @@ -1063,12 +1143,17 @@ def __repr__(self) -> str: return f'<{self.__class__.__name__}(NotInitialized)' -class MISPEventReport(AbstractMISP): +class MISPEventReport(AnalystDataBehaviorMixin): _fields_for_feed: set[str] = {'uuid', 'name', 'content', 'timestamp', 'deleted'} + _analyst_data_object_type = 'EventReport' timestamp: float | int | datetime + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.uuid: str = str(uuid.uuid4()) + def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] if 'EventReport' in kwargs: kwargs = kwargs['EventReport'] @@ -1451,11 +1536,13 @@ def __repr__(self) -> str: return f'<{self.__class__.__name__}(NotInitialized)' -class MISPEvent(AbstractMISP): +class MISPEvent(AnalystDataBehaviorMixin): _fields_for_feed: set[str] = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp', 'publish_timestamp', 'published', 'date', 'extends_uuid'} + _analyst_data_object_type = 'Event' + def __init__(self, describe_types: dict[str, Any] | None = None, strict_validation: bool = False, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(**kwargs) self.__schema_file = 'schema.json' if strict_validation else 'schema-lax.json' @@ -2318,3 +2405,200 @@ def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] def __repr__(self) -> str: return f'<{self.__class__.__name__}(uuid={self.uuid})>' + + +class MISPAnalystData(AbstractMISP): + + _fields_for_feed: set[str] = {'uuid', 'object_uuid', 'object_type', 'authors', + 'created', 'distribution', 'sharing_group_id', 'note_type_name'} + + valid_object_type = {'Attribute', 'Event', 'EventReport', 'GalaxyCluster', 'Galaxy', + 'Object', 'Note', 'Opinion', 'Relationship', 'Organisation', + 'SharingGroup'} + + @property + def org(self) -> MISPOrganisation: + return self.Org + + @property + def orgc(self) -> MISPOrganisation: + return self.Orgc + + @orgc.setter + def orgc(self, orgc: MISPOrganisation) -> None: + if isinstance(orgc, MISPOrganisation): + self.Orgc = orgc + else: + raise PyMISPError('Orgc must be of type MISPOrganisation.') + + def __new__(cls, *args, **kwargs): + if cls is MISPAnalystData: + raise TypeError(f"only children of '{cls.__name__}' may be instantiated") + return object.__new__(cls) + + def __init__(self, **kwargs: dict[str, Any]) -> None: + super().__init__(**kwargs) + self.uuid = str(uuid.uuid4()) + self.object_uuid: str + self.object_type: str + self.authors: str + self.created: float | int | datetime + self.modified: float | int | datetime + self.SharingGroup: MISPSharingGroup + + def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] + self.distribution = kwargs.pop('distribution', None) + if self.distribution is not None: + self.distribution = int(self.distribution) + if self.distribution not in [0, 1, 2, 3, 4, 5]: + raise NewAnalystDataError(f'{self.distribution} is invalid, the distribution has to be in 0, 1, 2, 3, 4, 5') + + if kwargs.get('sharing_group_id'): + self.sharing_group_id = int(kwargs.pop('sharing_group_id')) + + if self.distribution == 4: + # The distribution is set to sharing group, a sharing_group_id is required. + if not hasattr(self, 'sharing_group_id'): + raise NewAnalystDataError('If the distribution is set to sharing group, a sharing group ID is required.') + elif not self.sharing_group_id: + # Cannot be None or 0 either. + raise NewAnalystDataError(f'If the distribution is set to sharing group, a sharing group ID is required (cannot be {self.sharing_group_id}).') + + self.object_uuid = kwargs.pop('object_uuid', None) + if self.object_uuid is None: + raise NewAnalystDataError('The UUID for which this element is attached is required.') + self.object_type = kwargs.pop('object_type', None) + if self.object_type is None: + raise NewAnalystDataError('The element type for which this element is attached is required.') + if self.object_type not in self.valid_object_type: + raise NewAnalystDataError('The element type is not a valid type. Actual: {self.object_type}.') + + if kwargs.get('id'): + self.id = int(kwargs.pop('id')) + if kwargs.get('created'): + ts = kwargs.pop('created') + if isinstance(ts, datetime): + self.created = ts + else: + self.created = datetime.fromisoformat(ts + '+00:00') # Force UTC TZ + if kwargs.get('modified'): + ts = kwargs.pop('modified') + if isinstance(ts, datetime): + self.modified = ts + else: + self.modified = datetime.fromisoformat(ts + '+00:00') # Force UTC TZ + + if kwargs.get('Org'): + self.Org = MISPOrganisation() + self.Org.from_dict(**kwargs.pop('Org')) + if kwargs.get('Orgc'): + self.Orgc = MISPOrganisation() + self.Orgc.from_dict(**kwargs.pop('Orgc')) + if kwargs.get('SharingGroup'): + self.SharingGroup = MISPSharingGroup() + self.SharingGroup.from_dict(**kwargs.pop('SharingGroup')) + + super().from_dict(**kwargs) + + def _set_default(self) -> None: + if not hasattr(self, 'created'): + self.created = datetime.timestamp(datetime.now()) + if not hasattr(self, 'modified'): + self.modified = self.created + + +class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData): + + _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'note', 'language'}) + + _analyst_data_object_type = 'Note' + + def __init__(self, **kwargs: dict[str, Any]) -> None: + self.note: str + self.language: str + super().__init__(**kwargs) + + def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] + if 'Note' in kwargs: + kwargs = kwargs['Note'] + self.note = kwargs.pop('note', None) + if self.note is None: + raise NewNoteError('The text note of the note is required.') + super().from_dict(**kwargs) + + def __repr__(self) -> str: + if hasattr(self, 'note'): + return '<{self.__class__.__name__}(note={self.note})'.format(self=self) + return f'<{self.__class__.__name__}(NotInitialized)' + + +class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData): + + _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'opinion', 'comment'}) + + _analyst_data_object_type = 'Opinion' + + def __init__(self, **kwargs: dict[str, Any]) -> None: + self.opinion: int + self.comment: str + super().__init__(**kwargs) + + def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] + if 'Opinion' in kwargs: + kwargs = kwargs['Opinion'] + self.opinion = kwargs.pop('opinion', None) + if self.opinion is not None: + self.opinion = int(self.opinion) + if not (0 <= self.opinion <= 100): + raise NewOpinionError('The opinion value must be between 0 and 100 included.') + else: + raise NewOpinionError('The opinion value is required.') + + self.comment = kwargs.pop('comment', None) + if self.comment is None: + raise NewOpinionError('The text comment is required.') + + return super().from_dict(**kwargs) + + def __repr__(self) -> str: + if hasattr(self, 'opinion'): + return '<{self.__class__.__name__}([opinion={self.opinion}] comment={self.comment})'.format(self=self) + return f'<{self.__class__.__name__}(NotInitialized)' + + +class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData): + + _fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'related_object_uuid', 'related_object_type', 'relationship_type'}) + + _analyst_data_object_type = 'Relationship' + + def __init__(self, **kwargs: dict[str, Any]) -> None: + self.related_object_uuid: str + self.related_object_type: str + self.relationship_type: str + super().__init__(**kwargs) + + def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def] + if 'Relationship' in kwargs: + kwargs = kwargs['Relationship'] + self.related_object_type = kwargs.pop('related_object_type', None) + if self.related_object_type is None: + raise NewRelationshipError('The target object type for this relationship is required.') + + self.related_object_uuid = kwargs.pop('related_object_uuid', None) + if self.related_object_uuid is None: + if not isinstance(self.related_object_type, AbstractMISP): + raise NewRelationshipError('The target UUID for this relationship is required.') + else: + self.related_object_uuid = self.related_object_type.uuid + self.related_object_type = self.related_object_type._analyst_data_object_type + + if self.related_object_type not in self.valid_object_type: + raise NewAnalystDataError(f'The target object type is not a valid type. Actual: {self.related_object_type}.') + + return super().from_dict(**kwargs) + + def __repr__(self) -> str: + if hasattr(self, 'related_object_uuid') and hasattr(self, 'object_uuid'): + return '<{self.__class__.__name__}(object_uuid={self.object_uuid}, related_object_type={self.related_object_type}, related_object_uuid={self.related_object_uuid}, relationship_type={self.relationship_type})'.format(self=self) + return f'<{self.__class__.__name__}(NotInitialized)' diff --git a/tests/testlive_comprehensive.py b/tests/testlive_comprehensive.py index d6323042..a79afcff 100644 --- a/tests/testlive_comprehensive.py +++ b/tests/testlive_comprehensive.py @@ -25,7 +25,8 @@ MISPAttribute, MISPSighting, MISPShadowAttribute, MISPTag, MISPSharingGroup, MISPFeed, MISPServer, MISPUserSetting, MISPEventReport, MISPCorrelationExclusion, MISPGalaxyCluster, - MISPGalaxy, MISPOrganisationBlocklist, MISPEventBlocklist) + MISPGalaxy, MISPOrganisationBlocklist, MISPEventBlocklist, + MISPNote) from pymisp.tools import CSVLoader, DomainIPObject, ASNObject, GenericObjectGenerator except ImportError: raise @@ -3198,7 +3199,7 @@ def test_event_galaxy(self) -> None: self.admin_misp_connector.toggle_global_pythonify() def test_attach_galaxy_cluster(self) -> None: - event = self.create_simple_event() + event = self.create_simple_event() event = self.admin_misp_connector.add_event(event, pythonify=True) try: galaxies: list[MISPGalaxy] = self.admin_misp_connector.galaxies(pythonify=True) @@ -3234,6 +3235,142 @@ def test_attach_galaxy_cluster(self) -> None: self.admin_misp_connector.delete_event(event) self.admin_misp_connector.toggle_global_pythonify() + def test_analyst_data_CRUD(self) -> None: + event = self.create_simple_event() + self.admin_misp_connector.toggle_global_pythonify() + try: + fake_uuid = str(uuid4()) + new_note1 = MISPNote() + new_note1.object_type = 'Event' + new_note1.object_uuid = fake_uuid + new_note1.note = 'Fake note' + new_note1 = self.user_misp_connector.add_note(new_note1) + # The Note should be linked even for non-existing data + self.assertTrue(new_note1.object_uuid == fake_uuid) + + new_note1.note = "Updated Note" + new_note1 = self.user_misp_connector.update_note(new_note1) + # The Note should be updatable + self.assertTrue(new_note1.note == "Updated Note") + + # The Note should be able to get an Opinion + new_opinion = new_note1.add_opinion(42, 'Test Opinion') + new_note1 = self.user_misp_connector.update_note(new_note1) + # Fetch newly added node + new_note1 = self.user_misp_connector.get_note(new_note1) + # The Opinion shoud be able to be created via the Note + self.assertTrue(new_note1.opinions[0].opinion == new_opinion.opinion) + + response = self.user_misp_connector.delete_note(new_note1) + # The Note should be deletable + self.assertTrue(response['success']) + self.assertEqual(response['message'], 'Note deleted.') + # The Opinion should not be deleted + opinion_resp = self.user_misp_connector.get_opinion(new_opinion) + self.assertTrue(opinion_resp.opinion == new_opinion.opinion) + + new_note: MISPNote = event.add_note(note='Test Note', language='en') + new_note.distribution = 1 # Community + event = self.user_misp_connector.add_event(event) + # The note should be linked by Event UUID + self.assertEqual(new_note.object_type, 'Event') + self.assertTrue(new_note.object_uuid == event.uuid) + + event = self.user_misp_connector.get_event(event) + # The Note should be present on the event + self.assertTrue(event.notes[0].object_uuid == event.uuid) + + finally: + self.admin_misp_connector.delete_event(event) + try: + self.admin_misp_connector.delete_opinion(new_opinion) + self.admin_misp_connector.delete_note(new_note) + self.admin_misp_connector.delete_note(new_note1) # Should already be deleted + except Exception: + pass + + def test_analyst_data_ACL(self) -> None: + event = self.create_simple_event() + event.distribution = 2 + sg = MISPSharingGroup() + sg.name = 'Testcases SG' + sg.releasability = 'Testing' + sharing_group = self.admin_misp_connector.add_sharing_group(sg, pythonify=True) + # Chec that sharing group was created + self.assertEqual(sharing_group.name, 'Testcases SG') + self.admin_misp_connector.toggle_global_pythonify() + + try: + new_note: MISPNote = event.add_note(note='Test Note', language='en') + new_note.distribution = 0 # Org only + event = self.admin_misp_connector.add_event(event) + + # The note should be linked by Event UUID + self.assertEqual(new_note.object_type, 'Event') + self.assertEqual(event.uuid, new_note.object_uuid) + + event = self.admin_misp_connector.get_event(event) + # The note should be visible for the creator + self.assertEqual(len(event.notes), 1) + self.assertTrue(new_note.note == "Test Note") + + resp = self.user_misp_connector.get_note(new_note) + # The note should not be visible to another org + self.assertTrue(len(resp), 0) + + event = self.user_misp_connector.get_event(event) + # The Note attached to the event should not be visible for another org than the creator + self.assertEqual(len(event.Note), 0) + + new_note = self.admin_misp_connector.get_note(new_note) + new_note.distribution = 4 + new_note.sharing_group_id = sharing_group.id + new_note = self.admin_misp_connector.update_note(new_note) + self.assertEqual(int(new_note.sharing_group_id), int(sharing_group.id)) + + event = self.user_misp_connector.get_event(event) + # The Note attached to the event should not be visible for another org not part of the sharing group + self.assertEqual(len(event.Note), 0) + + # Add org to the sharing group + r = self.admin_misp_connector.add_org_to_sharing_group(sharing_group, + self.test_org, extend=True) + self.assertEqual(r['name'], 'Organisation added to the sharing group.') + + event = self.user_misp_connector.get_event(event) + # The Note attached to the event should now be visible + self.assertEqual(len(event.Note), 1) + + new_note.note = "Updated Note" + resp = self.user_misp_connector.update_note(new_note) + # The Note should not be updatable by another organisation + self.assertTrue(resp['errors']) + + resp = self.user_misp_connector.delete_note(new_note) + # The Note should not be deletable by another organisation + self.assertTrue(resp['errors']) + + organisation = MISPOrganisation() + organisation.name = 'Fake Org' + fake_org = self.admin_misp_connector.add_organisation(organisation, pythonify=True) + new_note_2 = new_note.add_note('Test Note 2') + new_note_2.orgc_uuid = fake_org.uuid + new_note_2 = self.user_misp_connector.add_note(new_note_2) + # Regular user should not be able to create a note on behalf of another organisation + self.assertFalse(new_note_2.orgc_uuid == fake_org.uuid) + # Note should have the orgc set to the use's organisation for non-privileged users + self.assertTrue(new_note_2.orgc_uuid == self.test_org.uuid) + + finally: + self.admin_misp_connector.delete_event(event) + try: + pass + self.admin_misp_connector.delete_sharing_group(sharing_group.id) + self.admin_misp_connector.delete_organisation(fake_org) + self.admin_misp_connector.delete_note(new_note) + except Exception: + pass + @unittest.skip("Internal use only") def missing_methods(self) -> None: skip = [