API Code Examples
Complete code examples for common MCP Security Score API integrations.
Basic Scan Flow
The typical workflow is:
- Create a scan (POST /api/v1/scan)
- Poll for completion (GET /api/v1/scan/:id)
- Process results
curl Examples
Create a Scan
curl -X POST https://mcpscanner.com/api/v1/scan \
-H "Authorization: Bearer $MCP_SCANNER_API_KEY" \
-H "Content-Type: application/json" \
-d '{"url": "https://github.com/owner/repo"}'Get Scan Results
curl https://mcpscanner.com/api/v1/scan/scan_abc123 \
-H "Authorization: Bearer $MCP_SCANNER_API_KEY"List Recent Scans
curl "https://mcpscanner.com/api/v1/scans?limit=10" \
-H "Authorization: Bearer $MCP_SCANNER_API_KEY"Complete Polling Script
#!/bin/bash
set -e
API_KEY="${MCP_SCANNER_API_KEY}"
REPO_URL="https://github.com/owner/repo"
# Create scan
SCAN_RESPONSE=$(curl -s -X POST "https://mcpscanner.com/api/v1/scan" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d "{\"url\": \"$REPO_URL\"}")
SCAN_ID=$(echo "$SCAN_RESPONSE" | jq -r '.id')
echo "Scan started: $SCAN_ID"
# Poll for completion
while true; do
RESULT=$(curl -s "https://mcpscanner.com/api/v1/scan/$SCAN_ID" \
-H "Authorization: Bearer $API_KEY")
STATUS=$(echo "$RESULT" | jq -r '.status')
if [ "$STATUS" = "complete" ]; then
SCORE=$(echo "$RESULT" | jq -r '.score')
GRADE=$(echo "$RESULT" | jq -r '.grade')
echo "Scan complete! Score: $SCORE ($GRADE)"
break
elif [ "$STATUS" = "failed" ]; then
echo "Scan failed!"
exit 1
fi
echo "Status: $STATUS, waiting..."
sleep 5
done
# Check if score meets threshold
THRESHOLD=70
if [ "$SCORE" -lt "$THRESHOLD" ]; then
echo "Score $SCORE is below threshold $THRESHOLD"
exit 1
fiJavaScript/TypeScript Examples
Using fetch
const API_KEY = process.env.MCP_SCANNER_API_KEY;
const BASE_URL = 'https://mcpscanner.com/api/v1';
async function createScan(url: string) {
const response = await fetch(`${BASE_URL}/scan`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ url }),
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.error.message);
}
return response.json();
}
async function getScan(scanId: string) {
const response = await fetch(`${BASE_URL}/scan/${scanId}`, {
headers: {
'Authorization': `Bearer ${API_KEY}`,
},
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.error.message);
}
return response.json();
}
async function waitForCompletion(scanId: string, timeoutMs = 120000) {
const startTime = Date.now();
while (Date.now() - startTime < timeoutMs) {
const scan = await getScan(scanId);
if (scan.status === 'complete') {
return scan;
}
if (scan.status === 'failed') {
throw new Error('Scan failed');
}
await new Promise(resolve => setTimeout(resolve, 5000));
}
throw new Error('Scan timed out');
}
// Usage
async function scanRepository(repoUrl: string) {
console.log(`Scanning ${repoUrl}...`);
const { id } = await createScan(repoUrl);
console.log(`Scan ID: ${id}`);
const result = await waitForCompletion(id);
console.log(`Score: ${result.score} (${result.grade})`);
console.log(`Findings: ${result.findings.length}`);
return result;
}Using axios
import axios from 'axios';
const client = axios.create({
baseURL: 'https://mcpscanner.com/api/v1',
headers: {
'Authorization': `Bearer ${process.env.MCP_SCANNER_API_KEY}`,
},
});
async function scanAndWait(repoUrl: string) {
// Create scan
const { data: scan } = await client.post('/scan', { url: repoUrl });
console.log(`Scan started: ${scan.id}`);
// Poll until complete
while (true) {
const { data: result } = await client.get(`/scan/${scan.id}`);
if (result.status === 'complete') {
return result;
}
if (result.status === 'failed') {
throw new Error('Scan failed');
}
await new Promise(r => setTimeout(r, 5000));
}
}Python Examples
Using requests
import os
import time
import requests
API_KEY = os.environ['MCP_SCANNER_API_KEY']
BASE_URL = 'https://mcpscanner.com/api/v1'
def get_headers():
return {
'Authorization': f'Bearer {API_KEY}',
'Content-Type': 'application/json',
}
def create_scan(url: str) -> dict:
response = requests.post(
f'{BASE_URL}/scan',
headers=get_headers(),
json={'url': url}
)
response.raise_for_status()
return response.json()
def get_scan(scan_id: str) -> dict:
response = requests.get(
f'{BASE_URL}/scan/{scan_id}',
headers=get_headers()
)
response.raise_for_status()
return response.json()
def wait_for_completion(scan_id: str, timeout: int = 120) -> dict:
start_time = time.time()
while time.time() - start_time < timeout:
scan = get_scan(scan_id)
if scan['status'] == 'complete':
return scan
if scan['status'] == 'failed':
raise Exception('Scan failed')
print(f"Status: {scan['status']}, waiting...")
time.sleep(5)
raise TimeoutError('Scan timed out')
def scan_repository(repo_url: str) -> dict:
print(f'Scanning {repo_url}...')
scan = create_scan(repo_url)
print(f'Scan ID: {scan["id"]}')
result = wait_for_completion(scan['id'])
print(f'Score: {result["score"]} ({result["grade"]})')
print(f'Findings: {len(result["findings"])}')
return result
# Usage
if __name__ == '__main__':
result = scan_repository('https://github.com/owner/repo')
# Fail if score below threshold
if result['score'] < 70:
print(f'Score {result["score"]} is below threshold 70')
exit(1)Using httpx (async)
import asyncio
import os
import httpx
API_KEY = os.environ['MCP_SCANNER_API_KEY']
BASE_URL = 'https://mcpscanner.com/api/v1'
async def scan_repository(repo_url: str) -> dict:
async with httpx.AsyncClient() as client:
headers = {'Authorization': f'Bearer {API_KEY}'}
# Create scan
response = await client.post(
f'{BASE_URL}/scan',
headers=headers,
json={'url': repo_url}
)
response.raise_for_status()
scan = response.json()
# Poll for completion
while True:
response = await client.get(
f'{BASE_URL}/scan/{scan["id"]}',
headers=headers
)
response.raise_for_status()
result = response.json()
if result['status'] == 'complete':
return result
if result['status'] == 'failed':
raise Exception('Scan failed')
await asyncio.sleep(5)
# Usage
async def main():
result = await scan_repository('https://github.com/owner/repo')
print(f'Score: {result["score"]}')
asyncio.run(main())Handling Rate Limits
async function fetchWithRetry(url: string, options: RequestInit, maxRetries = 3) {
for (let attempt = 0; attempt < maxRetries; attempt++) {
const response = await fetch(url, options);
if (response.status === 429) {
const resetTime = response.headers.get('X-RateLimit-Reset');
const waitMs = resetTime
? new Date(resetTime).getTime() - Date.now()
: 60000;
console.log(`Rate limited. Waiting ${waitMs}ms...`);
await new Promise(r => setTimeout(r, Math.max(waitMs, 1000)));
continue;
}
return response;
}
throw new Error('Max retries exceeded');
}Next Steps
- Rate Limits - Rate limit handling
- CI/CD Integration - Pipeline integration
- Troubleshooting - Common issues