What I’m trying to do:
Listen for events from stripe (new customer, new subscription for example) so that I can act on them to update data tables.
What I’ve tried and what’s not working:
I have a server module that picks up the events from stripe but it always gets stuck with the signature header. The code below returns a signature header (t=1718807390,v1=…, v0=…) but then fails later with an error (Invalid signature). So it must in some way be wrong.
Hopefully someone will be able to help.
Thanks,
Dave
Code Sample:
stripe.api_key = "sk_test_..."
# This is your Stripe CLI webhook secret for testing your endpoint locally.
endpoint_secret = 'whsec_...'
@anvil.server.http_endpoint("/webhook", methods=["POST"])
def stripe_webhook_handler(**kwargs):
print("Webhook handler called")
# Check if the request body is None
if anvil.server.request.body is None:
print("Request body is None")
return anvil.server.HttpResponse(400, "No request body received")
# Attempt to get the payload from the request body
try:
payload = anvil.server.request.body.get_bytes().decode('utf-8')
#print(f"Payload: {payload}")
except AttributeError as e:
print(f"Error getting payload: {e}")
return anvil.server.HttpResponse(400, "Error getting payload")
# Generate the Stripe-Signature
sig_header = anvil.server.request.headers.get("stripe-signature")
if sig_header is None:
print("Stripe-Signature header not found")
return anvil.server.HttpResponse(400, "Stripe-Signature header missing")
print(f"Signature Header: {sig_header}")
try:
event = stripe.Webhook.construct_event(payload, sig_header, endpoint_secret)
print(f"Event: {event}")
except ValueError as e:
print(f"Invalid payload: {e}")
return anvil.server.HttpResponse(400, "Invalid payload")
except stripe.error.SignatureVerificationError as e:
print(f"Invalid signature: {e}")
return anvil.server.HttpResponse(400, "Invalid signature")
# Handle the event
if event["type"] == "customer.created":
customer = event["data"]["object"]
# Process customer created event
print(f"Customer created: {customer}")
elif event["type"] == "customer.subscription.created":
subscription = event["data"]["object"]
# Process subscription created event
print(f"Subscription created: {subscription}")
else:
print(f"Unhandled event type: {event['type']}")
return anvil.server.HttpResponse(200, "Success")