#!/usr/bin/env python3 r""" fsevents_gap_finder.py V 20250909 1739 Author: Stephen Bunting steve@buntingdigitalforensics.us ---------------------- Detect significant gaps in FSEvents-style timelines (e.g., iOS/macOS fseventsd outputs or parsed exports). Inputs: --csv | --excel | --sqlite Key flags: --csv-time-col / --csv-path-col (for CSV/Excel) --sheet (Excel; otherwise auto-detects) --table --time-col [--path-col] (SQLite) --threshold-min (gap threshold, minutes; default 30) --min-gap (minimum gap to keep; default threshold) --start --end (UTC window) --exclude-path "*GlobA*;*GlobB*" (path noise filter) Outputs in --out: fsevents_gap_report.txt | .csv | .json fsevents_gap_report_daily_summary.csv (if --daily-summary) fsevents_gap_report_case_exhibit.csv (if --case-exhibit N) Examples (PowerShell): python fsevents_gap_finder.py ` --excel "D:\Cases\Case\XRY_Archives.xlsx" ` --csv-time-col "Created" ` --csv-path-col "Path" ` --threshold-min 30 --min-gap 30 ` --start "2021-06-06" --end "2021-06-08 23:59:59" ` --exclude-path "*MobileAsset*;*Caches*;*temporary*" ` --daily-summary --case-exhibit 25 --print-top 10 --debug """ import argparse import csv import datetime as dt import fnmatch import json import os import re import sqlite3 import sys from typing import Dict, List, Optional, Tuple APPLE_COCOA_EPOCH = dt.datetime(2001, 1, 1, tzinfo=dt.timezone.utc) UNIX_EPOCH = dt.datetime(1970, 1, 1, tzinfo=dt.timezone.utc) DEFAULT_TIME_COLS = [ "timestamp","time","event_time","date","event_date","event_timestamp", "Timestamp","Date/Time (UTC)","Timestamp (UTC)","Date/Time", "CreatedUTC","Time","Created","Modified","Accessed" ] DEFAULT_PATH_COLS = ["path","file_path","item_path","full_path","Path","Full Path","Item Path","File Path"] UTC_TOK_RE = re.compile(r"\s+UTC([+-]\d{2}:\d{2})?$", re.IGNORECASE) # ---------- time parsing ---------- def to_utc(d: dt.datetime) -> dt.datetime: if d.tzinfo is None: return d.replace(tzinfo=dt.timezone.utc) return d.astimezone(dt.timezone.utc) def try_parse_formats(s: str, formats: List[str]) -> Optional[dt.datetime]: for fmt in formats: try: d = dt.datetime.strptime(s, fmt) if d.tzinfo is None: d = d.replace(tzinfo=dt.timezone.utc) return to_utc(d) except Exception: pass return None def parse_xamn_utc(raw: str) -> Optional[dt.datetime]: """ Examples handled: '2021/05/15 20:43:20 UTC' '2021/05/15 20:43:20 UTC-04:00' '5/15/2021 8:03:55 PM UTC-04:00' '5/15/2021 8:03:55 PM UTC' """ if raw is None: return None s = str(raw).strip() m = UTC_TOK_RE.search(s) if not m: return None dt_txt = s[:m.start()].strip() off_txt = m.group(1) # e.g. '-04:00' or None candidates = [ "%Y/%m/%d %H:%M:%S", "%Y-%m-%d %H:%M:%S", "%m/%d/%Y %I:%M:%S %p", "%m-%d-%Y %I:%M:%S %p", ] for fmt in candidates: try: base = dt.datetime.strptime(dt_txt, fmt).replace(tzinfo=dt.timezone.utc) if off_txt: sign = 1 if off_txt.startswith("+") else -1 hh = int(off_txt[1:3]); mm = int(off_txt[4:6]) delta = dt.timedelta(hours=hh, minutes=mm) * sign base = base - delta return base.astimezone(dt.timezone.utc) except Exception: continue return None def parse_timestamp_auto(raw: str) -> Optional[dt.datetime]: x = parse_xamn_utc(raw) if x is not None: return x if raw is None: return None s = str(raw).strip() if not s: return None fmts = [ "%Y-%m-%d %H:%M:%S%z","%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S%z","%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S.%f","%Y-%m-%d %H:%M:%S.%f%z","%Y-%m-%d %H:%M:%S.%f", "%Y/%m/%d %H:%M:%S%z","%Y/%m/%d %H:%M:%S", "%Y/%m/%d %H:%M:%S.%f%z","%Y/%m/%d %H:%M:%S.%f", "%m/%d/%Y %I:%M:%S %p%z","%m/%d/%Y %I:%M:%S %p", "%m-%d-%Y %I:%M:%S %p%z","%m-%d-%Y %I:%M:%S %p", ] d = try_parse_formats(s, fmts) if d: return d # numeric epochs for base in (UNIX_EPOCH, APPLE_COCOA_EPOCH): try: f = float(s) if f > 1e10: f /= 1000.0 return to_utc(base + dt.timedelta(seconds=f)) except Exception: pass return None def force_epoch(raw: str, epoch: Optional[str]) -> Optional[dt.datetime]: if epoch is None: return parse_timestamp_auto(raw) try: f = float(str(raw).strip()) if f > 1e10: f /= 1000.0 base = UNIX_EPOCH if epoch == "unix" else APPLE_COCOA_EPOCH return to_utc(base + dt.timedelta(seconds=f)) except Exception: return None def parse_date_only(s: Optional[str]) -> Optional[dt.datetime]: if not s: return None s = s.strip() fmts = [ "%Y-%m-%d","%Y/%m/%d", "%Y-%m-%d %H:%M:%S","%Y/%m/%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S","%Y-%m-%dT%H:%M:%S%z", "%Y/%m/%dT%H:%M:%S","%Y/%m/%dT%H:%M:%S%z", ] d = try_parse_formats(s, fmts) return to_utc(d) if d else None # ---------- loaders ---------- def load_from_csv(csv_path: str, epoch_hint: Optional[str], csv_time_col: Optional[str]=None, csv_path_col: Optional[str]=None, debug: bool=False) -> Tuple[List[Dict], str]: with open(csv_path, "r", encoding="utf-8", errors="replace") as f: sample = f.read(4096) delim = "\t" if (sample.count("\t") > sample.count(",")) else "," if debug: print(f"[DEBUG] Delimiter detected: {'TAB' if delim=='\\t' else 'COMMA'}") rows: List[Dict] = [] with open(csv_path, "r", encoding="utf-8", errors="replace", newline="") as f: rdr = csv.DictReader(f, delimiter=delim) headers = [h.strip() for h in (rdr.fieldnames or [])] if debug: print(f"[DEBUG] Headers ({len(headers)}): {headers}") time_col = csv_time_col or next((c for c in DEFAULT_TIME_COLS if c in headers), None) path_col = csv_path_col or next((c for c in DEFAULT_PATH_COLS if c in headers), None) if debug: print(f"[DEBUG] time_col chosen: {time_col}") print(f"[DEBUG] path_col chosen: {path_col}") if time_col is None: raise ValueError(f"Could not detect a timestamp column in {csv_path}. Found headers: {headers}") for r in rdr: raw_t = r.get(time_col) if not raw_t: continue t = force_epoch(raw_t, epoch_hint) if t is None: continue rows.append({"_ts": t, "path": r.get(path_col, "") if path_col else "", "raw": r}) return rows, ("TAB" if delim == "\t" else "COMMA") def load_from_excel(xlsx_path: str, sheet_name: Optional[str], epoch_hint: Optional[str], time_col: Optional[str], path_col: Optional[str], debug: bool=False) -> List[Dict]: try: import openpyxl except ImportError: print("openpyxl is required for --excel. Install with: pip install openpyxl", file=sys.stderr) sys.exit(2) wb = openpyxl.load_workbook(xlsx_path, data_only=True, read_only=True) # --- choose sheet --- ws = None if sheet_name and sheet_name in wb.sheetnames: ws = wb[sheet_name] chosen = f"explicit ('{sheet_name}')" else: if sheet_name: print(f"[WARN] Sheet '{sheet_name}' not found. Falling back to auto-detect.") # prefer sheets with both likely headers present for nm in wb.sheetnames: hdrs = [str(h).strip() if h is not None else "" for h in next(wb[nm].iter_rows(values_only=True))] if any(c in hdrs for c in ["Created","Modified","Accessed","Date/Time (UTC)","Time"]) and any(p in hdrs for p in ["Path","File Path"]): ws = wb[nm]; chosen = f"auto ('{nm}')"; break if ws is None: ws = wb.active; chosen = f"default active ('{ws.title}')" if debug: print(f"[DEBUG] Using sheet: {ws.title} (selection: {chosen})") # --- headers --- rows_iter = ws.iter_rows(values_only=True) header = next(rows_iter) headers = [str(h).strip() if h is not None else "" for h in header] if debug: print(f"[DEBUG] Excel headers ({len(headers)}): {headers}") def _col_index(name: str) -> Optional[int]: return headers.index(name) if name in headers else None def _column_has_values(idx: Optional[int], checks: int = 80) -> bool: if idx is None: return False n = 0 for r in ws.iter_rows(min_row=2, values_only=True): if r and idx < len(r) and r[idx] not in (None, ""): return True n += 1 if n >= checks: break return False # --- choose columns (ignore empty time columns) --- preferred_time = ["Created","Modified","Accessed","Date/Time (UTC)","Time"] + DEFAULT_TIME_COLS tcol = time_col if (time_col in headers) else None pcol = path_col if (path_col in headers) else None # user-specified but empty -> fallback if tcol is not None and not _column_has_values(_col_index(tcol)): if debug: print(f"[WARN] Requested time column '{tcol}' is empty; falling back.") tcol = None if tcol is None: for cand in preferred_time: idx = _col_index(cand) if _column_has_values(idx): tcol = cand break if pcol is None: for cand in ["Path","File Path"] + DEFAULT_PATH_COLS: if cand in headers: pcol = cand; break if tcol is None: raise ValueError(f"Could not find a non-empty timestamp column. Headers: {headers}") t_idx = headers.index(tcol) p_idx = headers.index(pcol) if pcol and pcol in headers else None if debug: print(f"[DEBUG] Excel time_col: {tcol}") print(f"[DEBUG] Excel path_col: {pcol}") # --- read rows --- out: List[Dict] = [] for row in rows_iter: if not row: continue raw_t = row[t_idx] if t_idx < len(row) else None if raw_t is None or raw_t == "": continue if isinstance(raw_t, dt.datetime): t = to_utc(raw_t) else: t = force_epoch(str(raw_t), epoch_hint) if t is None: t = parse_xamn_utc(str(raw_t)) if t is None: # unparseable continue path = "" if p_idx is not None and p_idx < len(row) and row[p_idx] is not None: path = str(row[p_idx]) out.append({"_ts": t, "path": path, "raw": {tcol: raw_t, (pcol or 'path'): path}}) return out def load_from_sqlite(sqlite_path: str, table: str, time_col: str, path_col: Optional[str], epoch_hint: Optional[str]) -> List[Dict]: rows: List[Dict] = [] con = sqlite3.connect(f"file:{sqlite_path}?mode=ro", uri=True) try: cur = con.cursor() cols = [time_col] + ([path_col] if path_col else []) q = f"SELECT {', '.join(cols)} FROM {table}" for row in cur.execute(q): raw_t = row[0] path = row[1] if (path_col and len(row) > 1) else "" t = force_epoch(str(raw_t), epoch_hint) if t is None: continue rows.append({"_ts": t, "path": path, "raw": {time_col: raw_t, (path_col or 'path'): path}}) finally: con.close() return rows # ---------- filtering & gaps ---------- def within_range(t: dt.datetime, start: Optional[dt.datetime], end: Optional[dt.datetime]) -> bool: if start and t < start: return False if end and t > end: return False return True def parse_excludes(spec: Optional[str]) -> List[str]: if not spec: return [] return [p.strip().lower() for p in spec.split(";") if p.strip()] def excluded_by_patterns(before_path: str, after_path: str, patterns: List[str]) -> bool: if not patterns: return False bp, ap = (before_path or "").lower(), (after_path or "").lower() for pat in patterns: if fnmatch.fnmatch(bp, pat) or fnmatch.fnmatch(ap, pat): return True return False def find_gaps(sorted_events: List[Dict], threshold: dt.timedelta, min_gap: dt.timedelta, exclude_patterns: List[str]) -> List[Dict]: gaps: List[Dict] = [] for i in range(1, len(sorted_events)): prev_e, cur_e = sorted_events[i-1], sorted_events[i] delta = cur_e["_ts"] - prev_e["_ts"] if delta >= threshold: g = { "gap_index": len(gaps)+1, "start": prev_e["_ts"].isoformat(), "end": cur_e["_ts"].isoformat(), "duration_seconds": int(delta.total_seconds()), "duration_hms": str(delta), "before_path": prev_e.get("path",""), "after_path": cur_e.get("path",""), } if delta >= min_gap and not excluded_by_patterns(g["before_path"], g["after_path"], exclude_patterns): gaps.append(g) return gaps # ---------- reports ---------- def write_txt(out_path: str, gaps: List[Dict], meta: Dict) -> None: with open(out_path, "w", encoding="utf-8") as f: f.write("FSEvents Gap Analysis Report\n") f.write("="*32 + "\n\n") for k,v in meta.items(): f.write(f"{k}: {v}\n") f.write("\n") if not gaps: f.write("No gaps found meeting threshold.\n"); return for g in gaps: f.write(f"[Gap {g['gap_index']}] {g['start']} → {g['end']} | {g['duration_hms']} ({g['duration_seconds']}s)\n") if g.get("before_path") or g.get("after_path"): f.write(f" before: {g.get('before_path','')}\n") f.write(f" after : {g.get('after_path','')}\n") f.write("\n") def write_csv(out_path: str, gaps: List[Dict]) -> None: if not gaps: with open(out_path,"w",newline="",encoding="utf-8") as f: w=csv.writer(f); w.writerow(["message"]); w.writerow(["No gaps found meeting threshold."]); return keys=["gap_index","start","end","duration_seconds","duration_hms","before_path","after_path"] with open(out_path,"w",newline="",encoding="utf-8") as f: w=csv.DictWriter(f,fieldnames=keys); w.writeheader() for g in gaps: w.writerow({k:g.get(k,"") for k in keys}) def write_json(out_path: str, gaps: List[Dict], meta: Dict) -> None: with open(out_path,"w",encoding="utf-8") as f: json.dump({"meta":meta,"gaps":gaps}, f, indent=2) def write_daily_summary(out_path: str, gaps: List[Dict]) -> None: if not gaps: with open(out_path,"w",newline="",encoding="utf-8") as f: w=csv.writer(f); w.writerow(["message"]); w.writerow(["No gaps found."]); return from collections import defaultdict buckets=defaultdict(int) # (date, hour)->secs for g in gaps: s = dt.datetime.fromisoformat(g["start"].replace("Z","+00:00")).astimezone(dt.timezone.utc) e = dt.datetime.fromisoformat(g["end"].replace("Z","+00:00")).astimezone(dt.timezone.utc) cur=s while cur None: sel=sorted(gaps,key=lambda g:g["duration_seconds"],reverse=True)[:max(top_n,0) or len(gaps)] headers=["rank","start","end","duration_hms","duration_seconds","before_path","after_path","notes"] with open(out_path,"w",newline="",encoding="utf-8") as f: w=csv.DictWriter(f,fieldnames=headers); w.writeheader() for i,g in enumerate(sel,1): w.writerow({"rank":i,"start":g["start"],"end":g["end"], "duration_hms":g["duration_hms"],"duration_seconds":g["duration_seconds"], "before_path":g.get("before_path",""),"after_path":g.get("after_path",""), "notes":""}) def print_top_gaps(gaps: List[Dict], top_n: int) -> None: if not gaps: print("No gaps to summarize."); return for g in sorted(gaps,key=lambda g:g["duration_seconds"],reverse=True)[:top_n]: print(f" #{g['gap_index']:>3} {g['duration_hms']:>12} {g['start']} → {g['end']}") # ---------- main ---------- def main(): ap=argparse.ArgumentParser(description="Detect time gaps in FSEvents-style timelines.") src=ap.add_mutually_exclusive_group(required=True) src.add_argument("--csv") src.add_argument("--excel") src.add_argument("--sqlite") ap.add_argument("--sheet") ap.add_argument("--table") ap.add_argument("--time-col") ap.add_argument("--path-col") ap.add_argument("--epoch",choices=["unix","cocoa"]) ap.add_argument("--threshold-min",type=int,default=30) ap.add_argument("--min-gap",type=int,default=None) ap.add_argument("--start"); ap.add_argument("--end") ap.add_argument("--out",default=".") ap.add_argument("--csv-time-col"); ap.add_argument("--csv-path-col") ap.add_argument("--exclude-path") ap.add_argument("--print-top",type=int,default=0) ap.add_argument("--daily-summary",action="store_true") ap.add_argument("--case-exhibit",type=int,default=0) ap.add_argument("--debug",action="store_true") args=ap.parse_args() meta={"threshold_minutes":args.threshold_min, "min_gap_minutes":args.min_gap if args.min_gap is not None else args.threshold_min, "source":"","start":args.start or "","end":args.end or "", "exclude_path":args.exclude_path or ""} if args.sqlite: if not args.table or not args.time_col: print("--sqlite requires --table and --time-col",file=sys.stderr); sys.exit(2) meta["source"]=f"sqlite:{args.sqlite} table={args.table} time_col={args.time_col}" rows=load_from_sqlite(args.sqlite,args.table,args.time_col,args.path_col,args.epoch) elif args.excel: meta["source"]=f"excel:{args.excel}" rows=load_from_excel(args.excel,args.sheet,args.epoch,args.csv_time_col,args.csv_path_col,debug=args.debug) else: meta["source"]=f"csv:{args.csv}" rows,_=load_from_csv(args.csv,args.epoch,args.csv_time_col,args.csv_path_col,debug=args.debug) if not rows: print("No events loaded. Check input paths and column names."); sys.exit(1) start_dt=parse_date_only(args.start); end_dt=parse_date_only(args.end) if start_dt or end_dt: rows=[r for r in rows if within_range(r["_ts"],start_dt,end_dt)] if args.debug: print(f"[DEBUG] Rows after time slice: {len(rows)}") if not rows: print("No events remain after applying --start/--end window."); sys.exit(1) rows.sort(key=lambda r:r["_ts"]) threshold=dt.timedelta(minutes=args.threshold_min) min_gap=dt.timedelta(minutes=(args.min_gap if args.min_gap is not None else args.threshold_min)) exclude_patterns=parse_excludes(args.exclude_path) gaps=find_gaps(rows,threshold,min_gap,exclude_patterns) os.makedirs(args.out,exist_ok=True) base=os.path.join(args.out,"fsevents_gap_report") write_txt(base+".txt",gaps,meta) write_csv(base+".csv",gaps) write_json(base+".json",gaps,meta) if args.daily_summary: write_daily_summary(base+"_daily_summary.csv",gaps) if args.case_exhibit: write_case_exhibit(base+"_case_exhibit.csv",gaps,args.case_exhibit) print(f"[OK] Events loaded: {len(rows)}") print(f"[OK] Gaps ≥ {args.threshold_min} min (post-filters): {len(gaps)}") print(f"[OK] Reports written: {base}.txt, .csv, .json") if args.daily_summary: print(f"[OK] Daily summary: {base}_daily_summary.csv") if args.case_exhibit: print(f"[OK] Case exhibit (top {args.case_exhibit}): {base}_case_exhibit.csv") if args.print_top>0: print("Top gaps:"); print_top_gaps(gaps,args.print_top) if __name__=="__main__": main()