Skip to content

Commit

Permalink
Merge branch 'wip_analystdata'
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafiot committed May 22, 2024
2 parents 5a5d580 + 2cf5d99 commit feadede
Show file tree
Hide file tree
Showing 6 changed files with 636 additions and 15 deletions.
5 changes: 3 additions & 2 deletions pymisp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def warning_2024() -> None:
MISPNoticelist, MISPObjectTemplate, MISPSharingGroup, MISPRole, MISPServer, MISPFeed,
MISPEventDelegation, MISPUserSetting, MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist,
MISPEventReport, MISPCorrelationExclusion, MISPDecayingModel, MISPGalaxy, MISPGalaxyCluster,
MISPGalaxyClusterElement, MISPGalaxyClusterRelation)
MISPGalaxyClusterElement, MISPGalaxyClusterRelation, MISPNote, MISPOpinion, MISPRelationship)
from .api import PyMISP, register_user # noqa
# NOTE: the direct imports to .tools are kept for backward compatibility but should be removed in the future
from .tools import AbstractMISPObjectGenerator # noqa
Expand Down Expand Up @@ -76,7 +76,8 @@ def __init__(self, *args, **kwargs):
'MISPEventDelegation', 'MISPUserSetting', 'MISPInbox', 'MISPEventBlocklist',
'MISPOrganisationBlocklist', 'MISPEventReport', 'MISPCorrelationExclusion',
'MISPDecayingModel', 'MISPGalaxy', 'MISPGalaxyCluster', 'MISPGalaxyClusterElement',
'MISPGalaxyClusterRelation', 'PyMISPError', 'NewEventError', 'NewAttributeError',
'MISPGalaxyClusterRelation', 'MISPNote', 'MISPOpinion', 'MISPRelationship',
'PyMISPError', 'NewEventError', 'NewAttributeError',
'NoURL', 'NoKey', 'InvalidMISPObject', 'UnknownMISPObjectTemplate', 'PyMISPInvalidFormat',
'Distribution', 'ThreatLevel', 'Analysis', 'ExpandedPyMISP'
]
2 changes: 1 addition & 1 deletion pymisp/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from json import JSONEncoder
from uuid import UUID
from abc import ABCMeta
from enum import Enum, IntEnum
from enum import Enum
from typing import Any, Mapping
from collections.abc import MutableMapping
from functools import lru_cache
Expand Down
192 changes: 188 additions & 4 deletions pymisp/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
MISPGalaxy, MISPNoticelist, MISPObjectReference, MISPObjectTemplate, MISPSharingGroup, \
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel, \
MISPNote, MISPOpinion, MISPRelationship, AnalystDataBehaviorMixin
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types


