22
33import argparse
44import asyncio
5+ import datetime
56import logging
7+ import os
68import sys
79import time
10+ import xml
11+ from datetime import timezone
812from typing import Final
13+ from xml .dom import minidom
914
1015try :
1116 import version
3237
3338logger = logging .getLogger (__name__ )
3439
40+ UTC_TIME_FORMAT : Final [str ] = "%Y-%m-%dT%H:%M:%SZ"
3541APPNAME : Final [str ] = "combiner"
3642
3743
38- async def process_path (path : str ):
44+ def new_prettify (dom_str ):
45+ """Remove excess newlines from DOM output.
46+
47+ via: https://stackoverflow.com/a/14493981
48+ """
49+ reparsed = minidom .parseString (dom_str )
50+ return "\n " .join (
51+ [
52+ line
53+ for line in reparsed .toprettyxml (indent = " " * 2 ).split ("\n " )
54+ if line .strip ()
55+ ]
56+ )
57+
58+
59+ def get_utc_timestamp_now ():
60+ """Get a formatted UTC timestamp for 'now' that can be used when
61+ a timestamp is needed.
62+ """
63+ return datetime .datetime .now (timezone .utc ).strftime (UTC_TIME_FORMAT )
64+
65+
66+ async def create_new_sig_file (sigs : list ):
67+ """Create a new signature file based on the information we have
68+ processed.
69+ """
70+ logger .info ("processing: '%s' sigs" , len (sigs ))
71+ internal_sigs = [item [0 ] for item in sigs ]
72+ file_formats = [item [1 ] for item in sigs ]
73+ root = minidom .Document ()
74+ signature_file = root .createElement ("FFSignatureFile" )
75+ # pylint: disable=E1101 # no attribute attributes. This seems to be
76+ # an incorrect read from pylint.
77+ signature_file .attributes ["xmlns" ] = (
78+ "http://www.nationalarchives.gov.uk/pronom/SignatureFile"
79+ )
80+ signature_file .attributes ["Version" ] = "1"
81+ signature_file .attributes ["DateCreated" ] = get_utc_timestamp_now ()
82+ internal_signature_collection = root .createElement ("InternalSignatureCollection" )
83+ for item in internal_sigs :
84+ internal_signature_collection .appendChild (item )
85+ file_format_collection = root .createElement ("FileFormatCollection" )
86+ for item in file_formats :
87+ file_format_collection .appendChild (item )
88+ root .appendChild (signature_file )
89+ signature_file .appendChild (internal_signature_collection )
90+ signature_file .appendChild (file_format_collection )
91+ pretty_xml = root .toprettyxml (indent = " " , encoding = "utf-8" ).decode ()
92+ print (new_prettify (pretty_xml ))
93+
94+
95+ async def split_xml (
96+ internal_sig_coll : xml .dom .minicompat .NodeList ,
97+ file_format_coll : xml .dom .minicompat .NodeList ,
98+ idx : int ,
99+ prefix : str ,
100+ ):
101+ """Return a separate internal signature collection and file format
102+ collection so that they can be recombined as a new document.
103+ """
104+ ins = internal_sig_coll [0 ].getElementsByTagName ("InternalSignature" )[0 ]
105+ ff = file_format_coll [0 ].getElementsByTagName ("FileFormat" )[0 ]
106+ ins .attributes ["ID" ] = f"{ idx } "
107+ ff .attributes ["ID" ] = f"{ idx } "
108+ ff .attributes ["PUID" ] = f"{ prefix } /{ idx } "
109+ ff .getElementsByTagName ("InternalSignatureID" )[0 ].firstChild .nodeValue = idx
110+ return (ins , ff )
111+
112+
113+ async def check_items (
114+ internal_sig_coll : xml .dom .minicompat .NodeList ,
115+ file_format_coll : xml .dom .minicompat .NodeList ,
116+ ):
117+ """Make sure we're working specifically with an ffdev.info output,
118+ i.e. it only has one internal signature collection and one
119+ file format collection.
120+ """
121+ try :
122+ bs_len = len (internal_sig_coll [0 ].getElementsByTagName ("InternalSignature" ))
123+ assert bs_len == 1 , f"internal signatures should be one, got: { bs_len } "
124+ ff_len = len (file_format_coll [0 ].getElementsByTagName ("FileFormat" ))
125+ assert ff_len == 1 , f"byte sequences should be 1, got: { ff_len } "
126+ except TypeError :
127+ assert False , "cannot process collections"
128+
129+
130+ async def process_paths (manifest : list , prefix : str ):
131+ """Process the paths given as XML and prepare them to be combined
132+ into one xml.
133+ """
134+ sig_list = []
135+ identifiers = []
136+ for item in manifest :
137+ with open (item , "r" , encoding = "utf8" ) as xml_file :
138+ try :
139+ doc = minidom .parseString (xml_file .read ())
140+ if not doc .firstChild .tagName == "FFSignatureFile" :
141+ continue
142+ internal_sig_coll = doc .getElementsByTagName (
143+ "InternalSignatureCollection"
144+ )
145+ file_format_coll = doc .getElementsByTagName ("FileFormatCollection" )
146+ try :
147+ await check_items (internal_sig_coll , file_format_coll )
148+ except AssertionError as err :
149+ logger .error ("cannot process: %s ('%s')" , item , err )
150+ continue
151+ idx = len (identifiers ) + 1
152+ identifiers .append (idx )
153+ res = await split_xml (internal_sig_coll , file_format_coll , idx , prefix )
154+ sig_list .append (res )
155+ except xml .parsers .expat .ExpatError :
156+ logger .error ("cannot process: %s" , item )
157+ if len (sig_list ) == 0 :
158+ logger .info ("no signature files were processed" )
159+ return
160+ await create_new_sig_file (sig_list )
161+
162+
163+ async def create_manifest (path : str ) -> list [str ]:
164+ """Get a list of paths to process."""
165+ paths = []
166+ for root , _ , files in os .walk (path ):
167+ for file in files :
168+ file_path = os .path .join (root , file )
169+ logger .debug (file_path )
170+ paths .append (file_path )
171+ return paths
172+
173+
174+ async def process_path (path : str , prefix : str ):
39175 """Process the files at the given path."""
40176 logger .debug ("processing files at: %s" , path )
177+ # minidom.parseString()
178+ xml_paths = await create_manifest (path )
179+ await process_paths (xml_paths , prefix )
41180
42181
43182def main () -> None :
@@ -58,6 +197,9 @@ def main() -> None:
58197 help = "directory where the signature files are" ,
59198 required = False ,
60199 )
200+ parser .add_argument (
201+ "--prefix" , help = "prefix for custom puids" , required = False , default = "ffdev"
202+ )
61203 parser .add_argument (
62204 "--version" ,
63205 help = "print version information" ,
@@ -76,6 +218,7 @@ def main() -> None:
76218 asyncio .run (
77219 process_path (
78220 path = args .path ,
221+ prefix = args .prefix ,
79222 )
80223 )
81224
0 commit comments