Skip to main content

Cloud Functions

Create serverless backend logic with full database access, request handling, and real-time capabilities using Python cloud functions.

What Are Cloud Functions?

Cloud functions are serverless Python functions that run in the cloud. They provide:
  • Full Database Access - Query and modify your data using the powerful Database API
  • HTTP Request Handling - Process GET/POST requests with query parameters and JSON payloads
  • HTML Rendering - Return dynamic HTML pages with Jinja2 templates
  • Email Integration - Send emails directly from your functions
  • Environment Context - Access user authentication, project info, and more

When to Use Cloud Functions

  • Custom API Endpoints - Build custom backend logic without deploying servers
  • Data Processing - Transform, aggregate, or validate data before storing
  • Dynamic Web Pages - Render server-side HTML with real-time data
  • Webhooks - Handle webhook events from external services
  • Scheduled Tasks - Run background jobs and cron tasks
  • Email Notifications - Send automated emails based on events

Getting Started

Basic Function Structure

Every cloud function has a main() function that serves as the entry point:
def main():
    return {"message": "Hello, World!"}

Execution URL

Each cloud function gets a unique execution URL:
https://api.cocobase.buzz/functions/{function_id}/execute
Call it with GET or POST requests:
# GET request with query parameters
curl "https://api.cocobase.buzz/functions/abc123/execute?name=John&age=25"

# POST request with JSON body
curl -X POST https://api.cocobase.buzz/functions/abc123/execute \
  -H "Content-Type: application/json" \
  -d '{"name": "John", "age": 25}'

Database API Access

The db object provides full access to your database. It’s automatically available in all cloud functions.

Query Documents

def main():
    # Simple query
    posts = db.query("posts",
        status="published",
        limit=10
    )

    return {"posts": posts["data"]}

Query with Filters

def main():
    # Advanced filtering
    products = db.query("products",
        price_gte="50",        # Greater than or equal
        price_lte="500",       # Less than or equal
        stock_gt="0",          # Greater than
        category_in="electronics,computers",  # In array
        populate=["category", "reviews"],
        sort="popularity",
        order="desc",
        limit=24
    )

    return {"products": products["data"]}

Get Single Document

def main():
    post_id = req.get("post_id")

    post = db.find_one("posts",
        id=post_id,
        populate=["author", "category"]
    )

    if not post:
        return {"error": "Post not found"}, 404

    return {"post": post}

Create Documents

def main():
    # Create a new document
    post = db.create_document("posts", {
        "title": "My First Post",
        "content": "Hello World!",
        "status": "draft",
        "author_id": req.user.id,
        "views": 0
    })

    return {"post": post}

Update Documents

def main():
    post_id = req.get("post_id")

    # Full replacement
    post = db.update_document("posts", post_id, {
        "title": "Updated Title",
        "content": "Updated content"
    })

    # Partial update (merges with existing)
    post = db.update_document_fields("posts", post_id, {
        "views": 150,
        "status": "published"
    })

    return {"post": post}

Delete Documents

def main():
    post_id = req.get("post_id")
    deleted = db.delete_document("posts", post_id)

    return {"deleted": deleted}

Request and Response Handling

The Request Object

Access incoming request data with the request object (also available as req):
def main():
    # Get data from POST body or GET query params
    name = req.get('name', 'Guest')
    age = req.get('age', 0)

    # Access all payload data
    all_data = req.json()

    # Get query parameters (GET requests)
    page = req.query_params.get('page', '1')

    # Access headers
    auth_token = req.headers.get('authorization', '')

    # Check HTTP method
    method = req.method  # 'GET' or 'POST'

    # Current user (if authenticated)
    user = req.user  # AppUser object or None

    return {
        "received": all_data,
        "method": method
    }

Response Types

JSON Response (Default)
def main():
    return {
        "success": True,
        "message": "Operation completed",
        "data": {"id": "123", "name": "John"}
    }
HTML Response
def main():
    html = '''
    <!DOCTYPE html>
    <html>
    <head>
        <title>My Page</title>
    </head>
    <body>
        <h1>Hello World!</h1>
    </body>
    </html>
    '''

    return render.render_html(html)
