Open LevGun opened 2 weeks ago
It might be useful but not for all scenarios. I compared two method with exact last quality and last dimension for the updated compression and the original compression with 1mb photos to 15kb. the results are not significant even though I gave the exact numbers for the compression of the same photo but maybe I'm missing something. Sharing the results and codes.
gokselozgur@admins-MacBook-Air-2 skycompress % python3 compress_image.py
2024-06-25 17:28:19,059 - compress_image - INFO - image compression took 0.04838116595055908 seconds
2024-06-25 17:28:19,059 - compress_image - INFO - Compressed image saved as compressed_frame.jpg 11 0.20500000000000002
gokselozgur@admins-MacBook-Air-2 skycompress % python3 2_compress_image.py
2024-06-25 17:28:25,620 - 2_compress_image - INFO - image compression took 0.03800662502180785 seconds
2024-06-25 17:28:25,620 - 2_compress_image - INFO - Compression completed and image saved as compressed_frame.jpg
Skycompress looking for best fit in terms of width/height and quality to compress. It uses binary search algorithm to find best ratio between dimension and quality during compression. So the iteration not so much. For instance, the iteration number is 6 for 1 mb photo compression
Prepared a compare script that runs both scripts 100 times and generate a plot with their time
Here is the graph and compare script
compare.py
import time
import subprocess
import matplotlib.pyplot as plt
def time_script(script_name):
start_time = time.perf_counter()
subprocess.run(['python3', script_name], check=True)
end_time = time.perf_counter()
return end_time - start_time
def run_and_collect_data(script_name, num_runs=100):
times = []
for _ in range(num_runs):
duration = time_script(script_name)
times.append(duration)
return times
# Run the scripts and collect data
script1_times = run_and_collect_data('compress_image.py')
script2_times = run_and_collect_data('2_compress_image.py')
# Plot the results
plt.figure(figsize=(12, 6))
plt.plot(script1_times, label='Script 1')
plt.plot(script2_times, label='Script 2')
plt.xlabel('Run')
plt.ylabel('Time (seconds)')
plt.title('Performance Comparison of Script 1 and Script 2')
plt.legend()
plt.show()
original compression
import logging
import time
from pathlib import Path
import cv2 # type: ignore
import numpy as np # type: ignore
import numpy.typing as npt # type: ignore
LOGGER = logging.getLogger(__name__)
LOGGER = logging.getLogger(Path(__file__).resolve().stem)
LOGGER.setLevel(logging.INFO) # for debugging set logging to DEBUG
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logging.getLogger().addHandler(ch)
best_quality = 100
best_dimension = 1.0
count = 0
def compress_image(original_img: npt.NDArray[np.uint8], byte_limit: int) -> bytearray:
global count
"""
Function to compress the image to a set number of bytes
Inputs
original_img = RGB image to be compressed, as a numpy array
byte_limit = Int defining max number of bytes that output image should be
Outputs
jpeg_data = Compressed image, encoded as a jpeg format byte array
"""
# Function variables
start_time = time.perf_counter()
byte_limit = byte_limit # 340 for Iridium, 3800 for FiPy
jpeg_quality = 100
# Save the initial image chip for a size on disk reference
_, jpeg_data = cv2.imencode('.jpg', original_img, [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality])
jpeg_data = bytearray(jpeg_data)
out_image_size = len(jpeg_data)
min_quality, max_quality = 0, 100
min_dimension, max_dimension = 0.1, 1.0
global best_quality, best_dimension
best_quality = min_quality
best_dimension = min_dimension
try:
while min_quality <= max_quality and min_dimension <= max_dimension:
count += 1
# tune the mid quality option to balance what ratio you want quality & dimension
mid_quality = (min_quality + max_quality) // 2
# tune the mid quality option to balance what ratio you want quality / dimension
mid_dimension = (min_dimension + max_dimension) / 2
new_img = cv2.resize(original_img, (0, 0), fx=mid_dimension, fy=mid_dimension)
_, jpeg_data = cv2.imencode('.jpg', new_img, [int(cv2.IMWRITE_JPEG_QUALITY), mid_quality])
out_image_size = len(bytearray(jpeg_data))
if out_image_size == byte_limit:
return jpeg_data # Exit early if we've hit the byte limit exactly
elif out_image_size < byte_limit:
if mid_quality > best_quality:
best_quality = mid_quality
best_dimension = mid_dimension
min_quality = mid_quality + 1
min_dimension = mid_dimension + 0.01
else:
max_quality = mid_quality - 1
max_dimension = mid_dimension - 0.01
# Recreate the image with the best parameters found
best_img = cv2.resize(original_img, (0, 0), fx=best_dimension, fy=best_dimension)
_, best_jpeg_data = cv2.imencode('.jpg', best_img, [int(cv2.IMWRITE_JPEG_QUALITY), best_quality])
end_time = time.perf_counter()
LOGGER.info(f'image compression took {end_time - start_time} seconds')
return best_jpeg_data
except Exception as e:
LOGGER.warning(f'Failed to compress: \n {e}')
return bytearray(b'')
def main():
# Read the image
image_path = 'frame.jpg'
original_img = cv2.imread(image_path)
# Compress the image
byte_limit = 15000 # Set your byte limit here
compressed_image_data = compress_image(original_img, byte_limit)
# Save the compressed image
with open('compressed_frame.jpg', 'wb') as f:
f.write(compressed_image_data)
LOGGER.info(f'Compressed image saved as compressed_frame.jpg {best_quality} {best_dimension} {count}')
if __name__ == '__main__':
main()
Updated one
import logging
import time
from pathlib import Path
import cv2 # type: ignore
import numpy as np # type: ignore
import numpy.typing as npt # type: ignore
# Configure logging
LOGGER = logging.getLogger(__name__)
LOGGER = logging.getLogger(Path(__file__).resolve().stem)
LOGGER.setLevel(logging.INFO) # for debugging set logging to DEBUG
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
logging.getLogger().addHandler(ch)
# Initialize variables for last quality and dimension chosen
last_quality_chosen = 11
last_dimension_chosen = 0.2
count = 0
def compress_image(original_img: npt.NDArray[np.float64], byte_limit: int, start_quality: int = None, start_dimension: float = None) -> bytearray:
"""
Function to compress the image to a set number of bytes
Inputs:
- original_img: RGB image to be compressed, as a numpy array
- byte_limit: Int defining max number of bytes that output image should be
- start_quality: Initial quality setting for compression (optional)
- start_dimension: Initial dimension scaling factor (optional)
Outputs:
- jpeg_data: Compressed image, encoded as a jpeg format byte array
"""
global last_quality_chosen, last_dimension_chosen, count
# Function variables
start_time = time.perf_counter()
jpeg_quality = start_quality if start_quality is not None else last_quality_chosen
dimension_scaling = start_dimension if start_dimension is not None else last_dimension_chosen
# Initial compression attempt
new_img = cv2.resize(original_img, (0, 0), fx=dimension_scaling, fy=dimension_scaling)
_, jpeg_data = cv2.imencode('.jpg', new_img, [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality])
jpeg_data = bytearray(jpeg_data)
out_image_size = len(jpeg_data)
min_quality, max_quality = 0, 100
min_dimension, max_dimension = 0.1, 1.0
best_quality = min_quality
best_dimension = min_dimension
tolerance = 50 # Set a tolerance level for early termination
try:
while min_quality <= max_quality and min_dimension <= max_dimension:
count += 1
mid_quality = (min_quality + max_quality) // 2
mid_dimension = (min_dimension + max_dimension) / 2
new_img = cv2.resize(original_img, (0, 0), fx=mid_dimension, fy=mid_dimension)
_, jpeg_data = cv2.imencode('.jpg', new_img, [int(cv2.IMWRITE_JPEG_QUALITY), mid_quality])
out_image_size = len(bytearray(jpeg_data))
if abs(out_image_size - byte_limit) <= tolerance: # Early termination condition
last_quality_chosen = mid_quality
last_dimension_chosen = mid_dimension
return jpeg_data
elif out_image_size < byte_limit:
if mid_quality > best_quality:
best_quality = mid_quality
best_dimension = mid_dimension
min_quality = mid_quality + 1
min_dimension = mid_dimension + 0.01
else:
max_quality = mid_quality - 1
max_dimension = mid_dimension - 0.01
best_img = cv2.resize(original_img, (0, 0), fx=best_dimension, fy=best_dimension)
_, best_jpeg_data = cv2.imencode('.jpg', best_img, [int(cv2.IMWRITE_JPEG_QUALITY), best_quality])
end_time = time.perf_counter()
LOGGER.info(f'image compression took {end_time - start_time} seconds')
last_quality_chosen = best_quality
last_dimension_chosen = best_dimension
return best_jpeg_data
except Exception as e:
LOGGER.warning(f'Failed to compress: \n {e}')
return bytearray(b'')
def main():
# Define the byte limit for the compressed image
byte_limit = 15000 # Example byte limit, adjust as needed
# Read the original image
original_img = cv2.imread('frame.jpg')
if original_img is None:
LOGGER.error('Failed to load image')
return
# Compress the image
compressed_img_data = compress_image(original_img, byte_limit)
# Save the compressed image
with open('compressed_frame.jpg', 'wb') as f:
f.write(compressed_img_data)
LOGGER.info(f'Compression completed and image saved as compressed_frame.jpg {count}')
if __name__ == '__main__':
main()
For every new image being compressed, SkyCompress iterates from the start until it finds the optimal values. Assuming that the optimal values change very little over time, it may be beneficial to add the option to reuse the values and thus decrease compression times significantly.
As concrete changes to allow this, I would recommend:
last_quality_chosen
andlast_dimension_chosen
and update these every timecompress_image(...)
is run.compress_image(...)
calledstart_quality
andstart_dimension
, where the above module variables can be given. The default values for these parameters would be the start values being used at present.