diff options
Diffstat (limited to 'backend')
| -rw-r--r-- | backend/app.py | 77 | ||||
| -rw-r--r-- | backend/server/Dockerfile (renamed from backend/Dockerfile) | 8 | ||||
| -rw-r--r-- | backend/server/app.py | 145 | ||||
| -rw-r--r-- | backend/server/requirements.txt (renamed from backend/requirements.txt) | 4 | ||||
| -rw-r--r-- | backend/worker/Dockerfile | 17 | ||||
| -rw-r--r-- | backend/worker/requirements.txt | 5 | ||||
| -rw-r--r-- | backend/worker/worker.py | 129 |
7 files changed, 306 insertions, 79 deletions
diff --git a/backend/app.py b/backend/app.py deleted file mode 100644 index 4ed4954..0000000 --- a/backend/app.py +++ /dev/null @@ -1,77 +0,0 @@ -import os -from flask import Flask, request, jsonify -from flask_cors import CORS -import boto3 -import cv2 -import tempfile -import numpy as np - -app = Flask(__name__) -CORS(app) - -# Configure AWS S3 -s3 = boto3.client('s3') -BUCKET_NAME_ORIGINAL = "original-images-rapid-macaw" -BUCKET_NAME_PROCESSED = "processed-images-rapid-macaw" - -@app.route('/upload', methods=['POST']) -def upload_file(): - if 'image' not in request.files: - return jsonify({'error': 'No file provided'}), 400 - - file = request.files['image'] - operation = request.form.get('operation', 'edge_detection') # Default to edge detection - if file and allowed_file(file.filename): - # Save the file temporarily - temp_file = tempfile.NamedTemporaryFile(delete=False) - file.save(temp_file.name) - - # Upload to S3 original bucket - with open(temp_file.name, "rb") as img_data: - s3.put_object(Bucket=BUCKET_NAME_ORIGINAL, Key=file.filename, Body=img_data, ContentType="image/png") - - # Fetch the image from the original bucket - original_img_obj = s3.get_object(Bucket=BUCKET_NAME_ORIGINAL, Key=file.filename) - original_img_data = original_img_obj['Body'].read() - - # Process the image - processed_image_path = process_image(original_img_data, operation) - - # Upload processed image to S3 processed bucket - processed_filename = f"processed_{file.filename}" - with open(processed_image_path, "rb") as processed_img_data: - s3.put_object(Bucket=BUCKET_NAME_PROCESSED, Key=processed_filename, Body=processed_img_data, ContentType="image/png") - - # Clean up temporary files - os.remove(temp_file.name) - os.remove(processed_image_path) - - processed_file_url = f'https://{BUCKET_NAME_PROCESSED}.s3.amazonaws.com/{processed_filename}' - - return jsonify({'message': 'File processed and uploaded successfully', 'processed_file': processed_file_url}), 200 - else: - return jsonify({'error': 'Invalid file type'}), 400 - -def allowed_file(filename): - return '.' in filename and filename.rsplit('.', 1)[1].lower() in {'png', 'jpg', 'jpeg'} - -def process_image(image_data, operation): - # Convert image data to numpy array - nparr = np.frombuffer(image_data, np.uint8) - img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) - - processed_img = img - - if operation == 'edge_detection': - processed_img = cv2.Canny(img, 100, 200) - elif operation == 'color_inversion': - processed_img = cv2.bitwise_not(img) - - # Save processed image to a temporary path - output_path = os.path.join(tempfile.gettempdir(), f'processed_image.png') - cv2.imwrite(output_path, processed_img) - - return output_path - -if __name__ == '__main__': - app.run(host='0.0.0.0', port=5000, debug=True) diff --git a/backend/Dockerfile b/backend/server/Dockerfile index ff49235..8601d37 100644 --- a/backend/Dockerfile +++ b/backend/server/Dockerfile @@ -1,7 +1,10 @@ # Use an official Python runtime as a parent image FROM python:3.9-slim -# Set the working directory to /app +# Install dependencies for OpenCV using apt-get +RUN apt-get update && apt-get install -y libgl1 libglib2.0-0 curl + +# Set the working directory WORKDIR /app # Copy the current directory contents into the container at /app @@ -13,6 +16,9 @@ RUN pip install --no-cache-dir -r requirements.txt # Make port 5000 available to the world outside this container EXPOSE 5000 +# Define environment variable +ENV FLASK_APP=app.py + # Run app.py when the container launches CMD ["python", "app.py"] diff --git a/backend/server/app.py b/backend/server/app.py new file mode 100644 index 0000000..1b4e80e --- /dev/null +++ b/backend/server/app.py @@ -0,0 +1,145 @@ +import pika +import json +from flask import Flask, request, jsonify +from flask_cors import CORS +import boto3 +import tempfile +import os +import base64 +import cv2 +import numpy as np + +app = Flask(__name__) +CORS(app) + +s3 = boto3.client('s3') +BUCKET_NAME_ORIGINAL = "original-images-allowing-griffon" +BUCKET_NAME_PROCESSED = "processed-images-allowing-griffon" + +def allowed_file(filename): + return '.' in filename and filename.rsplit('.', 1)[1].lower() in {'png', 'jpg', 'jpeg'} + +def split_image(image_data, num_parts): + img = cv2.imdecode(np.frombuffer(image_data, np.uint8), cv2.IMREAD_COLOR) + height, width, _ = img.shape + part_height = height // num_parts + parts = [] + + for i in range(num_parts): + part_img = img[i * part_height: (i + 1) * part_height if i != num_parts - 1 else height, :, :] + _, buffer = cv2.imencode('.png', part_img) + part_data = buffer.tobytes() + parts.append(part_data) + + return parts, width, height, part_height + +def publish_task(part_data, filename, part_num, operation, callback_queue): + connection = pika.BlockingConnection(pika.ConnectionParameters('<rabbit-mq-server-public-ip')) + channel = connection.channel() + channel.queue_declare(queue='image_tasks') + + task = { + 'part_data': base64.b64encode(part_data).decode('utf-8'), + 'filename': filename, + 'part_num': part_num, + 'operation': operation, + 'callback_queue': callback_queue + } + channel.basic_publish(exchange='', routing_key='image_tasks', body=json.dumps(task)) + connection.close() + print(f"Published task for part {part_num}") + +def merge_parts(filename, num_parts, width, height, part_height): + merged_img = np.zeros((height, width, 3), dtype=np.uint8) + + for i in range(num_parts): + part_key = f"{filename}_part_{i}" + part_obj = s3.get_object(Bucket=BUCKET_NAME_PROCESSED, Key=part_key) + part_data = part_obj['Body'].read() + part_img = cv2.imdecode(np.frombuffer(part_data, np.uint8), cv2.IMREAD_COLOR) + + if part_img is None: + print(f"Failed to decode part {i}") + continue + + start_row = i * part_height + end_row = (i + 1) * part_height if i != num_parts - 1 else height + merged_img[start_row:end_row, :, :] = part_img + + merged_filename = f"processed_{filename}" + _, buffer = cv2.imencode('.jpg', merged_img) + merged_data = buffer.tobytes() + + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_file.write(merged_data) + temp_file.seek(0) # Ensure the file pointer is at the beginning + s3.put_object(Bucket=BUCKET_NAME_PROCESSED, Key=merged_filename, Body=temp_file.read(), ContentType="image/jpg") + os.remove(temp_file.name) + + return merged_filename + + +@app.route('/health', methods=['GET']) +def health_check(): + return jsonify({'status': 'ok'}), 200 + +@app.route('/upload', methods=['POST']) +def upload_file(): + if 'image' not in request.files: + return jsonify({'error': 'No file provided'}), 400 + + file = request.files['image'] + operation = request.form.get('operation', 'edge_detection') + num_parts = int(request.form.get('num_parts', 8)) # Default to 4 parts + + if file and allowed_file(file.filename): + temp_file = tempfile.NamedTemporaryFile(delete=False) + file.save(temp_file.name) + + with open(temp_file.name, "rb") as img_data: + s3.put_object(Bucket=BUCKET_NAME_ORIGINAL, Key=file.filename, Body=img_data, ContentType="image/png") + + original_img_obj = s3.get_object(Bucket=BUCKET_NAME_ORIGINAL, Key=file.filename) + original_img_data = original_img_obj['Body'].read() + + parts, width, height, part_height = split_image(original_img_data, num_parts) + callback_queue = f"{file.filename}_callback" + + # Declare callback queue + connection = pika.BlockingConnection(pika.ConnectionParameters('<rabbit-mq-server-public-ip')) + channel = connection.channel() + channel.queue_declare(queue=callback_queue) + connection.close() + + for i, part_data in enumerate(parts): + publish_task(part_data, file.filename, i, operation, callback_queue) + + os.remove(temp_file.name) + + # Wait for completion notifications + def on_completion(ch, method, properties, body): + nonlocal num_parts_processed + num_parts_processed += 1 + print(f"Part {num_parts_processed} received") + if num_parts_processed == num_parts: + merged_filename = merge_parts(file.filename, num_parts, width, height, part_height) + processed_file_url = f'https://{BUCKET_NAME_PROCESSED}.s3.amazonaws.com/{merged_filename}' + ch.stop_consuming() + response_queue.put(processed_file_url) + + import queue + response_queue = queue.Queue() + num_parts_processed = 0 + + connection = pika.BlockingConnection(pika.ConnectionParameters('<rabbit-mq-server-public-ip')) + channel = connection.channel() + channel.basic_consume(queue=callback_queue, on_message_callback=on_completion, auto_ack=True) + channel.start_consuming() + + processed_file_url = response_queue.get() + return jsonify({'message': 'File processed and uploaded successfully', 'processed_file': processed_file_url}), 200 + else: + return jsonify({'error': 'Invalid file type'}), 400 + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=5000) diff --git a/backend/requirements.txt b/backend/server/requirements.txt index bf3d422..eb7d129 100644 --- a/backend/requirements.txt +++ b/backend/server/requirements.txt @@ -1,5 +1,7 @@ flask flask-cors boto3 -opencv-python-headless +opencv-python numpy +pika + diff --git a/backend/worker/Dockerfile b/backend/worker/Dockerfile new file mode 100644 index 0000000..8752e1c --- /dev/null +++ b/backend/worker/Dockerfile @@ -0,0 +1,17 @@ +# Use an official Python runtime as a parent image +FROM python:3.9-slim + +# Install dependencies for OpenCV using yum +RUN apt-get update && apt-get install -y libgl1 libglib2.0-0 + +# Set the working directory +WORKDIR /worker + +# Copy the current directory contents into the container at /app +COPY . /worker + +# Install any needed packages specified in requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +# Run worker.py when the container launches +CMD ["python", "worker.py"] diff --git a/backend/worker/requirements.txt b/backend/worker/requirements.txt new file mode 100644 index 0000000..0491cb0 --- /dev/null +++ b/backend/worker/requirements.txt @@ -0,0 +1,5 @@ +boto3 +opencv-python +numpy +pika + diff --git a/backend/worker/worker.py b/backend/worker/worker.py new file mode 100644 index 0000000..9a8f32b --- /dev/null +++ b/backend/worker/worker.py @@ -0,0 +1,129 @@ +import pika +import json +import boto3 +import cv2 +import numpy as np +import tempfile +import os +import base64 +import hashlib +import logging +import uuid + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +s3 = boto3.client('s3') +BUCKET_NAME_PROCESSED = "processed-images-allowing-griffon" + +def hash_image(image_data): + return hashlib.sha256(image_data).hexdigest() + +def process_image(part_data, operation): + try: + nparr = np.frombuffer(part_data, np.uint8) + img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) + + if img is None: + logger.error("Failed to decode image data.") + return None + + if operation == 'edge_detection': + processed_img = cv2.Canny(img, 100, 200) + elif operation == 'color_inversion': + processed_img = cv2.bitwise_not(img) + elif operation == 'grayscale': + processed_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + elif operation == 'blur': + processed_img = cv2.GaussianBlur(img, (15, 15), 0) + elif operation == 'sharpen': + kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]]) + processed_img = cv2.filter2D(img, -1, kernel) + elif operation == 'brightness_increase': + processed_img = cv2.convertScaleAbs(img, alpha=1.1, beta=30) + elif operation == 'contrast_increase': + lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) + l, a, b = cv2.split(lab) + l = cv2.equalizeHist(l) + lab = cv2.merge((l, a, b)) + processed_img = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) + elif operation == "sharpening": + kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]]) + result = cv2.filter2D(image, -1, kernel) + else: + logger.error(f"Unknown operation: {operation}") + return None + + unique_id = uuid.uuid4() + output_path = os.path.join(tempfile.gettempdir(), f'processed_image_part_{unique_id}.jpg') + success = cv2.imwrite(output_path, processed_img) + + if not success: + logger.error("Failed to write processed image to file.") + return None + + return output_path + except Exception as e: + logger.exception("Exception occurred during image processing.") + return None + +def compute_md5(file_path): + hash_md5 = hashlib.md5() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.digest() + +def callback(ch, method, properties, body): + task = json.loads(body) + part_data = base64.b64decode(task['part_data']) + filename = task['filename'] + part_num = task['part_num'] + operation = task['operation'] + callback_queue = task['callback_queue'] + + processed_part_path = process_image(part_data, operation) + + if processed_part_path is None or not os.path.exists(processed_part_path): + logger.error(f"Processed file {processed_part_path} does not exist.") + ch.basic_ack(delivery_tag=method.delivery_tag) + return + + try: + processed_filename = f"{filename}_part_{part_num}" + with open(processed_part_path, "rb") as processed_part_data: + file_content = processed_part_data.read() + + md5_hash = compute_md5(processed_part_path) + base64_md5_hash = base64.b64encode(md5_hash).decode('utf-8') + + s3.put_object(Bucket=BUCKET_NAME_PROCESSED, Key=processed_filename, Body=file_content, ContentType="image/jpg", ContentMD5=base64_md5_hash) + logger.info(f"Uploaded {processed_filename} to S3 bucket {BUCKET_NAME_PROCESSED}") + + if os.path.exists(processed_part_path): + os.remove(processed_part_path) + logger.info(f"Removed processed file {processed_part_path}") + + # Notify completion + connection = pika.BlockingConnection(pika.ConnectionParameters('<rabbitmq-public-ip')) + channel = connection.channel() + channel.basic_publish(exchange='', routing_key=callback_queue, body='Completed') + connection.close() + logger.info("Notification sent to callback queue.") + + except Exception as e: + logger.exception("Exception occurred during S3 upload or notification.") + + finally: + ch.basic_ack(delivery_tag=method.delivery_tag) + +def start_worker(): + connection = pika.BlockingConnection(pika.ConnectionParameters('<rabbitmq-public-ip')) + channel = connection.channel() + channel.queue_declare(queue='image_tasks') + channel.basic_consume(queue='image_tasks', on_message_callback=callback) + channel.start_consuming() + +if __name__ == '__main__': + start_worker() |