Status Codes
def main():
    user = req.user

    if not user:
        return {"error": "Unauthorized"}, 401

    if not user.is_admin:
        return {"error": "Forbidden"}, 403

    return {"success": True}, 200

Error Handling

def main():
    try:
        result = some_operation()
        return {"result": result}
    except ValueError as e:
        return {"error": str(e)}, 400
    except Exception as e:
        return {"error": "Internal server error"}, 500

Environment Variables

Access environment and project information:
def main():
    # Project ID
    project_id = env.project_id

    # Execution environment
    runtime = env.runtime  # "python3.10"

    # Static files base URL
    static_url = env.static_base_url

    return {
        "project": project_id,
        "runtime": runtime
    }

Execution Context

User Authentication

Access authenticated user data:
def main():
    user = req.user

    if not user:
        return {"error": "Authentication required"}, 401

    # User properties
    user_id = user.id
    email = user.email
    user_data = user.data  # Custom user fields
    roles = user.roles

    return {
        "user_id": user_id,
        "email": email,
        "roles": roles
    }

HTTP Methods

Route based on HTTP method:
def main():
    method = req.method

    if method == "GET":
        # List posts
        posts = db.query("posts",
            status="published",
            limit=20
        )
        return {"posts": posts["data"]}

    elif method == "POST":
        # Create post
        user = req.user
        if not user:
            return {"error": "Authentication required"}, 401

        data = req.json()
        post = db.create_document("posts", {
            "title": data["title"],
            "content": data["content"],
            "author_id": user.id,
            "status": "draft"
        })
        return {"post": post}

    else:
        return {"error": "Method not supported"}, 405

Database Queries in Cloud Functions

Comparison Operators

Use operator suffixes to filter data:
OperatorSuffixExampleDescription
Equal(none) or _eqstatus="published"Exact match
Not Equal_nestatus_ne="draft"Not equal
Greater Than_gtprice_gt="100"Greater than
Greater or Equal_gteage_gte="18"Greater than or equal
Less Than_ltprice_lt="1000"Less than
Less or Equal_ltestock_lte="10"Less than or equal
Contains_containstitle_contains="python"String contains
Starts With_startswithname_startswith="john"String starts with
Ends With_endswithemail_endswith="@gmail.com"String ends with
In Array_instatus_in="published,draft"Value in list
Not In Array_notincategory_notin="spam,nsfw"Value not in list
Is Null_isnulldeleted_at_isnull="true"Check if null
Examples:
# Greater than
products = db.query("products", price_gt="50")

# Contains (case-insensitive)
users = db.query_users(name_contains="john")

# In array
posts = db.query("posts",
    status_in="published,featured,trending"
)

# Multiple operators
products = db.query("products",
    price_gte="50",
    price_lte="500",
    stock_gt="0",
    category_ne="discontinued"
)

Boolean Logic - OR Queries

Simple OR:
# Posts with status = published OR featured
posts = db.query("posts", **{
    "[or]status": "published",
    "[or]status_2": "featured"
})
Named OR Groups:
# (category=tech OR category=programming) AND (status=published OR status=featured)
posts = db.query("posts", **{
    "[or:cats]category": "tech",
    "[or:cats]category_2": "programming",
    "[or:status]status": "published",
    "[or:status]status_2": "featured"
})
Search Across Multiple Fields:
# Posts where title OR content contains keyword
posts = db.query("posts", **{
    "[or:search]title_contains": "python",
    "[or:search]content_contains": "python"
})

Relationships and Population

Auto-Relationship Detection: Relationships are automatically detected based on field naming:
  • {field}id - Single reference
  • {field}_ids - Multiple references
# Populate single relationship
posts = db.query("posts",
    populate=["author"],
    limit=10
)

# Populate multiple relationships
posts = db.query("posts",
    populate=["author", "category", "tags"],
    limit=10
)

