177 lines
6.4 KiB
Python
177 lines
6.4 KiB
Python
"""Detect ordered hops between whitelisted sites in a day's timeline.
|
|
|
|
We process visits per calendar day (Europe/London), resetting state each
|
|
day. We also support injecting a synthetic Home→FirstSite hop when the
|
|
first recognised site of the day isn't Home (assume_home_start).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import date
|
|
from typing import Dict, Iterable, List, Optional, Tuple
|
|
from collections import defaultdict
|
|
import math
|
|
import yaml
|
|
|
|
from ..ingest.semantic_reader import Location, PlaceVisit
|
|
|
|
|
|
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
|
"""Compute the great-circle distance between two points in miles."""
|
|
R = 3958.8 # Earth radius in miles
|
|
phi1 = math.radians(lat1)
|
|
phi2 = math.radians(lat2)
|
|
dphi = math.radians(lat2 - lat1)
|
|
dlambda = math.radians(lon2 - lon1)
|
|
a = math.sin(dphi / 2.0) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2.0) ** 2
|
|
c = 2 * math.atan2(math.sqrt(a), math.sqrt(max(0.0, 1 - a)))
|
|
return R * c
|
|
|
|
|
|
@dataclass
|
|
class SiteEntry:
|
|
"""Represents a single recognised site from the configuration."""
|
|
canonical: str
|
|
label: str
|
|
lat: float
|
|
lon: float
|
|
radius_m: float
|
|
aliases: List[str]
|
|
|
|
|
|
class SiteConfig:
|
|
"""Holds all recognised site definitions keyed by canonical name."""
|
|
|
|
def __init__(self, sites: Iterable[SiteEntry]):
|
|
self.by_canonical: Dict[str, SiteEntry] = {s.canonical: s for s in sites}
|
|
self.alias_map: Dict[str, str] = {}
|
|
for site in sites:
|
|
for alias in [site.canonical] + site.aliases:
|
|
self.alias_map[alias.lower()] = site.canonical
|
|
|
|
@classmethod
|
|
def from_yaml(cls, path: str) -> "SiteConfig":
|
|
"""Load a site configuration from a YAML file."""
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
raw = yaml.safe_load(f)
|
|
sites_data: List[Dict[str, object]] = []
|
|
if isinstance(raw, list):
|
|
sites_data = raw
|
|
elif isinstance(raw, dict):
|
|
if "sites" in raw and isinstance(raw["sites"], list):
|
|
sites_data = raw["sites"]
|
|
else:
|
|
for canon, entry in raw.items():
|
|
entry = entry or {}
|
|
if not isinstance(entry, dict):
|
|
raise ValueError("Site entry for %s must be a mapping" % canon)
|
|
entry = dict(entry)
|
|
entry.setdefault("canonical", canon)
|
|
sites_data.append(entry)
|
|
else:
|
|
raise ValueError("Invalid site configuration format")
|
|
sites: List[SiteEntry] = []
|
|
for entry in sites_data:
|
|
canonical = entry.get("canonical") or entry.get("name")
|
|
if not canonical:
|
|
raise ValueError("Site entry missing canonical name")
|
|
label = entry.get("label", canonical)
|
|
lat = float(entry.get("lat", 0.0))
|
|
lon = float(entry.get("lon", 0.0))
|
|
radius_m = float(entry.get("radius_m", 0.0))
|
|
aliases = entry.get("aliases") or []
|
|
sites.append(SiteEntry(
|
|
canonical=canonical,
|
|
label=label,
|
|
lat=lat,
|
|
lon=lon,
|
|
radius_m=radius_m,
|
|
aliases=list(aliases),
|
|
))
|
|
return cls(sites)
|
|
|
|
def recognise(self, location: Location) -> Optional[str]:
|
|
"""Return canonical site name if this location matches by name/alias or geofence."""
|
|
name_lower = (location.name or "").lower()
|
|
# Pass 1: name/alias substring match
|
|
for alias, canonical in self.alias_map.items():
|
|
if alias in name_lower:
|
|
return canonical
|
|
# Pass 2: geofence match
|
|
for canonical, site in self.by_canonical.items():
|
|
if site.radius_m > 0:
|
|
max_dist_miles = site.radius_m / 1609.34
|
|
d = haversine_distance(location.lat, location.lon, site.lat, site.lon)
|
|
if d <= max_dist_miles:
|
|
return canonical
|
|
return None
|
|
|
|
|
|
@dataclass
|
|
class Hop:
|
|
"""A hop from one recognised site to another, dated by the origin's start date."""
|
|
date: date
|
|
origin: str
|
|
destination: str
|
|
|
|
|
|
def _build_day_hops(day_visits: List[PlaceVisit], site_config: SiteConfig, assume_home_start: bool) -> List[Hop]:
|
|
"""Build ordered hops for a single day of visits."""
|
|
# Ensure chronological order by *start* time
|
|
day_visits = sorted(day_visits, key=lambda v: v.start_time)
|
|
|
|
recognised: List[Tuple[str, PlaceVisit]] = []
|
|
last_site: Optional[str] = None
|
|
for v in day_visits:
|
|
s = site_config.recognise(v.location)
|
|
if not s:
|
|
continue
|
|
if s == last_site:
|
|
continue # ignore duplicates back-to-back
|
|
recognised.append((s, v))
|
|
last_site = s
|
|
|
|
if not recognised:
|
|
return []
|
|
|
|
# Inject Home at start if enabled and first site isn't Home
|
|
if assume_home_start and recognised[0][0] != "Home":
|
|
first_time = recognised[0][1].start_time
|
|
synthetic_home = PlaceVisit(location=Location(lat=0.0, lon=0.0, name="Home"),
|
|
start_time=first_time, end_time=first_time)
|
|
recognised.insert(0, ("Home", synthetic_home))
|
|
|
|
# Walk forward, stop at second Home
|
|
hops: List[Hop] = []
|
|
home_hits = 1 if recognised and recognised[0][0] == "Home" else 0
|
|
for i in range(1, len(recognised)):
|
|
origin_site, origin_visit = recognised[i - 1]
|
|
dest_site, _dest_visit = recognised[i]
|
|
hop_date = origin_visit.start_time.date()
|
|
if origin_site != dest_site:
|
|
hops.append(Hop(date=hop_date, origin=origin_site, destination=dest_site))
|
|
if dest_site == "Home":
|
|
home_hits += 1
|
|
if home_hits >= 2:
|
|
break
|
|
return hops
|
|
|
|
|
|
def detect_itinerary(visits: List[PlaceVisit], site_config: SiteConfig, *, assume_home_start: bool = True) -> List[Hop]:
|
|
"""Reduce all visits into ordered hops per day, concatenated across the file."""
|
|
if not visits:
|
|
return []
|
|
|
|
# Group by the local date from each visit's start_time
|
|
by_day: Dict[date, List[PlaceVisit]] = defaultdict(list)
|
|
for v in visits:
|
|
by_day[v.start_time.date()].append(v)
|
|
|
|
hops_all: List[Hop] = []
|
|
for day in sorted(by_day.keys()):
|
|
day_hops = _build_day_hops(by_day[day], site_config, assume_home_start=assume_home_start)
|
|
hops_all.extend(day_hops)
|
|
return hops_all
|
|
|