Skip to main content

Cloud Function Environment

Complete guide to the execution environment, request handling, and response types in CocoCloud functions.

Overview

CocoCloud functions execute in a sandboxed Python 3.10 environment with access to:

  • Request object (request) - Access to query params, payload, headers, user info
  • Database service (db) - Query and manage your data
  • Template renderer (render) - Render HTML with Jinja2
  • Environment object (env) - Project and environment info

📡 Function Execution

Execution URL

When you create a cloud function, you get an execution URL:

https://your-domain.com/functions/{function_id}/execute

HTTP Methods

GET Request:

# Query parameters in URL
curl "https://your-domain.com/functions/abc123/execute?name=John&age=25"

POST Request:

# JSON payload in body
curl -X POST https://your-domain.com/functions/abc123/execute \
-H "Content-Type: application/json" \
-d '{"name": "John", "age": 25}'

🎯 The request Object

The request object provides access to incoming request data.

Properties

def main():
# Get data from POST body or GET query params
name = request.get('name', 'Guest')
age = request.get('age', 0)

# Access all payload data
all_data = request.json()

# Get query parameters (GET requests)
page = request.query_params.get('page', '1')

# Access headers
auth_token = request.headers.get('authorization', '')

# Check HTTP method
method = request.method # 'GET' or 'POST'

# Request timestamp
timestamp = request.timestamp

# Current user (if authenticated)
user = request.user # AppUser object or None

# Project ID
project_id = request.proj

Methods

request.get(key, default=None)

Get value from payload (POST) or query params (GET):

def main():
# From POST body: {"name": "John"}
# Or GET query: ?name=John
name = request.get('name', 'Anonymous')

return {"greeting": f"Hello, {name}!"}

request.json()

Get entire payload as dictionary:

def main():
data = request.json()
# Returns: {"name": "John", "age": 25, "email": "john@example.com"}

return {
"received": data,
"keys": list(data.keys())
}

request.send_mail()

Send email using CocoMailer integration:

def main():
# Send with template
result = request.send_mail(
to="user@example.com",
subject="Welcome!",
template_id="welcome-template",
context={
"username": "John",
"activation_link": "https://..."
}
)

# Send with plain body
result = request.send_mail(
to=["user1@example.com", "user2@example.com"],
subject="Notification",
body="<h1>Hello!</h1><p>This is your notification.</p>"
)

return {"email_sent": result}

User Authentication

Access authenticated user data:

def main():
user = request.user

if not user:
return {"error": "Authentication required"}, 401

# User properties
user_id = user.id
email = user.email
user_data = user.data # Custom user fields
roles = user.roles

return {
"user_id": user_id,
"email": email,
"roles": roles
}

🗄️ The db Object

Access to your project's database. See Database API for complete reference.

Quick Examples

def main():
# Query documents
posts = db.query("posts",
status="published",
views_gte="100",
populate=["author", "category"],
limit=20
)

# Get single document
post = db.find_one("posts", id="post-123")

# Create document
new_post = db.create_document("posts", {
"title": "My Post",
"content": "...",
"author_id": request.user.id
})

# Update document
db.update_document("posts", "post-123", {
"views": 150
})

# Delete document
db.delete_document("posts", "post-123")

# Query users
users = db.query_users(
role="premium",
limit=50
)

return {"posts": posts["data"]}

🎨 The render Object

Render HTML responses with Jinja2 templating.

render.render_html()

Render HTML with template variables:

def main():
# Get data
username = request.get('username', 'Guest')
posts = db.query("posts", limit=10)

# HTML template with Jinja2
html = '''
<!DOCTYPE html>
<html>
<head>
<title>{{ title }}</title>
<link rel="stylesheet" href="{{ static('css/style.css') }}">
</head>
<body>
<h1>Welcome, {{ username }}!</h1>

<div class="posts">
{% for post in posts %}
<article>
<h2>{{ post.title }}</h2>
<p>{{ post.content }}</p>
<span>By {{ post.author.name }}</span>
</article>
{% endfor %}
</div>

<script src="{{ static('js/app.js') }}"></script>
</body>
</html>
'''

# Render with context
return render.render_html(html, {
'title': 'My Blog',
'username': username,
'posts': posts['data']
})

