From bbf1d4da2b0f29cf45f9af425cc21b11ae407f85 Mon Sep 17 00:00:00 2001 From: Johann POLEWCZYK Date: Thu, 9 Jan 2025 19:43:28 +0100 Subject: [PATCH] Update does_column_exist_in_db function --- scripts/artifacts/burner.py | 10 ++-- scripts/artifacts/calendarAll.py | 2 +- scripts/artifacts/chrome.py | 2 +- scripts/artifacts/knowledgeC.py | 4 +- scripts/artifacts/notes.py | 6 +-- scripts/artifacts/reminders.py | 4 +- scripts/artifacts/tcc.py | 6 +-- scripts/artifacts/vipps.py | 23 ++------- scripts/artifacts/vippsContacts.py | 23 ++------- scripts/artifacts/wire.py | 79 +++++++++++++++++++----------- scripts/ilapfuncs.py | 3 +- 11 files changed, 76 insertions(+), 86 deletions(-) diff --git a/scripts/artifacts/burner.py b/scripts/artifacts/burner.py index cad4a0da..ff1ea09d 100644 --- a/scripts/artifacts/burner.py +++ b/scripts/artifacts/burner.py @@ -129,7 +129,7 @@ def get_burner_accounts(files_found, report_folder, seeker, wrap_text, timezone_ db = open_sqlite_db_readonly(file_found) cursor = db.cursor() - if does_column_exist_in_db(db, 'ZBURNER', 'ZUSER'): + if does_column_exist_in_db(file_found, 'ZBURNER', 'ZUSER'): cursor.execute(''' SELECT U.Z_PK, @@ -302,7 +302,7 @@ def get_burner_numbers(files_found, report_folder, seeker, wrap_text, timezone_o report_file = file_found db = open_sqlite_db_readonly(file_found) cursor = db.cursor() - if does_column_exist_in_db(db, 'ZBURNER', 'ZUSER'): + if does_column_exist_in_db(file_found, 'ZBURNER', 'ZUSER'): cursor.execute(''' SELECT B.Z_PK, @@ -428,17 +428,17 @@ def get_burner_messages(files_found, report_folder, seeker, wrap_text, timezone_ report_file = file_found if (file_found == 'Unknown') else report_file + ', ' + file_found db = open_sqlite_db_readonly(file_found) # 5.4.11 - if does_column_exist_in_db(db, 'ZMESSAGE', 'ZCONVERSATIONID'): + if does_column_exist_in_db(file_found, 'ZMESSAGE', 'ZCONVERSATIONID'): extra_join_thread = ' OR (M.ZMESSAGETHREAD IS NULL AND M.ZBURNERID = MT.ZBURNERID AND M.ZCONVERSATIONID = MT.ZCONVERSATIONID)' # 5.3.8 - elif does_column_exist_in_db(db, 'ZMESSAGE', 'M.ZCONTACTID'): + elif does_column_exist_in_db(file_found, 'ZMESSAGE', 'M.ZCONTACTID'): extra_join_thread = ' OR (M.ZMESSAGETHREAD IS NULL AND M.ZBURNERID = MT.ZBURNERID AND coalesce(M.ZCONTACTID, M.ZCONTACTPHONENUMBER) = MT.ZCONTACTPHONENUMBER)' # 4.0.18, 4.3.3 else: extra_join_thread = '' # OR (M.ZMESSAGETHREAD IS NULL AND M.ZBURNERID = MT.ZBURNERID AND M.ZCONTACTPHONENUMBER = MT.ZCONTACTPHONENUMBER)' # 5.3.8, 5.4.11 - if does_column_exist_in_db(db, 'ZMESSAGE', 'ZCONTACTID'): + if does_column_exist_in_db(file_found, 'ZMESSAGE', 'ZCONTACTID'): extra_join_contact = ' OR (MT.ZCONTACT IS NULL AND coalesce(M.ZCONTACTID, M.ZCONTACTPHONENUMBER) = C.ZPHONENUMBER)' # 4.0.18 e 4.3.3 else: diff --git a/scripts/artifacts/calendarAll.py b/scripts/artifacts/calendarAll.py index edd5d8d6..c5a8e128 100644 --- a/scripts/artifacts/calendarAll.py +++ b/scripts/artifacts/calendarAll.py @@ -173,7 +173,7 @@ def calendarEvents(files_found, report_folder, seeker, wrap_text, timezone_offse GROUP BY Attachment.owner_id) AS 'Attachments', ''' - conference_url_detected_exists = does_column_exist_in_db(db, 'CalendarItem', 'conference_url_detected') + conference_url_detected_exists = does_column_exist_in_db(file_found, 'CalendarItem', 'conference_url_detected') conference_url = f"CalendarItem.conference_url{'_detected' if conference_url_detected_exists else ''} AS 'Conference URL'," diff --git a/scripts/artifacts/chrome.py b/scripts/artifacts/chrome.py index 07de8642..24e57fe3 100644 --- a/scripts/artifacts/chrome.py +++ b/scripts/artifacts/chrome.py @@ -584,7 +584,7 @@ def chromeDownloads(files_found, report_folder, seeker, wrap_text, timezone_offs #Downloads # check for last_access_time column, an older version of chrome db (32) does not have it - if does_column_exist_in_db(db, 'downloads', 'last_access_time') == True: + if does_column_exist_in_db(file_found, 'downloads', 'last_access_time') == True: last_access_time_query = ''' CASE last_access_time WHEN "0" diff --git a/scripts/artifacts/knowledgeC.py b/scripts/artifacts/knowledgeC.py index e961a4d7..f6cd4db8 100644 --- a/scripts/artifacts/knowledgeC.py +++ b/scripts/artifacts/knowledgeC.py @@ -111,7 +111,7 @@ def knowledgeC_DevicePluginStatus(files_found, report_folder, seeker, wrap_text, cursor = db.cursor() does_adapteriswireless_exist = does_column_exist_in_db( - db, 'ZSTRUCTUREDMETADATA', 'Z_DKDEVICEISPLUGGEDINMETADATAKEY__ADAPTERISWIRELESS') + db_file, 'ZSTRUCTUREDMETADATA', 'Z_DKDEVICEISPLUGGEDINMETADATAKEY__ADAPTERISWIRELESS') if does_adapteriswireless_exist: adapter_is_wireless = ''' CASE ZSTRUCTUREDMETADATA.Z_DKDEVICEISPLUGGEDINMETADATAKEY__ADAPTERISWIRELESS @@ -173,7 +173,7 @@ def knowledgeC_MediaPlaying(files_found, report_folder, seeker, wrap_text, timez cursor = db.cursor() does_airplayvideo_exist = does_column_exist_in_db( - db, 'ZSTRUCTUREDMETADATA', 'Z_DKNOWPLAYINGMETADATAKEY__ISAIRPLAYVIDEO') + db_file, 'ZSTRUCTUREDMETADATA', 'Z_DKNOWPLAYINGMETADATAKEY__ISAIRPLAYVIDEO') if does_airplayvideo_exist: is_airplay_video = ''' CASE ZSTRUCTUREDMETADATA.Z_DKNOWPLAYINGMETADATAKEY__ISAIRPLAYVIDEO diff --git a/scripts/artifacts/notes.py b/scripts/artifacts/notes.py index c0d99c42..7473603c 100644 --- a/scripts/artifacts/notes.py +++ b/scripts/artifacts/notes.py @@ -21,7 +21,7 @@ def get_notes(files_found, report_folder, seeker, wrap_text, timezone_offset): db = open_sqlite_db_readonly(file_found) cursor = db.cursor() - if does_column_exist_in_db(db, 'ZICCLOUDSYNCINGOBJECT','ZACCOUNT4') == True and does_column_exist_in_db(db, 'ZICCLOUDSYNCINGOBJECT','ZCREATIONDATE3') == True: + if does_column_exist_in_db(file_found, 'ZICCLOUDSYNCINGOBJECT','ZACCOUNT4') == True and does_column_exist_in_db(file_found, 'ZICCLOUDSYNCINGOBJECT','ZCREATIONDATE3') == True: cursor.execute(''' SELECT @@ -59,7 +59,7 @@ def get_notes(files_found, report_folder, seeker, wrap_text, timezone_offset): LEFT JOIN ZICNOTEDATA TabF on TabF.ZNOTE = TabA.Z_PK ''') - elif does_column_exist_in_db(db, 'ZICCLOUDSYNCINGOBJECT','ZACCOUNT4') == True and does_column_exist_in_db(db, 'ZICCLOUDSYNCINGOBJECT','ZCREATIONDATE3') == False: + elif does_column_exist_in_db(file_found, 'ZICCLOUDSYNCINGOBJECT','ZACCOUNT4') == True and does_column_exist_in_db(file_found, 'ZICCLOUDSYNCINGOBJECT','ZCREATIONDATE3') == False: cursor.execute(''' SELECT DATETIME(TabA.ZCREATIONDATE1+978307200,'UNIXEPOCH'), @@ -246,7 +246,7 @@ def ReadLengthField(blob): data_length = int(blob[skip]) length = ((data_length & 0x7F) << (skip * 7)) + length except (IndexError, ValueError): - log.exception('Error trying to read length field in note data blob') + logfunc('Error trying to read length field in note data blob') skip += 1 return length, skip diff --git a/scripts/artifacts/reminders.py b/scripts/artifacts/reminders.py index 051d387f..3393ef6d 100644 --- a/scripts/artifacts/reminders.py +++ b/scripts/artifacts/reminders.py @@ -15,7 +15,7 @@ def get_reminders(files_found, report_folder, seeker, wrap_text, timezone_offset if file_found.endswith('.sqlite'): db = open_sqlite_db_readonly(file_found) - answer = does_column_exist_in_db(db, 'ZREMCDOBJECT', 'ZLASTMODIFIEDDATE') + answer = does_column_exist_in_db(file_found, 'ZREMCDOBJECT', 'ZLASTMODIFIEDDATE') if answer: #db = open_sqlite_db_readonly(file_found) @@ -40,7 +40,7 @@ def get_reminders(files_found, report_folder, seeker, wrap_text, timezone_offset createdate = convert_utc_human_to_timezone(createdate,timezone_offset) moddate = convert_ts_human_to_utc(row[1]) - moddate = convert_utc_human_to_timezone(moddate,timezone) + moddate = convert_utc_human_to_timezone(moddate,timezone_offset) data_list.append((createdate, row[3], row[2], moddate, location_file_found)) dir_file_found = dirname(sqlite_file).split('Stores', 1)[0] + 'Stores' diff --git a/scripts/artifacts/tcc.py b/scripts/artifacts/tcc.py index b3a8071f..09027c91 100644 --- a/scripts/artifacts/tcc.py +++ b/scripts/artifacts/tcc.py @@ -30,12 +30,12 @@ def get_tcc(files_found, report_folder, seeker, wrap_text, timezone_offset): db = open_sqlite_db_readonly(file_found) cursor = db.cursor() - if does_column_exist_in_db(db, 'access', 'last_modified'): + if does_column_exist_in_db(file_found, 'access', 'last_modified'): last_modified_timestamp = "datetime(last_modified,'unixepoch') as 'Last Modified Timestamp'" else: last_modified_timestamp = "" - if does_column_exist_in_db(db, 'access', 'auth_value'): + if does_column_exist_in_db(file_found, 'access', 'auth_value'): access = ''' case auth_value when 0 then 'Not allowed' @@ -53,7 +53,7 @@ def get_tcc(files_found, report_folder, seeker, wrap_text, timezone_offset): end as "Access" ''' - prompt_count = does_column_exist_in_db(db, 'access', 'prompt_count') + prompt_count = does_column_exist_in_db(file_found, 'access', 'prompt_count') cursor.execute(f''' select {last_modified_timestamp if last_modified_timestamp else "''"}, diff --git a/scripts/artifacts/vipps.py b/scripts/artifacts/vipps.py index fc65718a..2f77cf6a 100644 --- a/scripts/artifacts/vipps.py +++ b/scripts/artifacts/vipps.py @@ -1,28 +1,11 @@ import io import nska_deserialize as nd -import sqlite3 +import plistlib import json import sys from scripts.artifact_report import ArtifactHtmlReport -from scripts.ilapfuncs import logfunc, tsv, timeline, is_platform_windows, open_sqlite_db_readonly - -def does_column_exist_in_db(db, table_name, col_name): - '''Checks if a specific column exists in a table''' - col_name = col_name.lower() - try: - db.row_factory = sqlite3.Row # For fetching columns by name - query = f"pragma table_info('{table_name}');" - cursor = db.cursor() - cursor.execute(query) - all_rows = cursor.fetchall() - for row in all_rows: - if row['name'].lower() == col_name: - return True - except sqlite3.Error as ex: - logfunc(f"Query error, query={query} Error={str(ex)}") - pass - return False +from scripts.ilapfuncs import logfunc, tsv, timeline, is_platform_windows, open_sqlite_db_readonly, does_column_exist_in_db def get_vipps(files_found, report_folder, seeker, wrap_text, time_offset): for file_found in files_found: @@ -90,7 +73,7 @@ def get_vipps(files_found, report_folder, seeker, wrap_text, time_offset): name = row1[0] else: # Check if ZPHONENUMBERS exists, if so, use it instead - if does_column_exist_in_db(db, 'ZCONTACTMODEL', 'ZPHONENUMBERS'): + if does_column_exist_in_db(file_found, 'ZCONTACTMODEL', 'ZPHONENUMBERS'): cursor1.execute(f''' SELECT ZNAME, diff --git a/scripts/artifacts/vippsContacts.py b/scripts/artifacts/vippsContacts.py index 65920e53..c16d1d66 100644 --- a/scripts/artifacts/vippsContacts.py +++ b/scripts/artifacts/vippsContacts.py @@ -5,7 +5,7 @@ import os from scripts.artifact_report import ArtifactHtmlReport -from scripts.ilapfuncs import logfunc, tsv, timeline, is_platform_windows, open_sqlite_db_readonly, media_to_html +from scripts.ilapfuncs import logfunc, tsv, timeline, is_platform_windows, open_sqlite_db_readonly, does_column_exist_in_db def relative_paths(source, splitter): splitted_a = source.split(splitter) @@ -16,23 +16,6 @@ def relative_paths(source, splitter): splitted_b = source.split(report_folder) return '.'+ splitted_b[1] -def does_column_exist_in_db(db, table_name, col_name): - '''Checks if a specific column exists in a table''' - col_name = col_name.lower() - try: - db.row_factory = sqlite3.Row # For fetching columns by name - query = f"pragma table_info('{table_name}');" - cursor = db.cursor() - cursor.execute(query) - all_rows = cursor.fetchall() - for row in all_rows: - if row['name'].lower() == col_name: - return True - except sqlite3.Error as ex: - logfunc(f"Query error, query={query} Error={str(ex)}") - pass - return False - def get_vippsContacts(files_found, report_folder, seeker, wrap_text, timezone_offset): for file_found in files_found: file_found = str(file_found) @@ -46,9 +29,9 @@ def get_vippsContacts(files_found, report_folder, seeker, wrap_text, timezone_of cursor = db.cursor() # Check if ZRAWPHONENUMBERS exists, if not, check ZPHONENUMBERS - if does_column_exist_in_db(db, 'ZCONTACTMODEL', 'ZRAWPHONENUMBERS'): + if does_column_exist_in_db(file_found, 'ZCONTACTMODEL', 'ZRAWPHONENUMBERS'): phone_column = 'ZRAWPHONENUMBERS' - elif does_column_exist_in_db(db, 'ZCONTACTMODEL', 'ZPHONENUMBERS'): + elif does_column_exist_in_db(file_found, 'ZCONTACTMODEL', 'ZPHONENUMBERS'): phone_column = 'ZPHONENUMBERS' else: logfunc('Neither ZRAWPHONENUMBERS nor ZPHONENUMBERS exist in ZCONTACTMODEL table.') diff --git a/scripts/artifacts/wire.py b/scripts/artifacts/wire.py index 80a922d0..4d923f2e 100644 --- a/scripts/artifacts/wire.py +++ b/scripts/artifacts/wire.py @@ -1,25 +1,25 @@ __artifacts_v2__ = { "wireAccount": { - "name": "Wire Account", + "name": "Wire Secure Messenger Account", "description": "Wire account details", "author": "Elliot Glendye", "creation_date": "2024-01-21", "last_update_date": "2025-01-03", "requirements": "", - "category": "Wire", + "category": "Business", "notes": "", "paths": ('*/mobile/Containers/Shared/AppGroup/*/AccountData/*/store/store.wiredatabase*'), "output_types": "all", "artifact_icon": "user" }, "wireMessages": { - "name": "Wire Messages", + "name": "Wire Secure Messenger Messages", "description": "Wire messages, including message sender, associated user identifiers and message type", "author": "Elliot Glendye", "creation_date": "2024-01-21", "last_update_date": "2025-01-03", "requirements": "", - "category": "Wire", + "category": "Business", "notes": "", "paths": ('*/mobile/Containers/Shared/AppGroup/*/AccountData/*/store/store.wiredatabase*'), "output_types": "standard", @@ -27,41 +27,64 @@ } } -from scripts.ilapfuncs import artifact_processor, get_file_path, get_sqlite_db_records, convert_cocoa_core_data_ts_to_utc +from scripts.ilapfuncs import artifact_processor, get_file_path, get_sqlite_db_records, does_column_exist_in_db, convert_cocoa_core_data_ts_to_utc @artifact_processor def wireAccount(files_found, report_folder, seeker, wrap_text, timezone_offset): source_path = get_file_path(files_found, "store.wiredatabase") data_list = [] - query = ''' - SELECT - DISTINCT ZUSER.ZHANDLE AS 'User ID', - ZUSER.ZNAME AS 'Display Name', - ZUSERCLIENT.ZACTIVATIONDATE AS 'Activation Date', - ZUSER.ZPHONENUMBER AS 'Phone Number', - ZUSER.ZEMAILADDRESS AS 'Email Address', - ZUSERCLIENT.ZACTIVATIONLOCATIONLATITUDE AS 'Activation Latitude', - ZUSERCLIENT.ZACTIVATIONLOCATIONLONGITUDE AS 'Activation Longitude' - FROM ZUSER - LEFT JOIN ZUSERCLIENT ON ZUSER.Z_PK = ZUSERCLIENT.ZUSER; - ''' - - data_headers = ( - 'User ID', - 'Display Name', - ('Activation Date', 'datetime'), - ('Phone Number', 'phonenumber'), - 'Email Address', - 'Latitude', - 'Longitude' - ) + has_location_data = does_column_exist_in_db(source_path, 'ZUSER', 'ZUSERCLIENT.ZACTIVATIONLOCATIONLATITUDE') + + if has_location_data: + query = ''' + SELECT + DISTINCT ZUSER.ZHANDLE AS 'User ID', + ZUSER.ZNAME AS 'Display Name', + ZUSERCLIENT.ZACTIVATIONDATE AS 'Activation Date', + ZUSER.ZPHONENUMBER AS 'Phone Number', + ZUSER.ZEMAILADDRESS AS 'Email Address', + ZUSERCLIENT.ZACTIVATIONLOCATIONLATITUDE AS 'Activation Latitude', + ZUSERCLIENT.ZACTIVATIONLOCATIONLONGITUDE AS 'Activation Longitude' + FROM ZUSER + LEFT JOIN ZUSERCLIENT ON ZUSER.Z_PK = ZUSERCLIENT.ZUSER; + ''' + data_headers = ( + 'User ID', + 'Display Name', + ('Activation Date', 'datetime'), + ('Phone Number', 'phonenumber'), + 'Email Address', + 'Latitude', + 'Longitude' + ) + else: + query = ''' + SELECT + DISTINCT ZUSER.ZHANDLE AS 'User ID', + ZUSER.ZNAME AS 'Display Name', + ZUSERCLIENT.ZACTIVATIONDATE AS 'Activation Date', + ZUSER.ZPHONENUMBER AS 'Phone Number', + ZUSER.ZEMAILADDRESS AS 'Email Address' + FROM ZUSER + LEFT JOIN ZUSERCLIENT ON ZUSER.Z_PK = ZUSERCLIENT.ZUSER; + ''' + data_headers = ( + 'User ID', + 'Display Name', + ('Activation Date', 'datetime'), + ('Phone Number', 'phonenumber'), + 'Email Address' + ) db_records = get_sqlite_db_records(source_path, query) for record in db_records: activation_date = convert_cocoa_core_data_ts_to_utc(record[2]) - data_list.append((record[0], record[1], activation_date, record[3], record[4], record[5], record[6])) + if has_location_data: + data_list.append((record[0], record[1], activation_date, record[3], record[4], record[5], record[6])) + else: + data_list.append((record[0], record[1], activation_date, record[3], record[4])) return data_headers, data_list, source_path diff --git a/scripts/ilapfuncs.py b/scripts/ilapfuncs.py index 4dd7679d..6e8276be 100644 --- a/scripts/ilapfuncs.py +++ b/scripts/ilapfuncs.py @@ -460,8 +460,9 @@ def get_sqlite_db_records(path, query, attach_query=None): logfunc(f" - {str(e)}") return [] -def does_column_exist_in_db(db, table_name, col_name): +def does_column_exist_in_db(path, table_name, col_name): '''Checks if a specific col exists''' + db = open_sqlite_db_readonly(path) col_name = col_name.lower() try: db.row_factory = sqlite3.Row # For fetching columns by name