261 lines
10 KiB
Python
261 lines
10 KiB
Python
"""Unit tests for EmployeesAgent — plan, gather, reason, report, peer_bus, sweep."""
|
|
import datetime
|
|
import pytest
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
from agent_service.agents.employees_agent import EmployeesAgent, EMPLOYEES_TOOLS
|
|
from agent_service.agents.base_agent import AgentDirective, AgentReport, SweepReport
|
|
|
|
|
|
def _directive(intent='', context=None):
|
|
return AgentDirective(
|
|
directive_id='emp-d1', user_id='1', intent=intent,
|
|
context=context or {}, agent_name='employees_agent',
|
|
)
|
|
|
|
|
|
def _make_agent():
|
|
agent = EmployeesAgent(odoo=MagicMock(), llm=MagicMock())
|
|
agent._ht = MagicMock()
|
|
agent._ht.get_employees = AsyncMock(return_value=[
|
|
{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}
|
|
])
|
|
agent._ht.get_leaves = AsyncMock(return_value=[])
|
|
agent._ht.get_contracts = AsyncMock(return_value=[])
|
|
agent._ht.get_employee_profile = AsyncMock(return_value={'id': 1, 'name': 'Alice'})
|
|
agent._ht.get_department_summary = AsyncMock(return_value={'headcount': 5, 'avg_wage': 50000.0})
|
|
agent._ht.get_attendance_summary = AsyncMock(return_value={'total_hours': 160.0})
|
|
agent._ht.flag_for_review = AsyncMock(return_value=True)
|
|
agent._ht.post_chatter_note = AsyncMock(return_value=True)
|
|
return agent
|
|
|
|
|
|
# ── Meta ────────────────────────────────────────────────────────────────────
|
|
|
|
def test_tool_count():
|
|
assert len(EMPLOYEES_TOOLS) <= 8
|
|
|
|
def test_agent_name():
|
|
assert EmployeesAgent.name == 'employees_agent'
|
|
|
|
def test_hipaa_locked():
|
|
from agent_service.llm.llm_router import HIPAA_LOCKED_AGENTS
|
|
assert 'employees_agent' in HIPAA_LOCKED_AGENTS
|
|
|
|
|
|
# ── _plan ───────────────────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plan_employee_intent():
|
|
agent = _make_agent()
|
|
plan = await agent._plan(_directive(intent='show all employees'))
|
|
assert plan['fetch_employees'] is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plan_headcount_intent():
|
|
agent = _make_agent()
|
|
plan = await agent._plan(_directive(intent='what is the headcount'))
|
|
assert plan['fetch_employees'] is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plan_leave_intent():
|
|
agent = _make_agent()
|
|
plan = await agent._plan(_directive(intent='show pending leave requests'))
|
|
assert plan['fetch_leaves'] is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plan_vacation_intent():
|
|
agent = _make_agent()
|
|
plan = await agent._plan(_directive(intent='list vacation requests'))
|
|
assert plan['fetch_leaves'] is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plan_contract_intent():
|
|
agent = _make_agent()
|
|
plan = await agent._plan(_directive(intent='check contracts'))
|
|
assert plan['fetch_contracts'] is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_plan_propagates_context():
|
|
agent = _make_agent()
|
|
plan = await agent._plan(_directive(context={'department_id': 3, 'employee_id': 7}))
|
|
assert plan['department_id'] == 3
|
|
assert plan['employee_id'] == 7
|
|
|
|
|
|
# ── _gather ─────────────────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gather_employees_by_default():
|
|
agent = _make_agent()
|
|
ctx = {'plan': {'fetch_employees': True, 'fetch_leaves': False,
|
|
'fetch_contracts': False, 'department_id': None, 'employee_id': None}}
|
|
data = await agent._gather(ctx)
|
|
assert 'employees' in data
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gather_department_summary_when_dept_id():
|
|
agent = _make_agent()
|
|
ctx = {'plan': {'fetch_employees': True, 'fetch_leaves': False,
|
|
'fetch_contracts': False, 'department_id': 5, 'employee_id': None}}
|
|
data = await agent._gather(ctx)
|
|
assert 'dept_summary' in data
|
|
agent._ht.get_department_summary.assert_awaited_once_with(5)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gather_leaves():
|
|
agent = _make_agent()
|
|
agent._ht.get_leaves = AsyncMock(return_value=[{'id': 1, 'name': 'Alice Leave'}])
|
|
ctx = {'plan': {'fetch_employees': False, 'fetch_leaves': True,
|
|
'fetch_contracts': False, 'department_id': None, 'employee_id': None}}
|
|
data = await agent._gather(ctx)
|
|
assert 'leaves' in data
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gather_contracts():
|
|
agent = _make_agent()
|
|
agent._ht.get_contracts = AsyncMock(return_value=[{'id': 10, 'date_end': '2025-01-01'}])
|
|
ctx = {'plan': {'fetch_employees': False, 'fetch_leaves': False,
|
|
'fetch_contracts': True, 'department_id': None, 'employee_id': None}}
|
|
data = await agent._gather(ctx)
|
|
assert 'contracts' in data
|
|
|
|
|
|
# ── _reason ─────────────────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reason_flags_expired_contracts():
|
|
agent = _make_agent()
|
|
yesterday = str((datetime.date.today() - datetime.timedelta(days=1)))
|
|
agent._gathered_data = {
|
|
'contracts': [{'id': 1, 'date_end': yesterday, 'employee_id': [1, 'Alice']}]
|
|
}
|
|
analysis = await agent._reason({})
|
|
assert len(analysis['escalations']) == 1
|
|
assert 'expired' in analysis['escalations'][0].lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reason_active_contracts_no_escalation():
|
|
agent = _make_agent()
|
|
future = str((datetime.date.today() + datetime.timedelta(days=30)))
|
|
agent._gathered_data = {
|
|
'contracts': [{'id': 1, 'date_end': future}]
|
|
}
|
|
analysis = await agent._reason({})
|
|
assert analysis['escalations'] == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reason_no_contracts_no_escalation():
|
|
agent = _make_agent()
|
|
agent._gathered_data = {'employees': [{'id': 1}]}
|
|
analysis = await agent._reason({})
|
|
assert analysis['escalations'] == []
|
|
|
|
|
|
# ── _report ─────────────────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_report_employee_count():
|
|
agent = _make_agent()
|
|
agent._gathered_data = {'employees': [{'id': 1}, {'id': 2}, {'id': 3}]}
|
|
agent._escalations_list = []
|
|
report = await agent._report({})
|
|
assert isinstance(report, AgentReport)
|
|
assert '3' in report.summary
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_report_dept_summary():
|
|
agent = _make_agent()
|
|
agent._gathered_data = {'dept_summary': {'headcount': 8, 'avg_wage': 60000.0}}
|
|
agent._escalations_list = []
|
|
report = await agent._report({})
|
|
assert '8' in report.summary
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_report_fallback_message():
|
|
agent = _make_agent()
|
|
agent._gathered_data = {}
|
|
agent._escalations_list = []
|
|
report = await agent._report({})
|
|
assert 'complete' in report.summary.lower() or 'hr' in report.summary.lower()
|
|
|
|
|
|
# ── _dispatch_tool ───────────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_get_employees():
|
|
agent = _make_agent()
|
|
await agent._dispatch_tool('get_employees', {})
|
|
agent._ht.get_employees.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_get_employee_profile():
|
|
agent = _make_agent()
|
|
await agent._dispatch_tool('get_employee_profile', {'employee_id': 42})
|
|
agent._ht.get_employee_profile.assert_awaited_once_with(employee_id=42)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_unknown_raises():
|
|
agent = _make_agent()
|
|
with pytest.raises(ValueError, match='Unknown tool'):
|
|
await agent._dispatch_tool('nonexistent', {})
|
|
|
|
|
|
# ── handle_peer_request ──────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_peer_employee_list():
|
|
agent = _make_agent()
|
|
result = await agent.handle_peer_request('employee_list', {}, 'dir-1')
|
|
assert 'employees' in result
|
|
assert len(result['employees']) == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_peer_employee_profile():
|
|
agent = _make_agent()
|
|
result = await agent.handle_peer_request('employee_profile', {'employee_id': 1}, 'dir-1')
|
|
assert 'name' in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_peer_headcount():
|
|
agent = _make_agent()
|
|
result = await agent.handle_peer_request('headcount', {}, 'dir-1')
|
|
assert result['headcount'] == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_peer_unknown_returns_error():
|
|
agent = _make_agent()
|
|
result = await agent.handle_peer_request('bad_type', {}, 'dir-1')
|
|
assert 'error' in result
|
|
|
|
|
|
# ── sweep ────────────────────────────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sweep_finds_expired_contracts():
|
|
agent = _make_agent()
|
|
yesterday = str((datetime.date.today() - datetime.timedelta(days=1)))
|
|
agent._ht.get_contracts = AsyncMock(return_value=[
|
|
{'id': 1, 'date_end': yesterday, 'employee_id': [1, 'Alice']}
|
|
])
|
|
result = await agent.sweep()
|
|
assert isinstance(result, SweepReport)
|
|
assert len(result.findings) == 1
|
|
assert result.findings[0]['type'] == 'expired_contract'
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sweep_active_contracts_no_findings():
|
|
agent = _make_agent()
|
|
future = str((datetime.date.today() + datetime.timedelta(days=60)))
|
|
agent._ht.get_contracts = AsyncMock(return_value=[
|
|
{'id': 1, 'date_end': future}
|
|
])
|
|
result = await agent.sweep()
|
|
assert result.findings == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sweep_handles_exception():
|
|
agent = _make_agent()
|
|
agent._ht.get_contracts = AsyncMock(side_effect=Exception('DB down'))
|
|
result = await agent.sweep()
|
|
assert result.error is not None
|