axman6 / amazonka-s3-streaming

Provides a conduit based interface to uploading data to S3 using the Multipart API
MIT License
20 stars 23 forks source link

Question about streaming file down and back up to S3 #21

Closed krcurtis closed 3 years ago

krcurtis commented 4 years ago

I'm clearly doing something wrong, but I don't understand what the error message about no instance for (MonadAWS (ResourceT IO)) is telling me. I'm trying to download a file from S3 (using amazonka-s3) and then stream it back to upload on S3 (using amazonka-s3-streaming).

Here's the code I'm trying to compile

alt_source :: RsBody -> ConduitT () ByteString (ResourceT IO) ()
alt_source  (RsBody body) = body

upload_sink :: (MonadUnliftIO m, MonadAWS m, MonadFail m) => Text -> Text -> ConduitT ByteString Void m (Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse)
upload_sink b k = streamUpload Nothing (createMultipartUpload (BucketName b) (ObjectKey k))

simple_download_upload :: Region  -> Text -> Text -> Text -> Text -> IO () 
simple_download_upload r from_bucket from_key to_bucket to_key =  do
    lgr <- newLogger Debug stdout
    env <- newEnv Discover <&> set envLogger lgr . set envRegion r

    runResourceT . (runAWST env) $ do
        rs <- send (getObject (BucketName from_bucket) (ObjectKey from_key))
        liftIO $ runConduitRes ((alt_source ((view gorsBody) rs)) .|  (upload_sink to_bucket to_key)) --  >>= liftIO . either print print)
        say $ "Successfully re-upload: " <> from_bucket <> " - " <> from_key <> " to " <> to_bucket <> " " <> to_key

But I get a compiler error message about:

No instance for (MonadAWS (ResourceT IO)) arising from a use of `upload_sink'

Do you have any suggestions? Thanks!

peironggg commented 3 years ago

Hi, I am facing a similar problem to you. Could I check if you have found any solutions?

krcurtis commented 3 years ago

I ended up saving to a file, and then doing the upload. At the time, I thought the compiler error might because of nested ResourceT monad transformers that I was not handling properly, but I did not find a solution.

peironggg commented 3 years ago

Yes I think you are right, because of the class restrictions the library placed on m, makes it almost impossible to use streamUpload in anything other than an IO ().

Thanks, my original solution was to save it to a file and then upload it but I wanted to make the process abit more elegant by using Conduit. Seems like I will go back to that 😆

ivanbakel commented 3 years ago

This is happening because alt_source is constraining the conduit type too much - the compiler has to choose a type to instantiate m with in the call to upload_sink, and it can infer that m must be the same as the conduit monad in alt_source (because of the type of .|). But once m is ResourceT IO, then it's no longer possible to solve the constraint MonadAWS m - this is what the compiler is telling you in the error.

The solution is this:

  1. Swap runConduitRes to just runConduit - the conduit is already being run in a ResourceT, because you're using runResourceT on the do block.
  2. Drop the liftIO, since you can run the conduit in the original monad.
  3. Change the conduit monad of alt_source to have the correct type - AWST (ResourceT IO) instead of ResourceT IO - using transPipe lift.
runConduit ((transPipe lift (alt_source ((view gorsBody) rs))) .|  (upload_sink to_bucket to_key))

In general, you can also make alt_source more polymorphic:

alt_source :: (MonadIO m, MonadResource m) => RsBody -> ConduitT () ByteString m ()
alt_source  (RsBody body) = transPipe liftResourceT body

So you aren't constrained by a concrete conduit monad type.

krcurtis commented 3 years ago

Thanks! That fixed my compile error. Here's the code updated with ivanbakel's changes for others who may have the same question

alt_source :: (MonadIO m, MonadResource m) => RsBody -> ConduitT () ByteString m ()
alt_source (RsBody body) = transPipe liftResourceT body

upload_sink :: (MonadUnliftIO m, MonadAWS m, MonadFail m) => Text -> Text -> ConduitT ByteString Void m (Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse)
upload_sink b k = streamUpload Nothing (createMultipartUpload (BucketName b) (ObjectKey k))

simple_download_upload :: Region  -> Text -> Text -> Text -> Text -> IO ()
simple_download_upload r from_bucket from_key to_bucket to_key =  do
    lgr <- newLogger Debug stdout
    env <- newEnv Discover <&> set envLogger lgr . set envRegion r

    runResourceT . (runAWST env) $ do
        rs <- send (getObject (BucketName from_bucket) (ObjectKey from_key))
        runConduit ((alt_source ((view gorsBody) rs)) .|  (upload_sink to_bucket to_key))
        say $ "Successfully re-upload: " <> from_bucket <> " - " <> from_key <> " to " <> to_bucket <> " " <> to_key

When I ran it, I got an error about:

*** Exception: SerializeError (SerializeError' {_serializeAbbrev = Abbrev "S3", _serializeStatus = Status {statusCode = 200, statusMessage = "OK"}, _serializeBody = Nothing, _serializeMessage = "Failed reading: Failure parsing StorageClass from value: 'intelligent_tiering'. Accepted values: onezone_ia, reduced_redundancy, standard, standard_ia"})

which I think is unrelated, and instead due to some changes in the S3 service and the version of the amazonka package that I'm using.