Update local state immediately, then reconcile with server:
Copy
// Client sends movementfunction movePlayer(x, y) { // Update local state immediately (prediction) players[myId].x = x; players[myId].y = y; // Send to server connection.send({ action: 'move', x: x, y: y });}// Server confirms position (reconciliation)connection.on('position_confirmed', (data) => { // Only update if server position differs significantly if (Math.abs(players[myId].x - data.x) > 5) { players[myId].x = data.x; players[myId].y = data.y; }});
Never trust client data - always validate on the server:
Copy
async def on_message(): action = request.get('action') if action == 'move': x = request.get('x', 0) y = request.get('y', 0) # ✅ Validate coordinates if not (0 <= x <= 800 and 0 <= y <= 600): return {'error': 'Invalid coordinates'} # ✅ Validate speed (anti-cheat) player = room.state['players'][session.player_id] distance = ((x - player['x'])**2 + (y - player['y'])**2)**0.5 max_speed = 10 # Max pixels per tick if distance > max_speed: return {'error': 'Movement too fast'} # Update position player['x'] = x player['y'] = y
Implement Rate Limiting
Protect against spam and abuse:
Copy
async def on_message(): action = request.get('action') if action == 'send_message': # Check rate limit last_message_time = session.get('last_message_time', 0) current_time = time.time() if current_time - last_message_time < 1.0: # 1 message per second return {'error': 'Please wait before sending another message'} session.set('last_message_time', current_time) # Process message text = request.get('text', '') await room.broadcast({ 'type': 'new_message', 'text': text })
CocoCloud has built-in rate limiting of 60 messages/second per player, but you should implement stricter limits for specific actions.
Authorize Critical Actions
Verify player permissions before executing critical actions:
Copy
async def on_message(): action = request.get('action') if action == 'start_game': # ✅ Check if player is room host if session.player_id != room.state.get('host_id'): return {'error': 'Only host can start the game'} # ✅ Check game state if room.state['phase'] != 'lobby': return {'error': 'Game already started'} # ✅ Check minimum players if room.get_player_count() < 2: return {'error': 'Need at least 2 players'} # Start game room.state['phase'] = 'playing' await room.broadcast({'type': 'game_started'})
Sanitize User Content
Clean user-generated content before broadcasting:
Copy
async def on_message(): action = request.get('action') if action == 'send_message': text = request.get('text', '') # ✅ Length limit if len(text) > 500: return {'error': 'Message too long'} # ✅ Remove leading/trailing whitespace text = text.strip() # ✅ Check for empty message if not text: return {'error': 'Message cannot be empty'} # ✅ (Optional) Filter profanity # text = profanity_filter.clean(text) await room.broadcast({ 'type': 'new_message', 'user': room.state['players'][session.player_id]['name'], 'text': text })
async def on_connect(): # ✅ Proper initialization if not room.state: room.state = { 'players': {}, 'game_phase': 'lobby', 'round': 1, 'created_at': time.time() } # Now safe to use room.state room.state['players'][session.player_id] = { 'name': request.get('player_name', 'Guest'), 'score': 0 }
Clean Up Disconnected Players
Remove player data when they disconnect:
Copy
async def on_disconnect(): # ✅ Clean up player data if session.player_id in room.state['players']: player_name = room.state['players'][session.player_id]['name'] del room.state['players'][session.player_id] await room.broadcast({ 'type': 'player_left', 'player_id': session.player_id, 'player_name': player_name }) # ✅ Destroy empty rooms if room.get_player_count() == 0: room.stop_game_loop() room.destroy()
Use Session Data for Player-Specific Info
Store per-player data in session, not room state:
Copy
# ❌ Bad: Mixing session data in room stateroom.state['players'][session.player_id] = { 'name': 'Alice', 'temp_variable': 123, # Don't put session-only data here 'last_action': time.time()}# ✅ Good: Separate concerns# Room state (shared across all players)room.state['players'][session.player_id] = { 'name': 'Alice', 'score': 100, 'x': 400, 'y': 300}# Session data (player-specific, not broadcast)session.set('temp_variable', 123)session.set('last_action', time.time())
Handle State Synchronization
Ensure all players have consistent state:
Copy
async def on_connect(): # ... player joins room ... # ✅ Send full state to new player return { 'type': 'welcome', 'your_id': session.player_id, 'room_state': { 'players': room.state['players'], 'game_phase': room.state['game_phase'], 'round': room.state['round'], 'timer': room.state.get('timer', 0) } }async def on_tick(): # ✅ Broadcast only changes (delta updates) await room.broadcast({ 'type': 'timer_update', 'timer': int(room.state['timer']) })
async def on_message(): try: action = request.get('action') if action == 'process_data': data = request.get('data') # Process data (might fail) result = complex_calculation(data) return {'status': 'success', 'result': result} except ValueError as e: # Return user-friendly error return {'error': 'Invalid data format'} except Exception as e: # Log error for debugging print(f"Unexpected error: {e}") return {'error': 'Something went wrong'}
Provide Clear Error Messages
Help users understand what went wrong:
Copy
# ❌ Bad: Vague errorreturn {'error': 'Invalid'}# ✅ Good: Specific errorreturn { 'error': 'Invalid bet amount', 'details': f'Bet must be between $1 and ${player["balance"]}'}
Handle Reconnections
Allow players to reconnect and resume their session:
Copy
async def on_connect(): # Check for reconnection token reconnect_token = request.get('reconnect_token') if reconnect_token: # Try to restore previous session old_player_id = room.state.get('reconnect_tokens', {}).get(reconnect_token) if old_player_id and old_player_id in room.state['players']: # Restore player data player_data = room.state['players'][old_player_id] room.state['players'][session.player_id] = player_data # Clean up old player del room.state['players'][old_player_id] await room.broadcast({ 'type': 'player_reconnected', 'player_id': session.player_id, 'player_name': player_data['name'] }) return { 'type': 'reconnected', 'your_id': session.player_id, 'player_data': player_data } # New connection (normal flow) # ... rest of on_connect logic ... # Generate reconnect token reconnect_token = str(uuid.uuid4()) if 'reconnect_tokens' not in room.state: room.state['reconnect_tokens'] = {} room.state['reconnect_tokens'][reconnect_token] = session.player_id return { 'type': 'welcome', 'your_id': session.player_id, 'reconnect_token': reconnect_token }
Room-based architecture scales horizontally - each room runs independently. Design your rooms to be self-contained.
Limit Room Size
Don’t put too many players in one room:
Copy
async def on_connect(): # ✅ Enforce maximum players room.join(request.get('room_id', 'lobby'), max_players=10) # Room will reject connection if full
Recommended limits:
Fast-paced games: 2-10 players
MMO zones: 20-50 players
Chat rooms: 50-100 users
Implement Room Sharding
Split large player bases across multiple rooms:
Copy
async def on_connect(): # Auto-assign to least populated room room_id = request.get('room_id') if not room_id: # Find room with space available_rooms = [] # Query active rooms room_id = available_rooms[0] if available_rooms else f'room-{uuid.uuid4()}' room.join(room_id, max_players=10)
Clean Up Inactive Rooms
Destroy rooms when empty:
Copy
async def on_disconnect(): # Remove player if session.player_id in room.state['players']: del room.state['players'][session.player_id] # ✅ Destroy room if empty if room.get_player_count() == 0: room.stop_game_loop() room.destroy()
Use Database for Persistence
Store important data in the database, not just room state:
Copy
async def on_message(): action = request.get('action') if action == 'save_progress': player = room.state['players'][session.player_id] # ✅ Save to database for persistence await db.collection('player_progress').update( session.player_id, { 'level': player['level'], 'score': player['score'], 'inventory': player['inventory'], 'last_save': time.time() } ) return {'status': 'saved'}
async def on_tick(): start_time = time.time() # Your game logic here # ... tick_duration = time.time() - start_time # Warn if tick is too slow if tick_duration > 0.05: # 50ms (should be < tick interval) print(f"[WARNING] Slow tick: {tick_duration*1000:.2f}ms")