apache / incubator-seata

:fire: Seata is an easy-to-use, high-performance, open source distributed transaction solution.
https://seata.apache.org/
Apache License 2.0
25.32k stars 8.78k forks source link

Seata 解决postgresql数据库层面表分区无法识别主键问题 #4430

Open wilche2 opened 2 years ago

wilche2 commented 2 years ago

postgresql分区之后,主表没有主键了,主键在各个分区表中,因此执行流程下出现了以下问题,希望可以找到解决方法

[2022-03-03 18:14:54.511][ERROR] [DubboServerHandler-192.168.17.245:21883-thread-7]i.s.r.d.s.s.c.AbstractTableMetaCache[63]- get table meta error:Failed to fetch schema of t_message
java.sql.SQLException: Failed to fetch schema of t_message
    at io.seata.rm.datasource.sql.struct.cache.PostgresqlTableMetaCache.fetchSchema(PostgresqlTableMetaCache.java:67)
    at io.seata.rm.datasource.sql.struct.cache.AbstractTableMetaCache.lambda$getTableMeta$0(AbstractTableMetaCache.java:61)
    at com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2039)
    at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
    at com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2037)
    at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2020)
    at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:112)
    at com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:54)
    at io.seata.rm.datasource.sql.struct.cache.AbstractTableMetaCache.getTableMeta(AbstractTableMetaCache.java:59)
    at io.seata.rm.datasource.AbstractConnectionProxy.prepareStatement(AbstractConnectionProxy.java:117)
    at org.springframework.jdbc.core.JdbcTemplate$SimplePreparedStatementCreator.createPreparedStatement(JdbcTemplate.java:1555)
    at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:614)
    at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:861)
    at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:916)
    at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:926)
    at org.springframework.jdbc.core.JdbcTemplate$$FastClassBySpringCGLIB$$a4771cb8.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:88)
    at com.mashang.vxiao.common.filter.SqlExecutionLogAspect.execute(SqlExecutionLogAspect.java:35)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633)
    at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:93)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at org.springframework.jdbc.core.JdbcTemplate$$EnhancerBySpringCGLIB$$5695ef69.update(<generated>)
    at com.mashang.vxiao.base.dao.BaseDao.addObject(BaseDao.java:282)
    at com.mashang.vxiao.business.message.service.impl.MessageService.saveMessage(MessageService.java:1105)
    at com.mashang.vxiao.business.message.service.impl.MessageService.create(MessageService.java:1238)
    at com.mashang.vxiao.business.message.service.impl.MessageService$$FastClassBySpringCGLIB$$a4580151.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at com.mashang.vxiao.business.message.service.impl.MessageService$$EnhancerBySpringCGLIB$$d7eef9d7.create(<generated>)
    at com.mashang.vxiao.business.message.generic.impl.MessageCreateService.doCreate(MessageCreateService.java:237)
    at com.mashang.vxiao.business.message.generic.impl.MessageCreateService.create(MessageCreateService.java:135)
    at com.mashang.vxiao.business.message.generic.impl.MessageGenericService.lambda$doCreateOperate$7(MessageGenericService.java:170)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
    at com.mashang.vxiao.business.message.generic.impl.MessageGenericService.doCreateOperate(MessageGenericService.java:169)
    at com.mashang.vxiao.business.message.generic.impl.MessageGenericService.create(MessageGenericService.java:96)
    at com.mashang.vxiao.business.message.generic.impl.MessageGenericService$$FastClassBySpringCGLIB$$443d9a60.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at com.mashang.vxiao.business.message.generic.impl.MessageGenericService$$EnhancerBySpringCGLIB$$69846664.create(<generated>)
    at org.apache.dubbo.common.bytecode.Wrapper347.invokeMethod(Wrapper347.java)
    at org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory$1.doInvoke(JavassistProxyFactory.java:47)
    at org.apache.dubbo.rpc.proxy.AbstractProxyInvoker.invoke(AbstractProxyInvoker.java:84)
    at org.apache.dubbo.config.invoker.DelegateProviderMetaDataInvoker.invoke(DelegateProviderMetaDataInvoker.java:56)
    at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56)
    at com.mashang.vxiao.logtrack.dubbo.provider.TraceDubboProviderFilter.invoke(TraceDubboProviderFilter.java:33)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:83)
    at com.mashang.vxiao.common.dubbo.filter.DevFilter.invoke(DevFilter.java:24)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:83)
    at com.alibaba.dubbo.rpc.Invoker$CompatibleInvoker.invoke(Invoker.java:55)
    at io.seata.integration.dubbo.alibaba.AlibabaDubboTransactionPropagationFilter.invoke(AlibabaDubboTransactionPropagationFilter.java:45)
    at com.alibaba.dubbo.rpc.Filter.invoke(Filter.java:29)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:83)
    at io.seata.integration.dubbo.ApacheDubboTransactionPropagationFilter.invoke(ApacheDubboTransactionPropagationFilter.java:69)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:83)
    at org.apache.dubbo.rpc.filter.ExceptionFilter.invoke(ExceptionFilter.java:52)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:83)
    at org.apache.dubbo.rpc.filter.TimeoutFilter.invoke(TimeoutFilter.java:46)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:83)
    at org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter.invoke(TraceFilter.java:77)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:83)
    at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:89)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:83)
    at org.apache.dubbo.rpc.filter.ContextFilter.invoke(ContextFilter.java:129)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:83)
    at org.apache.dubbo.rpc.filter.GenericFilter.invoke(GenericFilter.java:152)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:83)
    at org.apache.dubbo.rpc.filter.ClassLoaderFilter.invoke(ClassLoaderFilter.java:38)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:83)
    at org.apache.dubbo.rpc.filter.EchoFilter.invoke(EchoFilter.java:41)
    at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:83)
    at org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol$1.reply(DubboProtocol.java:145)
    at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(HeaderExchangeHandler.java:100)
    at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(HeaderExchangeHandler.java:175)
    at org.apache.dubbo.remoting.transport.DecodeHandler.received(DecodeHandler.java:51)
    at org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.run(ChannelEventRunnable.java:57)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.seata.common.exception.ShouldNeverHappenException: Could not found any index in the table: t_message
    at io.seata.rm.datasource.sql.struct.cache.PostgresqlTableMetaCache.resultSetMetaToSchema(PostgresqlTableMetaCache.java:172)
    at io.seata.rm.datasource.sql.struct.cache.PostgresqlTableMetaCache.fetchSchema(PostgresqlTableMetaCache.java:63)
    ... 99 common frames omitted
