Open tuan-h opened 3 months ago
Your approach of subclassing AthenaAccessKeyProfileMapping
to call sts
for temporary credentials is viable and aligns with the existing mechanisms in the Cosmos repository. The provided code already includes a method _get_temporary_credentials
that retrieves temporary short-lived credentials using the AwsGenericHook
from Airflow. This method is used within the profile
property to set the aws_access_key_id
, aws_secret_access_key
, and aws_session_token
.
Here is the relevant part of the code that confirms this approach:
class AthenaAccessKeyProfileMapping(BaseProfileMapping):
# ... other parts of the class ...
@property
def profile(self) -> dict[str, Any | None]:
"Gets profile. The password is stored in an environment variable."
self.temporary_credentials = self._get_temporary_credentials() # type: ignore
profile = {
**self.mapped_params,
**self.profile_args,
"aws_access_key_id": self.temporary_credentials.access_key,
"aws_secret_access_key": self.get_env_var_format("aws_secret_access_key"),
"aws_session_token": self.get_env_var_format("aws_session_token"),
}
return self.filter_null(profile)
def _get_temporary_credentials(self): # type: ignore
"""
Helper function to retrieve temporary short lived credentials
Returns an object including access_key, secret_key and token
"""
from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
hook = AwsGenericHook(self.conn_id) # type: ignore
credentials = hook.get_credentials()
return credentials
Additionally, the tests provided in the repository validate the functionality of the AthenaAccessKeyProfileMapping
class, ensuring that it correctly claims the connection type and sets the profile values. The tests use mock objects to simulate the AWS credentials and verify that the profile mapping behaves as expected [1][2].
However, your concern about the deployed Airflow running with an IAM role that already has a session is valid. The get_session_token
call might fail due to session limitations. To address this, you can conditionally check if the session token is already present and only call get_session_token
if it is not:
def _get_temporary_credentials(self): # type: ignore
"""
Helper function to retrieve temporary short lived credentials
Returns an object including access_key, secret_key and token
"""
from airflow.providers.amazon.aws.hooks.sts import StsHook
from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
hook = AwsGenericHook(self.conn_id) # type: ignore
credentials = hook.get_credentials()
if not credentials.token:
sts_hook = StsHook(self.conn_id) # type: ignore
duration = self.profile_args.get("duration_seconds", 3600)
credentials = sts_hook.conn.get_session_token(DurationSeconds=duration)
return credentials
This way, the code will only request a new session token if one is not already present, avoiding the session limitation issue [1].
To continue the conversation, mention @dosu.
Hey @tuan-h, Would you like to submit a PR?
@pankajastro Yes, I can submit a PR based on the bot's suggestion if you think that's ok. Specifically, using AwsGenericHook
first and if the token is not present, fall back to StsHook
.
When defining an AWS connection like so in
airflow_settings.yaml
:Running a dag with
AthenaAccessKeyProfileMapping
returns this error:And that's because
AwsGenericHook
returnsfrozen_credentials
as defined above, i.e. no session token, when callingget_credentials
inAthenaAccessKeyProfileMapping
.To make the dag run successfully, I have to use AWS CLI
aws sts get-session-token
and then define the connection like this:However, this is annoying to do every time a session expires. To fix this issue, I subclassed
AthenaAccessKeyProfileMapping
to make the call tosts
in code:I'm not sure if this is the best solution. Additionally, I don't think this will work with a deployed Airflow that runs with an IAM role that already has a session. That is, it will fail to call
get_session_token
due to session limitation: