Skip to content

Commit

Permalink
Clean up code. Add none option to replacement-policy.
Browse files Browse the repository at this point in the history
  • Loading branch information
wojciech-graj committed Mar 19, 2023
1 parent 76ad531 commit 4230f82
Showing 1 changed file with 84 additions and 82 deletions.
166 changes: 84 additions & 82 deletions tic_midi.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from typing import List, Tuple, Optional, Dict


VERSION_STRING = "TIC-MIDI v0.1.1 2023-03-18 by Wojciech Graj"
VERSION_STRING = "TIC-MIDI v0.1.2 2023-03-19 by Wojciech Graj"


class Chunk:
Expand Down Expand Up @@ -118,7 +118,7 @@ def get_ch(self, idx: int) -> int:
return self.ch1 - 1
elif idx == 2:
return self.ch2 - 1
else: # idx == 3
elif idx == 3:
return self.ch3 - 1

def set_ch(self, idx: int, ch: int) -> None:
Expand All @@ -128,7 +128,7 @@ def set_ch(self, idx: int, ch: int) -> None:
self.ch1 = ch + 1
elif idx == 2:
self.ch2 = ch + 1
else: # idx == 3
elif idx == 3:
self.ch3 = ch + 1

def serialize(self) -> Tuple[int]:
Expand Down Expand Up @@ -218,6 +218,54 @@ def __init__(self) -> None:
def index(self, note: int, sfx: int) -> None:
return next((i for i, channel in enumerate(self.channels) if channel.note == note and channel.sfx == sfx), -1)

def calc_note_on_channel_placement(self, msge: MessageExt, replacement_policy: str) -> int:
if (channel_idx := self.index(msge.msg.note, msge.sfx)) != -1: # Replace channel with same note
return channel_idx
if (channel_idx := self.index(0, 0)) != -1: # Replace empty channel
return channel_idx
# All channels in use, so use replacement policy
if replacement_policy == 'random':
return random.randrange(4)
elif replacement_policy == 'fifo':
return self.channels.index(min(self.channels, key=lambda channel: channel.time_set))
elif replacement_policy == 'lifo':
return self.channels.index(max(self.channels, key=lambda channel: channel.time_set))
elif replacement_policy == 'none':
return -1


def _preprocess_messages(
mid: mido.MidiFile,
sfx_names: Optional[Dict]) -> Optional[List[MessageExt]]:
"""Combine all messages from all tracks into a list of MessageExt. Map tracks to sfx during conversion."""
messages = []
for sfx_idx, track in enumerate(mid.tracks):
if sfx_names:
if track.name not in sfx_names:
logging.error(f"MIDI Track '{track.name}' not found in sfx-names.")
return None
sfx_idx = sfx_names[track.name]
elif sfx_idx > 63:
logging.warning("MIDI file contains over 64 tracks. Discarded excess tracks.")
break
messages.extend((MessageExt(msg, sfx_idx) for msg in _to_abstime(track)))
messages.sort(key=lambda msge: msge.msg.time)
return messages


def _calc_tempo(messages: List[MessageExt]) -> int:
return next((msge.msg.tempo for msge in reversed(messages) if msge.msg.type == "set_tempo"), 500000)


def _terminate(
pc: PatternChunk,
frame: Frame,
pattern_row_idx: int) -> None:
for channel_idx in range(4):
if (pattern_idx := frame.get_ch(channel_idx)) != -1:
pc.patterns[pattern_idx].rows[pattern_row_idx].note = 1
logging.info("Added terminator to end of track.")