# Deep (nested) population
posts = db.query("posts",
    populate=["author.company.location"],
    limit=10
)
Filter by Related Data:
# Get posts where author has role=admin
posts = db.query("posts", **{
    "author.role": "admin"
}, populate=["author"], limit=20)

# Multiple relationship filters
posts = db.query("posts", **{
    "author.verified": "true",
    "category.status": "active"
}, populate=["author", "category"])

User Queries

Query users with the same powerful features:
# Query users
users = db.query_users(
    role="premium",
    age_gte="18",
    email_endswith="@gmail.com",
    populate=["referred_by"],
    sort="created_at",
    order="desc",
    limit=50
)

# Find single user
user = db.find_user(
    id="user-123",
    populate=["company", "manager"]
)

# By email
user = db.find_user(email="[email protected]")

User Relationships

Manage relationships between users:
# Get followers
followers = db.get_user_relationships(
    user_id="user-123",
    relationship_type="followers",
    limit=50
)

# Add relationship
db.add_user_relationship(
    user_id="user-123",
    related_user_id="user-456",
    relationship_type="following"
)

# Remove relationship
db.remove_user_relationship(
    user_id="user-123",
    related_user_id="user-456",
    relationship_type="following"
)

# Get user's collections
posts = db.get_user_collections(
    user_id="user-123",
    collection_name="posts",
    filters={"status": "published"},
    populate=["category"],
    limit=20
)

Operators and Filtering

# Search in title or description
products = db.query("products", **{
    "[or]name_contains": "laptop",
    "[or]description_contains": "laptop"
})

# Email domain filtering
company_users = db.query_users(
    email_endswith="@mycompany.com"
)

# Name prefix search
users = db.query_users(
    name_startswith="John"
)

Range Queries

# Price range
products = db.query("products",
    price_gte="10",
    price_lte="100",
    stock_gt="0"
)

# Date range
orders = db.query("orders",
    created_at_gte="2024-01-01",
    created_at_lt="2024-02-01"
)

# Age range
users = db.query_users(
    age_gte="18",
    age_lte="65"
)

List Filtering

# Multiple values (any match)
posts = db.query("posts",
    category_in="tech,science,tutorial"
)

# Exclude values
users = db.query_users(
    role_notin="admin,superadmin"
)

Null Checks

# Find items without deletion timestamp
active_posts = db.query("posts",
    deleted_at_isnull="true"
)

# Find users with referrer
referred_users = db.query_users(
    referred_by_id_isnull="false"
)

Sorting and Pagination

# Sort by field
posts = db.query("posts",
    status="published",
    sort="created_at",
    order="desc",  # or "asc"
    limit=20,
    offset=0
)

# Pagination helper
def get_page(page=1, per_page=20):
    offset = (page - 1) * per_page
    result = db.query("posts",
        status="published",
        limit=per_page,
        offset=offset,
        sort="created_at",
        order="desc"
    )
    return {
        "posts": result["data"],
        "page": page,
        "per_page": per_page,
        "total": result["total"],
        "has_more": result["has_more"]
    }

Real-World Examples

def main():
    # Get filters from request
    category = req.get("category")
    min_price = req.get("min_price", "0")
    max_price = req.get("max_price", "10000")
    search = req.get("search", "")

    # Build filters
    filters = {
        "status": "active",
        "stock_gt": "0",
        "price_gte": min_price,
        "price_lte": max_price
    }

    if category:
        filters["category_id"] = category

    if search:
        filters["[or]name_contains"] = search
        filters["[or]description_contains"] = search

    # Query products
    products = db.query("products", **filters,
        populate=["category", "reviews"],
        sort="popularity",
        order="desc",
        limit=24
    )

    return {
        "products": products["data"],
        "total": products["total"],
        "has_more": products["has_more"]
    }

Social Media: User Feed

