eventuate-foundation / eventuate-common

Other
12 stars 20 forks source link

Blocking code in reactive package #124

Open bekirtaskin opened 2 years ago

bekirtaskin commented 2 years ago

In EventuateCommonReactiveJdbcOperations class, There is a block call in columnToJson method. `public String columnToJson(EventuateSchema eventuateSchema, String column) {

BiFunction<String, List<Object>, List<Map<String, Object>>> selectCallback =
        (sql, params) -> reactiveJdbcStatementExecutor
                .query(sql, params.toArray())
                .collectList()
                .block(Duration.ofMillis(blockingTimeoutForRetrievingMetadata));

return eventuateSqlDialect.castToJson("?",
        eventuateSchema, "message", column, selectCallback);

}`

Blockhoud would not let it run.

I had to change the code a bit to get rid of blocking code part.

`import static io.eventuate.common.jdbc.EventuateJdbcOperationsUtils.MESSAGE_AUTO_GENERATED_ID_COLUMN;

import java.util.HashMap; import java.util.Map;

import io.eventuate.common.id.IdGenerator; import io.eventuate.common.jdbc.EventuateJdbcOperationsUtils; import io.eventuate.common.jdbc.EventuateSchema; import io.eventuate.common.jdbc.sqldialect.EventuateSqlDialect; import io.eventuate.common.json.mapper.JSonMapper; import io.eventuate.common.reactive.jdbc.EventuateCommonReactiveJdbcOperations; import io.eventuate.common.reactive.jdbc.EventuateReactiveJdbcStatementExecutor; import reactor.core.publisher.Mono;

public class EventuateCommonReactiveOperations extends EventuateCommonReactiveJdbcOperations {

private static final String COLUMN_TYPE_SQL = "select data_type from information_schema.columns where table_schema = ? and table_name = ? and column_name = ?";
private static final String MESSAGE_TABLE_NAME = "message";

private EventuateJdbcOperationsUtils eventuateJdbcOperationsUtils;
private EventuateReactiveJdbcStatementExecutor reactiveJdbcStatementExecutor;
private EventuateSqlDialect eventuateSqlDialect;

public EventuateCommonReactiveOperations(
    EventuateJdbcOperationsUtils eventuateJdbcOperationsUtils,
    EventuateReactiveJdbcStatementExecutor reactiveJdbcStatementExecutor,
    EventuateSqlDialect eventuateSqlDialect,
    int blockingTimeoutForRetrievingMetadata) {
super(eventuateJdbcOperationsUtils, reactiveJdbcStatementExecutor, eventuateSqlDialect, blockingTimeoutForRetrievingMetadata);
this.eventuateJdbcOperationsUtils = eventuateJdbcOperationsUtils;
this.reactiveJdbcStatementExecutor = reactiveJdbcStatementExecutor;
this.eventuateSqlDialect = eventuateSqlDialect;
}

@Override
public Mono<String> insertIntoMessageTable(IdGenerator idGenerator,
    String payload,
    String destination,
    Map<String, String> headers,
    EventuateSchema eventuateSchema) {

return insertIntoMessageTableImpl(idGenerator, payload, destination, headers, eventuateSchema, false);
}

@Override
public Mono<String> insertPublishedMessageIntoMessageTable(IdGenerator idGenerator,
    String payload,
    String destination,
    Map<String, String> headers,
    EventuateSchema eventuateSchema) {

return insertIntoMessageTableImpl(idGenerator, payload, destination, headers, eventuateSchema, true);
}

private Mono<String> insertIntoMessageTableImpl(
    IdGenerator idGenerator,
    String payload,
    String destination,
    Map<String, String> headers,
    EventuateSchema eventuateSchema,
    boolean published) {
if (idGenerator.databaseIdRequired()) {
    return insertIntoMessageTableDatabaseIdImpl(idGenerator,
        payload, destination, headers, published, eventuateSchema);
} else {
    return insertIntoMessageTableApplicationIdImpl(idGenerator,
        payload, destination, headers, published, eventuateSchema);
}
}

private Mono<String> insertIntoMessageTableApplicationIdImpl(IdGenerator idGenerator,
    String payload,
    String destination,
    Map<String, String> headers,
    boolean published,
    EventuateSchema eventuateSchema) {
headers = new HashMap<>(headers);
String messageId = idGenerator.genId(null).asString();
headers.put("ID", messageId);
String serializedHeaders = JSonMapper.toJson(headers);
return this
    .createInsertQueryIntoMessageTableApplicationIdSql(eventuateSchema)
    .flatMap(sql -> reactiveJdbcStatementExecutor.update(
        sql,
        messageId,
        destination,
        serializedHeaders,
        payload,
        eventuateJdbcOperationsUtils.booleanToInt(published)))
    .map(rowsUpdated -> messageId);
}

private Mono<String> insertIntoMessageTableDatabaseIdImpl(IdGenerator idGenerator,
    String payload,
    String destination,
    Map<String, String> headers,
    boolean published,
    EventuateSchema eventuateSchema) {
String serializedHeaders = JSonMapper.toJson(headers);
return this
    .createInsertQueryIntoMessageTableDbIdSql(eventuateSchema)
    .flatMap(sql -> reactiveJdbcStatementExecutor.insertAndReturnId(
        sql,
        MESSAGE_AUTO_GENERATED_ID_COLUMN,
        destination,
        serializedHeaders,
        payload,
        eventuateJdbcOperationsUtils.booleanToInt(published)))
    .map(id -> idGenerator.genId(id).asString());
}

public Mono<String> createInsertQueryIntoMessageTableApplicationIdSql(final EventuateSchema eventuateSchema) {
return Mono.zip(
    this.getHeadersColumnType(eventuateSchema),
    this.getPayloadColumnType(eventuateSchema),
    (headersJson, payloadJson) -> {
        String sql = "insert into %s(id, destination, headers, payload, creation_time, published) values(?, ?, %s, %s, %s, ?)";
        return String.format(sql,
            eventuateSchema.qualifyTable(MESSAGE_TABLE_NAME),
            headersJson,
            payloadJson,
            eventuateSqlDialect.getCurrentTimeInMillisecondsExpression());
    });
}

public Mono<String> createInsertQueryIntoMessageTableDbIdSql(final EventuateSchema eventuateSchema) {
return Mono.zip(
    this.getHeadersColumnType(eventuateSchema),
    this.getPayloadColumnType(eventuateSchema),
    (headersJson, payloadJson) -> {
        String sql = "insert into %s(id, destination, headers, payload, creation_time, published) values('', ?, %s, %s, %s, ?)";
        return String.format(sql,
            eventuateSchema.qualifyTable(MESSAGE_TABLE_NAME),
            headersJson,
            payloadJson,
            eventuateSqlDialect.getCurrentTimeInMillisecondsExpression());
    });
}

private Mono<String> getHeadersColumnType(final EventuateSchema eventuateSchema) {
return reactiveJdbcStatementExecutor
    .query(COLUMN_TYPE_SQL, eventuateSchema.getEventuateDatabaseSchema(), MESSAGE_TABLE_NAME, "headers")
    .last()
    .map(r -> r.get("data_type"))
    .map(columnType -> String.format("?::%s", columnType))
    .defaultIfEmpty("?");
}

private Mono<String> getPayloadColumnType(final EventuateSchema eventuateSchema) {
return reactiveJdbcStatementExecutor
    .query(COLUMN_TYPE_SQL, eventuateSchema.getEventuateDatabaseSchema(), MESSAGE_TABLE_NAME, "payload")
    .last()
    .map(r -> r.get("data_type"))
    .map(columnType -> String.format("?::%s", columnType))
    .defaultIfEmpty("?");
}`