fix: infer travel_mode from timelinePath speed when activity type is UNKNOWN
Some checks failed
pre-commit / pre-commit (push) Has been cancelled
tests / Detect unreleased dependencies (push) Has been cancelled
tests / test with OCB (push) Has been cancelled
tests / test with Odoo (push) Has been cancelled

This commit is contained in:
2026-03-14 03:14:13 +00:00
parent 7c65a34fce
commit 988d26df4e

View File

@@ -38,18 +38,32 @@ def _distance_meters(lat1, lon1, lat2, lon2):
def _activity_type_to_travel_mode(activity_type):
"""Map Google activity type string to Odoo travel_mode selection value."""
if not activity_type:
return 'unknown'
return None
t = activity_type.upper()
if t in VEHICLE_ACTIVITIES:
return 'driving'
if 'UNKNOWN' in t:
return None
if t == 'IN_RAIL_VEHICLE':
return 'transit'
if t in VEHICLE_ACTIVITIES:
return 'driving'
if t in WALKING_ACTIVITIES:
return 'walking'
if t in CYCLING_ACTIVITIES:
return 'cycling'
return None
def _speed_to_travel_mode(max_speed_mph):
"""Infer travel mode from maximum speed observed along a path."""
if max_speed_mph > 150:
return 'transit' # flight
if max_speed_mph > 15:
return 'driving'
if max_speed_mph > 3:
return 'cycling'
if max_speed_mph > 0:
return 'walking'
return 'unknown'
@@ -69,6 +83,36 @@ def _parse_ts(ts_str):
return None
def _max_speed_from_path(timeline_path):
"""Calculate the maximum speed (mph) from a timelinePath segment's points."""
points = []
for pt in timeline_path:
latlng = pt.get('point', '')
ts_str = pt.get('time', '')
lat, lng = _parse_latlng(latlng)
ts = _parse_ts(ts_str)
if lat is not None and ts is not None:
points.append((ts, lat, lng))
if len(points) < 2:
return 0.0
points.sort(key=lambda x: x[0])
max_speed = 0.0
for i in range(1, len(points)):
t0, lat0, lng0 = points[i - 1]
t1, lat1, lng1 = points[i]
dt_hours = (t1 - t0).total_seconds() / 3600.0
if dt_hours <= 0:
continue
dist_miles = _haversine_miles(lat0, lng0, lat1, lng1)
speed = dist_miles / dt_hours
if speed > max_speed:
max_speed = speed
return max_speed
class WtImportTimelineWizard(models.TransientModel):
_name = 'wt.import.timeline.wizard'
_description = 'Import Google Timeline'
@@ -102,7 +146,6 @@ class WtImportTimelineWizard(models.TransientModel):
def action_import(self):
self.ensure_one()
# Load data from file upload or server path
if self.import_mode == 'server_path':
if not self.server_file_path:
raise UserError(_('Please enter the server file path.'))
@@ -123,7 +166,6 @@ class WtImportTimelineWizard(models.TransientModel):
except Exception as e:
raise UserError(_('Invalid JSON file: %s') % str(e))
# Auto-detect format
if 'semanticSegments' in data:
stops = self._parse_semantic_timeline(data)
elif 'timelineEdits' in data:
@@ -134,7 +176,6 @@ class WtImportTimelineWizard(models.TransientModel):
if not stops:
raise UserError(_('No location stops found in the file.'))
# Filter by minimum stop duration
min_secs = self.min_stop_minutes * 60
stops = [s for s in stops
if s.get('arrived_at') and s.get('departed_at')
@@ -145,7 +186,6 @@ class WtImportTimelineWizard(models.TransientModel):
stops.sort(key=lambda s: s['arrived_at'])
# Compute distances and travel times
for i, stop in enumerate(stops):
if i > 0:
prev = stops[i - 1]
@@ -159,7 +199,6 @@ class WtImportTimelineWizard(models.TransientModel):
stop['distance_from_previous'] = 0.0
stop['travel_time_from_previous'] = 0.0
# Skip existing records (deduplicate by arrived_at)
LocationLog = self.env['wt.location.log']
existing = set(
r.arrived_at.strftime('%Y-%m-%d %H:%M:%S')
@@ -207,59 +246,77 @@ class WtImportTimelineWizard(models.TransientModel):
}
def _parse_semantic_timeline(self, data):
"""Parse Timeline.json (semanticSegments format from Android export).
"""Parse Timeline.json (semanticSegments format).
Iterates segments in order. Activity segments (travel between stops)
are captured to provide travel_mode for the following visit segment.
Travel mode is determined in priority order:
1. activity segment topCandidate.type (if not UNKNOWN)
2. max speed calculated from preceding timelinePath points
3. 'unknown' as fallback
"""
stops = []
segments = data.get('semanticSegments', [])
pending_travel_mode = 'unknown'
# We scan segments in order, accumulating travel signals between visits
pending_mode = 'unknown'
pending_path_points = [] # all timelinePath points seen since last visit
for seg in segments:
# --- Activity / travel segment ---
activity = seg.get('activity')
if activity:
# --- timelinePath: collect GPS path points for speed inference ---
if 'timelinePath' in seg:
pending_path_points.extend(seg['timelinePath'])
continue
# --- activity segment: try to get explicit travel mode ---
if 'activity' in seg:
activity = seg['activity']
top = activity.get('topCandidate', {})
activity_type = top.get('type', '')
mode = _activity_type_to_travel_mode(activity_type)
if mode != 'unknown':
pending_travel_mode = mode
mode = _activity_type_to_travel_mode(top.get('type', ''))
if mode:
pending_mode = mode
continue
# --- Visit / stop segment ---
visit = seg.get('visit')
if not visit:
# timelinePath or unknown — ignore but keep pending travel mode
# --- visit segment: the stop we care about ---
if 'visit' in seg:
start_ts = _parse_ts(seg.get('startTime'))
end_ts = _parse_ts(seg.get('endTime'))
if not start_ts or not end_ts:
pending_mode = 'unknown'
pending_path_points = []
continue
candidate = seg['visit'].get('topCandidate', {})
semantic_type = candidate.get('semanticType', '')
latlng_str = candidate.get('placeLocation', {}).get('latLng', '')
lat, lng = _parse_latlng(latlng_str) if latlng_str else (None, None)
category = SEMANTIC_TYPE_CATEGORY.get(semantic_type, '')
if not category and semantic_type:
category = semantic_type.replace('_', ' ').title()
# Determine travel mode: explicit activity type wins,
# otherwise infer from max speed along the preceding path
travel_mode = pending_mode
if travel_mode == 'unknown' and pending_path_points:
max_speed = _max_speed_from_path(pending_path_points)
travel_mode = _speed_to_travel_mode(max_speed)
stops.append({
'arrived_at': start_ts,
'departed_at': end_ts,
'lat': lat,
'lng': lng,
'place_name': '',
'category': category,
'travel_mode': travel_mode,
})
# Reset for next inter-visit gap
pending_mode = 'unknown'
pending_path_points = []
continue
start_ts = _parse_ts(seg.get('startTime'))
end_ts = _parse_ts(seg.get('endTime'))
if not start_ts or not end_ts:
pending_travel_mode = 'unknown'
continue
candidate = visit.get('topCandidate', {})
semantic_type = candidate.get('semanticType', '')
latlng_str = candidate.get('placeLocation', {}).get('latLng', '')
lat, lng = _parse_latlng(latlng_str) if latlng_str else (None, None)
category = SEMANTIC_TYPE_CATEGORY.get(semantic_type, '')
if not category and semantic_type:
category = semantic_type.replace('_', ' ').title()
stops.append({
'arrived_at': start_ts,
'departed_at': end_ts,
'lat': lat,
'lng': lng,
'place_name': '',
'category': category,
'travel_mode': pending_travel_mode,
})
# Reset after consuming for the next visit
pending_travel_mode = 'unknown'
# timelineMemory or other unknown segment types — ignore
pass
return stops
@@ -298,7 +355,8 @@ class WtImportTimelineWizard(models.TransientModel):
if not window:
return 'unknown'
counts = Counter(a['type'] for a in window)
return _activity_type_to_travel_mode(counts.most_common(1)[0][0])
top_type = counts.most_common(1)[0][0]
return _activity_type_to_travel_mode(top_type) or 'unknown'
stops = []
current_cluster = [positions[0]]