Code from github to self hosted with git tea in AWS
This commit is contained in:
258
mini-dropbox/CODE/app/files.py
Normal file
258
mini-dropbox/CODE/app/files.py
Normal file
@@ -0,0 +1,258 @@
|
||||
# === FILE: app/files.py ===
|
||||
import os
|
||||
import uuid
|
||||
import secrets
|
||||
from datetime import datetime
|
||||
from flask import Blueprint, render_template, request, jsonify, session, redirect, url_for
|
||||
from werkzeug.utils import secure_filename
|
||||
from app.auth import login_required
|
||||
from app import db, s3
|
||||
|
||||
|
||||
files_bp = Blueprint('files', __name__, url_prefix='/files')
|
||||
|
||||
|
||||
# Maximum file size: 100MB
|
||||
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB in bytes
|
||||
|
||||
|
||||
def format_file_size(size_bytes):
|
||||
"""Convert bytes to human-readable format."""
|
||||
if size_bytes < 1024:
|
||||
return f"{size_bytes} B"
|
||||
elif size_bytes < 1024 * 1024:
|
||||
return f"{size_bytes / 1024:.2f} KB"
|
||||
elif size_bytes < 1024 * 1024 * 1024:
|
||||
return f"{size_bytes / (1024 * 1024):.2f} MB"
|
||||
else:
|
||||
return f"{size_bytes / (1024 * 1024 * 1024):.2f} GB"
|
||||
|
||||
|
||||
def get_file_category(file_type):
|
||||
"""Determine file category based on MIME type."""
|
||||
if not file_type:
|
||||
return 'other'
|
||||
|
||||
file_type = file_type.lower()
|
||||
|
||||
if file_type.startswith('image/'):
|
||||
return 'image'
|
||||
elif file_type == 'application/pdf':
|
||||
return 'pdf'
|
||||
elif file_type.startswith('video/'):
|
||||
return 'video'
|
||||
elif file_type.startswith('audio/'):
|
||||
return 'audio'
|
||||
elif file_type in ['application/zip', 'application/x-zip-compressed', 'application/x-rar-compressed', 'application/x-7z-compressed']:
|
||||
return 'archive'
|
||||
elif file_type in ['application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
||||
'application/vnd.ms-excel', 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
||||
'application/vnd.ms-powerpoint', 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
|
||||
'text/plain', 'text/csv']:
|
||||
return 'document'
|
||||
else:
|
||||
return 'other'
|
||||
|
||||
|
||||
def format_date(dt):
|
||||
"""Format datetime to readable string."""
|
||||
if not dt:
|
||||
return ''
|
||||
|
||||
if isinstance(dt, str):
|
||||
try:
|
||||
dt = datetime.fromisoformat(dt)
|
||||
except:
|
||||
return dt
|
||||
|
||||
return dt.strftime('%b %d, %Y')
|
||||
|
||||
|
||||
@files_bp.route('/')
|
||||
@login_required
|
||||
def dashboard():
|
||||
"""Display the main dashboard with user's files."""
|
||||
user_id = session['user']['id']
|
||||
|
||||
# Get all files for the user
|
||||
files = db.get_files_by_user(user_id)
|
||||
|
||||
# Enhance file data with formatted information
|
||||
for file in files:
|
||||
file['size_formatted'] = format_file_size(file['file_size'] or 0)
|
||||
file['category'] = get_file_category(file['file_type'])
|
||||
file['date_formatted'] = format_date(file['uploaded_at'])
|
||||
|
||||
return render_template(
|
||||
'dashboard.html',
|
||||
user=session['user'],
|
||||
files=files
|
||||
)
|
||||
|
||||
|
||||
@files_bp.route('/upload', methods=['POST'])
|
||||
@login_required
|
||||
def upload():
|
||||
"""Handle file upload."""
|
||||
try:
|
||||
# Check if file is present
|
||||
if 'file' not in request.files:
|
||||
return jsonify({'success': False, 'message': 'No file provided'}), 400
|
||||
|
||||
file = request.files['file']
|
||||
|
||||
if file.filename == '':
|
||||
return jsonify({'success': False, 'message': 'No file selected'}), 400
|
||||
|
||||
# Get file size
|
||||
file.seek(0, os.SEEK_END)
|
||||
file_size = file.tell()
|
||||
file.seek(0)
|
||||
|
||||
# Validate file size
|
||||
if file_size > MAX_FILE_SIZE:
|
||||
return jsonify({'success': False, 'message': 'File size exceeds 100MB limit'}), 400
|
||||
|
||||
# Get user ID
|
||||
user_id = session['user']['id']
|
||||
|
||||
# Secure the filename
|
||||
original_name = file.filename
|
||||
safe_filename = secure_filename(original_name)
|
||||
|
||||
# Generate unique S3 key
|
||||
unique_id = uuid.uuid4()
|
||||
s3_key = f"{user_id}/{unique_id}_{safe_filename}"
|
||||
|
||||
# Get content type
|
||||
content_type = file.content_type or 'application/octet-stream'
|
||||
|
||||
# Upload to S3
|
||||
s3.upload_file(file, s3_key, content_type)
|
||||
|
||||
# Save metadata to database
|
||||
db.add_file(
|
||||
user_id=user_id,
|
||||
filename=safe_filename,
|
||||
original_name=original_name,
|
||||
s3_key=s3_key,
|
||||
file_size=file_size,
|
||||
file_type=content_type
|
||||
)
|
||||
|
||||
return jsonify({'success': True, 'message': 'File uploaded successfully'}), 200
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error uploading file: {e}")
|
||||
return jsonify({'success': False, 'message': 'Failed to upload file'}), 500
|
||||
|
||||
|
||||
@files_bp.route('/download/<int:file_id>')
|
||||
@login_required
|
||||
def download(file_id):
|
||||
"""Generate presigned URL and redirect for file download."""
|
||||
try:
|
||||
user_id = session['user']['id']
|
||||
|
||||
# Get file from database (verifies ownership)
|
||||
file = db.get_file(file_id, user_id)
|
||||
|
||||
if not file:
|
||||
return jsonify({'success': False, 'message': 'File not found'}), 404
|
||||
|
||||
# Generate presigned URL (15 minutes)
|
||||
presigned_url = s3.generate_presigned_url(file['s3_key'], expiry=900)
|
||||
|
||||
# Redirect to presigned URL
|
||||
return redirect(presigned_url)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error downloading file: {e}")
|
||||
return jsonify({'success': False, 'message': 'Failed to download file'}), 500
|
||||
|
||||
|
||||
@files_bp.route('/delete/<int:file_id>', methods=['POST'])
|
||||
@login_required
|
||||
def delete(file_id):
|
||||
"""Delete a file."""
|
||||
try:
|
||||
user_id = session['user']['id']
|
||||
|
||||
# Get file from database (verifies ownership)
|
||||
file = db.get_file(file_id, user_id)
|
||||
|
||||
if not file:
|
||||
return jsonify({'success': False, 'message': 'File not found'}), 404
|
||||
|
||||
# Delete from S3
|
||||
s3.delete_file(file['s3_key'])
|
||||
|
||||
# Delete from database
|
||||
db.delete_file(file_id, user_id)
|
||||
|
||||
return jsonify({'success': True, 'message': 'File deleted successfully'}), 200
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error deleting file: {e}")
|
||||
return jsonify({'success': False, 'message': 'Failed to delete file'}), 500
|
||||
|
||||
|
||||
@files_bp.route('/share/<int:file_id>', methods=['POST'])
|
||||
@login_required
|
||||
def share(file_id):
|
||||
"""Generate a shareable link for a file."""
|
||||
try:
|
||||
user_id = session['user']['id']
|
||||
|
||||
# Get file from database (verifies ownership)
|
||||
file = db.get_file(file_id, user_id)
|
||||
|
||||
if not file:
|
||||
return jsonify({'success': False, 'message': 'File not found'}), 404
|
||||
|
||||
# Generate share token
|
||||
share_token = secrets.token_urlsafe(32)
|
||||
|
||||
# Save token to database
|
||||
db.set_share_token(file_id, user_id, share_token)
|
||||
|
||||
# Generate 7-day presigned URL
|
||||
share_url = s3.generate_share_url(file['s3_key'], expiry=604800)
|
||||
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'share_url': share_url
|
||||
}), 200
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error sharing file: {e}")
|
||||
return jsonify({'success': False, 'message': 'Failed to generate share link'}), 500
|
||||
|
||||
|
||||
@files_bp.route('/preview/<int:file_id>')
|
||||
@login_required
|
||||
def preview(file_id):
|
||||
"""Generate a preview URL for images and PDFs."""
|
||||
try:
|
||||
user_id = session['user']['id']
|
||||
|
||||
# Get file from database (verifies ownership)
|
||||
file = db.get_file(file_id, user_id)
|
||||
|
||||
if not file:
|
||||
return jsonify({'success': False, 'message': 'File not found'}), 404
|
||||
|
||||
# Check if file is previewable (image or PDF)
|
||||
file_type = file.get('file_type', '').lower()
|
||||
|
||||
if file_type.startswith('image/') or file_type == 'application/pdf':
|
||||
# Return CloudFront URL for preview
|
||||
preview_url = s3.get_cloudfront_url(file['s3_key'])
|
||||
return jsonify({'success': True, 'preview_url': preview_url}), 200
|
||||
else:
|
||||
# Not previewable, redirect to download
|
||||
return redirect(url_for('files.download', file_id=file_id))
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error generating preview: {e}")
|
||||
return jsonify({'success': False, 'message': 'Failed to generate preview'}), 500
|
||||
Reference in New Issue
Block a user