DEV Community

Ahmed Moussa
Ahmed Moussa

Posted on

How to Automate SOC2 and GDPR Compliance Scans with ComplianceWeave

---
title: "Stop Dreading Audits: Automate Your Compliance Evidence with ComplianceWeave"
published: false
tags: [security, devops, python, compliance]
---

# Stop Dreading Audits: Automate Your Compliance Evidence with ComplianceWeave

Picture this: your auditor emails on a Tuesday. The audit is in three weeks. Somewhere in your organization, a spreadsheet begins to scream.

If you've lived through manual compliance evidence collection, you know the particular exhaustion of hunting down access logs, cross-referencing policy documents, and praying your screenshots are timestamped correctly. ComplianceWeave exists to end that ritual. This tutorial walks you through integrating it into your workflow using Python — so the next time that auditor emails, you reply with a PDF instead of a panic attack.

---

## What We're Building

By the end of this tutorial, you'll have a Python script that:

1. Triggers an infrastructure compliance scan
2. Polls until the scan completes
3. Fetches a formatted audit report
4. Automatically remediates flagged issues

We'll cover SOC2 as our primary framework, but the same pattern works for GDPR, HIPAA, and ISO 27001.

---

## Prerequisites

- Python 3.9+
- A ComplianceWeave account and API key (grab one at the dashboard)
- `requests` and `python-dotenv` installed

Enter fullscreen mode Exit fullscreen mode


bash
pip install requests python-dotenv


Store your credentials safely — never hardcode them:

Enter fullscreen mode Exit fullscreen mode


bash

.env

COMPLIANCEWEAVE_API_KEY=your_api_key_here
COMPLIANCEWEAVE_BASE_URL=https://api.complianceweave.io/v1


---

## Step 1: Trigger a Compliance Scan

The first thing we need is a scan. ComplianceWeave's `POST /compliance/scan` endpoint accepts a framework identifier and the scope of infrastructure you want evaluated.

Enter fullscreen mode Exit fullscreen mode


python

compliance_client.py

import os
import requests
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("COMPLIANCEWEAVE_API_KEY")
BASE_URL = os.getenv("COMPLIANCEWEAVE_BASE_URL")

HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
}

def trigger_scan(framework: str, scope: dict) -> str:
"""
Initiates a compliance scan and returns the scan ID.
framework: e.g., "SOC2", "GDPR", "HIPAA", "ISO27001"
scope: dict describing which resources to scan
"""
payload = {
"framework": framework,
"scope": scope,
}

try:
    response = requests.post(
        f"{BASE_URL}/compliance/scan",
        json=payload,
        headers=HEADERS,
        timeout=30,
    )
    response.raise_for_status()
    data = response.json()
    scan_id = data["scan_id"]
    print(f"✅ Scan initiated. ID: {scan_id}")
    return scan_id

except requests.exceptions.HTTPError as e:
    print(f"❌ HTTP error during scan trigger: {e.response.status_code} - {e.response.text}")
    raise
except requests.exceptions.Timeout:
    print("❌ Request timed out. Check your network or ComplianceWeave status.")
    raise
except KeyError:
    print("❌ Unexpected response format — 'scan_id' not found.")
    raise
Enter fullscreen mode Exit fullscreen mode

if name == "main":
scope = {
"cloud_provider": "aws",
"regions": ["us-east-1", "us-west-2"],
"services": ["ec2", "s3", "rds", "iam"],
}
scan_id = trigger_scan("SOC2", scope)


**Expected output:**
Enter fullscreen mode Exit fullscreen mode


plaintext
✅ Scan initiated. ID: scan_a3f9c21b


> **Best practice:** Store `scan_id` persistently (a database, a file, a CI artifact) so you can retrieve reports even if your process crashes mid-run.

---

## Step 2: Poll for Scan Completion

Scans don't complete instantly — ComplianceWeave is actually interrogating your infrastructure. We'll poll the report endpoint with exponential backoff rather than hammering it every second.

Enter fullscreen mode Exit fullscreen mode


python
import time

def wait_for_scan(scan_id: str, max_wait_seconds: int = 300) -> dict:
"""
Polls until the scan completes or max_wait_seconds is exceeded.
Returns the completed report data.
"""
url = f"{BASE_URL}/compliance/reports"
params = {"scan_id": scan_id}
elapsed = 0
interval = 5 # start polling every 5 seconds

print(f"⏳ Waiting for scan {scan_id} to complete...")

while elapsed < max_wait_seconds:
    try:
        response = requests.get(url, headers=HEADERS, params=params, timeout=30)
        response.raise_for_status()
        data = response.json()

        status = data.get("status")
        print(f"   Status: {status} (elapsed: {elapsed}s)")

        if status == "completed":
            print("✅ Scan complete.")
            return data
        elif status == "failed":
            raise RuntimeError(f"Scan failed: {data.get('error', 'Unknown error')}")

    except requests.exceptions.HTTPError as e:
        print(f"❌ Error fetching report: {e.response.status_code}")
        raise

    time.sleep(interval)
    elapsed += interval
    interval = min(interval * 1.5, 30)  # exponential backoff, cap at 30s

