Loading...
Loading...
Send emails from your Python application using requests or httpx.
Install the requests library if you haven't already:
pip install requestsimport os
import requests
from typing import Optional, List, Dict, AnyNOTIFYN_API_KEY = os.environ.get('NOTIFYN_API_KEY')
NOTIFYN_API_URL = 'https://notifyn.net/api/v1/emails'def send_email(
from_email: str,
to: str | List[str],
subject: str,
html: Optional[str] = None,
text: Optional[str] = None,
reply_to: Optional[str] = None,
cc: Optional[List[str]] = None,
bcc: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
Send an email using the Notifyn API.
Args:
from_email: Sender email address
to: Recipient email address(es)
subject: Email subject line
html: HTML content of the email
text: Plain text content of the email
reply_to: Reply-to email address
cc: CC recipients
bcc: BCC recipients
Returns:
API response as dictionary
Raises:
Exception: If the API request fails
"""
payload = {
'from': from_email,
'to': to,
'subject': subject,
}
if html:
payload['html'] = html
if text:
payload['text'] = text
if reply_to:
payload['replyTo'] = reply_to
if cc:
payload['cc'] = cc
if bcc:
payload['bcc'] = bcc
response = requests.post(
NOTIFYN_API_URL,
headers={
'Authorization': f'Bearer {NOTIFYN_API_KEY}',
'Content-Type': 'application/json',
},
json=payload,
)
if not response.ok:
error = response.json()
raise Exception(error.get('message', 'Failed to send email'))
return response.json()from notifyn import send_emailresult = send_email(
from_email='hello@yourdomain.com',
to='user@example.com',
subject='Welcome!',
html='<h1>Hello!</h1><p>Welcome to our service.</p>',
)print(f"Email sent: {result['id']}")from notifyn import send_email# Perfect for contact forms
result = send_email(
from_email='notifications@yourdomain.com',
to='you@yourdomain.com',
subject='New Contact Form Submission',
reply_to='customer@example.com',
html='''
<h2>New Message</h2>
<p><strong>From:</strong> John Doe</p>
<p><strong>Email:</strong> customer@example.com</p>
<p><strong>Message:</strong> I have a question...</p>
''',
)from flask import Flask, request, jsonify
from notifyn import send_emailapp = Flask(__name__)@app.route('/api/contact', methods=['POST'])
def contact():
data = request.json
name = data.get('name')
email = data.get('email')
message = data.get('message')
if not all([name, email, message]):
return jsonify({'error': 'All fields required'}), 400
try:
send_email(
from_email='notifications@yourdomain.com',
to='you@yourdomain.com',
reply_to=email,
subject=f'Contact Form: {name}',
html=f'''
<h2>New message from {name}</h2>
<p><strong>Email:</strong> {email}</p>
<p>{message}</p>
''',
)
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500if __name__ == '__main__':
app.run(debug=True)import json
from django.http import JsonResponse
from django.views.decorators.http import require_POST
from django.views.decorators.csrf import csrf_exempt
from .notifyn import send_email@csrf_exempt
@require_POST
def contact_view(request):
try:
data = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
name = data.get('name')
email = data.get('email')
message = data.get('message')
if not all([name, email, message]):
return JsonResponse({'error': 'All fields required'}, status=400)
try:
send_email(
from_email='notifications@yourdomain.com',
to='you@yourdomain.com',
reply_to=email,
subject=f'Contact Form: {name}',
html=f'<h2>From {name}</h2><p>{message}</p>',
)
return JsonResponse({'success': True})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, EmailStr
from notifyn import send_emailapp = FastAPI()class ContactForm(BaseModel):
name: str
email: EmailStr
message: str@app.post('/api/contact')
async def contact(form: ContactForm):
try:
result = send_email(
from_email='notifications@yourdomain.com',
to='you@yourdomain.com',
reply_to=form.email,
subject=f'Contact: {form.name}',
html=f'''
<h2>New message from {form.name}</h2>
<p><strong>Email:</strong> {form.email}</p>
<p>{form.message}</p>
''',
)
return {'success': True, 'id': result['id']}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))For async applications, use httpx:
import os
import httpx
from typing import Optional, List, Dict, AnyNOTIFYN_API_KEY = os.environ.get('NOTIFYN_API_KEY')
NOTIFYN_API_URL = 'https://notifyn.net/api/v1/emails'async def send_email_async(
from_email: str,
to: str | List[str],
subject: str,
html: Optional[str] = None,
text: Optional[str] = None,
reply_to: Optional[str] = None,
) -> Dict[str, Any]:
payload = {
'from': from_email,
'to': to,
'subject': subject,
}
if html:
payload['html'] = html
if text:
payload['text'] = text
if reply_to:
payload['replyTo'] = reply_to
async with httpx.AsyncClient() as client:
response = await client.post(
NOTIFYN_API_URL,
headers={
'Authorization': f'Bearer {NOTIFYN_API_KEY}',
'Content-Type': 'application/json',
},
json=payload,
)
if response.status_code != 200:
error = response.json()
raise Exception(error.get('message', 'Failed to send email'))
return response.json()# Usage with asyncio
import asyncioasync def main():
result = await send_email_async(
from_email='hello@yourdomain.com',
to='user@example.com',
subject='Hello!',
html='<p>Sent with async!</p>',
)
print(f"Email sent: {result['id']}")asyncio.run(main())