Skip to content

Commit

Permalink
chg: A bit more refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Rafiot committed May 6, 2024
1 parent 94a48a6 commit 2cf5d99
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 29 deletions.
41 changes: 24 additions & 17 deletions pymisp/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
MISPRole, MISPServer, MISPFeed, MISPEventDelegation, MISPCommunity, MISPUserSetting, \
MISPInbox, MISPEventBlocklist, MISPOrganisationBlocklist, MISPEventReport, \
MISPGalaxyCluster, MISPGalaxyClusterRelation, MISPCorrelationExclusion, MISPDecayingModel, \
MISPNote, MISPOpinion, MISPRelationship
MISPNote, MISPOpinion, MISPRelationship, AnalystDataBehaviorMixin
from .abstract import pymisp_json_default, MISPTag, AbstractMISP, describe_types


Expand Down Expand Up @@ -585,24 +585,26 @@ def delete_event_report(self, event_report: MISPEventReport | int | str | UUID,
data['hard'] = 1
r = self._prepare_request('POST', request_url, data=data)
return self._check_json_response(r)

# ## END Event Report ###

# ## BEGIN Analyst Data ###
def get_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationship | int | str | UUID,
# ## BEGIN Analyst Data ###a
def get_analyst_data(self, analyst_data: AnalystDataBehaviorMixin | int | str | UUID,
pythonify: bool = False) -> dict[str, Any] | MISPNote | MISPOpinion | MISPRelationship:
"""Get an analyst data from a MISP instance
:param analyst_data: analyst data to get
:param pythonify: Returns a list of PyMISP Objects instead of the plain json output. Warning: it might use a lot of RAM
"""
type = analyst_data.classObjectType
if isinstance(analyst_data, AnalystDataBehaviorMixin):
analyst_data_type = analyst_data.analyst_data_object_type
else:
analyst_data_type = 'all'
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
r = self._prepare_request('GET', f'analyst_data/view/{type}/{analyst_data_id}')
r = self._prepare_request('GET', f'analyst_data/view/{analyst_data_type}/{analyst_data_id}')
analyst_data_r = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in analyst_data_r:
if not (self.global_pythonify or pythonify) or 'errors' in analyst_data_r or analyst_data_type == 'all':
return analyst_data_r
er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)()
er = type(analyst_data)()
er.from_dict(**analyst_data_r)
return er

Expand All @@ -613,14 +615,13 @@ def add_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelationsh
:param analyst_data: analyst_data to add
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
type = analyst_data.classObjectType
object_uuid = analyst_data.object_uuid
object_type = analyst_data.object_type
r = self._prepare_request('POST', f'analyst_data/add/{type}/{object_uuid}/{object_type}', data=analyst_data)
r = self._prepare_request('POST', f'analyst_data/add/{analyst_data.analyst_data_object_type}/{object_uuid}/{object_type}', data=analyst_data)
new_analyst_data = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in new_analyst_data:
return new_analyst_data
er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)()
er = type(analyst_data)()
er.from_dict(**new_analyst_data)
return er

Expand All @@ -632,14 +633,17 @@ def update_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelatio
:param analyst_data_id: analyst data ID to update
:param pythonify: Returns a PyMISP Object instead of the plain json output
"""
type = analyst_data.classObjectType
if isinstance(analyst_data, AnalystDataBehaviorMixin):
analyst_data_type = analyst_data.analyst_data_object_type
else:
analyst_data_type = 'all'
if analyst_data_id is None:
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
r = self._prepare_request('POST', f'analyst_data/edit/{type}/{analyst_data_id}', data=analyst_data)
r = self._prepare_request('POST', f'analyst_data/edit/{analyst_data_type}/{analyst_data_id}', data=analyst_data)
updated_analyst_data = self._check_json_response(r)
if not (self.global_pythonify or pythonify) or 'errors' in updated_analyst_data:
if not (self.global_pythonify or pythonify) or 'errors' in updated_analyst_data or analyst_data_type == 'all':
return updated_analyst_data
er = {'Note': MISPNote, 'Opinion': MISPOpinion, 'Relationship': MISPRelationship}.get(type, MISPNote)()
er = type(analyst_data)()
er.from_dict(**updated_analyst_data)
return er

Expand All @@ -648,9 +652,12 @@ def delete_analyst_data(self, analyst_data: MISPNote | MISPOpinion | MISPRelatio
:param analyst_data: analyst data to delete
"""
type = analyst_data.classObjectType
if isinstance(analyst_data, AnalystDataBehaviorMixin):
analyst_data_type = analyst_data.analyst_data_object_type
else:
analyst_data_type = 'all'
analyst_data_id = get_uuid_or_id_from_abstract_misp(analyst_data)
request_url = f'analyst_data/delete/{type}/{analyst_data_id}'
request_url = f'analyst_data/delete/{analyst_data_type}/{analyst_data_id}'
r = self._prepare_request('POST', request_url)
return self._check_json_response(r)

