Single-file Python CLI to batch-test multiple LLM models with predefined queries. Supports YAML/JSON config, reasoning detection (<think> tags and reasoning_content field), per-query token/speed stats, and graceful API error handling. Install with `pip install -e .` to get the `llmqt` command. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
257 lines
8.2 KiB
Python
257 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
|
"""llmqt - LLM Query Tester"""
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
yaml = None
|
|
|
|
from openai import OpenAI
|
|
|
|
|
|
def load_config(config_path: Path) -> dict:
|
|
suffix = config_path.suffix.lower()
|
|
with open(config_path) as f:
|
|
if suffix in ('.yaml', '.yml'):
|
|
if yaml is None:
|
|
print("Error: pyyaml is required for YAML configs. Run: pip install pyyaml")
|
|
sys.exit(1)
|
|
return yaml.safe_load(f)
|
|
elif suffix == '.json':
|
|
return json.load(f)
|
|
else:
|
|
raise ValueError(f"Unsupported config format: {config_path.suffix} (use .yaml, .yml, or .json)")
|
|
|
|
|
|
def run_query(
|
|
client: OpenAI, model: str, system_prompt: str, query: str
|
|
) -> tuple[str | None, str, dict]:
|
|
"""
|
|
Send a query and return (reasoning, answer, stats).
|
|
|
|
stats keys:
|
|
prompt_tokens, completion_tokens, total_tokens,
|
|
elapsed_s, tokens_per_sec, error
|
|
"""
|
|
t0 = time.monotonic()
|
|
try:
|
|
response = client.chat.completions.create(
|
|
model=model,
|
|
messages=[
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": query},
|
|
],
|
|
)
|
|
except Exception as exc:
|
|
elapsed = time.monotonic() - t0
|
|
stats = {
|
|
"prompt_tokens": 0,
|
|
"completion_tokens": 0,
|
|
"total_tokens": 0,
|
|
"elapsed_s": elapsed,
|
|
"tokens_per_sec": 0.0,
|
|
"error": str(exc),
|
|
}
|
|
return None, "", stats
|
|
elapsed = time.monotonic() - t0
|
|
|
|
message = response.choices[0].message
|
|
content = message.content or ""
|
|
|
|
# Some APIs (e.g. DeepSeek) expose reasoning_content as a separate field.
|
|
reasoning = getattr(message, "reasoning_content", None)
|
|
|
|
if not reasoning:
|
|
# Fall back to extracting <think>...</think> from content.
|
|
think_match = re.search(r"<think>(.*?)</think>", content, re.DOTALL)
|
|
if think_match:
|
|
reasoning = think_match.group(1).strip()
|
|
content = re.sub(r"<think>.*?</think>", "", content, flags=re.DOTALL).strip()
|
|
|
|
usage = response.usage
|
|
prompt_tokens = usage.prompt_tokens if usage else 0
|
|
completion_tokens = usage.completion_tokens if usage else 0
|
|
total_tokens = usage.total_tokens if usage else 0
|
|
tps = completion_tokens / elapsed if elapsed > 0 else 0.0
|
|
|
|
stats = {
|
|
"prompt_tokens": prompt_tokens,
|
|
"completion_tokens": completion_tokens,
|
|
"total_tokens": total_tokens,
|
|
"elapsed_s": elapsed,
|
|
"tokens_per_sec": tps,
|
|
"error": None,
|
|
}
|
|
|
|
return reasoning or None, content, stats
|
|
|
|
|
|
def sanitize_filename(name: str) -> str:
|
|
"""Replace characters that are unsafe in filenames."""
|
|
return "".join(c if c.isalnum() or c in "._- " else "_" for c in name).strip()
|
|
|
|
|
|
def format_stats_inline(stats: dict) -> str:
|
|
return (
|
|
f"{stats['elapsed_s']:.1f}s | "
|
|
f"{stats['completion_tokens']} completion tokens | "
|
|
f"{stats['tokens_per_sec']:.1f} tok/s"
|
|
)
|
|
|
|
|
|
def format_stats_table(indexed_stats: list[tuple[int, dict]]) -> str:
|
|
"""Render a summary stats table. indexed_stats is [(query_number, stats), ...]."""
|
|
all_stats = [s for _, s in indexed_stats]
|
|
total_prompt = sum(s["prompt_tokens"] for s in all_stats)
|
|
total_comp = sum(s["completion_tokens"] for s in all_stats)
|
|
total_tok = sum(s["total_tokens"] for s in all_stats)
|
|
total_elapsed = sum(s["elapsed_s"] for s in all_stats)
|
|
avg_tps = total_comp / total_elapsed if total_elapsed > 0 else 0.0
|
|
|
|
lines = [
|
|
"| Query | Elapsed | Prompt tok | Completion tok | Total tok | tok/s |",
|
|
"|-------|---------|------------|----------------|-----------|-------|",
|
|
]
|
|
for i, s in indexed_stats:
|
|
lines.append(
|
|
f"| {i} "
|
|
f"| {s['elapsed_s']:.1f}s "
|
|
f"| {s['prompt_tokens']} "
|
|
f"| {s['completion_tokens']} "
|
|
f"| {s['total_tokens']} "
|
|
f"| {s['tokens_per_sec']:.1f} |"
|
|
)
|
|
lines.append(
|
|
f"| **Total** "
|
|
f"| **{total_elapsed:.1f}s** "
|
|
f"| **{total_prompt}** "
|
|
f"| **{total_comp}** "
|
|
f"| **{total_tok}** "
|
|
f"| **{avg_tps:.1f}** |"
|
|
)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def process_config(config_path: Path, system_prompt: str, client: OpenAI) -> None:
|
|
config = load_config(config_path)
|
|
|
|
for key in ("models", "queries"):
|
|
if key not in config:
|
|
print(f"Error: Missing '{key}' in {config_path}")
|
|
sys.exit(1)
|
|
|
|
models = config["models"]
|
|
queries = config["queries"]
|
|
|
|
output_dir = Path.cwd() / config_path.stem
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
for model in models:
|
|
print(f"\n Model: {model}")
|
|
results: list[tuple[str, str | None, str, dict]] = []
|
|
|
|
for i, query in enumerate(queries, 1):
|
|
preview = query[:70] + ("..." if len(query) > 70 else "")
|
|
print(f" Query {i}/{len(queries)}: {preview}")
|
|
reasoning, answer, stats = run_query(client, model, system_prompt, query)
|
|
results.append((query, reasoning, answer, stats))
|
|
if stats["error"]:
|
|
print(f" -> ERROR: {stats['error']}")
|
|
else:
|
|
tag = " [reasoning]" if reasoning else ""
|
|
print(f" -> {format_stats_inline(stats)}{tag}")
|
|
|
|
model_filename = sanitize_filename(model) + ".md"
|
|
output_path = output_dir / model_filename
|
|
|
|
with open(output_path, "w") as f:
|
|
f.write(f"# {model}\n\n")
|
|
f.write(f"**Config:** `{config_path.name}`\n\n")
|
|
|
|
# Summary stats table (only successful queries, preserving query number)
|
|
successful_stats = [(i, s) for i, (_, _, _, s) in enumerate(results, 1) if not s["error"]]
|
|
if successful_stats:
|
|
f.write("## Statistics\n\n")
|
|
f.write(format_stats_table(successful_stats))
|
|
f.write("\n\n---\n\n")
|
|
|
|
for i, (query, reasoning, answer, stats) in enumerate(results, 1):
|
|
f.write(f"## Query {i}\n\n")
|
|
f.write(f"> {query}\n\n")
|
|
|
|
if stats["error"]:
|
|
f.write(f"> [!WARNING]\n> **Error:** {stats['error']}\n\n")
|
|
f.write("---\n\n")
|
|
continue
|
|
|
|
f.write(
|
|
f"*{stats['elapsed_s']:.1f}s · "
|
|
f"{stats['completion_tokens']} completion tokens · "
|
|
f"{stats['tokens_per_sec']:.1f} tok/s*\n\n"
|
|
)
|
|
|
|
if reasoning:
|
|
f.write("### Reasoning\n\n")
|
|
f.write(f"{reasoning}\n\n")
|
|
|
|
f.write("### Response\n\n")
|
|
f.write(f"{answer}\n\n")
|
|
f.write("---\n\n")
|
|
|
|
print(f" Saved: {output_path}")
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 3:
|
|
print("Usage: llmqt <system_prompt.md> <config1.yaml> [config2.yaml ...]")
|
|
print()
|
|
print("Environment variables:")
|
|
print(" OPENAI_API_KEY (required)")
|
|
print(" OPENAI_API_BASE (optional, for custom endpoints)")
|
|
sys.exit(1)
|
|
|
|
api_key = os.environ.get("OPENAI_API_KEY")
|
|
if not api_key:
|
|
print("Error: OPENAI_API_KEY environment variable not set.")
|
|
sys.exit(1)
|
|
|
|
api_base = os.environ.get("OPENAI_API_BASE") or os.environ.get("OPENAI_BASE_URL")
|
|
client_kwargs = {"api_key": api_key}
|
|
if api_base:
|
|
client_kwargs["base_url"] = api_base
|
|
|
|
client = OpenAI(**client_kwargs)
|
|
|
|
prompt_path = Path(sys.argv[1])
|
|
if not prompt_path.exists():
|
|
print(f"Error: System prompt file not found: {prompt_path}")
|
|
sys.exit(1)
|
|
|
|
with open(prompt_path) as f:
|
|
system_prompt = f.read()
|
|
|
|
config_paths = []
|
|
for arg in sys.argv[2:]:
|
|
p = Path(arg)
|
|
if not p.exists():
|
|
print(f"Error: Config file not found: {p}")
|
|
sys.exit(1)
|
|
config_paths.append(p)
|
|
|
|
for config_path in config_paths:
|
|
print(f"\nProcessing: {config_path}")
|
|
process_config(config_path, system_prompt, client)
|
|
|
|
print("\nDone.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|