def main():
    user_id = req.get("user_id")

    # Get users this user follows
    following = db.get_user_relationships(user_id, "following")
    following_ids = [u["id"] for u in following["data"]]

    # Build OR filter for posts from followed users
    filters = {}
    for idx, followed_id in enumerate(following_ids[:50]):
        filters[f"[or:authors]author_id_{idx}"] = followed_id

    # Query posts
    feed = db.query("posts", **{
        **filters,
        "status": "published"
    },
    populate=["author", "category"],
    sort="created_at",
    order="desc",
    limit=30)

    return {"feed": feed["data"]}
def main():
    keyword = req.get("keyword", "")
    category = req.get("category")
    author_id = req.get("author_id")
    tags = req.get("tags", "").split(",") if req.get("tags") else []

    # Build filters
    filters = {"status": "published"}

    # Search in title or content
    if keyword:
        filters["[or:search]title_contains"] = keyword
        filters["[or:search]content_contains"] = keyword

    # Filter by category
    if category:
        filters["category_id"] = category

    # Filter by author
    if author_id:
        filters["author_id"] = author_id

    # Filter by tags (any tag matches)
    if tags:
        for idx, tag in enumerate(tags):
            filters[f"[or:tags]tag_ids_{idx}"] = tag

    # Query posts
    posts = db.query("posts", **filters,
        populate=["author", "category", "tags"],
        sort="created_at",
        order="desc",
        limit=20
    )

    return {
        "posts": posts["data"],
        "total": posts["total"]
    }

User Dashboard

def main():
    user_id = req.get("user_id")

    # Get user info
    user = db.find_user(id=user_id, populate=["company"])

    # Get user's posts
    posts = db.get_user_collections(
        user_id, "posts",
        filters={"status": "published"},
        limit=10
    )

    # Get followers/following
    followers = db.get_user_relationships(user_id, "followers", limit=5)
    following = db.get_user_relationships(user_id, "following", limit=5)

    # Get recent activity
    comments = db.get_user_collections(
        user_id, "comments",
        populate=["post"],
        limit=5
    )

    return {
        "user": user,
        "stats": {
            "posts": posts["total"],
            "followers": followers["total"],
            "following": following["total"],
            "comments": comments["total"]
        },
        "recent_posts": posts["data"],
        "recent_comments": comments["data"]
    }

Dynamic Web Page

def main():
    # Get page from query param
    page = int(req.query_params.get('page', '1'))
    per_page = 10

    # Get posts from database
    posts = db.query("posts",
        status="published",
        populate=["author", "category"],
        sort="created_at",
        order="desc",
        limit=per_page,
        offset=(page - 1) * per_page
    )

    # Render HTML
    html = '''
    <!DOCTYPE html>
    <html>
    <head>
        <title>Blog - Page {{ page }}</title>
        <link rel="stylesheet" href="{{ static('css/blog.css') }}">
    </head>
    <body>
        <header>
            <h1>My Blog</h1>
        </header>

        <main>
            {% for post in posts %}
            <article class="post">
                <h2>{{ post.title }}</h2>
                <div class="meta">
                    <span>By {{ post.author.name }}</span>
                    <span>{{ post.created_at }}</span>
                </div>
                <p>{{ post.excerpt }}</p>
                <a href="?post={{ post.id }}">Read more →</a>
            </article>
            {% endfor %}
        </main>

        <footer>
            <div class="pagination">
                {% if page > 1 %}
                <a href="?page={{ page - 1 }}">← Previous</a>
                {% endif %}

                <span>Page {{ page }}</span>

                {% if has_more %}
                <a href="?page={{ page + 1 }}">Next →</a>
                {% endif %}
            </div>
        </footer>
    </body>
    </html>
    '''

    return render.render_html(html, {
        'posts': posts['data'],
        'page': page,
        'has_more': posts['has_more']
    })

Best Practices

1. Always Use Limits

# Good
posts = db.query("posts", limit=20)

# Bad (could return thousands)
posts = db.query("posts")

2. Use Indexes for Performance

# Query on indexed fields
posts = db.query("posts",
    status="published",  # Should be indexed
    user_id="user-123",  # Should be indexed
    sort="created_at",   # Should be indexed
    limit=20
)

3. Selective Population

