from __future__ import annotations
from typing import List, Dict, Optional, Union, Literal, TYPE_CHECKING, TypedDict
from uuid import uuid4
import math
if TYPE_CHECKING:
from .musical_time import MusicalTime
from ..enums import TalaName
# Type definitions for tala system
VibhagaBeat = Union[str, int] # 'X', 'O', or a number like 2, 3, 4
class TalaDefinition(TypedDict):
"""Definition of a tala preset."""
hierarchy: List[Union[int, List[int]]]
vibhaga: List[VibhagaBeat]
# Tempo validation constants
MIN_TEMPO_BPM = 20 # Very slow musical pieces (e.g., some alap sections)
MAX_TEMPO_BPM = 300 # Very fast musical pieces
def find_closest_idxs(trials: List[float], items: List[float]) -> List[int]:
"""Return indexes of items closest to each trial (greedy)."""
used: set[int] = set()
out: List[int] = []
for trial in trials:
diffs = [(abs(trial - item), idx) for idx, item in enumerate(items)
if idx not in used]
diffs.sort(key=lambda x: x[0])
if not diffs:
raise ValueError("not enough items to match trials")
used.add(diffs[0][1])
out.append(diffs[0][1])
return out
class Pulse:
def __init__(self, real_time: float = 0.0, unique_id: Optional[str] = None,
affiliations: Optional[List[Dict]] = None,
meter_id: Optional[str] = None,
corporeal: bool = True) -> None:
# Parameter validation
self._validate_parameters({'real_time': real_time, 'unique_id': unique_id,
'affiliations': affiliations, 'meter_id': meter_id, 'corporeal': corporeal})
self.real_time = real_time
self.unique_id = unique_id or str(uuid4())
self.affiliations: List[Dict] = affiliations or []
self.meter_id = meter_id
self.corporeal = corporeal
def _validate_parameters(self, opts: Dict) -> None:
"""Validate constructor parameters and provide helpful error messages."""
if not isinstance(opts.get('real_time', 0.0), (int, float)):
raise TypeError(f"Parameter 'real_time' must be a number, got {type(opts['real_time']).__name__}")
if opts.get('real_time', 0.0) < 0:
raise ValueError(f"Parameter 'real_time' must be non-negative, got {opts['real_time']}")
if 'unique_id' in opts and opts['unique_id'] is not None and not isinstance(opts['unique_id'], str):
raise TypeError(f"Parameter 'unique_id' must be a string, got {type(opts['unique_id']).__name__}")
if 'affiliations' in opts and opts['affiliations'] is not None:
if not isinstance(opts['affiliations'], list):
raise TypeError(f"Parameter 'affiliations' must be a list, got {type(opts['affiliations']).__name__}")
if not all(isinstance(item, dict) for item in opts['affiliations']):
raise TypeError("All items in 'affiliations' must be dictionaries")
if 'meter_id' in opts and opts['meter_id'] is not None and not isinstance(opts['meter_id'], str):
raise TypeError(f"Parameter 'meter_id' must be a string, got {type(opts['meter_id']).__name__}")
if not isinstance(opts.get('corporeal', True), bool):
raise TypeError(f"Parameter 'corporeal' must be a boolean, got {type(opts['corporeal']).__name__}")
@staticmethod
def from_json(obj: Dict) -> 'Pulse':
return Pulse(
real_time=obj.get('realTime', 0.0),
unique_id=obj.get('uniqueId'),
affiliations=obj.get('affiliations'),
meter_id=obj.get('meterId'),
corporeal=obj.get('corporeal', True),
)
def to_json(self) -> Dict:
return {
'realTime': self.real_time,
'uniqueId': self.unique_id,
'affiliations': self.affiliations,
'meterId': self.meter_id,
'corporeal': self.corporeal,
}
@property
def lowest_layer(self) -> int:
"""Get the lowest (finest) layer this pulse belongs to.
Returns the minimum layer value from all affiliations,
or 0 if no affiliations have layer info.
"""
layers = [a.get('layer') for a in self.affiliations if a.get('layer') is not None]
if not layers:
return 0
return min(layers)
def __eq__(self, other: object) -> bool:
return isinstance(other, Pulse) and self.to_json() == other.to_json()
class PulseStructure:
def __init__(self, tempo: float = 60.0, size: int = 4,
start_time: float = 0.0, unique_id: Optional[str] = None,
front_weighted: bool = True, layer: Optional[int] = None,
parent_pulse_id: Optional[str] = None,
primary: bool = True, segmented_meter_idx: int = 0,
meter_id: Optional[str] = None,
pulses: Optional[List[Pulse | Dict]] = None) -> None:
# Parameter validation
self._validate_parameters({
'tempo': tempo, 'size': size, 'start_time': start_time, 'unique_id': unique_id,
'front_weighted': front_weighted, 'layer': layer, 'parent_pulse_id': parent_pulse_id,
'primary': primary, 'segmented_meter_idx': segmented_meter_idx, 'meter_id': meter_id,
'pulses': pulses
})
self.tempo = tempo
self.pulse_dur = 60.0 / tempo
self.size = size
self.start_time = start_time
self.unique_id = unique_id or str(uuid4())
self.front_weighted = front_weighted
self.layer = layer
self.parent_pulse_id = parent_pulse_id
self.primary = primary
self.segmented_meter_idx = segmented_meter_idx
self.meter_id = meter_id
if pulses is not None:
self.pulses = [p if isinstance(p, Pulse) else Pulse.from_json(p)
for p in pulses]
else:
self.pulses = [
Pulse(
real_time=start_time + i * self.pulse_dur,
affiliations=[{
'psId': self.unique_id,
'idx': i,
'layer': self.layer,
'segmentedMeterIdx': self.segmented_meter_idx,
'strong': (i == 0) if front_weighted else (i == size - 1),
}],
meter_id=meter_id
) for i in range(size)
]
def _validate_parameters(self, opts: Dict) -> None:
"""Validate constructor parameters and provide helpful error messages."""
if not isinstance(opts.get('tempo', 60.0), (int, float)):
raise TypeError(f"Parameter 'tempo' must be a number, got {type(opts['tempo']).__name__}")
if opts.get('tempo', 60.0) <= 0:
raise ValueError(f"Parameter 'tempo' must be positive, got {opts['tempo']}")
if opts.get('tempo', 60.0) < MIN_TEMPO_BPM or opts.get('tempo', 60.0) > MAX_TEMPO_BPM:
import warnings
warnings.warn(f"Tempo {opts['tempo']} BPM is outside typical range ({MIN_TEMPO_BPM}-{MAX_TEMPO_BPM} BPM)", UserWarning)
if not isinstance(opts.get('size', 4), int):
raise TypeError(f"Parameter 'size' must be an integer, got {type(opts['size']).__name__}")
if opts.get('size', 4) <= 0:
raise ValueError(f"Parameter 'size' must be positive, got {opts['size']}")
if not isinstance(opts.get('start_time', 0.0), (int, float)):
raise TypeError(f"Parameter 'start_time' must be a number, got {type(opts['start_time']).__name__}")
if opts.get('start_time', 0.0) < 0:
raise ValueError(f"Parameter 'start_time' must be non-negative, got {opts['start_time']}")
if 'unique_id' in opts and opts['unique_id'] is not None and not isinstance(opts['unique_id'], str):
raise TypeError(f"Parameter 'unique_id' must be a string, got {type(opts['unique_id']).__name__}")
if not isinstance(opts.get('front_weighted', True), bool):
raise TypeError(f"Parameter 'front_weighted' must be a boolean, got {type(opts['front_weighted']).__name__}")
if 'layer' in opts and opts['layer'] is not None:
if not isinstance(opts['layer'], int):
raise TypeError(f"Parameter 'layer' must be an integer, got {type(opts['layer']).__name__}")
if opts['layer'] < 0:
raise ValueError(f"Parameter 'layer' must be non-negative, got {opts['layer']}")
if 'parent_pulse_id' in opts and opts['parent_pulse_id'] is not None and not isinstance(opts['parent_pulse_id'], str):
raise TypeError(f"Parameter 'parent_pulse_id' must be a string, got {type(opts['parent_pulse_id']).__name__}")
if not isinstance(opts.get('primary', True), bool):
raise TypeError(f"Parameter 'primary' must be a boolean, got {type(opts['primary']).__name__}")
if not isinstance(opts.get('segmented_meter_idx', 0), int):
raise TypeError(f"Parameter 'segmented_meter_idx' must be an integer, got {type(opts['segmented_meter_idx']).__name__}")
if opts.get('segmented_meter_idx', 0) < 0:
raise ValueError(f"Parameter 'segmented_meter_idx' must be non-negative, got {opts['segmented_meter_idx']}")
if 'meter_id' in opts and opts['meter_id'] is not None and not isinstance(opts['meter_id'], str):
raise TypeError(f"Parameter 'meter_id' must be a string, got {type(opts['meter_id']).__name__}")
if 'pulses' in opts and opts['pulses'] is not None:
if not isinstance(opts['pulses'], list):
raise TypeError(f"Parameter 'pulses' must be a list, got {type(opts['pulses']).__name__}")
@property
def dur_tot(self) -> float:
return self.size * self.pulse_dur
def set_tempo(self, new_tempo: float) -> None:
self.tempo = new_tempo
self.pulse_dur = 60.0 / new_tempo
for i, pulse in enumerate(self.pulses):
pulse.real_time = self.start_time + i * self.pulse_dur
def set_start_time(self, new_start: float) -> None:
diff = new_start - self.start_time
self.start_time = new_start
for pulse in self.pulses:
pulse.real_time += diff
@staticmethod
def from_pulse(pulse: Pulse, duration: float, size: int,
front_weighted: bool = True, layer: int = 0) -> 'PulseStructure':
tempo = 60 * size / duration
ps = PulseStructure(tempo=tempo, size=size, start_time=pulse.real_time,
front_weighted=front_weighted, layer=layer,
parent_pulse_id=pulse.unique_id, meter_id=pulse.meter_id)
idx = 0 if front_weighted else ps.size - 1
pulse.affiliations.append({
'psId': ps.unique_id,
'idx': idx,
'layer': layer,
'segmentedMeterIdx': 0,
'strong': True,
})
ps.pulses[idx] = pulse
return ps
def to_json(self) -> Dict:
return {
'pulses': [p.to_json() for p in self.pulses],
'tempo': self.tempo,
'pulseDur': self.pulse_dur,
'size': self.size,
'startTime': self.start_time,
'uniqueId': self.unique_id,
'frontWeighted': self.front_weighted,
'layer': self.layer,
'parentPulseID': self.parent_pulse_id,
'primary': self.primary,
'segmentedMeterIdx': self.segmented_meter_idx,
'meterId': self.meter_id,
'offsets': [0.0] * self.size,
}
@staticmethod
def from_json(obj: Dict) -> 'PulseStructure':
return PulseStructure(
tempo=obj.get('tempo', 60.0),
size=obj.get('size', 4),
start_time=obj.get('startTime', 0.0),
unique_id=obj.get('uniqueId'),
front_weighted=obj.get('frontWeighted', True),
layer=obj.get('layer'),
parent_pulse_id=obj.get('parentPulseID'),
primary=obj.get('primary', True),
segmented_meter_idx=obj.get('segmentedMeterIdx', 0),
meter_id=obj.get('meterId'),
pulses=[Pulse.from_json(p) for p in obj.get('pulses', [])]
)
def __eq__(self, other: object) -> bool:
return isinstance(other, PulseStructure) and self.to_json() == other.to_json()
[docs]
class Meter:
# Tala presets - predefined Hindustani classical music meters
tala_presets: Dict[TalaName, TalaDefinition] = {
TalaName.Tintal: {
'hierarchy': [[4, 4, 4, 4], 4],
'vibhaga': ['X', 2, 'O', 3]
},
TalaName.Tilwada: {
'hierarchy': [[4, 4, 4, 4], 4],
'vibhaga': ['X', 2, 'O', 3]
},
TalaName.Jhoomra: {
'hierarchy': [[3, 4, 3, 4], 4],
'vibhaga': ['X', 2, 'O', 3]
},
TalaName.AdaChautal: {
'hierarchy': [[2, 2, 2, 2, 2, 2, 2], 4],
'vibhaga': ['X', 'O', 2, 'O', 3, 4, 'O']
},
TalaName.Dhamar: {
'hierarchy': [[5, 2, 3, 4], 4],
'vibhaga': ['X', 2, 'O', 3]
},
TalaName.DeepchandiThumri: {
'hierarchy': [[3, 4, 3, 4], 4],
'vibhaga': ['X', 'O', 2, 3]
},
TalaName.DeepchandiDhrupad: {
'hierarchy': [[4, 2, 4, 2], 4],
'vibhaga': ['X', 2, 'O', 3]
},
TalaName.Ektal: {
'hierarchy': [[2, 2, 2, 2, 2, 2], 4],
'vibhaga': ['X', 'O', 2, 'O', 3, 4]
},
TalaName.Jhaptal: {
'hierarchy': [[2, 3, 2, 3], 4],
'vibhaga': ['X', 2, 'O', 3]
},
TalaName.SoolTaal: {
'hierarchy': [[2, 2, 2, 2, 2], 4],
'vibhaga': ['X', 2, 'O', 3, 4]
},
TalaName.Keherwa: {
'hierarchy': [4, 4],
'vibhaga': ['X', 'O']
},
TalaName.Rupak: {
'hierarchy': [3, 2, 2],
'vibhaga': ['X', 2, 3]
},
TalaName.Tivra: {
'hierarchy': [[3, 2, 2], 4],
'vibhaga': ['X', 2, 3]
},
TalaName.Dadra: {
'hierarchy': [[3, 3], 4],
'vibhaga': ['X', 'O']
},
}
[docs]
def __init__(self, hierarchy: Optional[List[int | List[int]]] = None,
start_time: float = 0.0, tempo: float = 60.0,
unique_id: Optional[str] = None, repetitions: int = 1,
tala_name: Optional[TalaName] = None,
vibhaga: Optional[List[VibhagaBeat]] = None) -> None:
# Parameter validation
self._validate_parameters({
'hierarchy': hierarchy, 'start_time': start_time, 'tempo': tempo,
'unique_id': unique_id, 'repetitions': repetitions
})
self.hierarchy = hierarchy or [4, 4]
self.start_time = start_time
self.tempo = tempo
self.unique_id = unique_id or str(uuid4())
self.repetitions = repetitions
self.tala_name = tala_name
self.vibhaga = vibhaga or []
self.pulse_structures: List[List[PulseStructure]] = []
self._generate_pulse_structures()
[docs]
@classmethod
def from_tala(cls, name: TalaName, start_time: float, tempo: float,
repetitions: int) -> 'Meter':
"""Create a Meter from a predefined tala preset.
Args:
name: The tala name (e.g., TalaName.Tintal)
start_time: Start time in seconds
tempo: Tempo in BPM (at matra level)
repetitions: Number of tala cycles
Returns:
A Meter configured with the tala preset
"""
preset = cls.tala_presets[name]
return cls(
hierarchy=preset['hierarchy'],
start_time=start_time,
tempo=tempo,
repetitions=repetitions,
tala_name=name,
vibhaga=preset['vibhaga']
)
def _validate_parameters(self, opts: Dict) -> None:
"""Validate constructor parameters and provide helpful error messages."""
if 'hierarchy' in opts and opts['hierarchy'] is not None:
if not isinstance(opts['hierarchy'], list):
raise TypeError(f"Parameter 'hierarchy' must be a list, got {type(opts['hierarchy']).__name__}")
if len(opts['hierarchy']) == 0:
raise ValueError("Parameter 'hierarchy' cannot be empty")
for i, level in enumerate(opts['hierarchy']):
if isinstance(level, list):
if not all(isinstance(item, int) for item in level):
raise TypeError(f"All items in hierarchy[{i}] must be integers")
if any(item <= 0 for item in level):
raise ValueError(f"All items in hierarchy[{i}] must be positive")
elif isinstance(level, int):
if level <= 0:
raise ValueError(f"hierarchy[{i}] must be positive, got {level}")
else:
raise TypeError(f"hierarchy[{i}] must be an integer or list of integers, got {type(level).__name__}")
if not isinstance(opts.get('start_time', 0.0), (int, float)):
raise TypeError(f"Parameter 'start_time' must be a number, got {type(opts['start_time']).__name__}")
if opts.get('start_time', 0.0) < 0:
raise ValueError(f"Parameter 'start_time' must be non-negative, got {opts['start_time']}")
if not isinstance(opts.get('tempo', 60.0), (int, float)):
raise TypeError(f"Parameter 'tempo' must be a number, got {type(opts['tempo']).__name__}")
if opts.get('tempo', 60.0) <= 0:
raise ValueError(f"Parameter 'tempo' must be positive, got {opts['tempo']}")
if opts.get('tempo', 60.0) < MIN_TEMPO_BPM or opts.get('tempo', 60.0) > MAX_TEMPO_BPM:
import warnings
warnings.warn(f"Tempo {opts['tempo']} BPM is outside typical range ({MIN_TEMPO_BPM}-{MAX_TEMPO_BPM} BPM)", UserWarning)
if 'unique_id' in opts and opts['unique_id'] is not None and not isinstance(opts['unique_id'], str):
raise TypeError(f"Parameter 'unique_id' must be a string, got {type(opts['unique_id']).__name__}")
if not isinstance(opts.get('repetitions', 1), int):
raise TypeError(f"Parameter 'repetitions' must be an integer, got {type(opts['repetitions']).__name__}")
if opts.get('repetitions', 1) <= 0:
raise ValueError(f"Parameter 'repetitions' must be positive, got {opts['repetitions']}")
# helper values
@property
def _top_size(self) -> int:
h0 = self.hierarchy[0]
return sum(h0) if isinstance(h0, list) else int(h0)
@property
def _bottom_mult(self) -> int:
mult = 1
for h in self.hierarchy[1:]:
mult *= int(h)
return mult
@property
def _pulses_per_cycle(self) -> int:
return self._top_size * self._bottom_mult
@property
def _pulse_dur(self) -> float:
return 60.0 / self.tempo / self._bottom_mult
@property
def cycle_dur(self) -> float:
return self._pulse_dur * self._pulses_per_cycle
def _get_hierarchy_mult(self, layer: int) -> int:
"""Get the multiplier for a given hierarchy layer.
Handles both simple numbers and complex arrays like [3, 2] -> 5
Args:
layer: The hierarchy layer index
Returns:
The multiplier for that layer (sum if array, value if int)
"""
if layer >= len(self.hierarchy):
return 1
h = self.hierarchy[layer]
if isinstance(h, int):
return h
else:
return sum(h)
[docs]
def get_tempo_at_layer(self, layer: int) -> float:
"""Get the tempo at a specific hierarchical layer.
Args:
layer: The hierarchy layer (0 = coarsest/vibhag, higher = finer subdivisions)
Returns:
The tempo (BPM) at that layer
Raises:
ValueError: If layer is out of bounds
"""
if layer < 0 or layer >= len(self.hierarchy):
raise ValueError(f"Layer {layer} is out of bounds for hierarchy with {len(self.hierarchy)} layers")
# Start with base tempo and multiply by each layer's subdivision
result_tempo = self.tempo
for i in range(1, layer + 1):
result_tempo *= self._get_hierarchy_mult(i)
return result_tempo
def _generate_pulse_structures(self) -> None:
self.pulse_structures = [[]]
# single layer of pulses for simplified implementation
pulses: List[Pulse] = []
for rep in range(self.repetitions):
start = self.start_time + rep * self.cycle_dur
for i in range(self._pulses_per_cycle):
pulses.append(Pulse(real_time=start + i * self._pulse_dur,
meter_id=self.unique_id))
self.pulse_structures[0] = [PulseStructure(
tempo=self.tempo,
size=self._pulses_per_cycle,
start_time=self.start_time,
meter_id=self.unique_id,
pulses=pulses,
)]
@property
def all_pulses(self) -> List[Pulse]:
"""Get all pulses from the finest layer (lowest level) of the hierarchy.
This concatenates pulses from all pulse structures in the last layer,
matching the TypeScript implementation: lastLayer.map(ps => ps.pulses).flat()
"""
if not self.pulse_structures or not self.pulse_structures[-1]:
return []
# Flatten all pulses from all structures in the finest layer
return [pulse for ps in self.pulse_structures[-1] for pulse in ps.pulses]
@property
def real_times(self) -> List[float]:
return [p.real_time for p in self.all_pulses]
[docs]
def offset_pulse(self, pulse: Pulse, offset: float) -> None:
pulse.real_time += offset
[docs]
def reset_tempo(self) -> None:
base = self.all_pulses[:self._pulses_per_cycle]
diff = base[-1].real_time - base[0].real_time
if len(base) > 1:
bit = diff / (len(base) - 1)
if bit > 0:
self.tempo = 60.0 / (bit * self._bottom_mult)
# pulse duration will be derived from tempo
[docs]
def grow_cycle(self) -> None:
self.reset_tempo()
start = self.start_time + self.repetitions * self.cycle_dur
for i in range(self._pulses_per_cycle):
new_pulse = Pulse(real_time=start + i * self._pulse_dur,
meter_id=self.unique_id)
self.pulse_structures[0][0].pulses.append(new_pulse)
self.repetitions += 1
[docs]
def add_time_points(self, time_points: List[float], layer: int = 1) -> None:
time_points = sorted(time_points)
for tp in time_points:
self.pulse_structures[0][0].pulses.append(Pulse(real_time=tp,
meter_id=self.unique_id))
self.pulse_structures[0][0].pulses.sort(key=lambda p: p.real_time)
[docs]
@staticmethod
def from_time_points(time_points: List[float], hierarchy: List[Union[int, List[int]]],
repetitions: int = 1, layer: int = 0) -> 'Meter':
"""Create a Meter from actual pulse time points, handling timing variations.
This method creates a meter that accurately represents actual pulse timing
(including rubato and tempo variations) rather than theoretical even spacing.
Uses timing regularization algorithm to handle extreme deviations.
Args:
time_points: List of actual pulse times in seconds
hierarchy: Meter hierarchy (e.g., [4, 4, 2])
repetitions: Number of cycle repetitions
layer: Which hierarchical layer the time points represent (0 or 1)
Returns:
Meter object with pulses positioned at the provided time points
"""
if not time_points or len(time_points) < 2:
raise ValueError("Must provide at least two time points")
if not hierarchy or len(hierarchy) < 1:
raise ValueError("Must provide hierarchy to create Meter")
# Work on a copy to avoid modifying the original
time_points = sorted(time_points.copy())
# Step 1: Timing regularization algorithm (from TypeScript)
# Calculate pulse duration and handle extreme deviations
diffs = [time_points[i+1] - time_points[i] for i in range(len(time_points) - 1)]
pulse_dur = sum(diffs) / len(diffs)
# Normalize deviations relative to pulse duration
zeroed_tps = [tp - time_points[0] for tp in time_points]
norms = [pulse_dur * i for i in range(len(time_points))]
tp_diffs = [(zeroed_tps[i] - norms[i]) / pulse_dur for i in range(len(time_points))]
# Insert intermediate time points when deviations exceed 40%
max_iterations = 100 # Prevent infinite loops
iteration = 0
while any(abs(d) > 0.4 for d in tp_diffs) and iteration < max_iterations:
abs_tp_diffs = [abs(d) for d in tp_diffs]
biggest_idx = abs_tp_diffs.index(max(abs_tp_diffs))
diff = tp_diffs[biggest_idx]
if diff > 0:
# Insert time point before the problematic one
if biggest_idx > 0:
new_tp = (time_points[biggest_idx-1] + time_points[biggest_idx]) / 2
time_points.insert(biggest_idx, new_tp)
else:
# Can't insert before first point, adjust differently
break
else:
# Insert time point after the problematic one
if biggest_idx < len(time_points) - 1:
new_tp = (time_points[biggest_idx] + time_points[biggest_idx+1]) / 2
time_points.insert(biggest_idx+1, new_tp)
else:
# Can't insert after last point, adjust differently
break
# Recalculate deviations
diffs = [time_points[i+1] - time_points[i] for i in range(len(time_points) - 1)]
pulse_dur = sum(diffs) / len(diffs)
zeroed_tps = [tp - time_points[0] for tp in time_points]
norms = [pulse_dur * i for i in range(len(time_points))]
tp_diffs = [(zeroed_tps[i] - norms[i]) / pulse_dur for i in range(len(time_points))]
iteration += 1
# Step 2: Calculate meter properties
tempo = 60.0 / pulse_dur
start_time = time_points[0]
# Determine how many repetitions we need based on time points
if isinstance(hierarchy[0], list):
layer0_size = sum(hierarchy[0])
else:
layer0_size = hierarchy[0]
# Calculate minimum repetitions needed
min_reps = max(repetitions, (len(time_points) + layer0_size - 1) // layer0_size)
# Step 3: Create theoretical meter
meter = Meter(
hierarchy=hierarchy,
start_time=start_time,
tempo=tempo,
repetitions=min_reps
)
# Step 4: Adjust pulses to match actual time points
finest_pulses = meter.all_pulses
# Update pulse times to match provided time points
for i, time_point in enumerate(time_points):
if i < len(finest_pulses):
finest_pulses[i].real_time = time_point
# If we have fewer time points than pulses, extrapolate the remaining
if len(time_points) < len(finest_pulses):
# Use the calculated pulse duration to extrapolate
last_provided_time = time_points[-1]
for i in range(len(time_points), len(finest_pulses)):
extrapolated_time = last_provided_time + (i - len(time_points) + 1) * pulse_dur
finest_pulses[i].real_time = extrapolated_time
return meter
[docs]
@staticmethod
def from_json(obj: Dict) -> 'Meter':
# Parse tala_name from string if present
tala_name = None
if obj.get('talaName'):
try:
tala_name = TalaName(obj['talaName'])
except ValueError:
pass # Invalid tala name, leave as None
m = Meter(hierarchy=obj.get('hierarchy'),
start_time=obj.get('startTime', 0.0),
tempo=obj.get('tempo', 60.0),
unique_id=obj.get('uniqueId'),
repetitions=obj.get('repetitions', 1),
tala_name=tala_name,
vibhaga=obj.get('vibhaga', []))
m.pulse_structures = [
[PulseStructure.from_json(ps) for ps in layer]
for layer in obj.get('pulseStructures', [])
]
return m
[docs]
def to_json(self) -> Dict:
result = {
'uniqueId': self.unique_id,
'hierarchy': self.hierarchy,
'startTime': self.start_time,
'tempo': self.tempo,
'repetitions': self.repetitions,
'pulseStructures': [[ps.to_json() for ps in layer]
for layer in self.pulse_structures]
}
# Include tala_name and vibhaga if set
if self.tala_name is not None:
result['talaName'] = self.tala_name.value
if self.vibhaga:
result['vibhaga'] = self.vibhaga
return result
def __eq__(self, other: object) -> bool:
return isinstance(other, Meter) and self.to_json() == other.to_json()
# Segment boundary methods for vibhag-aware operations
[docs]
def get_segment_boundary_indices(self) -> List[int]:
"""Get the indices of matra pulses that are at segment (vibhag) boundaries.
For Tintal [[4,4,4,4], 4]: returns [0, 4, 8, 12] per cycle
For Jhoomra [[3,4,3,4], 4]: returns [0, 3, 7, 10] per cycle
Returns empty list if hierarchy[0] is not compound.
"""
# Only applies to tala-style meters with:
# 1. A compound first layer (vibhag structure)
# 2. At least a second layer (matras within vibhags)
if not isinstance(self.hierarchy[0], list) or len(self.hierarchy) < 2:
return []
segment_sizes = self.hierarchy[0]
matras_per_cycle = sum(segment_sizes)
all_boundaries: List[int] = []
for cycle in range(self.repetitions):
cum_sum = 0
for seg in range(len(segment_sizes)):
all_boundaries.append(cycle * matras_per_cycle + cum_sum)
cum_sum += segment_sizes[seg]
return all_boundaries
[docs]
def get_matra_pulses(self) -> List[Pulse]:
"""Get only the matra-level pulses (pulses that correspond to matras/beats).
For a hierarchy like [[4,4,4,4], 4] (Tintal):
- Total pulses = 64 (16 matras × 4 subdivisions)
- Returns only the 16 matra pulses (every 4th pulse)
These are the pulses that correspond to beats in the tala structure.
"""
all_pulses = self.all_pulses
if len(self.hierarchy) < 2:
# Single-layer hierarchy - all pulses are at matra level
return all_pulses
# For multi-layer hierarchy, return every Nth pulse where N is the subdivision factor
subdivision = self._bottom_mult
if subdivision <= 1:
return all_pulses
return [all_pulses[i] for i in range(0, len(all_pulses), subdivision)]
[docs]
def is_segment_boundary(self, pulse: Pulse) -> bool:
"""Check if a pulse is at a segment (vibhag) boundary."""
matra_pulses = self.get_matra_pulses()
try:
idx = next(i for i, p in enumerate(matra_pulses) if p.unique_id == pulse.unique_id)
except StopIteration:
return False
return idx in self.get_segment_boundary_indices()
[docs]
def get_segment_for_matra_index(self, matra_idx: int) -> Optional[Dict[str, int]]:
"""Get the segment range (start and end matra indices) for a given matra index.
Returns None if the hierarchy doesn't have compound first layer.
Args:
matra_idx: The matra index to find the segment for
Returns:
Dict with 'start' and 'end' keys, or None
"""
if not isinstance(self.hierarchy[0], list):
return None
boundaries = self.get_segment_boundary_indices()
matras_per_cycle = sum(self.hierarchy[0])
total_matras = matras_per_cycle * self.repetitions
# Add the final boundary (end of last segment)
all_boundaries = boundaries + [total_matras]
for i in range(len(all_boundaries) - 1):
if all_boundaries[i] <= matra_idx < all_boundaries[i + 1]:
return {'start': all_boundaries[i], 'end': all_boundaries[i + 1]}
return None
def _offset_pulse_direct(self, pulse: Pulse, offset: float, override: bool = False) -> None:
"""Direct pulse offset without segment-aware logic.
Used internally by offset_segment_boundary to avoid recursion.
Args:
pulse: The pulse to offset
offset: Time offset in seconds
override: If True, allows larger offsets (for segment redistribution)
"""
# Simplified implementation - just adjust the pulse time
pulse.real_time += offset
[docs]
def offset_segment_boundary(self, pulse: Pulse, offset: float) -> bool:
"""Offset a segment boundary pulse and proportionally adjust all matra pulses
within that segment.
This makes nudging a vibhag boundary move all the matras within that
vibhag proportionally.
Args:
pulse: The pulse to offset (must be at a segment boundary)
offset: The time offset in seconds
Returns:
True if segment-aware offset was applied, False if regular offset should be used
"""
matra_pulses = self.get_matra_pulses()
try:
pulse_idx = next(i for i, p in enumerate(matra_pulses) if p.unique_id == pulse.unique_id)
except StopIteration:
return False # Not a matra pulse
boundaries = self.get_segment_boundary_indices()
if pulse_idx not in boundaries:
return False # Not at a segment boundary
boundary_idx = boundaries.index(pulse_idx)
# Can't adjust segment before the first boundary (index 0)
if boundary_idx == 0:
return False
# Get the PREVIOUS segment (the one that ends at this boundary)
prev_boundary_idx = boundaries[boundary_idx - 1]
current_boundary_idx = pulse_idx
# Get the pulses in the previous segment
prev_segment_pulses = matra_pulses[prev_boundary_idx:current_boundary_idx]
if len(prev_segment_pulses) < 1:
return False
# Calculate original previous segment duration
prev_segment_start_time = prev_segment_pulses[0].real_time
prev_segment_end_time = pulse.real_time
original_prev_segment_dur = prev_segment_end_time - prev_segment_start_time
if original_prev_segment_dur <= 0:
return False
# Calculate new previous segment duration
new_prev_segment_dur = original_prev_segment_dur + offset
if new_prev_segment_dur <= 0:
return False # Would create invalid timing
# Get the NEXT segment
next_boundary_idx = boundaries[boundary_idx + 1] if boundary_idx + 1 < len(boundaries) else None
has_next_boundary = next_boundary_idx is not None
next_segment_pulses: List[Pulse] = []
original_next_segment_dur = 0.0
new_next_segment_dur = 0.0
next_segment_end_time = 0.0
original_boundary_time = pulse.real_time
if has_next_boundary:
next_segment_pulses = matra_pulses[current_boundary_idx + 1:next_boundary_idx]
next_segment_end_time = matra_pulses[next_boundary_idx].real_time
original_next_segment_dur = next_segment_end_time - pulse.real_time
new_next_segment_dur = original_next_segment_dur - offset
if new_next_segment_dur <= 0:
return False # Would create invalid timing
else:
# Last segment of the meter
next_segment_pulses = matra_pulses[current_boundary_idx + 1:]
if next_segment_pulses:
last_pulse = next_segment_pulses[-1]
avg_pulse_dur = (last_pulse.real_time - pulse.real_time) / len(next_segment_pulses)
next_segment_end_time = last_pulse.real_time + avg_pulse_dur
original_next_segment_dur = next_segment_end_time - pulse.real_time
new_next_segment_dur = original_next_segment_dur - offset
if new_next_segment_dur <= 0:
return False # Would create invalid timing
has_next_segment = len(next_segment_pulses) > 0
# Offset the boundary pulse itself first
self._offset_pulse_direct(pulse, offset, True)
# Reset and evenly space all pulses in the PREVIOUS segment (except the first one)
num_prev_pulses = len(prev_segment_pulses)
for i in range(1, len(prev_segment_pulses)):
p = prev_segment_pulses[i]
default_relative_pos = i / num_prev_pulses
new_relative_time = prev_segment_start_time + default_relative_pos * new_prev_segment_dur
pulse_offset = new_relative_time - p.real_time
if abs(pulse_offset) > 0.0001:
self._offset_pulse_direct(p, pulse_offset, True)
# Reset and evenly space all pulses in the NEXT segment
if has_next_segment and next_segment_pulses:
new_boundary_time = original_boundary_time + offset
num_next_pulses = len(next_segment_pulses) + 1
for i, p in enumerate(next_segment_pulses):
default_relative_pos = (i + 1) / num_next_pulses
new_relative_time = new_boundary_time + default_relative_pos * new_next_segment_dur
pulse_offset = new_relative_time - p.real_time
if abs(pulse_offset) > 0.0001:
self._offset_pulse_direct(p, pulse_offset, True)
return True
# Musical time conversion methods
def _validate_reference_level(self, reference_level: Optional[int]) -> int:
"""Validate and normalize reference level parameter."""
if reference_level is None:
return len(self.hierarchy) - 1
if not isinstance(reference_level, int):
raise TypeError(f"reference_level must be an integer, got {type(reference_level).__name__}")
if reference_level < 0:
raise ValueError(f"reference_level must be non-negative, got {reference_level}")
if reference_level >= len(self.hierarchy):
raise ValueError(f"reference_level {reference_level} exceeds hierarchy depth {len(self.hierarchy)}")
return reference_level
def _hierarchical_position_to_pulse_index(self, positions: List[int], cycle_number: int) -> int:
"""Convert hierarchical position to pulse index."""
pulse_index = 0
multiplier = 1
# Work from finest to coarsest level
for level in range(len(positions) - 1, -1, -1):
position = positions[level]
hierarchy_size = self.hierarchy[level]
if isinstance(hierarchy_size, list):
hierarchy_size = sum(hierarchy_size)
pulse_index += position * multiplier
multiplier *= hierarchy_size
# Add offset for cycle
cycle_offset = cycle_number * self._pulses_per_cycle
return pulse_index + cycle_offset
def _pulse_index_to_hierarchical_position(self, pulse_index: int, cycle_number: int) -> List[int]:
"""Convert pulse index back to hierarchical position (reverse of _hierarchical_position_to_pulse_index)."""
# Use modulo to get within-cycle index regardless of which cycle the pulse belongs to
within_cycle_index = pulse_index % self._pulses_per_cycle
# Ensure within_cycle_index is non-negative
if within_cycle_index < 0:
within_cycle_index = 0
positions = []
remaining_index = within_cycle_index
# Work from coarsest to finest level
for level in range(len(self.hierarchy)):
hierarchy_size = self.hierarchy[level]
if isinstance(hierarchy_size, list):
hierarchy_size = sum(hierarchy_size)
# Calculate how many pulses are in each group at this level
group_size = self._pulses_per_cycle
for inner_level in range(level + 1):
inner_size = self.hierarchy[inner_level]
if isinstance(inner_size, list):
inner_size = sum(inner_size)
group_size = group_size // inner_size
position_at_level = remaining_index // group_size if group_size > 0 else 0
positions.append(position_at_level)
remaining_index = remaining_index % group_size if group_size > 0 else 0
return positions
def _calculate_level_start_time(self, positions: List[int], cycle_number: int, reference_level: int) -> float:
"""Calculate start time of hierarchical unit at reference level."""
# Create positions for start of reference-level unit
# Ensure we have positions up to reference_level
start_positions = list(positions[:reference_level + 1])
# Extend with zeros for levels below reference level
while len(start_positions) < len(self.hierarchy):
start_positions.append(0)
start_pulse_index = self._hierarchical_position_to_pulse_index(start_positions, cycle_number)
return self.all_pulses[start_pulse_index].real_time
def _calculate_level_duration(self, positions: List[int], cycle_number: int, reference_level: int) -> float:
"""Calculate actual duration of hierarchical unit based on pulse timing."""
# Get start time of current unit
start_time = self._calculate_level_start_time(positions, cycle_number, reference_level)
# Calculate start time of next unit at same level
next_positions = positions.copy()
next_positions[reference_level] += 1
# Handle overflow - if we've exceeded this level
hierarchy_size = self.hierarchy[reference_level]
if isinstance(hierarchy_size, list):
hierarchy_size = sum(hierarchy_size)
if next_positions[reference_level] >= hierarchy_size:
# Handle overflow by moving to next cycle
next_cycle_number = cycle_number + 1
if next_cycle_number >= self.repetitions:
# Use meter end time
return self.start_time + self.repetitions * self.cycle_dur - start_time
next_positions[reference_level] = 0
return self._calculate_level_start_time(next_positions, next_cycle_number, reference_level) - start_time
end_time = self._calculate_level_start_time(next_positions, cycle_number, reference_level)
return end_time - start_time
[docs]
def get_musical_time(self, real_time: float, reference_level: Optional[int] = None) -> Union['MusicalTime', Literal[False]]:
"""
Convert real time to musical time within this meter.
Args:
real_time: Time in seconds
reference_level: Hierarchical level for fractional calculation
(0=beat, 1=subdivision, etc.). Defaults to finest level.
Returns:
MusicalTime object if time falls within meter boundaries, False otherwise
"""
from .musical_time import MusicalTime
# Step 1: Boundary validation
if real_time < self.start_time:
return False
# Calculate proper end time
if self.all_pulses and len(self.all_pulses) > 0:
# For boundary validation, use theoretical end time to maintain compatibility with existing tests
# The pulse-based logic will handle actual cycle boundaries in the main calculation
actual_end_time = self.start_time + self.repetitions * self.cycle_dur
else:
# No pulses available - this should not happen as we require pulse data
raise ValueError("No pulse data available for meter. Pulse data is required for musical time calculation.")
if real_time > actual_end_time:
return False
# Validate reference level
ref_level = self._validate_reference_level(reference_level)
# Step 2: Pulse-based cycle calculation (pulse data always available)
if not self.all_pulses or len(self.all_pulses) == 0:
raise ValueError(f"No pulse data available for meter. Pulse data is required for musical time calculation.")
cycle_number = None
cycle_offset = None
for cycle in range(self.repetitions):
cycle_start_pulse_idx = cycle * self._pulses_per_cycle
# Get actual cycle start time
if cycle_start_pulse_idx < len(self.all_pulses):
cycle_start_time = self.all_pulses[cycle_start_pulse_idx].real_time
# Get actual cycle end time
next_cycle_start_pulse_idx = (cycle + 1) * self._pulses_per_cycle
if next_cycle_start_pulse_idx < len(self.all_pulses):
cycle_end_time = self.all_pulses[next_cycle_start_pulse_idx].real_time
else:
# Final cycle - use theoretical end
cycle_end_time = self.start_time + self.repetitions * self.cycle_dur
# Check if time falls within this cycle
# For the final cycle, include the exact end time (Issue #38 fix)
if cycle == self.repetitions - 1:
# Final cycle: include exact end time
if cycle_start_time <= real_time <= cycle_end_time:
cycle_number = cycle
cycle_offset = real_time - cycle_start_time
break
else:
# Intermediate cycles: exclude end time (it belongs to next cycle)
if cycle_start_time <= real_time < cycle_end_time:
cycle_number = cycle
cycle_offset = real_time - cycle_start_time
break
# Error if no pulse-based cycle found - indicates data integrity issue
if cycle_number is None:
raise ValueError(f"Unable to determine cycle for time {real_time} using pulse data. "
f"Time does not fall within any pulse-based cycle boundaries. "
f"This indicates a problem with meter pulse data integrity.")
# Step 3: Fractional beat calculation
# Find the correct pulse based on actual time, not hierarchical position
# This is necessary when pulse timing has variations (rubato)
# Find the pulse that comes at or before the query time within the current cycle
cycle_start_pulse_idx = cycle_number * self._pulses_per_cycle
cycle_end_pulse_idx = min((cycle_number + 1) * self._pulses_per_cycle, len(self.all_pulses))
current_pulse_index = None
for pulse_idx in range(cycle_start_pulse_idx, cycle_end_pulse_idx):
if self.all_pulses[pulse_idx].real_time <= real_time:
current_pulse_index = pulse_idx
else:
break
if current_pulse_index is None:
# Query time is before all pulses in this cycle (shouldn't happen but handle gracefully)
current_pulse_index = cycle_start_pulse_idx
current_pulse_time = self.all_pulses[current_pulse_index].real_time
# Update positions to reflect the actual pulse found
positions = self._pulse_index_to_hierarchical_position(current_pulse_index, cycle_number)
# Find next pulse for fractional calculation - always use pulse-based logic
if current_pulse_index + 1 < len(self.all_pulses):
next_pulse_time = self.all_pulses[current_pulse_index + 1].real_time
pulse_duration = next_pulse_time - current_pulse_time
if pulse_duration <= 0:
fractional_beat = 0.0
else:
time_from_current_pulse = real_time - current_pulse_time
fractional_beat = time_from_current_pulse / pulse_duration
else:
# This is the last pulse - fractional_beat should be 0.0 since we can't calculate duration
fractional_beat = 0.0
# Note: fractional_beat calculation may need refinement when hierarchical position
# calculation finds the wrong pulse due to timing variations, but clamping ensures valid range
# Clamp to [0, 1) range (exclusive upper bound for MusicalTime)
fractional_beat = max(0.0, min(0.9999999999999999, fractional_beat))
# Step 4: Handle reference level truncation (if specified)
if ref_level is not None and ref_level < len(self.hierarchy) - 1:
# Truncate positions to reference level for final result
positions = positions[:ref_level + 1]
# Step 5: Result construction
return MusicalTime(
cycle_number=cycle_number,
hierarchical_position=positions,
fractional_beat=fractional_beat
)