Expand Down Expand Up @@ -584,7 +585,6 @@ def delete_event_report(self, event_report: MISPEventReport | int | str | UUID,
data['hard'] = 1
r = self._prepare_request('POST', request_url, data=data)
return self._check_json_response(r)

# ## END Event Report ###

# ## BEGIN Galaxy Cluster ###
Expand All @@ -611,15 +611,199 @@ def attach_galaxy_cluster(self, misp_entity: MISPEvent | MISPAttribute, galaxy_c
elif isinstance(galaxy_cluster, (int, str)):
cluster_id = galaxy_cluster
else:
raise PyMISPError('The galaxy_cluster must be MISPGalaxyCluster or the id associated with the cluster (int or str)')
raise PyMISPError('The galaxy_cluster must be MISPGalaxyCluster or the id associated with the cluster (int or str)')

to_post = { 'Galaxy': { 'target_id': cluster_id } }
to_post = {'Galaxy': {'target_id': cluster_id}}
url = f'galaxies/attachCluster/{attach_target_id}/{attach_target_type}/local:{local}'

r = self._prepare_request('POST', url, data=to_post)
return self._check_json_response(r)
# ## END Galaxy Cluster ###

# ## BEGIN Analyst Data ###a
def get_analyst_data(self, analyst_data: AnalystDataBehaviorMixin | int | str | UUID,
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
"""Get an analyst data from a MISP instance
:param analyst_data: analyst data to get
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
if isinstance(analyst_data, AnalystDataBehaviorMixin):
analyst_data_type = analyst_data.analyst_data_object_type
else:
analyst_data_type = 'all'
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
r = self._prepare_request('GET', f'analyst_data/view/{analyst_data_type}/{analyst_data_id}')
analyst_data_r = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in analyst_data_r or analyst_data_type == 'all':
return analyst_data_r
er = type(analyst_data)()
er.from_dict(**analyst_data_r)
return er

def add_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship,
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
"""Add an analyst data to an existing MISP element
:param analyst_data: analyst_data to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
object_uuid = analyst_data.object_uuid
object_type = analyst_data.object_type
r = self._prepare_request('POST', f'analyst_data/add/{analyst_data.analyst_data_object_type}/{object_uuid}/{object_type}', data=analyst_data)
new_analyst_data = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in new_analyst_data:
return new_analyst_data
er = type(analyst_data)()
er.from_dict(**new_analyst_data)
return er

def update_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship, analyst_data_id: int | None = None,
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
"""Update an analyst data on a MISP instance
:param analyst_data: analyst data to update
:param analyst_data_id: analyst data ID to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
if isinstance(analyst_data, AnalystDataBehaviorMixin):
analyst_data_type = analyst_data.analyst_data_object_type
else:
analyst_data_type = 'all'
if analyst_data_id is None:
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
r = self._prepare_request('POST', f'analyst_data/edit/{analyst_data_type}/{analyst_data_id}', data=analyst_data)
updated_analyst_data = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in updated_analyst_data or analyst_data_type == 'all':
return updated_analyst_data
er = type(analyst_data)()
er.from_dict(**updated_analyst_data)
return er

def delete_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
"""Delete an analyst data from a MISP instance
:param analyst_data: analyst data to delete
"""
if isinstance(analyst_data, AnalystDataBehaviorMixin):
analyst_data_type = analyst_data.analyst_data_object_type
else:
analyst_data_type = 'all'
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
request_url = f'analyst_data/delete/{analyst_data_type}/{analyst_data_id}'
r = self._prepare_request('POST', request_url)
return self._check_json_response(r)

# ## END Analyst Data ###

# ## BEGIN Analyst Note ###

def get_note(self, note: MISPNote, pythonify: bool = False) -> dict[str, Any] | MISPNote:
"""Get a note from a MISP instance
:param note: note to get
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
return self.get_analyst_data(note, pythonify)

def add_note(self, note: MISPNote, pythonify: bool = False) -> dict[str, Any] | MISPNote:
"""Add a note to an existing MISP element
:param note: note to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.add_analyst_data(note, pythonify)

def update_note(self, note: MISPNote, note_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPNote:
"""Update a note on a MISP instance
:param note: note to update
:param note_id: note ID to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.update_analyst_data(note, note_id, pythonify)

def delete_note(self, note: MISPNote | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
"""Delete a note from a MISP instance
:param note: note delete
"""
return self.delete_analyst_data(note)

# ## END Analyst Note ###

# ## BEGIN Analyst Opinion ###

def get_opinion(self, opinion: MISPOpinion, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
"""Get an opinion from a MISP instance
:param opinion: opinion to get
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
return self.get_analyst_data(opinion, pythonify)

def add_opinion(self, opinion: MISPOpinion, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
"""Add an opinion to an existing MISP element
:param opinion: opinion to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.add_analyst_data(opinion, pythonify)

def update_opinion(self, opinion: MISPOpinion, opinion_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPOpinion:
"""Update an opinion on a MISP instance
:param opinion: opinion to update
:param opinion_id: opinion ID to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.update_analyst_data(opinion, opinion_id, pythonify)

def delete_opinion(self, opinion: MISPOpinion | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
"""Delete an opinion from a MISP instance
:param opinion: opinion to delete
"""
return self.delete_analyst_data(opinion)

# ## END Analyst Opinion ###

# ## BEGIN Analyst Relationship ###

def get_relationship(self, relationship: MISPRelationship, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
"""Get a relationship from a MISP instance
:param relationship: relationship to get
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
return self.get_analyst_data(relationship, pythonify)

def add_relationship(self, relationship: MISPRelationship, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
"""Add a relationship to an existing MISP element
:param relationship: relationship to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.add_analyst_data(relationship, pythonify)

def update_relationship(self, relationship: MISPRelationship, relationship_id: int | None = None, pythonify: bool = False) -> dict[str, Any] | MISPRelationship:
"""Update a relationship on a MISP instance
:param relationship: relationship to update
:param relationship_id: relationship ID to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
return self.update_analyst_data(relationship, relationship_id, pythonify)

def delete_relationship(self, relationship: MISPRelationship | int | str | UUID) -> dict[str, Any] | list[dict[str, Any]]:
"""Delete a relationship from a MISP instance
:param relationship: relationship to delete
"""
return self.delete_analyst_data(relationship)

# ## END Analyst Relationship ###

# ## BEGIN Object ###

def get_object(self, misp_object: MISPObject | int | str | UUID, pythonify: bool = False) -> dict[str, Any] | MISPObject:
Expand Down
15 changes: 15 additions & 0 deletions pymisp/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,21 @@ class NewEventReportError(PyMISPError):
pass


class NewAnalystDataError(PyMISPError):
pass


class NewNoteError(PyMISPError):
pass


class NewOpinionError(PyMISPError):
pass

class NewRelationshipError(PyMISPError):
pass


class UpdateAttributeError(PyMISPError):
pass

Expand Down
Loading

0 comments on commit feadede

Please sign in to comment.