def convert(
mid: mido.MidiFile,
Expand All @@ -231,126 +279,80 @@ def convert(
terminate: bool = True) -> bool:
start_time = time.time()

# Combine all messages from all tracks into a list of MessageExt
next_unused_sfx = 0
messages = []
for track in mid.tracks:
if sfx_names:
if track.name not in sfx_names:
logging.error(f"MIDI Track '{track.name}' not found in sfx-names.")
return False
next_unused_sfx = sfx_names[track.name]
elif next_unused_sfx > 63:
logging.warning("MIDI file contains over 64 tracks. Discarded excess tracks.")
break
messages.extend((MessageExt(msg, next_unused_sfx) for msg in _to_abstime(track)))
next_unused_sfx += 1
messages.sort(key=lambda msge: msge.msg.time)
messages = _preprocess_messages(mid, sfx_names)
if messages is None:
return False
tempo = _calc_tempo(messages)
messages = filter(lambda msge: not msge.msg.is_meta, messages) # Remove MetaMessages

# Get track and set values
track = tc.tracks[track_idx]
track.speed = resolution + 2

# Deduce tempo from MetaMessages
tempo = 500000
for msge in messages:
if msge.msg.type == "set_tempo":
tempo = msge.msg.tempo
track.tempo = int(mido.tempo2bpm(tempo))

# Remove MetaMessages
messages = filter(lambda msge: not msge.msg.is_meta, messages)

# Convert
frame_idx = -1
next_unused_pattern = 0
scale = [
4,
2,
1.5,
1.2,
1,
.85,
.75,
.675,
][resolution]
scale = [4, 2, 1.5, 1.2, 1, .85, .75, .675][resolution]
channel_state = ChannelState()

try:
for msge in messages:
msg = msge.msg
if msg.type not in {"note_on", "note_off"}:
continue

# Advance frame if neccessary, and calculate row index in pattern
while True:
pattern_row_idx = int(msg.time * 4 * scale / mid.ticks_per_beat) - 64 * frame_idx
if pattern_row_idx < 64:
break
frame_idx += 1
if frame_idx > 15:
logging.warning("Used all available frames in track.")
raise StopIteration()
frame = track.frames[frame_idx]

if msg.type == "note_on": # Play a note
# Find appropriate channel index
channel_idx = channel_state.index(msg.note, msge.sfx) # Replace channel with same note
if channel_idx == -1: # Replace empty channel
channel_idx = channel_state.index(0, 0)
if channel_idx == -1: # Replace channel in accordance with replacement policy
if replacement_policy == 'random':
channel_idx = random.randrange(4)
elif replacement_policy == 'fifo':
channel_idx = channel_state.channels.index(min(channel_state.channels, key=lambda channel: channel.time_set))
elif replacement_policy == 'lifo':
channel_idx = channel_state.channels.index(max(channel_state.channels, key=lambda channel: channel.time_set))
pattern_row_idx = int(msg.time * 4 * scale / mid.ticks_per_beat)
frame_idx = pattern_row_idx // 64
if frame_idx > 15:
logging.warning("Used all available frames in track.")
raise StopIteration()
pattern_row_idx %= 64
frame = track.frames[frame_idx]

if msg.type == "note_on":
channel_idx = channel_state.calc_note_on_channel_placement(msge, replacement_policy)
if channel_idx == -1:
continue

# Assign pattern to channel if unassigned
if frame.get_ch(channel_idx) == -1:
if next_unused_pattern == 60:
if next_unused_pattern >= 60:
logging.warning("Used all available patterns.")
raise StopIteration()
frame.set_ch(channel_idx, next_unused_pattern)
next_unused_pattern += 1

# Assign values to pattern row
r = pc.patterns[frame.get_ch(channel_idx)].rows[pattern_row_idx]
row = pc.patterns[frame.get_ch(channel_idx)].rows[pattern_row_idx]
note_scaled = max(0, msg.note - 24)
volume = msg.velocity // 8
octave = note_scaled // 12 + octave_shift
if octave < 0 or octave > 7:
if not 0 <= octave <= 7:
logging.warning(f"track {track_idx}:frame {frame_idx}:channel {channel_idx + 1}:row {pattern_row_idx}:Note with octave {octave} is beyond playable range. Consider using '--octave-shift'.")
octave = max(0, min(7, octave))
r.set(note_scaled % 12 + 4, volume, volume, 1, msge.sfx, octave)

row.set(note_scaled % 12 + 4, volume, volume, 1, msge.sfx, octave)
channel_state.channels[channel_idx].set(msg.note, msge.sfx, msg.time)
else: # Stop a note
# Find appropriate channel index
else:
channel_idx = channel_state.index(msg.note, msge.sfx)
if channel_idx == -1:
continue

# Assign values to row
r = pc.patterns[frame.get_ch(channel_idx)].rows[pattern_row_idx]
r.note = 1
pc.patterns[frame.get_ch(channel_idx)].rows[pattern_row_idx].note = 1
channel_state.channels[channel_idx].set(0, 0, msg.time)
except StopIteration:
logging.warning("Terminated early.")

frame_idx = min(15, frame_idx)
pattern_row_idx = min(63, pattern_row_idx)

# Add terminator
if terminate:
for channel_idx in range(4):
if (pattern_idx := track.frames[frame_idx].get_ch(channel_idx)) != -1:
pc.patterns[pattern_idx].rows[pattern_row_idx].note = 1
logging.info(f"Added terminator on row {pattern_row_idx} on frame {min(15, frame_idx)}")
_terminate(pc, track.frames[frame_idx], pattern_row_idx)

# Print summary
logging.info(f"Converted {mido.tick2second(msg.time, mid.ticks_per_beat, tempo):.2f} seconds of music in {1000 * (time.time() - start_time):.2f} millis.")
logging.info(f"Used {frame_idx + 1}/16 frames on track {track_idx}.")
logging.info(f"Used {min(60, next_unused_pattern)}/60 patterns.")
logging.info(f"Ended on row {pattern_row_idx} on frame {min(15, frame_idx)}.")
logging.info(("===== Summary ====="
f"\nConverted {mido.tick2second(msg.time, mid.ticks_per_beat, tempo):.2f} seconds of music in {1000 * (time.time() - start_time):.2f} millis."
f"\nUsed {frame_idx + 1}/16 frames on track {track_idx}."
f"\nUsed {min(60, next_unused_pattern)}/60 patterns."
f"\nEnded on row {pattern_row_idx} on frame {min(15, frame_idx)}."))

return True

Expand Down Expand Up @@ -401,7 +403,7 @@ def tic_save(
parser.add_argument('--track', default=0, type=int, help="Accepted values: [0,7].")
parser.add_argument('--bank', default=0, type=int, help="Memory bank.")
parser.add_argument('--octave-shift', default=0, type=int, help="Shift all notes by some number of octaves.")
parser.add_argument('--replacement-policy', choices={'random', 'fifo', 'lifo'}, default='fifo', help="Determines note placement in channels when all channels are in use.")
parser.add_argument('--replacement-policy', choices={'random', 'fifo', 'lifo', 'none'}, default='fifo', help="Determines note placement in channels when all channels are in use.")
parser.add_argument('--no-terminator', action='store_true', help="Do not end playback on all channels at the end of tracks.")
ins_or_ovr = parser.add_mutually_exclusive_group()
ins_or_ovr.add_argument('--insert', action='store_true', help="Insert Track and Pattern Chunks into an existing cartridge while leaving remaining chunks intact.")
Expand Down

0 comments on commit 4230f82

Please sign in to comment.