import asyncio import os from dotenv import load_dotenv from adapters.s3_adapter import S3Adapter async def test_s3(): load_dotenv() endpoint = os.getenv("MINIO_ENDPOINT", "http://localhost:9000") access_key = os.getenv("MINIO_ACCESS_KEY") secret_key = os.getenv("MINIO_SECRET_KEY") bucket = os.getenv("MINIO_BUCKET") print(f"Connecting to {endpoint}, bucket: {bucket}") s3 = S3Adapter(endpoint, access_key, secret_key, bucket) test_filename = "test_connection.txt" test_data = b"Hello MinIO!" print("Uploading...") success = await s3.upload_file(test_filename, test_data) if success: print("Upload successful!") else: print("Upload failed!") return print("Downloading...") data = await s3.get_file(test_filename) if data == test_data: print("Download successful and data matches!") else: print(f"Download mismatch: {data}") print("Deleting...") deleted = await s3.delete_file(test_filename) if deleted: print("Delete successful!") else: print("Delete failed!") if __name__ == "__main__": asyncio.run(test_s3())