Expand Down
28 changes: 16 additions & 12 deletions pymisp/mispevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@ class AnalystDataBehaviorMixin(AbstractMISP):
def __init__(self, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(**kwargs)
self.uuid: str # Created in the child class
self.classObjectType: str # Must be defined in the child class
self._analyst_data_object_type: str # Must be defined in the child class
self.Note: list[MISPNote] = []
self.Opinion: list[MISPOpinion] = []
self.Relationship: list[MISPRelationship] = []

@property
def analyst_data_object_type(self) -> str:
return self._analyst_data_object_type

@property
def notes(self) -> list[MISPNote]:
return self.Note
Expand All @@ -57,7 +61,7 @@ def relationships(self) -> list[MISPRelationship]:
def add_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
the_note = MISPNote()
the_note.from_dict(note=note, language=language,
object_uuid=self.uuid, object_type=self.classObjectType,
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
**kwargs)
self.notes.append(the_note)
self.edited = True
Expand All @@ -66,7 +70,7 @@ def add_note(self, note: str, language: str | None = None, **kwargs) -> MISPNote
def add_opinion(self, opinion: int, comment: str | None = None, **kwargs) -> MISPNote: # type: ignore[no-untyped-def]
the_opinion = MISPOpinion()
the_opinion.from_dict(opinion=opinion, comment=comment,
object_uuid=self.uuid, object_type=self.classObjectType,
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
**kwargs)
self.opinions.append(the_opinion)
self.edited = True
Expand All @@ -76,7 +80,7 @@ def add_relationship(self, related_object_type: AbstractMISP | str, related_obje
the_relationship = MISPRelationship()
the_relationship.from_dict(related_object_type=related_object_type, related_object_uuid=related_object_uuid,
relationship_type=relationship_type,
object_uuid=self.uuid, object_type=self.classObjectType,
object_uuid=self.uuid, object_type=self.analyst_data_object_type,
**kwargs)
self.relationships.append(the_relationship)
self.edited = True
Expand Down Expand Up @@ -303,7 +307,7 @@ class MISPAttribute(AnalystDataBehaviorMixin):
'deleted', 'timestamp', 'to_ids', 'disable_correlation',
'first_seen', 'last_seen'}

classObjectType = 'Attribute'
_analyst_data_object_type = 'Attribute'

def __init__(self, describe_types: dict[str, Any] | None = None, strict: bool = False):
"""Represents an Attribute
Expand Down Expand Up @@ -746,7 +750,7 @@ class MISPObject(AnalystDataBehaviorMixin):
'template_version', 'uuid', 'timestamp', 'comment',
'first_seen', 'last_seen', 'deleted'}

classObjectType = 'Object'
_analyst_data_object_type = 'Object'

def __init__(self, name: str, strict: bool = False, standalone: bool = True, # type: ignore[no-untyped-def]
default_attributes_parameters: dict[str, Any] = {}, **kwargs) -> None:
Expand Down Expand Up @@ -1142,7 +1146,7 @@ def __repr__(self) -> str:
class MISPEventReport(AnalystDataBehaviorMixin):

_fields_for_feed: set[str] = {'uuid', 'name', 'content', 'timestamp', 'deleted'}
classObjectType = 'EventReport'
_analyst_data_object_type = 'EventReport'

timestamp: float | int | datetime

Expand Down Expand Up @@ -1537,7 +1541,7 @@ class MISPEvent(AnalystDataBehaviorMixin):
_fields_for_feed: set[str] = {'uuid', 'info', 'threat_level_id', 'analysis', 'timestamp',
'publish_timestamp', 'published', 'date', 'extends_uuid'}

classObjectType = 'Event'
_analyst_data_object_type = 'Event'

def __init__(self, describe_types: dict[str, Any] | None = None, strict_validation: bool = False, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(**kwargs)
Expand Down Expand Up @@ -2507,7 +2511,7 @@ class MISPNote(AnalystDataBehaviorMixin, MISPAnalystData):

_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'note', 'language'})

classObjectType = 'Note'
_analyst_data_object_type = 'Note'

def __init__(self, **kwargs: dict[str, Any]) -> None:
self.note: str
Expand All @@ -2532,7 +2536,7 @@ class MISPOpinion(AnalystDataBehaviorMixin, MISPAnalystData):

_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'opinion', 'comment'})

classObjectType = 'Opinion'
_analyst_data_object_type = 'Opinion'

def __init__(self, **kwargs: dict[str, Any]) -> None:
self.opinion: int
Expand Down Expand Up @@ -2566,7 +2570,7 @@ class MISPRelationship(AnalystDataBehaviorMixin, MISPAnalystData):

_fields_for_feed: set[str] = MISPAnalystData._fields_for_feed.union({'related_object_uuid', 'related_object_type', 'relationship_type'})

classObjectType = 'Relationship'
_analyst_data_object_type = 'Relationship'

def __init__(self, **kwargs: dict[str, Any]) -> None:
self.related_object_uuid: str
Expand All @@ -2587,7 +2591,7 @@ def from_dict(self, **kwargs) -> None: # type: ignore[no-untyped-def]
raise NewRelationshipError('The target UUID for this relationship is required.')
else:
self.related_object_uuid = self.related_object_type.uuid
self.related_object_type = self.related_object_type.classObjectType
self.related_object_type = self.related_object_type._analyst_data_object_type

if self.related_object_type not in self.valid_object_type:
raise NewAnalystDataError(f'The target object type is not a valid type. Actual: {self.related_object_type}.')
Expand Down

0 comments on commit 2cf5d99

Please sign in to comment.