Skip to content

crosswalk_analysis

crosswalk_analysis

Crosswalk deployment analysis for the bikeped collision-warning system.

Evaluates decision pipeline performance as a function of road width for one-camera and two-camera deployments with size-aware detection.

Geometry (matching index.html): - The road runs along the y-axis. Road width is measured along y. - The crosswalk crosses the road at x = 5m (along the y-axis). - Camera 1 is on a pole at (x=5, y=-road_half, h=5.5), heading 90 deg (facing +y, looking across the road). - Camera 2 is on a pole at (x=5, y=+road_half, h=5.5), heading 270 deg (facing -y, looking back across the road). - Cyclists approach along the x-axis toward the crosswalk.

The distance from each camera to a cyclist at (bike_x, bike_y) is: dist = sqrt((bike_x - 5)^2 + (bike_y - cam_y)^2 + h^2)

Two-camera fusion: P(detect) = 1 - (1-AR_cam1) * (1-AR_cam2).

Usage

python crosswalk_analysis.py

make_cam_at

make_cam_at(cam_y, height=CAM_HEIGHT_DEFAULT, pitch_deg=None)

Create a FisheyeCamera positioned on a pole at the crosswalk edge.

Source code in crosswalk_analysis.py
def make_cam_at(cam_y, height=CAM_HEIGHT_DEFAULT, pitch_deg=None):
    """Create a FisheyeCamera positioned on a pole at the crosswalk edge."""
    return FisheyeCamera(
        height=height, fov_deg=220.0, cam_x=CAM_X, cam_y=cam_y,
        pitch_deg=pitch_deg)

size_aware_ar

size_aware_ar(cam, obj_name, obj_x, obj_y, unc)

Get size-aware AR using full 8-corner fisheye projection.

Source code in crosswalk_analysis.py
def size_aware_ar(cam, obj_name, obj_x, obj_y, unc):
    """Get size-aware AR using full 8-corner fisheye projection."""
    obj = OBJECT_TYPES.get(obj_name)
    if obj is None:
        return 0.0
    area = cam.projected_bbox_area(obj_x, obj_y, obj)
    if area <= 0:
        return 0.0
    sized = unc.for_class(obj_name)
    return sized.for_area(area).ar

make_scenario

make_scenario(road_width_m)

Create a bike-approaches-ped scenario for a given road width.

Pedestrian walks along the crosswalk (y-axis) at x=5. Cyclist approaches along x toward the crosswalk center (y~0).

Source code in crosswalk_analysis.py
def make_scenario(road_width_m):
    """Create a bike-approaches-ped scenario for a given road width.

    Pedestrian walks along the crosswalk (y-axis) at x=5.
    Cyclist approaches along x toward the crosswalk center (y~0).
    """
    half = road_width_m / 2.0
    ped_time = road_width_m / PED_SPEED_MS

    bike_start_x = CROSSWALK_X + 20.0
    bike_end_x = CROSSWALK_X - 5.0
    bike_travel = bike_start_x - bike_end_x
    bike_time = bike_travel / BIKE_SPEED_MS
    bike_start_t = 1.0

    return {
        'name': f'Road {road_width_m:.1f}m',
        'expected_dominant': 'alert',
        'agents': [
            {'type': 'person', 'id': 1, 'path': [
                {'x': CROSSWALK_X, 'y': -half, 't': 0},
                {'x': CROSSWALK_X, 'y': 0, 't': ped_time / 2},
                {'x': CROSSWALK_X, 'y': half, 't': ped_time},
            ]},
            {'type': 'bike', 'id': 2, 'path': [
                {'x': bike_start_x, 'y': 0.5, 't': bike_start_t},
                {'x': CROSSWALK_X, 'y': 0.1, 't': bike_start_t + bike_time * 0.8},
                {'x': bike_end_x, 'y': -0.1, 't': bike_start_t + bike_time},
            ]},
        ],
    }

run_mc

run_mc(scenario, n_cameras, road_width_m, unc=None, cam_height=CAM_HEIGHT_DEFAULT, cam_pitch=None, n_trials=30, fps=30)

Run Monte Carlo for a scenario with 1 or 2 cameras.

Source code in crosswalk_analysis.py
def run_mc(scenario, n_cameras, road_width_m, unc=None,
           cam_height=CAM_HEIGHT_DEFAULT, cam_pitch=None,
           n_trials=30, fps=30):
    """Run Monte Carlo for a scenario with 1 or 2 cameras."""
    if unc is None:
        unc = UncertaintyParams()
    params = PIPELINE_CONFIGS['optimized']
    duration = get_scenario_duration(scenario)
    dt = 1.0 / fps

    half = road_width_m / 2.0
    cam1 = make_cam_at(-half, height=cam_height, pitch_deg=cam_pitch)
    cam2 = make_cam_at(half, height=cam_height, pitch_deg=cam_pitch) if n_cameras == 2 else None

    all_sens = []
    all_sev_fn = []

    for trial in range(n_trials):
        rng = np.random.default_rng(seed=trial)
        pipeline = make_pipeline(params, fps)

        states_list = []
        dangers_list = []
        severities_list = []

        t = 0.0
        while t <= duration:
            agents = []
            for ag_def in scenario['agents']:
                pos = interpolate_agent(ag_def, t)
                if pos is not None:
                    obj_name = 'pedestrian' if pos['type'] == 'person' else 'cyclist'

                    ar1 = size_aware_ar(cam1, obj_name,
                                        pos['x'], pos['y'], unc)

                    if cam2 is not None:
                        ar2 = size_aware_ar(cam2, obj_name,
                                            pos['x'], pos['y'], unc)
                        ar = 1.0 - (1.0 - ar1) * (1.0 - ar2)
                    else:
                        ar = ar1

                    if rng.random() > ar:
                        continue
                    agents.append(pos)

            state = run_pipeline_on_agents(pipeline, agents, t,
                                           cam_x=cam1.cam_x, cam_y=cam1.cam_y)
            assessment = compute_ground_truth_danger(scenario, t)

            states_list.append(state)
            dangers_list.append(assessment.is_danger)
            severities_list.append(assessment.severity)
            t += dt

        dangers = np.array(dangers_list)
        states = np.array(states_list)
        severities = np.array(severities_list)
        is_alert = states == 'alert'

        n_danger = int(np.sum(dangers))
        tp = int(np.sum(is_alert & dangers))
        sens = tp / max(n_danger, 1)
        all_sens.append(sens)

        sev_total = float(np.sum(severities[dangers])) if n_danger > 0 else 1.0
        sev_missed = float(np.sum(severities[~is_alert & dangers])) if n_danger > 0 else 0.0
        all_sev_fn.append(sev_missed / max(sev_total, 1e-6))

    return {
        'mean_sens': float(np.mean(all_sens)),
        'std_sens': float(np.std(all_sens)),
        'mean_sev_fn': float(np.mean(all_sev_fn)),
        'std_sev_fn': float(np.std(all_sev_fn)),
    }

run_height_sweep

run_height_sweep(road_width_ft, unc, heights, n_cameras=1, n_trials=20)

Sweep camera height and return sensitivity for each.

Source code in crosswalk_analysis.py
def run_height_sweep(road_width_ft, unc, heights, n_cameras=1, n_trials=20):
    """Sweep camera height and return sensitivity for each."""
    width_m = road_width_ft * FEET_TO_METERS
    scenario = make_scenario(width_m)
    results = []
    for h in heights:
        r = run_mc(scenario, n_cameras, width_m, unc=unc,
                   cam_height=h, n_trials=n_trials)
        results.append((h, r))
    return results