Files
Mileage-Logger/mileage_logger/logic/detect_itinerary.py

177 lines
6.4 KiB
Python

"""Detect ordered hops between whitelisted sites in a day's timeline.
We process visits per calendar day (Europe/London), resetting state each
day. We also support injecting a synthetic Home→FirstSite hop when the
first recognised site of the day isn't Home (assume_home_start).
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from typing import Dict, Iterable, List, Optional, Tuple
from collections import defaultdict
import math
import yaml
from ..ingest.semantic_reader import Location, PlaceVisit
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Compute the great-circle distance between two points in miles."""
R = 3958.8 # Earth radius in miles
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi / 2.0) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2.0) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(max(0.0, 1 - a)))
return R * c
@dataclass
class SiteEntry:
"""Represents a single recognised site from the configuration."""
canonical: str
label: str
lat: float
lon: float
radius_m: float
aliases: List[str]
class SiteConfig:
"""Holds all recognised site definitions keyed by canonical name."""
def __init__(self, sites: Iterable[SiteEntry]):
self.by_canonical: Dict[str, SiteEntry] = {s.canonical: s for s in sites}
self.alias_map: Dict[str, str] = {}
for site in sites:
for alias in [site.canonical] + site.aliases:
self.alias_map[alias.lower()] = site.canonical
@classmethod
def from_yaml(cls, path: str) -> "SiteConfig":
"""Load a site configuration from a YAML file."""
with open(path, "r", encoding="utf-8") as f:
raw = yaml.safe_load(f)
sites_data: List[Dict[str, object]] = []
if isinstance(raw, list):
sites_data = raw
elif isinstance(raw, dict):
if "sites" in raw and isinstance(raw["sites"], list):
sites_data = raw["sites"]
else:
for canon, entry in raw.items():
entry = entry or {}
if not isinstance(entry, dict):
raise ValueError("Site entry for %s must be a mapping" % canon)
entry = dict(entry)
entry.setdefault("canonical", canon)
sites_data.append(entry)
else:
raise ValueError("Invalid site configuration format")
sites: List[SiteEntry] = []
for entry in sites_data:
canonical = entry.get("canonical") or entry.get("name")
if not canonical:
raise ValueError("Site entry missing canonical name")
label = entry.get("label", canonical)
lat = float(entry.get("lat", 0.0))
lon = float(entry.get("lon", 0.0))
radius_m = float(entry.get("radius_m", 0.0))
aliases = entry.get("aliases") or []
sites.append(SiteEntry(
canonical=canonical,
label=label,
lat=lat,
lon=lon,
radius_m=radius_m,
aliases=list(aliases),
))
return cls(sites)
def recognise(self, location: Location) -> Optional[str]:
"""Return canonical site name if this location matches by name/alias or geofence."""
name_lower = (location.name or "").lower()
# Pass 1: name/alias substring match
for alias, canonical in self.alias_map.items():
if alias in name_lower:
return canonical
# Pass 2: geofence match
for canonical, site in self.by_canonical.items():
if site.radius_m > 0:
max_dist_miles = site.radius_m / 1609.34
d = haversine_distance(location.lat, location.lon, site.lat, site.lon)
if d <= max_dist_miles:
return canonical
return None
@dataclass
class Hop:
"""A hop from one recognised site to another, dated by the origin's start date."""
date: date
origin: str
destination: str
def _build_day_hops(day_visits: List[PlaceVisit], site_config: SiteConfig, assume_home_start: bool) -> List[Hop]:
"""Build ordered hops for a single day of visits."""
# Ensure chronological order by *start* time
day_visits = sorted(day_visits, key=lambda v: v.start_time)
recognised: List[Tuple[str, PlaceVisit]] = []
last_site: Optional[str] = None
for v in day_visits:
s = site_config.recognise(v.location)
if not s:
continue
if s == last_site:
continue # ignore duplicates back-to-back
recognised.append((s, v))
last_site = s
if not recognised:
return []
# Inject Home at start if enabled and first site isn't Home
if assume_home_start and recognised[0][0] != "Home":
first_time = recognised[0][1].start_time
synthetic_home = PlaceVisit(location=Location(lat=0.0, lon=0.0, name="Home"),
start_time=first_time, end_time=first_time)
recognised.insert(0, ("Home", synthetic_home))
# Walk forward, stop at second Home
hops: List[Hop] = []
home_hits = 1 if recognised and recognised[0][0] == "Home" else 0
for i in range(1, len(recognised)):
origin_site, origin_visit = recognised[i - 1]
dest_site, _dest_visit = recognised[i]
hop_date = origin_visit.start_time.date()
if origin_site != dest_site:
hops.append(Hop(date=hop_date, origin=origin_site, destination=dest_site))
if dest_site == "Home":
home_hits += 1
if home_hits >= 2:
break
return hops
def detect_itinerary(visits: List[PlaceVisit], site_config: SiteConfig, *, assume_home_start: bool = True) -> List[Hop]:
"""Reduce all visits into ordered hops per day, concatenated across the file."""
if not visits:
return []
# Group by the local date from each visit's start_time
by_day: Dict[date, List[PlaceVisit]] = defaultdict(list)
for v in visits:
by_day[v.start_time.date()].append(v)
hops_all: List[Hop] = []
for day in sorted(by_day.keys()):
day_hops = _build_day_hops(by_day[day], site_config, assume_home_start=assume_home_start)
hops_all.extend(day_hops)
return hops_all