Module 4

Working with APIs

Build and consume RESTful APIs securely with Python

Learning Objectives

Understand REST API principles

Make HTTP requests with requests library

Handle authentication and tokens

Parse JSON responses

Build secure API endpoints

Implement error handling for APIs

REST API Fundamentals

REST API Concepts
Understanding RESTful architecture

REST (Representational State Transfer) is an architectural style for building web services. Key principles include statelessness, client-server separation, and resource-based URLs.

Resources

Data entities identified by URLs (e.g., /api/users, /api/posts)

Stateless

Each request contains all information needed; server doesn't store client context

Uniform Interface

Consistent API design with standard HTTP methods and response formats

JSON Format

Standard data format for request and response bodies

Making HTTP Requests

Making GET Requests
Retrieve data from APIs
python
import requests

response = requests.get('https://api.example.com/users')
print(response.status_code)
print(response.json())

params = {'page': 1, 'limit': 10}
response = requests.get('https://api.example.com/users', params=params)

if response.status_code == 200:
    data = response.json()
    for user in data:
        print(user['name'], user['email'])
else:
    print(f"Error: {response.status_code}")

Authentication & Security

Basic Authentication
Username and password authentication
python
import requests
from requests.auth import HTTPBasicAuth

username = 'user'
password = 'pass'

response = requests.get(
    'https://api.example.com/protected',
    auth=HTTPBasicAuth(username, password)
)

response = requests.get(
    'https://api.example.com/protected',
    auth=(username, password)
)

import base64
credentials = base64.b64encode(b'user:pass').decode()
headers = {'Authorization': f'Basic {credentials}'}
response = requests.get('https://api.example.com/protected', headers=headers)

Building API Clients

Reusable API Client Class
Create maintainable API client implementations
python
import requests
from typing import Dict, Any, Optional

class SecurityAPIClient:
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.api_key = api_key
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'Bearer {api_key}',
            'Content-Type': 'application/json'
        })
    
    def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
        url = f"{self.base_url}{endpoint}"
        try:
            response = self.session.request(method, url, timeout=10, **kwargs)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"API Error: {e}")
            return None
    
    def get_vulnerabilities(self, target: str) -> Optional[Dict]:
        return self._make_request('GET', f'/vulnerabilities?target={target}')
    
    def create_scan(self, target: str, scan_type: str) -> Optional[Dict]:
        data = {'target': target, 'type': scan_type}
        return self._make_request('POST', '/scans', json=data)
    
    def get_scan_results(self, scan_id: str) -> Optional[Dict]:
        return self._make_request('GET', f'/scans/{scan_id}/results')

client = SecurityAPIClient('https://api.security.com', 'your-api-key')
vulns = client.get_vulnerabilities('example.com')
scan = client.create_scan('example.com', 'full')

Hands-On Project

Build a Threat Intelligence API Client
Create a tool to fetch and analyze threat data from security APIs

Design an API client that fetches threat intelligence data, caches results, and provides analysis capabilities.

python
import requests
import json
from datetime import datetime, timedelta

class ThreatIntelClient:
    def __init__(self, api_url, api_key):
        self.api_url = api_url
        self.api_key = api_key
        self.cache = {}
        self.cache_ttl = 3600
    
    def _is_cached(self, key):
        if key not in self.cache:
            return False
        cached_time, _ = self.cache[key]
        return datetime.now() - cached_time < timedelta(seconds=self.cache_ttl)
    
    def get_threat_data(self, indicator):
        if self._is_cached(indicator):
            _, data = self.cache[indicator]
            return data
        
        headers = {'Authorization': f'Bearer {self.api_key}'}
        try:
            response = requests.get(
                f'{self.api_url}/threats/{indicator}',
                headers=headers,
                timeout=10
            )
            response.raise_for_status()
            data = response.json()
            self.cache[indicator] = (datetime.now(), data)
            return data
        except requests.exceptions.RequestException as e:
            print(f"Error fetching threat data: {e}")
            return None
    
    def analyze_threats(self, indicators):
        results = []
        for indicator in indicators:
            threat_data = self.get_threat_data(indicator)
            if threat_data:
                results.append({
                    'indicator': indicator,
                    'severity': threat_data.get('severity'),
                    'last_seen': threat_data.get('last_seen')
                })
        return results

client = ThreatIntelClient('https://api.threat-intel.com', 'api-key')
threats = client.analyze_threats(['192.168.1.1', 'example.com'])