aboutsummaryrefslogtreecommitdiff
path: root/backend
diff options
context:
space:
mode:
Diffstat (limited to 'backend')
-rw-r--r--backend/app.py77
-rw-r--r--backend/server/Dockerfile (renamed from backend/Dockerfile)8
-rw-r--r--backend/server/app.py145
-rw-r--r--backend/server/requirements.txt (renamed from backend/requirements.txt)4
-rw-r--r--backend/worker/Dockerfile17
-rw-r--r--backend/worker/requirements.txt5
-rw-r--r--backend/worker/worker.py129
7 files changed, 306 insertions, 79 deletions
diff --git a/backend/app.py b/backend/app.py
deleted file mode 100644
index 4ed4954..0000000
--- a/backend/app.py
+++ /dev/null
@@ -1,77 +0,0 @@
-import os
-from flask import Flask, request, jsonify
-from flask_cors import CORS
-import boto3
-import cv2
-import tempfile
-import numpy as np
-
-app = Flask(__name__)
-CORS(app)
-
-# Configure AWS S3
-s3 = boto3.client('s3')
-BUCKET_NAME_ORIGINAL = "original-images-rapid-macaw"
-BUCKET_NAME_PROCESSED = "processed-images-rapid-macaw"
-
-@app.route('/upload', methods=['POST'])
-def upload_file():
- if 'image' not in request.files:
- return jsonify({'error': 'No file provided'}), 400
-
- file = request.files['image']
- operation = request.form.get('operation', 'edge_detection') # Default to edge detection
- if file and allowed_file(file.filename):
- # Save the file temporarily
- temp_file = tempfile.NamedTemporaryFile(delete=False)
- file.save(temp_file.name)
-
- # Upload to S3 original bucket
- with open(temp_file.name, "rb") as img_data:
- s3.put_object(Bucket=BUCKET_NAME_ORIGINAL, Key=file.filename, Body=img_data, ContentType="image/png")
-
- # Fetch the image from the original bucket
- original_img_obj = s3.get_object(Bucket=BUCKET_NAME_ORIGINAL, Key=file.filename)
- original_img_data = original_img_obj['Body'].read()
-
- # Process the image
- processed_image_path = process_image(original_img_data, operation)
-
- # Upload processed image to S3 processed bucket
- processed_filename = f"processed_{file.filename}"
- with open(processed_image_path, "rb") as processed_img_data:
- s3.put_object(Bucket=BUCKET_NAME_PROCESSED, Key=processed_filename, Body=processed_img_data, ContentType="image/png")
-
- # Clean up temporary files
- os.remove(temp_file.name)
- os.remove(processed_image_path)
-
- processed_file_url = f'https://{BUCKET_NAME_PROCESSED}.s3.amazonaws.com/{processed_filename}'
-
- return jsonify({'message': 'File processed and uploaded successfully', 'processed_file': processed_file_url}), 200
- else:
- return jsonify({'error': 'Invalid file type'}), 400
-
-def allowed_file(filename):
- return '.' in filename and filename.rsplit('.', 1)[1].lower() in {'png', 'jpg', 'jpeg'}
-
-def process_image(image_data, operation):
- # Convert image data to numpy array
- nparr = np.frombuffer(image_data, np.uint8)
- img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
-
- processed_img = img
-
- if operation == 'edge_detection':
- processed_img = cv2.Canny(img, 100, 200)
- elif operation == 'color_inversion':
- processed_img = cv2.bitwise_not(img)
-
- # Save processed image to a temporary path
- output_path = os.path.join(tempfile.gettempdir(), f'processed_image.png')
- cv2.imwrite(output_path, processed_img)
-
- return output_path
-
-if __name__ == '__main__':
- app.run(host='0.0.0.0', port=5000, debug=True)
diff --git a/backend/Dockerfile b/backend/server/Dockerfile
index ff49235..8601d37 100644
--- a/backend/Dockerfile
+++ b/backend/server/Dockerfile
@@ -1,7 +1,10 @@
# Use an official Python runtime as a parent image
FROM python:3.9-slim
-# Set the working directory to /app
+# Install dependencies for OpenCV using apt-get
+RUN apt-get update && apt-get install -y libgl1 libglib2.0-0 curl
+
+# Set the working directory
WORKDIR /app
# Copy the current directory contents into the container at /app
@@ -13,6 +16,9 @@ RUN pip install --no-cache-dir -r requirements.txt
# Make port 5000 available to the world outside this container
EXPOSE 5000
+# Define environment variable
+ENV FLASK_APP=app.py
+
# Run app.py when the container launches
CMD ["python", "app.py"]
diff --git a/backend/server/app.py b/backend/server/app.py
new file mode 100644
index 0000000..1b4e80e
--- /dev/null
+++ b/backend/server/app.py
@@ -0,0 +1,145 @@
+import pika
+import json
+from flask import Flask, request, jsonify
+from flask_cors import CORS
+import boto3
+import tempfile
+import os
+import base64
+import cv2
+import numpy as np
+
+app = Flask(__name__)
+CORS(app)
+
+s3 = boto3.client('s3')
+BUCKET_NAME_ORIGINAL = "original-images-allowing-griffon"
+BUCKET_NAME_PROCESSED = "processed-images-allowing-griffon"
+
+def allowed_file(filename):
+ return '.' in filename and filename.rsplit('.', 1)[1].lower() in {'png', 'jpg', 'jpeg'}
+
+def split_image(image_data, num_parts):
+ img = cv2.imdecode(np.frombuffer(image_data, np.uint8), cv2.IMREAD_COLOR)
+ height, width, _ = img.shape
+ part_height = height // num_parts
+ parts = []
+
+ for i in range(num_parts):
+ part_img = img[i * part_height: (i + 1) * part_height if i != num_parts - 1 else height, :, :]
+ _, buffer = cv2.imencode('.png', part_img)
+ part_data = buffer.tobytes()
+ parts.append(part_data)
+
+ return parts, width, height, part_height
+
+def publish_task(part_data, filename, part_num, operation, callback_queue):
+ connection = pika.BlockingConnection(pika.ConnectionParameters('<rabbit-mq-server-public-ip'))
+ channel = connection.channel()
+ channel.queue_declare(queue='image_tasks')
+
+ task = {
+ 'part_data': base64.b64encode(part_data).decode('utf-8'),
+ 'filename': filename,
+ 'part_num': part_num,
+ 'operation': operation,
+ 'callback_queue': callback_queue
+ }
+ channel.basic_publish(exchange='', routing_key='image_tasks', body=json.dumps(task))
+ connection.close()
+ print(f"Published task for part {part_num}")
+
+def merge_parts(filename, num_parts, width, height, part_height):
+ merged_img = np.zeros((height, width, 3), dtype=np.uint8)
+
+ for i in range(num_parts):
+ part_key = f"{filename}_part_{i}"
+ part_obj = s3.get_object(Bucket=BUCKET_NAME_PROCESSED, Key=part_key)
+ part_data = part_obj['Body'].read()
+ part_img = cv2.imdecode(np.frombuffer(part_data, np.uint8), cv2.IMREAD_COLOR)
+
+ if part_img is None:
+ print(f"Failed to decode part {i}")
+ continue
+
+ start_row = i * part_height
+ end_row = (i + 1) * part_height if i != num_parts - 1 else height
+ merged_img[start_row:end_row, :, :] = part_img
+
+ merged_filename = f"processed_{filename}"
+ _, buffer = cv2.imencode('.jpg', merged_img)
+ merged_data = buffer.tobytes()
+
+ with tempfile.NamedTemporaryFile(delete=False) as temp_file:
+ temp_file.write(merged_data)
+ temp_file.seek(0) # Ensure the file pointer is at the beginning
+ s3.put_object(Bucket=BUCKET_NAME_PROCESSED, Key=merged_filename, Body=temp_file.read(), ContentType="image/jpg")
+ os.remove(temp_file.name)
+
+ return merged_filename
+
+
+@app.route('/health', methods=['GET'])
+def health_check():
+ return jsonify({'status': 'ok'}), 200
+
+@app.route('/upload', methods=['POST'])
+def upload_file():
+ if 'image' not in request.files:
+ return jsonify({'error': 'No file provided'}), 400
+
+ file = request.files['image']
+ operation = request.form.get('operation', 'edge_detection')
+ num_parts = int(request.form.get('num_parts', 8)) # Default to 4 parts
+
+ if file and allowed_file(file.filename):
+ temp_file = tempfile.NamedTemporaryFile(delete=False)
+ file.save(temp_file.name)
+
+ with open(temp_file.name, "rb") as img_data:
+ s3.put_object(Bucket=BUCKET_NAME_ORIGINAL, Key=file.filename, Body=img_data, ContentType="image/png")
+
+ original_img_obj = s3.get_object(Bucket=BUCKET_NAME_ORIGINAL, Key=file.filename)
+ original_img_data = original_img_obj['Body'].read()
+
+ parts, width, height, part_height = split_image(original_img_data, num_parts)
+ callback_queue = f"{file.filename}_callback"
+
+ # Declare callback queue
+ connection = pika.BlockingConnection(pika.ConnectionParameters('<rabbit-mq-server-public-ip'))
+ channel = connection.channel()
+ channel.queue_declare(queue=callback_queue)
+ connection.close()
+
+ for i, part_data in enumerate(parts):
+ publish_task(part_data, file.filename, i, operation, callback_queue)
+
+ os.remove(temp_file.name)
+
+ # Wait for completion notifications
+ def on_completion(ch, method, properties, body):
+ nonlocal num_parts_processed
+ num_parts_processed += 1
+ print(f"Part {num_parts_processed} received")
+ if num_parts_processed == num_parts:
+ merged_filename = merge_parts(file.filename, num_parts, width, height, part_height)
+ processed_file_url = f'https://{BUCKET_NAME_PROCESSED}.s3.amazonaws.com/{merged_filename}'
+ ch.stop_consuming()
+ response_queue.put(processed_file_url)
+
+ import queue
+ response_queue = queue.Queue()
+ num_parts_processed = 0
+
+ connection = pika.BlockingConnection(pika.ConnectionParameters('<rabbit-mq-server-public-ip'))
+ channel = connection.channel()
+ channel.basic_consume(queue=callback_queue, on_message_callback=on_completion, auto_ack=True)
+ channel.start_consuming()
+
+ processed_file_url = response_queue.get()
+ return jsonify({'message': 'File processed and uploaded successfully', 'processed_file': processed_file_url}), 200
+ else:
+ return jsonify({'error': 'Invalid file type'}), 400
+
+if __name__ == '__main__':
+ app.run(host='0.0.0.0', port=5000)
diff --git a/backend/requirements.txt b/backend/server/requirements.txt
index bf3d422..eb7d129 100644
--- a/backend/requirements.txt
+++ b/backend/server/requirements.txt
@@ -1,5 +1,7 @@
flask
flask-cors
boto3
-opencv-python-headless
+opencv-python
numpy
+pika
+
diff --git a/backend/worker/Dockerfile b/backend/worker/Dockerfile
new file mode 100644
index 0000000..8752e1c
--- /dev/null
+++ b/backend/worker/Dockerfile
@@ -0,0 +1,17 @@
+# Use an official Python runtime as a parent image
+FROM python:3.9-slim
+
+# Install dependencies for OpenCV using yum
+RUN apt-get update && apt-get install -y libgl1 libglib2.0-0
+
+# Set the working directory
+WORKDIR /worker
+
+# Copy the current directory contents into the container at /app
+COPY . /worker
+
+# Install any needed packages specified in requirements.txt
+RUN pip install --no-cache-dir -r requirements.txt
+
+# Run worker.py when the container launches
+CMD ["python", "worker.py"]
diff --git a/backend/worker/requirements.txt b/backend/worker/requirements.txt
new file mode 100644
index 0000000..0491cb0
--- /dev/null
+++ b/backend/worker/requirements.txt
@@ -0,0 +1,5 @@
+boto3
+opencv-python
+numpy
+pika
+
diff --git a/backend/worker/worker.py b/backend/worker/worker.py
new file mode 100644
index 0000000..9a8f32b
--- /dev/null
+++ b/backend/worker/worker.py
@@ -0,0 +1,129 @@
+import pika
+import json
+import boto3
+import cv2
+import numpy as np
+import tempfile
+import os
+import base64
+import hashlib
+import logging
+import uuid
+
+# Configure logging
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+s3 = boto3.client('s3')
+BUCKET_NAME_PROCESSED = "processed-images-allowing-griffon"
+
+def hash_image(image_data):
+ return hashlib.sha256(image_data).hexdigest()
+
+def process_image(part_data, operation):
+ try:
+ nparr = np.frombuffer(part_data, np.uint8)
+ img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
+
+ if img is None:
+ logger.error("Failed to decode image data.")
+ return None
+
+ if operation == 'edge_detection':
+ processed_img = cv2.Canny(img, 100, 200)
+ elif operation == 'color_inversion':
+ processed_img = cv2.bitwise_not(img)
+ elif operation == 'grayscale':
+ processed_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
+ elif operation == 'blur':
+ processed_img = cv2.GaussianBlur(img, (15, 15), 0)
+ elif operation == 'sharpen':
+ kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]])
+ processed_img = cv2.filter2D(img, -1, kernel)
+ elif operation == 'brightness_increase':
+ processed_img = cv2.convertScaleAbs(img, alpha=1.1, beta=30)
+ elif operation == 'contrast_increase':
+ lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
+ l, a, b = cv2.split(lab)
+ l = cv2.equalizeHist(l)
+ lab = cv2.merge((l, a, b))
+ processed_img = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
+ elif operation == "sharpening":
+ kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]])
+ result = cv2.filter2D(image, -1, kernel)
+ else:
+ logger.error(f"Unknown operation: {operation}")
+ return None
+
+ unique_id = uuid.uuid4()
+ output_path = os.path.join(tempfile.gettempdir(), f'processed_image_part_{unique_id}.jpg')
+ success = cv2.imwrite(output_path, processed_img)
+
+ if not success:
+ logger.error("Failed to write processed image to file.")
+ return None
+
+ return output_path
+ except Exception as e:
+ logger.exception("Exception occurred during image processing.")
+ return None
+
+def compute_md5(file_path):
+ hash_md5 = hashlib.md5()
+ with open(file_path, "rb") as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ hash_md5.update(chunk)
+ return hash_md5.digest()
+
+def callback(ch, method, properties, body):
+ task = json.loads(body)
+ part_data = base64.b64decode(task['part_data'])
+ filename = task['filename']
+ part_num = task['part_num']
+ operation = task['operation']
+ callback_queue = task['callback_queue']
+
+ processed_part_path = process_image(part_data, operation)
+
+ if processed_part_path is None or not os.path.exists(processed_part_path):
+ logger.error(f"Processed file {processed_part_path} does not exist.")
+ ch.basic_ack(delivery_tag=method.delivery_tag)
+ return
+
+ try:
+ processed_filename = f"{filename}_part_{part_num}"
+ with open(processed_part_path, "rb") as processed_part_data:
+ file_content = processed_part_data.read()
+
+ md5_hash = compute_md5(processed_part_path)
+ base64_md5_hash = base64.b64encode(md5_hash).decode('utf-8')
+
+ s3.put_object(Bucket=BUCKET_NAME_PROCESSED, Key=processed_filename, Body=file_content, ContentType="image/jpg", ContentMD5=base64_md5_hash)
+ logger.info(f"Uploaded {processed_filename} to S3 bucket {BUCKET_NAME_PROCESSED}")
+
+ if os.path.exists(processed_part_path):
+ os.remove(processed_part_path)
+ logger.info(f"Removed processed file {processed_part_path}")
+
+ # Notify completion
+ connection = pika.BlockingConnection(pika.ConnectionParameters('<rabbitmq-public-ip'))
+ channel = connection.channel()
+ channel.basic_publish(exchange='', routing_key=callback_queue, body='Completed')
+ connection.close()
+ logger.info("Notification sent to callback queue.")
+
+ except Exception as e:
+ logger.exception("Exception occurred during S3 upload or notification.")
+
+ finally:
+ ch.basic_ack(delivery_tag=method.delivery_tag)
+
+def start_worker():
+ connection = pika.BlockingConnection(pika.ConnectionParameters('<rabbitmq-public-ip'))
+ channel = connection.channel()
+ channel.queue_declare(queue='image_tasks')
+ channel.basic_consume(queue='image_tasks', on_message_callback=callback)
+ channel.start_consuming()
+
+if __name__ == '__main__':
+ start_worker()