marblejs / marble

Marble.js - functional reactive Node.js framework for building server-side applications, based on TypeScript and RxJS.
https://marblejs.com
MIT License
2.15k stars 72 forks source link

RabbitMq client still a Promise on App start up #322

Open paulvitic opened 3 years ago

paulvitic commented 3 years ago

Describe the bug Following the Marble docs for creating an amqp publisher I receive the following error on app start-up (node:90349) UnhandledPromiseRejectionWarning: TypeError: rabbitMqClient.send is not a function because the resolved rabbitMQ client is still a Promise even though it was eagerly bounded to the context

To Reproduce Amqp publisher

export const AmqpClientToken = createContextToken<MessagingClient>('MessagingClient');
export const amqpClient = messagingClient({
    transport: Transport.AMQP,
    options: {
        host: 'amqp://localhost:5672',
        queue: 'hello_queue'
    },
});

Http effect

export const getRoot$ = r.pipe(
    r.matchPath('/'),
    r.matchType('GET'),
    r.useEffect((req$, ctx) => {
        const rabbitMqClient = useContext(AmqpClientToken)(ctx.ask);

        return req$.pipe(
            mergeMapTo(rabbitMqClient.send({ type: 'HELLO', payload: 'John' })),
            mapTo({ status: HttpStatus.ACCEPTED }),
        );
    }),
);

App

const httpServerListener = httpListener({
    middlewares: [
        logger$({ silent: isTestEnv() }),
        bodyParser$(),
    ],
    effects: [
        getRoot$
    ],
});

export const server = createServer({
    port: getPortEnv(),
    listener: httpServerListener,
    dependencies: [
        bindEagerlyTo(AmqpClientToken)(amqpClient),
    ],
});

export const main: IO.IO<void> = async () =>
    await (await server)();

Expected behavior App starts up.

Desktop (please complete the following information):

Additional context

JozefFlakus commented 3 years ago

@paulvitic thanks for reporting an issue.

  1. Is rabbit server established?
  2. Do you have or can create a repository with reproducible problem?
paulvitic commented 3 years ago

Hello @JozefFlakus thnaks for the response. 1- Yes rabbit server is established. 2- I am afraid I did not commit my code to a public repository yet.

I am using the amqp client dependency inside an event store implementation that conforms to:

export interface EventStore {
    append(event: DomainEvent): TaskEither<Error, void>;
    stream(id: string): Observable<DomainEvent>;
}

DomainEvent is just an extension of Marbles's Event struct

Here is how I am dealing with it now

export const EventStoreFake: Reader<Context, EventStore> = createReader(ctx => {
    const rabbitMqClient = useContext(AmqpClientToken)(ctx);
    const store: Record<string, DomainEvent[]> = {};
    const log = (event: DomainEvent) => (streamId: string) => pipe(
        store,
        lookup(streamId),
        fold(() => { store[streamId] = [event] }, events => { events.push(event) })
    )
    return {
        append(event: DomainEvent): TaskEither<Error, void> {
            let streamId = pipe(
                fromNullable(event.payload),
                mapOption(payload => payload.streamId)
            )
            return pipe(
                fromOption(() => toError('stream id not found'))(streamId),
                chainFirst(_streamId => tryCatch(async () => (await rabbitMqClient).emit(event), toError)),  👈🏼 here I await the client
                map(log(event))
            )
        },
        stream(id: string): Observable<DomainEvent> {
            return pipe(
                store,
                lookup(id),
                fold(() => from([]), events => from(events))
            )
        }
    }
});
JozefFlakus commented 3 years ago

Are you able to create a minimal, reproducible repository that shows the problem? It is really hard to tell what is wrong based on the snipped that you posted here - it looks correct.

Are you sure that the queue options are correct? Eg. lets say that hello_queue already exists.

paulvitic commented 3 years ago

I will try, by the way, to be clear, the above implementation, when I await the rabbitMqClient works.

JozefFlakus commented 3 years ago

@paulvitic any updates in this topic?

robert-gruner commented 2 years ago

Unfortunately this issue seems stale. Anyways @JozefFlakus we tried to get Marble running with an RabbitMq and failed as well. We even checked out marble source code, ran "yarn" and "yarn build". "yarn test:integration" does show that there might be a real problem with messaging since the tests are red. The script downloaded the latest "rabbitmq:management". Maybe you can have a look?