raise TimeoutError(f"Scan did not complete within {max_wait_seconds} seconds.")
Enter fullscreen mode Exit fullscreen mode

**Expected output:**
Enter fullscreen mode Exit fullscreen mode


console
⏳ Waiting for scan scan_a3f9c21b to complete...
Status: running (elapsed: 0s)
Status: running (elapsed: 5s)
Status: running (elapsed: 13s)
Status: completed (elapsed: 24s)
✅ Scan complete.


---

## Step 3: Parse and Display the Report

Now for the part that replaces three weeks of spreadsheet archaeology:

Enter fullscreen mode Exit fullscreen mode


python
def display_report_summary(report: dict) -> list:
"""
Prints a human-readable summary and returns a list of failed control IDs.
"""
summary = report.get("summary", {})
controls = report.get("controls", [])

print("\n📋 COMPLIANCE REPORT SUMMARY")
print(f"   Framework:  {report.get('framework')}")
print(f"   Scan Date:  {report.get('scanned_at')}")
print(f"   Passed:     {summary.get('passed', 0)}")
print(f"   Failed:     {summary.get('failed', 0)}")
print(f"   Warnings:   {summary.get('warnings', 0)}")
print(f"   Score:      {summary.get('compliance_score')}%\n")

failed_ids = []

for control in controls:
    if control["status"] == "failed":
        print(f"   ❌ [{control['id']}] {control['name']}")
        print(f"      Severity: {control['severity']}")
        print(f"      Detail:   {control['detail']}\n")
        failed_ids.append(control["id"])

return failed_ids
Enter fullscreen mode Exit fullscreen mode

**Expected output:**
Enter fullscreen mode Exit fullscreen mode


plaintext
📋 COMPLIANCE REPORT SUMMARY
Framework: SOC2
Scan Date: 2024-11-14T09:32:11Z
Passed: 47
Failed: 3
Warnings: 5
Score: 91%

❌ [CC6.1] Logical Access Controls
Severity: high
Detail: 3 IAM users have console access without MFA enabled.

❌ [CC7.2] System Monitoring
Severity: medium
Detail: CloudTrail logging disabled in us-west-2.

❌ [A1.2] Availability Monitoring
Severity: low
Detail: No uptime alerting configured for RDS cluster prod-db-01.


This is your audit evidence — timestamped, structured, and reproducible.

---

## Step 4: Trigger Automated Remediation

For issues ComplianceWeave can fix programmatically (enabling MFA enforcement, activating logging, etc.), you can kick off remediation directly:

Enter fullscreen mode Exit fullscreen mode


python
def remediate_controls(control_ids: list) -> None:
"""
Submits remediation requests for a list of failed control IDs.
"""
if not control_ids:
print("✅ No controls to remediate.")
return

print(f"\n🔧 Submitting remediation for {len(control_ids)} control(s)...")

for control_id in control_ids:
    payload = {"control_id": control_id, "mode": "auto"}

    try:
        response = requests.post(
            f"{BASE_URL}/compliance/remediate",
            json=payload,
            headers=HEADERS,
            timeout=30,
        )
        response.raise_for_status()
        result = response.json()
        action = result.get("action_taken", "No action described")
        print(f"   ✅ {control_id}: {action}")

    except requests.exceptions.HTTPError as e:
        # Some controls require manual remediation — that's expected
        if e.response.status_code == 422:
            print(f"   ⚠️  {control_id}: Requires manual remediation.")
        else:
            print(f"   ❌ {control_id}: Remediation failed ({e.response.status_code})")
Enter fullscreen mode Exit fullscreen mode

**Expected output:**
Enter fullscreen mode Exit fullscreen mode


console
🔧 Submitting remediation for 3 control(s)...
✅ CC6.1: MFA enforcement policy applied to all IAM users.
✅ CC7.2: CloudTrail enabled in us-west-2.
⚠️ A1.2: Requires manual remediation.


---

## Putting It All Together

Enter fullscreen mode Exit fullscreen mode


python
if name == "main":
scope = {
"cloud_provider": "aws",
"regions": ["us-east-1", "us-west-2"],
"services": ["ec2", "s3", "rds", "iam"],
}

scan_id = trigger_scan("SOC2", scope)
report = wait_for_scan(scan_id)
failed_control_ids = display_report_summary(report)
remediate_controls(failed_control_ids)

print("\n🎉 Audit prep complete. Your report is ready to export from the ComplianceWeave dashboard.")
Enter fullscreen mode Exit fullscreen mode

---

## Next Steps

- **Schedule this script** via cron or a CI/CD pipeline to run weekly — catch regressions before auditors do.
- **Export reports** from the dashboard as PDF or JSON for direct submission to auditors.
- **Expand your scope** by adding more services or switching frameworks — the same script handles HIPAA and GDPR with a one-word change.

The spreadsheet had a good run. It's retired now.
Enter fullscreen mode Exit fullscreen mode

Top comments (0)