wilche2 commented 2 years ago

补充一点: seata版本:

        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <seata.version>1.3.0</seata.version>
        </dependency>

postgresql版本: PostgreSQL 10.13 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-39), 64-bit

funky-eyes commented 2 years ago

为何不使用shardingsphere

wilche2 commented 2 years ago

为何不使用shardingsphere

我无法决定这个事情

funky-eyes commented 2 years ago

为何不使用shardingsphere

我无法决定这个事情

你们是怎么做分区的增删改查的?数据源上如何处理?

wilche2 commented 2 years ago

通过pg支持的partitionid定位到各个分区表;数据源上并没有额外处理

funky-eyes commented 2 years ago

通过pg支持的partitionid定位到各个分区表;数据源上并没有额外处理

其实获取任意一个分区表的元数据应该就可以了,pgsql我不是很了解,能写一下平时用的增删改查示例吗?我看下有没有什么特殊的地方

wilche2 commented 2 years ago

通过pg支持的partitionid定位到各个分区表;数据源上并没有额外处理

其实获取任意一个分区表的元数据应该就可以了,pgsql我不是很了解,能写一下平时用的增删改查示例吗?我看下有没有什么特殊的地方

查询:select * from t_message where id = 678594911 and partitionid = 0
partitionid=业务字段%30,因为我们分了30张表

增删改也是如此加上了partitionid
wilche2 commented 2 years ago

通过pg支持的partitionid定位到各个分区表;数据源上并没有额外处理

其实获取任意一个分区表的元数据应该就可以了,pgsql我不是很了解,能写一下平时用的增删改查示例吗?我看下有没有什么特殊的地方

查询:select * from t_message where id = 678594911 and partitionid = 0
partitionid=业务字段%30,因为我们分了30张表

增删改也是如此加上了partitionid

值得一提是,postgresql11版本支持分区表的全局主键,可在主表和子表中都查询到主键

l81893521 commented 2 years ago

请问可以调试一下这个类io.seata.rm.datasource.sql.struct.cache.PostgresqlTableMetaCache#resultSetMetaToSchema这个方法吗,尝试把

dbmd.getColumns(null, schemaName, tableName, "%");
dbmd.getIndexInfo(null, schemaName, tableName, false, true);
ResultSet rsPrimary = dbmd.getPrimaryKeys(null, schemaName, tableName);

中的tableName更换成子表的表名,看看能否获取得到元数据

wilche2 commented 2 years ago

请问可以调试一下这个类io.seata.rm.datasource.sql.struct.cache.PostgresqlTableMetaCache#resultSetMetaToSchema这个方法吗,尝试把

dbmd.getColumns(null, schemaName, tableName, "%");
dbmd.getIndexInfo(null, schemaName, tableName, false, true);
ResultSet rsPrimary = dbmd.getPrimaryKeys(null, schemaName, tableName);

中的tableName更换成子表的表名,看看能否获取得到元数据

确实可行

l81893521 commented 2 years ago

方便在钉钉联系一下我吗,我的钉钉号是wpzfkml

wilche2 commented 2 years ago

方便在钉钉联系一下我吗,我的钉钉号是wpzfkml

已发送请求,请同意一哈