Scheduler API¶
::: src.scheduler
Module: src/scheduler.py
The scheduler runs as an asyncio background task, polling data/scheduled_tasks.json for due tasks.
Scheduler¶
class Scheduler:
def __init__(self, agent, config: AppConfig, channels: dict = None): ...
async def start(self) -> None: ...
async def stop(self) -> None: ...
__init__(agent, config, channels=None)¶
| Parameter | Type | Description |
|---|---|---|
agent |
LangGraph agent | The agent instance to invoke for task execution |
config |
AppConfig |
Application config (reads scheduler.poll_interval, scheduler.data_file) |
channels |
dict[str, AbstractChannel] |
Map of channel name to channel instance for sending results |
start()¶
Creates an asyncio.Task running the polling loop. The loop calls _check_and_run() every poll_interval seconds.
stop()¶
Cancels the polling task and drains any in-flight task executions via asyncio.gather().
Polling Loop¶
The scheduler checks for due tasks under a shared asyncio.Lock (same lock used by cron tools) to prevent race conditions:
async with get_tasks_lock():
# Read tasks, mark due ones, write back
...
# Execute due tasks in parallel, outside the lock
for task in due_tasks:
asyncio.create_task(self._execute_task(task))
Task Types¶
Cron¶
Uses croniter expressions:
Due when: current time >= next cron iteration after last_run.
Interval¶
Seconds between runs:
Due when: (now - last_run).total_seconds() >= interval. First run is immediate if no last_run.
Once¶
ISO 8601 timestamp:
Due when: current time >= target timestamp. Automatically deactivated ("active": false) after execution.
Task JSON Format¶
{
"id": "abc12345",
"prompt": "Check the weather forecast for tomorrow",
"type": "cron",
"value": "0 9 * * *",
"channel": "telegram",
"chat_id": "123456789",
"created_at": "2025-01-15T10:00:00+00:00",
"last_run": null,
"active": true
}
| Field | Type | Description |
|---|---|---|
id |
str |
Short UUID (first 8 chars) |
prompt |
str |
Message sent to the agent when task fires |
type |
str |
"cron", "interval", or "once" |
value |
str |
Cron expression, seconds, or ISO timestamp |
channel |
str |
Originating channel name |
chat_id |
str |
Originating chat ID |
created_at |
str |
ISO timestamp of creation |
last_run |
str? |
ISO timestamp of last execution |
active |
bool |
false = skipped by scheduler |
Task Execution¶
Each due task runs as an independent asyncio.Task:
- If the task has
model_tier, callsset_active_tier(tier)so theRoutingChatModeluses that tier's LLM - Agent invoked with
thread_id = "scheduler_{task_id}"— the agent runs with full tools, memory, and context on the specified tier reset_active_tier()in afinallyblock ensures cleanup even on errors- Response extracted via
extract_agent_response() - Result sent to the originating channel/chat via
channel.send()withdisable_notification=True - If channel/chat not available, result is logged and discarded
Info
Scheduler tasks use their own thread IDs (scheduler_*), so they don't interfere with user conversation history.
model_tier Field¶
Tasks can specify a model_tier to run on a specific LLM tier:
{
"id": "abc12345",
"prompt": "Analyze my portfolio in depth",
"type": "cron",
"value": "0 9 * * *",
"model_tier": "advanced"
}
When model_tier is set, the scheduler sets _active_tier before invoking the agent. Unlike the previous raw LLM approach, the agent runs with all tools and memory on the specified tier — it's the full agent, just on a different model.