For instance, document the additional functions shown below.
from os import W_OK, access
from pathlib import Path
import tempfile
from zipfile import is_zipfile
from otdf_python.go import Slice_string
from otdf_python.gotdf_python import EncryptionConfig, DecryptionConfig, TokenAuth, DecryptFilePE, EncryptFilePE
README_PATH = Path(__file__).parent / "README.md"
def get_jwt_for_test_pe() -> str:
# This is a dummy JWT token, used for demonstration purposes.
# In a real-world scenario, you should replace this with a valid JWT token.
return "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
def get_encrypt_config(data_attributes: list | None = None):
"""
The config object returned here can only be used for encryption.
"""
print("Preparing 'EncryptionConfig' object")
if isinstance(data_attributes, list):
# TODO: Implement this block, using the 'data_attributes' from the caller
da = Slice_string(data_attributes)
config: EncryptionConfig = EncryptionConfig(
ClientId="opentdf-sdk",
ClientSecret="secret",
PlatformEndpoint=platformEndpoint,
TokenEndpoint="http://localhost:8888/auth/realms/opentdf/protocol/openid-connect/token",
KasUrl=f"http://{platformEndpoint}/kas",
# FIXME: Be careful with binding the 'DataAttributes' field on this struct.
#
# In golang, this is initialized as []string , but passing
# DataAttributes=None, or DataAttributes=[] from Python will fail.
DataAttributes=da,
)
else:
config: EncryptionConfig = EncryptionConfig(
ClientId=testing_credentials.TDF_NPE_CLIENT,
ClientSecret=testing_credentials.TDF_NPE_CLIENT_SECRET,
PlatformEndpoint=testing_credentials.PLATFORM_ENDPOINT,
TokenEndpoint=testing_credentials.OIDC_AUTH_URL,
KasUrl=testing_credentials.KAS_URL,
)
# NOTE: Structs from golang can be printed, like below
# print(config)
print("Returning 'EncryptionConfig'")
return config
def get_decrypt_config():
"""
The config object returned here can only be used for decryption.
"""
config: DecryptionConfig = DecryptionConfig(
ClientId="opentdf-sdk",
ClientSecret="secret",
PlatformEndpoint=platformEndpoint,
TokenEndpoint="http://localhost:8888/auth/realms/opentdf/protocol/openid-connect/token",
KasUrl=f"http://{platformEndpoint}/kas",
# FIXME: Be careful with binding the 'DataAttributes' field on this struct.
#
# In golang, this is initialized as []string , but passing
# DataAttributes=None, or DataAttributes=[] from Python will fail.
# DataAttributes=...
)
# NOTE: Structs from golang can be printed, like below
# This should print a string like
# gotdf_python.EncryptionConfig{ClientId=opentdf-sdk, ClientSecret=secret, KasUrl=http://localhost:8080/kas, PlatformEndpoint=localhost:8080, TokenEndpoint=http://localhost:8888/auth/realms/opentdf/protocol/openid-connect/token, handle=1}
# print(config)
return config
def make_decrypt_config_and_pe_token(access_token: str):
"""
The config object returned here can only be used for decryption.
"""
config = get_decrypt_config()
print(
f"Config (gopy): {config.ClientId=}, {config.KasUrl=}, {config.PlatformEndpoint=}, {config.TokenEndpoint=} "
)
print("Instantiating 'TokenAuth' (gopy) object.")
token_auth: TokenAuth = TokenAuth(
AccessToken=access_token, NpeClientId=config.ClientId
)
print(
f"TokenAuth (gopy): {token_auth.NpeClientId=}, {token_auth.AccessToken=} "
)
return token_auth, config
def make_encrypt_config_and_pe_token(access_token: str):
"""
The config object returned here can only be used for encryption.
"""
config = get_encrypt_config()
token_auth: TokenAuth = TokenAuth(
AccessToken=access_token, NpeClientId="opentdf-sdk"
)
return token_auth, config
def test_pe_e2e_file_nil_attributes():
print("Testing PE file encryption, via 'gotdf_python'")
with tempfile.TemporaryDirectory() as tmpDir:
print("Created temporary directory", tmpDir)
SOME_ENCRYPTED_FILE = Path(tmpDir) / "some-file.tdf"
if SOME_ENCRYPTED_FILE.exists():
SOME_ENCRYPTED_FILE.unlink()
if SOME_ENCRYPTED_FILE.exists():
raise ValueError(
"The output path should not exist before calling 'EncryptFile()'."
)
access_token = get_jwt_for_test_pe()
(token_auth, config) = make_encrypt_config_and_pe_token(
access_token=access_token
)
outputFilePath = EncryptFilePE(
inputFilePath=str(README_PATH),
outputFilePath=str(SOME_ENCRYPTED_FILE),
config=config,
token=token_auth,
)
print(f"The output file was written to destination path: {outputFilePath}")
if not SOME_ENCRYPTED_FILE.exists():
raise ValueError("The output file does not exist!")
if not (
SOME_ENCRYPTED_FILE.stat().st_size > 2500
and is_zipfile(SOME_ENCRYPTED_FILE)
):
raise ValueError("The output file has unexpected content!")
# This file shouldn't exist yet, we will create it
# by calling 'DecryptFilePE()'
decrypted_file_output = Path(tmpDir) / "output-file.md"
assert not decrypted_file_output.exists()
access_token = get_jwt_for_test_pe()
(decrypt_token_auth, decrypt_config) = make_decrypt_config_and_pe_token(
access_token=access_token
)
if access(decrypted_file_output.parent, W_OK):
decrypt_output_file = DecryptFilePE(
inputFilePath=outputFilePath,
outputFilePath=str(decrypted_file_output),
config=decrypt_config,
token=decrypt_token_auth,
)
else:
raise RuntimeError(
f"The destination path {decrypted_file_output=} is not writable"
)
decrypt_output_file_path = Path(decrypt_output_file)
assert decrypt_output_file_path.exists()
For instance, document the additional functions shown below.