Closed thoven87 closed 1 month ago
Do you have the logs for a failed job?
Do you have the logs for a failed job?
The jobs don't fail, they get stuck with status processing. The jobs stay in the queue until the server restarts. After restarting the server, all jobs get processed without failure.
Ok the logs for the job that is stuck in the processing state. If search your logs for the UUID it should give us an idea of what happened.
By the way I have written a soak test for the job queue and run it with both Postgres and Redis and cannot get it to fail. Perhaps you can expand on the test to get it to fail. Here is the redis version
import Jobs
import JobsRedis
import Logging
import NIOCore
import NIOPosix
import RediStack
import ServiceLifecycle
var logger = Logger(label: "Soak")
logger.logLevel = .debug
let redis = try RedisConnectionPool(
configuration: .init(
initialServerConnectionAddresses: [.makeAddressResolvingHost("localhost", port: 6379)],
maximumConnectionCount: .maximumActiveConnections(2),
connectionFactoryConfiguration: .init(
connectionDefaultLogger: logger,
tcpClient: nil
),
minimumConnectionCount: 0,
connectionBackoffFactor: 2,
initialConnectionBackoffDelay: .milliseconds(100)
),
boundEventLoop: MultiThreadedEventLoopGroup.singleton.any()
)
let jobQueue = JobQueue(
.redis(
redis,
configuration: .init(
pendingJobInitialization: .remove,
processingJobsInitialization: .remove,
failedJobsInitialization: .remove
)
),
numWorkers: 4,
logger: logger
)
struct MyJob: JobParameters {
static var jobName = "Test"
let sleep: Int
}
struct MyError: Error {}
jobQueue.registerJob(parameters: MyJob.self, maxRetryCount: 4) { parameters, _ in
try await Task.sleep(for: .milliseconds(parameters.sleep))
if Int.random(in: 0..<100) < 3 {
throw MyError()
}
}
try await withThrowingTaskGroup(of: Void.self) { group in
let serviceGroup = ServiceGroup(
configuration: .init(
services: [jobQueue],
gracefulShutdownSignals: [.sigterm, .sigint],
logger: logger
)
)
group.addTask {
try await serviceGroup.run()
}
group.addTask {
for _ in 0..<100_000 {
try await jobQueue.push(MyJob(sleep: Int.random(in: 1..<20)))
try await Task.sleep(for: .milliseconds(Int.random(in: 1..<10)))
}
}
group.addTask {
for _ in 0..<100_000 {
try await jobQueue.push(MyJob(sleep: Int.random(in: 1..<20)))
try await Task.sleep(for: .milliseconds(Int.random(in: 1..<10)))
}
}
try await group.next()
try await group.next()
try await Task.sleep(for: .seconds(1))
await serviceGroup.triggerGracefulShutdown()
}
let promise = redis.eventLoop.makePromise(of: Void.self)
redis.close(promise: promise)
try await promise.futureResult.get()
I created and POC https://github.com/thoven87/swift-job-stuck/tree/main which should help with the issue I am facing.
To run the job, you'll need SMTP credentials. I can forward credentials in DM on discord if needed.
Closing this issue as it seems to be an with SMTPKitten
I wanted to reporting this issue here for visibility. I first experienced this issue with the Postgres driver of which I opened this PR.
After a week or so, I started experiencing the same symptoms. I decided to try the Redis Queue driver it too had the same issue. I now convinced the issue is in this repository.
The issue is as follows:
1 - Push a message to the Job queue, sometimes the messages just get stuck in processing state and never processed 2 - Once the server is restated, all the messages in queue get processed 3 - The process repeats.
Environments
swift-driver version: 1.113 Apple Swift version 6.0 (swiftlang-6.0.0.7.6 clang-1600.0.24.1) Target: arm64-apple-macosx15.0
Swift 5.10.1 Docker