# Good - only populate what you need
posts = db.query("posts",
    populate=["author"],
    select=["id", "title", "author"]
)

# Bad - populating everything is slow
posts = db.query("posts",
    populate=["author", "category", "tags", "comments", "likes"]
)

4. Clear Field Naming

# Good - clear intent
{
  "author_id": "user-123",        # Obviously a user
  "product_ids": ["p1", "p2"],    # Obviously products
  "referred_by_id": "user-456"    # Obviously a user
}

# Bad - ambiguous
{
  "related_id": "...",   # Related to what?
  "linked_ids": ["..."]  # Linked to what?
}

5. Handle Pagination

def get_posts(page=1, per_page=20):
    offset = (page - 1) * per_page

    result = db.query("posts",
        status="published",
        limit=per_page,
        offset=offset,
        sort="created_at",
        order="desc"
    )

    return {
        "posts": result["data"],
        "page": page,
        "per_page": per_page,
        "total": result["total"],
        "has_more": result["has_more"]
    }

6. Error Handling

def main():
    try:
        user_id = req.get("user_id")

        if not user_id:
            return {"error": "user_id is required"}, 400

        posts = db.query("posts",
            author_id=user_id,
            limit=20
        )

        return {"posts": posts["data"]}

    except Exception as e:
        print(f"Error: {str(e)}")
        return {"error": "Internal server error"}, 500

7. Validate Input

def main():
    email = req.get('email', '').strip()

    if not email:
        return {"error": "Email is required"}, 400

    if '@' not in email:
        return {"error": "Invalid email format"}, 400

    # Process email...

Intellisense Setup

Monaco Editor Integration

Enable IntelliSense for cloud functions in your code editor: Install Dependencies:
npm install @monaco-editor/react monaco-editor
Use CloudFunctionEditor Component:
import CloudFunctionEditor from "@/components/CloudFunctionEditor";

function MyPage() {
  const [code, setCode] = useState(
    'def main():\n    return {"message": "Hello"}'
  );

  return (
    <CloudFunctionEditor
      value={code}
      onChange={setCode}
      apiBaseUrl="https://your-api.com"
      height="600px"
    />
  );
}

Available Endpoints

Your backend provides IntelliSense data:
  • GET /intellisense/python-stubs - Type definitions for request, db, render objects
  • GET /intellisense/python-examples - Code examples and patterns
  • GET /intellisense/operators - Query operator documentation

Features Provided

  • Autocomplete - Type request., db., or render. to see available methods
  • Code Snippets - Quick templates for common patterns
  • Hover Documentation - Hover over globals to see documentation
  • Examples Toolbar - One-click insertion of complete code patterns

Available Python Modules

Cloud functions have access to these Python standard library modules without import statements:
ModuleDescriptionCommon Uses
jsonJSON encoding/decodingParse API responses, serialize data
datetimeDate and time classCurrent time, timestamps
timedeltaTime duration classAdd/subtract time, expiration
timeTime utilitiesDelays, timestamps, timing
mathMathematical functionsCalculations, rounding
reRegular expressionsPattern matching, validation
secretsCryptographically secure randomOTP generation, tokens, passwords
uuidUUID generationUnique identifiers
hashlibHash functionsMD5, SHA256, data integrity
base64Base64 encodingEncode/decode binary data
stringString constantsASCII letters, digits, punctuation
collectionsData structuresCounter, defaultdict, deque

Examples

OTP Generation (Secure):
def main():
    # Generate 6-digit OTP (cryptographically secure)
    otp = ''.join([str(secrets.randbelow(10)) for _ in range(6)])

    # Send via email or SMS
    return {"otp": otp}
Password Hashing:
def main():
    password = req.get('password')

    # Hash password with SHA256
    hashed = hashlib.sha256(password.encode()).hexdigest()

    return {"hash": hashed}
Email Validation:
def main():
    email = req.get('email')

    # Validate email format
    email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
    is_valid = bool(re.match(email_pattern, email))

    if not is_valid:
        return {"error": "Invalid email format"}, 400

    return {"valid": True}

Next Steps