1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
import pika
import json
import boto3
import cv2
import numpy as np
import tempfile
import os
import base64
import hashlib
import logging
import uuid
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
s3 = boto3.client('s3')
BUCKET_NAME_PROCESSED = "processed-images-allowing-griffon"
def hash_image(image_data):
return hashlib.sha256(image_data).hexdigest()
def process_image(part_data, operation):
try:
nparr = np.frombuffer(part_data, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
logger.error("Failed to decode image data.")
return None
if operation == 'edge_detection':
processed_img = cv2.Canny(img, 100, 200)
elif operation == 'color_inversion':
processed_img = cv2.bitwise_not(img)
elif operation == 'grayscale':
processed_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
elif operation == 'blur':
processed_img = cv2.GaussianBlur(img, (15, 15), 0)
elif operation == 'sharpen':
kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]])
processed_img = cv2.filter2D(img, -1, kernel)
elif operation == 'brightness_increase':
processed_img = cv2.convertScaleAbs(img, alpha=1.1, beta=30)
elif operation == 'contrast_increase':
lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
l = cv2.equalizeHist(l)
lab = cv2.merge((l, a, b))
processed_img = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
elif operation == "sharpening":
kernel = np.array([[-1, -1, -1], [-1, 9, -1], [-1, -1, -1]])
result = cv2.filter2D(image, -1, kernel)
else:
logger.error(f"Unknown operation: {operation}")
return None
unique_id = uuid.uuid4()
output_path = os.path.join(tempfile.gettempdir(), f'processed_image_part_{unique_id}.jpg')
success = cv2.imwrite(output_path, processed_img)
if not success:
logger.error("Failed to write processed image to file.")
return None
return output_path
except Exception as e:
logger.exception("Exception occurred during image processing.")
return None
def compute_md5(file_path):
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.digest()
def callback(ch, method, properties, body):
task = json.loads(body)
part_data = base64.b64decode(task['part_data'])
filename = task['filename']
part_num = task['part_num']
operation = task['operation']
callback_queue = task['callback_queue']
processed_part_path = process_image(part_data, operation)
if processed_part_path is None or not os.path.exists(processed_part_path):
logger.error(f"Processed file {processed_part_path} does not exist.")
ch.basic_ack(delivery_tag=method.delivery_tag)
return
try:
processed_filename = f"{filename}_part_{part_num}"
with open(processed_part_path, "rb") as processed_part_data:
file_content = processed_part_data.read()
md5_hash = compute_md5(processed_part_path)
base64_md5_hash = base64.b64encode(md5_hash).decode('utf-8')
s3.put_object(Bucket=BUCKET_NAME_PROCESSED, Key=processed_filename, Body=file_content, ContentType="image/jpg", ContentMD5=base64_md5_hash)
logger.info(f"Uploaded {processed_filename} to S3 bucket {BUCKET_NAME_PROCESSED}")
if os.path.exists(processed_part_path):
os.remove(processed_part_path)
logger.info(f"Removed processed file {processed_part_path}")
# Notify completion
connection = pika.BlockingConnection(pika.ConnectionParameters('<rabbitmq-public-ip'))
channel = connection.channel()
channel.basic_publish(exchange='', routing_key=callback_queue, body='Completed')
connection.close()
logger.info("Notification sent to callback queue.")
except Exception as e:
logger.exception("Exception occurred during S3 upload or notification.")
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
def start_worker():
connection = pika.BlockingConnection(pika.ConnectionParameters('<rabbitmq-public-ip'))
channel = connection.channel()
channel.queue_declare(queue='image_tasks')
channel.basic_consume(queue='image_tasks', on_message_callback=callback)
channel.start_consuming()
if __name__ == '__main__':
start_worker()
|