Parameters:

  • html_content (str): HTML template string
  • context (dict): Variables to pass to template
  • status_code (int): HTTP status code (default: 200)

Returns: HTMLResponse object

render.render_template()

Render a stored template file:

def main():
# Render template stored in your project
return await render.render_template(
'index.html',
context={
'title': 'Home Page',
'user': request.user
}
)

Templates are fetched from:

https://storage/projects/{project_id}/__templates__/index.html

Jinja2 Features

Variables

<h1>{{ title }}</h1>
<p>{{ user.name }}</p>
<span>{{ post.views | default(0) }}</span>

Loops

<ul>
{% for item in items %}
<li>{{ item.name }}</li>
{% endfor %}
</ul>

{% for user in users %}
<div>{{ user.email }}</div>
{% else %}
<p>No users found</p>
{% endfor %}

Conditionals

{% if user %}
<p>Hello, {{ user.name }}!</p>
{% else %}
<p>Please log in</p>
{% endif %} {% if posts|length > 0 %}
<h2>Recent Posts</h2>
{% endif %}

Filters

{{ text | upper }} {{ text | lower }} {{ text | title }} {{ text | truncate(50)
}} {{ number | round(2) }} {{ list | length }} {{ date | default('N/A') }}

Static Files

<!-- CSS -->
<link rel="stylesheet" href="{{ static('css/main.css') }}" />
<link rel="stylesheet" href="{{ static('css/components/button.css') }}" />

<!-- JavaScript -->
<script src="{{ static('js/app.js') }}"></script>
<script src="{{ static('js/utils.js') }}"></script>

<!-- Images -->
<img src="{{ static('images/logo.png') }}" alt="Logo" />
<img src="{{ static('images/products/product1.jpg') }}" />

<!-- Fonts -->
<link href="{{ static('fonts/custom-font.woff2') }}" rel="stylesheet" />

Static files are served from:

https://storage/projects/{project_id}/static/css/main.css

📤 Response Types

JSON Response (Default)

Return a dictionary or list for automatic JSON response:

def main():
return {
"success": True,
"message": "Operation completed",
"data": {
"id": "123",
"name": "John"
}
}

Response:

{
"success": true,
"result": {
"success": true,
"message": "Operation completed",
"data": {
"id": "123",
"name": "John"
}
},
"output": "",
"error": null,
"execution_time": 0.045
}

HTML Response

Return HTML using render.render_html():

def main():
html = '''
<!DOCTYPE html>
<html>
<head>
<title>My Page</title>
</head>
<body>
<h1>Hello World!</h1>
</body>
</html>
'''

return render.render_html(html)

Response: Raw HTML (Content-Type: text/html)

String Response

Return a plain string:

def main():
return "Hello, World!"

Response:

{
"success": true,
"result": "Hello, World!",
"output": "",
"error": null,
"execution_time": 0.012
}

Status Codes

Return tuple with status code:

def main():
user = request.user

if not user:
return {"error": "Unauthorized"}, 401

if not user.is_admin:
return {"error": "Forbidden"}, 403

post = db.find_one("posts", id=request.get('post_id'))

if not post:
return {"error": "Not found"}, 404

return {"post": post}, 200

Error Handling

Errors are automatically caught and returned:

def main():
try:
result = some_operation()
return {"result": result}
except ValueError as e:
return {"error": str(e)}, 400
except Exception as e:
return {"error": "Internal server error"}, 500

Error Response:

{
"success": false,
"result": null,
"output": "",
"error": "Error message here",
"execution_time": 0.023
}

🌍 The env Object

Access environment and project information:

def main():
# Project ID
project_id = env.project_id

# Execution environment
runtime = env.runtime # "python3.10"

# Static files base URL
static_url = env.static_base_url

return {
"project": project_id,
"runtime": runtime
}

📦 Available Modules

