In-process AWS service testing for pytest. Use your existing boto3 code unchanged — the SDK provides pre-configured clients that point to local services instead of AWS. No credentials, no deploy, no waiting.
Install, declare your resources, and write a test. Services start in the same process as your tests — no external server, no ports, no Docker.
pip install local-web-services-python-sdk
# or
uv add local-web-services-python-sdk
# conftest.py
import pytest
@pytest.fixture(scope="session")
def lws_session_spec():
return {
"tables": [{"name": "Orders", "partition_key": "id"}],
"queues": ["OrderQueue"],
"buckets": ["ReceiptsBucket"],
}
# lws_session and lws_reset are provided automatically
# by the lws_testing pytest plugin — no imports needed
# test_orders.py
def test_create_order(lws_session):
# Call your real application code
from myapp.orders import create_order
create_order(order_id="42", status="pending")
# Assert on local DynamoDB with the helper
table = lws_session.dynamodb("Orders")
table.assert_item_exists({"id": {"S": "42"}})
def test_order_sends_queue_message(lws_session):
from myapp.orders import create_order
create_order(order_id="99", status="pending")
queue = lws_session.sqs("OrderQueue")
queue.assert_message_count(1)
pytest
Services start once per session. State is wiped between tests automatically by the lws_reset fixture.
Your application code already uses boto3. The SDK returns standard boto3 clients pre-configured to talk to local services. Nothing in your application needs to change.
# myapp/orders.py — unchanged
import boto3, os
def create_order(order_id: str, status: str):
client = boto3.client("dynamodb")
client.put_item(
TableName="Orders",
Item={
"id": {"S": order_id},
"status": {"S": status},
},
)
sqs = boto3.client("sqs")
sqs.send_message(
QueueUrl=os.environ["ORDER_QUEUE_URL"],
MessageBody=order_id,
)
# test_orders.py
def test_create_order(lws_session):
# The SDK patches the boto3 endpoint for you.
# Application code runs exactly as written.
from myapp.orders import create_order
create_order("42", "pending")
table = lws_session.dynamodb("Orders")
table.assert_item_exists({"id": {"S": "42"}})
queue = lws_session.sqs("OrderQueue")
queue.assert_message_count(1)
Local services accept any credentials. No AWS account, no IAM users, no ~/.aws/credentials required.
Application code stays identical to production. The SDK redirects boto3 to local endpoints at test time — no dependency injection required.
Services run as threads inside your test process. No Docker, no ports, no startup scripts. Tests begin in milliseconds.
The SDK registers pytest fixtures automatically via a pytest11 entry point. No imports needed in your conftest — just define a session spec fixture.
Session-scoped LwsSession instance. Configure via lws_session_spec fixture. Services start once and are reused across all tests.
Function-scoped autouse fixture. Calls session.reset() before each test to wipe all DynamoDB tables, SQS queues, and S3 buckets. Enabled automatically.
Session-scoped session with auto-discovery from CDK project. Configure path via cdk_project_dir fixture (default ".").
Session-scoped session with auto-discovery from HCL project. Configure path via hcl_project_dir fixture (default ".").
# conftest.py
import pytest
@pytest.fixture(scope="session")
def lws_session_spec():
return {
"tables": [
{"name": "Orders", "partition_key": "id"},
{"name": "Products", "partition_key": "sku", "sort_key": "version"},
],
"queues": ["OrderQueue", "DeadLetterQueue"],
"buckets": ["ReceiptsBucket"],
"secrets": [{"name": "stripe-key", "value": "sk_test_..."}],
"parameters": [{"name": "/app/config", "value": "test"}],
}
Simplified wrappers with built-in assertions. No more writing assert response['Item']['id']['S'] == '42'.
table = session.dynamodb("Orders")
table.put({"id": {"S": "1"}, "status": {"S": "pending"}})
item = table.get({"id": {"S": "1"}})
table.delete({"id": {"S": "1"}})
items = table.scan()
table.assert_item_exists({"id": {"S": "1"}})
table.assert_item_count(5)
queue = session.sqs("OrderQueue")
queue.send({"orderId": "42", "total": 99.99}) # dict → JSON
queue.send("plain text message") # or raw string
messages = queue.receive(max_messages=10)
queue.purge()
queue.assert_message_count(3)
print(queue.url) # http://localhost:.../OrderQueue
bucket = session.s3("ReceiptsBucket")
bucket.put("receipts/001.pdf", b"PDF bytes")
bucket.put("config.json", '{"key": "value"}') # auto-encodes
data = bucket.get("receipts/001.pdf") # bytes
text = bucket.get_text("config.json") # string
keys = bucket.list_keys(prefix="receipts/")
bucket.delete("config.json")
bucket.assert_object_exists("receipts/001.pdf")
bucket.assert_object_count(expected_count=5, prefix="receipts/")
Use directly as a context manager, or let the pytest fixtures manage the lifecycle for you.
from lws_testing import LwsSession
with LwsSession(
tables=[{"name": "Orders", "partition_key": "id"}],
queues=["OrderQueue"],
buckets=["ReceiptsBucket"],
topics=[{"name": "OrderEvents", "arn": "arn:aws:sns:us-east-1:000000000000:OrderEvents"}],
state_machines=[{"name": "OrderWorkflow", "definition": {...}}],
secrets=[{"name": "stripe-key", "value": "sk_test_..."}],
parameters=[{"name": "/app/config", "value": "production"}],
) as session:
client = session.client("dynamodb")
client.put_item(TableName="Orders", Item={"id": {"S": "1"}})
from lws_testing import LwsSession
# Reads cdk.out/ cloud assembly
with LwsSession.from_cdk("../my-cdk-project") as session:
dynamo = session.client("dynamodb")
# Reads .tf files
with LwsSession.from_hcl("../my-terraform-project") as session:
s3 = session.client("s3")
session.client("dynamodb")
session.client("sqs")
session.client("s3")
session.client("sns")
session.client("ssm")
session.client("secretsmanager")
session.client("stepfunctions")
session.reset() # clear all tables, queues, buckets
Override specific AWS operation responses with a fluent builder. Inject throttling errors, custom status codes, or delays without changing application code.
# Inject a throttling error
session.fake("dynamodb").operation("PutItem").respond(
status=400,
body={"__type": "ProvisionedThroughputExceededException"},
)
# Add artificial latency
session.fake("sqs").operation("SendMessage").respond(status=200, delay_ms=500)
# Match on a specific request header
session.fake("s3")\
.operation("GetObject")\
.with_header("x-request-id", "test-123")\
.respond(status=403, body={"__type": "AccessDenied"})
# Shorthand for service errors
session.fake("dynamodb").operation("DeleteItem").error(
error_type="ValidationException",
message="Item not found",
)
session.fake("dynamodb").clear() # remove all fakes for a service
def test_handles_throttling(lws_session):
lws_session.fake("dynamodb").operation("PutItem").respond(
status=400,
body={"__type": "ProvisionedThroughputExceededException"},
)
with pytest.raises(ClientError) as exc:
from myapp.orders import create_order
create_order("42", "pending")
assert "ProvisionedThroughputExceededException" in str(exc.value)
lws_session.fake("dynamodb").clear()
Inject error rates, latency, connection resets, and timeouts into any service to verify your application's resilience.
session.chaos("dynamodb").error_rate(0.3).latency(min_ms=50, max_ms=200).apply()
session.chaos("sqs").connection_reset_rate(0.1).timeout_rate(0.05).apply()
session.chaos("dynamodb").clear()
def test_resilient_to_latency(lws_session):
lws_session.chaos("dynamodb").latency(min_ms=100, max_ms=500).apply()
import time
start = time.time()
from myapp.orders import create_order
create_order("42", "pending")
elapsed = time.time() - start
assert elapsed >= 0.1 # latency was injected
lws_session.chaos("dynamodb").clear()
Test that your application enforces least-privilege access correctly by switching IAM modes and defining named identities.
session.iam.mode("enforce").apply() # HTTP 403 on policy violation
session.iam.mode("audit").apply() # log violations, allow requests
session.iam.mode("disabled").apply() # no checks (default)
session.iam.default_identity("service-account").apply()
session.iam\
.identity("readonly")\
.allow(["dynamodb:GetItem", "dynamodb:Scan"])\
.apply()
session.iam\
.identity("restricted")\
.allow(["s3:GetObject"], resource="arn:aws:s3:::my-bucket/*")\
.deny(["s3:DeleteObject"])\
.boundary(["s3:*"], resource="*")\
.apply()
Verify exactly which AWS service calls your application made — and that no unexpected calls or errors occurred.
def test_service_calls(lws_session):
with lws_session.capture_logs() as logs:
from myapp.orders import create_order
create_order(order_id="123")
logs.assert_called("dynamodb", "PutItem")
logs.assert_called("sqs", "SendMessage")
logs.assert_call_count("dynamodb", "Scan", expected_count=0)
logs.assert_not_called("s3", "GetObject")
logs.assert_no_errors()
Instead of declaring resources manually, let the SDK discover them from your CDK cloud assembly or HCL files.
Reads cdk.out/ from cdk synth. Discovers DynamoDB tables, SQS queues, S3 buckets, SNS topics, Step Functions, SSM parameters, and Secrets Manager secrets.
LwsSession.from_cdk("../my-cdk-project")
Parses .tf files in your project directory. No terraform plan or terraform apply needed.
LwsSession.from_hcl("../my-terraform-project")
# Override the project directory via a pytest fixture
# conftest.py
@pytest.fixture(scope="session")
def cdk_project_dir():
return "../my-cdk-app"
# Then use lws_session_from_cdk in your tests directly
def test_with_cdk_resources(lws_session_from_cdk):
table = lws_session_from_cdk.dynamodb("MyTable")
table.assert_item_count(0)