Cloud functions have access to these Python standard library modules without import statements (they're pre-loaded in the global scope):

Core Modules

def main():
# ✅ Available directly (no import needed)

# JSON handling
data = json.loads('{"key": "value"}')
text = json.dumps({"data": 123})

# Date and time
now = datetime.now()
tomorrow = now + timedelta(days=1)
expires = now + timedelta(minutes=5)
timestamp = time.time()

# Mathematics
result = math.sqrt(16)
rounded = math.ceil(4.3)

# Regular expressions
pattern = re.compile(r'\d+')
matches = re.findall(r'[a-z]+', text)

# Random numbers (regular)
num = random.randint(1, 100)
choice = random.choice(['a', 'b', 'c'])

# Cryptographically secure random
otp = ''.join([str(secrets.randbelow(10)) for _ in range(6)])
token = secrets.token_urlsafe(32)

# UUID generation
unique_id = str(uuid.uuid4())

# Hashing
hash_md5 = hashlib.md5(b"data").hexdigest()
hash_sha256 = hashlib.sha256(b"data").hexdigest()

# Base64 encoding/decoding
encoded = base64.b64encode(b"data").decode()
decoded = base64.b64decode(encoded)

# URL handling
from urllib.parse import urlencode, parse_qs
query_string = urlencode({'key': 'value'})
parsed = parse_qs('key=value&foo=bar')

# String constants
letters = string.ascii_letters
digits = string.digits
punctuation = string.punctuation

# Collections
counter = collections.Counter([1, 1, 2, 3, 3, 3])
groups = collections.defaultdict(list)

# Itertools
for pair in itertools.combinations([1, 2, 3], 2):
print(pair)

# Functools
from functools import reduce
total = reduce(lambda x, y: x + y, [1, 2, 3, 4])

Complete Module List

ModuleDescriptionCommon Uses
jsonJSON encoding/decodingParse API responses, serialize data
datetimeDate and time classCurrent time, timestamps
timedeltaTime duration classAdd/subtract time, expiration
timeTime utilitiesDelays, timestamps, timing
mathMathematical functionsCalculations, rounding, trigonometry
reRegular expressionsPattern matching, validation
randomRandom number generationSimple random operations
secretsCryptographically secure randomOTP generation, tokens, passwords
uuidUUID generationUnique identifiers
hashlibHash functionsMD5, SHA256, data integrity
base64Base64 encodingEncode/decode binary data
urllibURL utilitiesURL parsing, encoding
stringString constantsASCII letters, digits, punctuation
collectionsData structuresCounter, defaultdict, deque
itertoolsIterator toolsCombinations, permutations
functoolsHigher-order functionsReduce, partial, cache

Practical Examples

OTP Generation (Secure)

def main():
# Generate 6-digit OTP (cryptographically secure)
otp = ''.join([str(secrets.randbelow(10)) for _ in range(6)])

# Alternative: alphanumeric OTP
chars = string.ascii_uppercase + string.digits
otp_code = ''.join(secrets.choice(chars) for _ in range(8))

# Secure token
auth_token = secrets.token_urlsafe(32)

return {"otp": otp, "token": auth_token}

Password Hashing

def main():
password = request.get('password')

# Hash password with SHA256
hashed = hashlib.sha256(password.encode()).hexdigest()

# Or use SHA512 for better security
hashed = hashlib.sha512(password.encode()).hexdigest()

return {"hash": hashed}

URL Operations

def main():
# Parse URL parameters
from urllib.parse import parse_qs, urlencode, quote, unquote

# Encode parameters
params = urlencode({'name': 'John Doe', 'age': 30})
# Result: 'name=John+Doe&age=30'

# Parse query string
parsed = parse_qs('name=John&age=30')
# Result: {'name': ['John'], 'age': ['30']}

# URL encode/decode
encoded = quote('Hello World!')
decoded = unquote('Hello%20World%21')

return {"encoded": encoded, "decoded": decoded}

Data Validation with Regex

def main():
email = request.get('email')
phone = request.get('phone')

# Validate email
email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
is_valid_email = bool(re.match(email_pattern, email))

# Validate phone (US format)
phone_pattern = r'^\d{3}-\d{3}-\d{4}$'
is_valid_phone = bool(re.match(phone_pattern, phone))

# Extract numbers from text
numbers = re.findall(r'\d+', "Order #123 costs $45")
# Result: ['123', '45']

return {
"email_valid": is_valid_email,
"phone_valid": is_valid_phone,
"numbers": numbers
}

Collections and Counting

def main():
votes = request.get('votes', [])

# Count occurrences
vote_counts = collections.Counter(votes)
# Input: ['apple', 'banana', 'apple', 'orange', 'apple']
# Result: Counter({'apple': 3, 'banana': 1, 'orange': 1})

# Group items
items = db.query("products")["data"]
by_category = collections.defaultdict(list)

for item in items:
by_category[item['category']].append(item)

return {
"vote_counts": dict(vote_counts),
"by_category": dict(by_category)
}

Itertools Combinations

def main():
# Generate all combinations
items = ['A', 'B', 'C', 'D']

# Pairs
pairs = list(itertools.combinations(items, 2))
# Result: [('A','B'), ('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')]

# Permutations (order matters)
perms = list(itertools.permutations(items, 2))

# Product (cartesian product)
products = list(itertools.product([1, 2], ['a', 'b']))
# Result: [(1,'a'), (1,'b'), (2,'a'), (2,'b')]

return {"pairs": pairs, "products": products}

🔐 Authentication

Query Parameter Auth

Pass authentication token in URL:

curl "https://your-domain.com/functions/abc123/execute?token=your-token&name=John"
def main():
token = request.query_params.get('token')

if not token:
return {"error": "Token required"}, 401

# Validate token
user = db.find_user(auth_token=token)

if not user:
return {"error": "Invalid token"}, 401

return {"user": user}

Header Auth

Pass authentication in headers:

curl -X POST https://your-domain.com/functions/abc123/execute \
-H "Authorization: Bearer your-token" \
-H "Content-Type: application/json" \
-d '{"data": "value"}'
def main():
auth_header = request.headers.get('authorization', '')

if not auth_header.startswith('Bearer '):
return {"error": "Invalid authorization"}, 401

token = auth_header.replace('Bearer ', '')

# Validate token
user = db.find_user(auth_token=token)

return {"user": user}

Built-in User

If user is authenticated via CocoBase:

def main():
user = request.user

if not user:
return {"error": "Please log in"}, 401

# User is automatically available
return {
"user_id": user.id,
"email": user.email,
"name": user.data.get('name')
}

💡 Complete Examples

Example 1: Dynamic Web Page

def main():
# Get page from query param
page = int(request.query_params.get('page', '1'))
per_page = 10

# Get posts from database
posts = db.query("posts",
status="published",
populate=["author", "category"],
sort="created_at",
order="desc",
limit=per_page,
offset=(page - 1) * per_page
)

# Render HTML
html = '''
<!DOCTYPE html>
<html>
<head>
<title>Blog - Page {{ page }}</title>
<link rel="stylesheet" href="{{ static('css/blog.css') }}">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body>
<header>
<h1>My Blog</h1>
<nav>
<a href="?page=1">Home</a>
<a href="/about">About</a>
</nav>
</header>

<main>
{% for post in posts %}
<article class="post">
<h2>{{ post.title }}</h2>
<div class="meta">
<span>By {{ post.author.name }}</span>
<span>{{ post.created_at }}</span>
<span class="category">{{ post.category.name }}</span>
</div>
<p>{{ post.excerpt }}</p>
<a href="?post={{ post.id }}">Read more →</a>
</article>
{% endfor %}

{% if not posts %}
<p>No posts found.</p>
{% endif %}
</main>

<footer>
<div class="pagination">
{% if page > 1 %}
<a href="?page={{ page - 1 }}">← Previous</a>
{% endif %}

<span>Page {{ page }} of {{ total_pages }}</span>

{% if has_more %}
<a href="?page={{ page + 1 }}">Next →</a>
{% endif %}
</div>
</footer>

<script src="{{ static('js/app.js') }}"></script>
</body>
</html>
'''

return render.render_html(html, {
'posts': posts['data'],
'page': page,
'total_pages': (posts['total'] + per_page - 1) // per_page,
'has_more': posts['has_more']
})

Example 2: API Endpoint with Auth

def main():
# Check authentication
token = request.headers.get('authorization', '').replace('Bearer ', '')

if not token:
return {"error": "Authorization required"}, 401

# Validate token
user = db.find_user(auth_token=token)

if not user:
return {"error": "Invalid token"}, 401

# Get action from payload
action = request.get('action')

if action == 'get_profile':
return {
"user": {
"id": user.id,
"email": user.email,
"name": user.data.get('name'),
"avatar": user.data.get('avatar')
}
}

elif action == 'update_profile':
updates = request.get('updates', {})

# Update user data
user_data = user.data or {}
user_data.update(updates)

db.update_app_user(user.id, data=user_data)

return {"success": True, "message": "Profile updated"}

else:
return {"error": "Unknown action"}, 400

Example 3: Form Handler

def main():
# Handle form submission
if request.method == 'POST':
# Get form data
name = request.get('name', '').strip()
email = request.get('email', '').strip()
message = request.get('message', '').strip()

# Validate
if not name or not email or not message:
return render.render_html('''
<!DOCTYPE html>
<html>
<body>
<h1>Error</h1>
<p>All fields are required.</p>
<a href="?">Go back</a>
</body>
</html>
''')

# Save to database
db.create_document('contact_submissions', {
'name': name,
'email': email,
'message': message,
'submitted_at': datetime.now().isoformat()
})

# Send notification email
request.send_mail(
to='admin@example.com',
subject=f'New contact form submission from {name}',
body=f'''
<h2>New Contact Form Submission</h2>
<p><strong>Name:</strong> {name}</p>
<p><strong>Email:</strong> {email}</p>
<p><strong>Message:</strong></p>
<p>{message}</p>
'''
)

# Show success
return render.render_html('''
<!DOCTYPE html>
<html>
<body>
<h1>Thank You!</h1>
<p>Your message has been sent.</p>
<a href="?">Submit another</a>
</body>
</html>
''')

# Show form
html = '''
<!DOCTYPE html>
<html>
<head>
<title>Contact Us</title>
<link rel="stylesheet" href="{{ static('css/form.css') }}">
</head>
<body>
<h1>Contact Us</h1>
<form method="POST">
<input type="text" name="name" placeholder="Your Name" required>
<input type="email" name="email" placeholder="Your Email" required>
<textarea name="message" placeholder="Your Message" required></textarea>
<button type="submit">Send Message</button>
</form>
</body>
</html>
'''

return render.render_html(html)

📝 Best Practices

1. Always Validate Input

def main():
email = request.get('email', '').strip()

if not email:
return {"error": "Email is required"}, 400

if '@' not in email:
return {"error": "Invalid email format"}, 400

# Process email...

2. Handle Errors Gracefully

def main():
try:
user_id = request.get('user_id')
user = db.find_user(id=user_id)

if not user:
return {"error": "User not found"}, 404

return {"user": user}

except Exception as e:
print(f"Error: {str(e)}")
return {"error": "Internal server error"}, 500

3. Use Pagination

def main():
page = int(request.get('page', '1'))
limit = int(request.get('limit', '20'))

# Enforce max limit
limit = min(limit, 100)

posts = db.query("posts",
limit=limit,
offset=(page - 1) * limit
)

return {
"posts": posts['data'],
"page": page,
"has_more": posts['has_more']
}

4. Cache Heavy Operations

# Store results in a collection
def main():
cache_key = f"stats_{datetime.now().date()}"

# Check cache
cached = db.find_one("cache", key=cache_key)

if cached:
return cached['data']

# Calculate stats (expensive)
stats = calculate_stats()

# Store in cache
db.create_document("cache", {
"key": cache_key,
"data": stats,
"expires_at": (datetime.now() + timedelta(hours=24)).isoformat()
})

return stats

5. Secure Sensitive Operations

def main():
# Require authentication
user = request.user

if not user:
return {"error": "Unauthorized"}, 401

# Check permissions
if 'admin' not in user.roles:
return {"error": "Forbidden"}, 403

# Perform admin operation
result = perform_admin_action()

return {"result": result}

🎯 Quick Reference

Request

request.get(key, default)        # Get from payload/query
request.json() # Get all payload
request.query_params # Query parameters dict
request.headers # Headers dict
request.method # 'GET' or 'POST'
request.user # Current user or None
request.send_mail(...) # Send email

Database

db.query(collection, **filters)           # Query documents
db.find_one(collection, **filters) # Get single doc
db.create_document(collection, data) # Create doc
db.update_document(collection, id, data) # Update doc
db.delete_document(collection, id) # Delete doc
db.query_users(**filters) # Query users
db.find_user(**filters) # Get single user

Render

render.render_html(html, context)         # Render HTML
render.render_template(name, context) # Render stored template
static('path/to/file') # Static file URL

Response Types

return {"key": "value"}                   # JSON response
return render.render_html(html) # HTML response
return "string" # String response
return {"error": "..."}, 404 # With status code

See Also