prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
15.92k stars 5.33k forks source link

Majority TPCDS queries failed with "Failed to fetched data from" failure on high SF and large clusters #20032

Closed yingsu00 closed 1 year ago

yingsu00 commented 1 year ago

Many queries failed on higher SF 10TB on large cluster with 16 workers of r5.8xlarge (https://oss.ahana.dev/benchto-run-details?runs=397) 100TB on xlarge cluster with 32 workers of r5.16xlarge (https://oss.ahana.dev/benchto-run-details?runs=398)

These failed queries all have such error

Operator::isBlocked failed for [operator: Exchange, plan node ID: 1465]: Failed to fetched data from 10.78.43.252:8585 /v1/task/20230629_203031_00426_dikw7.6.0.16.0/results/8/0 - Exhausted retries: AsyncSocketException: connect failed, type = Socket not open, errno = 111 (Connection refused)

There was no pattern in the failed nodes IP so we can eliminate single node failures.

Example query Q55 20230630_000636_00000_dikw7

SELECT
  "i_brand_id" "brand_id"
, "i_brand" "brand"
, "sum"("ss_ext_sales_price") "ext_price"
FROM
  date_dim
, store_sales
, item
WHERE ("d_date_sk" = "ss_sold_date_sk")
   AND ("ss_item_sk" = "i_item_sk")
   AND ("i_manager_id" = 28)
   AND ("d_moy" = 11)
   AND ("d_year" = 1999)
GROUP BY "i_brand", "i_brand_id"
ORDER BY "ext_price" DESC, "i_brand_id" ASC
LIMIT 100

Error message and Stack trace

{"type":"VeloxRuntimeError",
 "message":" Operator::isBlocked failed for [operator: Exchange, plan node ID: 1465]: Failed to fetched data from 10.78.43.252:8585 /v1/task/20230629_203031_00426_dikw7.6.0.16.0/results/8/0 - Exhausted retries: AsyncSocketException: connect failed, type = Socket not open, errno = 111 (Connection refused)",
 "suppressed":[],"stack":["Unknown.# 0  _ZN8facebook5velox7process10StackTraceC1Ei(Unknown Source)",
 "Unknown.# 1  _ZN8facebook5velox14VeloxExceptionC2EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_(Unknown Source)",
 "Unknown.# 2  _ZN8facebook5velox6detail14veloxCheckFailINS0_17VeloxRuntimeErrorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEEEvRKNS1_18VeloxCheckFailArgsET0_(Unknown Source)",
 "Unknown.# 3  _ZN8facebook5velox4exec6Driver11runInternalERSt10shared_ptrIS2_ERS3_INS1_13BlockingStateEERS3_INS0_9RowVectorEE.cold(Unknown Source)",
 "Unknown.# 4  _ZN8facebook5velox4exec6Driver3runESt10shared_ptrIS2_E(Unknown Source)",
 "Unknown.# 5  _ZN5folly6detail8function14FunctionTraitsIFvvEE9callSmallIZN8facebook5velox4exec6Driver7enqueueESt10shared_ptrIS9_EEUlvE_EEvRNS1_4DataE(Unknown Source)",
 "Unknown.# 6  _ZN5folly6detail8function14FunctionTraitsIFvvEEclEv(Unknown Source)",
 "Unknown.# 7  _ZN5folly18ThreadPoolExecutor7runTaskERKSt10shared_ptrINS0_6ThreadEEONS0_4TaskE(Unknown Source)",
 "Unknown.# 8  _ZN5folly21CPUThreadPoolExecutor9threadRunESt10shared_ptrINS_18ThreadPoolExecutor6ThreadEE(Unknown Source)",
 "Unknown.# 9  _ZSt13__invoke_implIvRMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEERPS1_JRS4_EET_St21__invoke_memfun_derefOT0_OT1_DpOT2_(Unknown Source)",
 "Unknown.# 10 _ZSt8__invokeIRMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEJRPS1_RS4_EENSt15__invoke_resultIT_JDpT0_EE4typeEOSC_DpOSD_(Unknown Source)",
 "Unknown.# 11 _ZNSt5_BindIFMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEPS1_S4_EE6__callIvJEJLm0ELm1EEEET_OSt5tupleIJDpT0_EESt12_Index_tupleIJXspT1_EEE(Unknown Source)",
 "Unknown.# 12 _ZNSt5_BindIFMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEPS1_S4_EEclIJEvEET0_DpOT_(Unknown Source)",
 "Unknown.# 13 _ZN5folly6detail8function14FunctionTraitsIFvvEE9callSmallISt5_BindIFMNS_18ThreadPoolExecutorEFvSt10shared_ptrINS7_6ThreadEEEPS7_SA_EEEEvRNS1_4DataE(Unknown Source)",
 "Unknown.# 14 0x0000000000000000(Unknown Source)",
 "Unknown.# 15 start_thread(Unknown Source)",
 "Unknown.# 16 clone(Unknown Source)"],"errorCode":{"code":65536,"name":"GENERIC_INTERNAL_ERROR",
 "type":"INTERNAL_ERROR",
 "retriable":false},"errorCause":"UNKNOWN"}

The query has 5 stages (0-4), and stage 3 and 4 finished successfully.

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 0 [SINGLE]                                                                                                                                                                                                                                                                                                                                                       >
     Output layout: [i_brand_id, i_brand, sum]                                                                                                                                                                                                                                                                                                                             >
     Output partitioning: SINGLE []                                                                                                                                                                                                                                                                                                                                        >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                                                                                                         >
     - Output[brand_id, brand, ext_price] => [i_brand_id:integer, i_brand:varchar(50), sum:decimal(38,2)]                                                                                                                                                                                                                                                                  >
             brand_id := i_brand_id (1:35)                                                                                                                                                                                                                                                                                                                                 >
             brand := i_brand (1:61)                                                                                                                                                                                                                                                                                                                                       >
             ext_price := sum (1:81)                                                                                                                                                                                                                                                                                                                                       >
         - TopN[100 by (sum DESC_NULLS_LAST, i_brand_id ASC_NULLS_LAST)] => [i_brand:varchar(50), i_brand_id:integer, sum:decimal(38,2)]                                                                                                                                                                                                                                   >
             - LocalExchange[SINGLE] () => [i_brand:varchar(50), i_brand_id:integer, sum:decimal(38,2)]                                                                                                                                                                                                                                                                    >
                 - RemoteSource[1] => [i_brand:varchar(50), i_brand_id:integer, sum:decimal(38,2)]                                                                                                                                                                                                                                                                         >
                                                                                                                                                                                                                                                                                                                                                                           >
 Fragment 1 [HASH]                                                                                                                                                                                                                                                                                                                                                         >
     Output layout: [i_brand, i_brand_id, sum]                                                                                                                                                                                                                                                                                                                             >
     Output partitioning: SINGLE []                                                                                                                                                                                                                                                                                                                                        >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                                                                                                         >
     - TopNPartial[100 by (sum DESC_NULLS_LAST, i_brand_id ASC_NULLS_LAST)] => [i_brand:varchar(50), i_brand_id:integer, sum:decimal(38,2)]                                                                                                                                                                                                                                >
         - Aggregate(FINAL)[i_brand, i_brand_id] => [i_brand:varchar(50), i_brand_id:integer, sum:decimal(38,2)]                                                                                                                                                                                                                                                           >
                 sum := "presto.default.sum"((sum_20)) (1:81)                                                                                                                                                                                                                                                                                                              >
             - LocalExchange[HASH] (i_brand, i_brand_id) => [i_brand:varchar(50), i_brand_id:integer, sum_20:varbinary]                                                                                                                                                                                                                                                    >
                 - RemoteSource[2] => [i_brand:varchar(50), i_brand_id:integer, sum_20:varbinary]                                                                                                                                                                                                                                                                          >
                                                                                                                                                                                                                                                                                                                                                                           >
 Fragment 2 [SOURCE]                                                                                                                                                                                                                                                                                                                                                       >
     Output layout: [i_brand, i_brand_id, sum_20]                                                                                                                                                                                                                                                                                                                          >
     Output partitioning: HASH [i_brand, i_brand_id]                                                                                                                                                                                                                                                                                                                       >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                                                                                                         >
     - Aggregate(PARTIAL)[i_brand, i_brand_id] => [i_brand:varchar(50), i_brand_id:integer, sum_20:varbinary]                                                                                                                                                                                                                                                              >
             sum_20 := "presto.default.sum"((ss_ext_sales_price)) (1:81)                                                                                                                                                                                                                                                                                                   >
         - InnerJoin[("ss_item_sk" = "i_item_sk")] => [ss_ext_sales_price:decimal(7,2), i_brand_id:integer, i_brand:varchar(50)]                                                                                                                                                                                                                                           >
                 Estimates: {rows: 46351051 (3.50GB), cpu: 15300513743569.34, memory: 5458621.00, network: 5458621.00}                                                                                                                                                                                                                                                     >
                 Distribution: REPLICATED                                                                                                                                                                                                                                                                                                                                  >
             - InnerJoin[("ss_sold_date_sk" = "d_date_sk")] => [ss_item_sk:bigint, ss_ext_sales_price:decimal(7,2)]                                                                                                                                                                                                                                                        >
                     Estimates: {rows: 4576801864 (490.19GB), cpu: 15218156819337.52, memory: 8449.70, network: 8449.70}                                                                                                                                                                                                                                                   >
                     Distribution: REPLICATED                                                                                                                                                                                                                                                                                                                              >
                 - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf100000_parquet_varchar, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf100000_parquet_varchar.store_sales{}]'}, grouped = false] => [ss_sold_date_sk:bigint, ss_item_sk:bigint, ss_ext_sales_price:decimal(7,2)>
                         Estimates: {rows: 288002140051 (30.12TB), cpu: 7568710790041.00, memory: 0.00, network: 0.00}                                                                                                                                                                                                                                                     >
                         LAYOUT: tpcds_sf100000_parquet_varchar.store_sales{}                                                                                                                                                                                                                                                                                              >
                         ss_ext_sales_price := ss_ext_sales_price:decimal(7,2):15:REGULAR (1:137)                                                                                                                                                                                                                                                                          >
                         ss_sold_date_sk := ss_sold_date_sk:bigint:0:REGULAR (1:137)                                                                                                                                                                                                                                                                                       >
                         ss_item_sk := ss_item_sk:bigint:2:REGULAR (1:137)                                                                                                                                                                                                                                                                                                 >
                 - LocalExchange[HASH] (d_date_sk) => [d_date_sk:bigint]                                                                                                                                                                                                                                                                                                   >
                         Estimates: {rows: 30 (3.40kB), cpu: 545.14, memory: 0.00, network: 8449.70}                                                                                                                                                                                                                                                                       >
                     - RemoteSource[3] => [d_date_sk:bigint]                                                                                                                                                                                                                                                                                                               >
             - LocalExchange[HASH] (i_item_sk) => [i_item_sk:bigint, i_brand_id:integer, i_brand:varchar(50)]                                                                                                                                                                                                                                                              >
                     Estimates: {rows: 5008 (396.65kB), cpu: 351623.96, memory: 0.00, network: 5450171.30}                                                                                                                                                                                                                                                                 >
                 - RemoteSource[4] => [i_item_sk:bigint, i_brand_id:integer, i_brand:varchar(50)]                                                                                                                                                                                                                                                                          >
                                                                                                                                                                                                                                                                                                                                                                           >
 Fragment 3 [SOURCE]                                                                                                                                                                                                                                                                                                                                                       >
     Output layout: [d_date_sk]                                                                                                                                                                                                                                                                                                                                            >
     Output partitioning: BROADCAST []                                                                                                                                                                                                                                                                                                                                     >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                                                                                                         >
     - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf100000_parquet_varchar, tableName=date_dim, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf100000_parquet_varchar.date_dim{domains={d_moy=[ [["11"]] ], d_year=[ [["1999"]] ]}}]'}, grouped = false] => [d_date_sk:bigint]                        >
             Estimates: {rows: 30 (272B), cpu: 272.57, memory: 0.00, network: 0.00}                                                                                                                                                                                                                                                                                        >
             LAYOUT: tpcds_sf100000_parquet_varchar.date_dim{domains={d_moy=[ [["11"]] ], d_year=[ [["1999"]] ]}}                                                                                                                                                                                                                                                          >
             d_date_sk := d_date_sk:bigint:0:REGULAR (1:126)                                                                                                                                                                                                                                                                                                               >
             d_moy:int:8:REGULAR                                                                                                                                                                                                                                                                                                                                           >
                 :: [["11"]]                                                                                                                                                                                                                                                                                                                                               >
             d_year:int:6:REGULAR                                                                                                                                                                                                                                                                                                                                          >
                 :: [["1999"]]                                                                                                                                                                                                                                                                                                                                             >
                                                                                                                                                                                                                                                                                                                                                                           >
 Fragment 4 [SOURCE]                                                                                                                                                                                                                                                                                                                                                       >
     Output layout: [i_item_sk, i_brand_id, i_brand]                                                                                                                                                                                                                                                                                                                       >
     Output partitioning: BROADCAST []                                                                                                                                                                                                                                                                                                                                     >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                                                                                                         >
     - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf100000_parquet_varchar, tableName=item, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf100000_parquet_varchar.item{domains={i_manager_id=[ [["28"]] ]}}]'}, grouped = false] => [i_item_sk:bigint, i_brand_id:integer, i_brand:varchar(50)]       >
             Estimates: {rows: 5008 (171.69kB), cpu: 175811.98, memory: 0.00, network: 0.00}                                                                                                                                                                                                                                                                               >
             LAYOUT: tpcds_sf100000_parquet_varchar.item{domains={i_manager_id=[ [["28"]] ]}}                                                                                                                                                                                                                                                                              >
             i_brand_id := i_brand_id:int:7:REGULAR (1:151)                                                                                                                                                                                                                                                                                                                >
             i_brand := i_brand:varchar(50):8:REGULAR (1:151)                                                                                                                                                                                                                                                                                                              >
             i_item_sk := i_item_sk:bigint:0:REGULAR (1:151)                                                                                                                                                                                                                                                                                                               >
             i_manager_id:int:20:REGULAR                                                                                                                                                                                                                                                                                                                                   >
                 :: [["28"]]                                                                                                                                                                                                                                                                                                                                               >
                                                                                                                                                                                                                                                                                                                                                                           >
                                                                                                                                                                                                                                                                                                                                                                           >
(1 row)

All the rest tasks in stage 0 , 1, 2 failed or aborted.

In stage 2, there are total 28 tasks, and 26 were aborted, 2 failed. Example aborted task in stage 2: 20230630_000636_00000_dikw7.2.0.20.0 Example failed task in stage 2: 20230630_000636_00000_dikw7.2.0.20.0 Example failed task in stage 1: 20230630_000636_00000_dikw7.1.0.0.0 AbortedTasks.zip

The aborted task stack trace:

"# 0  _ZN8facebook5velox7process10StackTraceC1Ei",
 "# 1  _ZN8facebook5velox14VeloxExceptionC2EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_",
 "# 2  _ZN8facebook5velox6detail14veloxCheckFailINS0_17VeloxRuntimeErrorEPKcEEvRKNS1_18VeloxCheckFailArgsET0_",
 "# 3  _ZN8facebook5velox4exec4Task9terminateENS1_9TaskStateE",
 "# 4  _ZN8facebook6presto11TaskManager10deleteTaskERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEb",
 "# 5  _ZNSt17_Function_handlerIFvPN8proxygen11HTTPMessageERSt6vectorISt10unique_ptrIN5folly5IOBufESt14default_deleteIS6_EESaIS9_EEPNS0_15ResponseHandlerEEZN8facebook6presto12TaskResource10deleteTaskES2_RKS3_INSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaISO_EEEUlS2_RKSB_SE_E_E9_M_invokeERKSt9_Any_dataOS2_SC_OSE_",
 "# 6  _ZNSt17_Function_handlerIFvPN8proxygen11HTTPMessageERSt6vectorISt10unique_ptrIN5folly5IOBufESt14default_deleteIS6_EESaIS9_EEPNS0_15ResponseHandlerESt10shared_ptrIN8facebook6presto4http27CallbackRequestHandlerStateEEEZNSI_22CallbackRequestHandler4wrapESt8functionIFvS2_SC_SE_EEEUlS2_SC_SE_SK_E_E9_M_invokeERKSt9_Any_dataOS2_SC_OSE_OSK_",
 "# 7  _ZN8facebook6presto4http22CallbackRequestHandler5onEOMEv",
 "# 8  _ZN8proxygen21RequestHandlerAdaptor5onEOMEv",
 "# 9  _ZN8proxygen15HTTPTransaction17processIngressEOMEv",
 "# 10 _ZN8proxygen15HTTPTransaction12onIngressEOMEv",
 "# 11 _ZN8proxygen11HTTPSession17onMessageCompleteEmb",
 "# 12 _ZN8proxygen26PassThroughHTTPCodecFilter17onMessageCompleteEmb",
 "# 13 _ZN8proxygen11HTTP1xCodec17onMessageCompleteEv",
 "# 14 _ZN8proxygen11HTTP1xCodec19onMessageCompleteCBEPNS_11http_parserE",
 "# 15 _ZN8proxygen27http_parser_execute_optionsEPNS_11http_parserEPKNS_20http_parser_settingsEhPKcm",
 "# 16 _ZN8proxygen11HTTP1xCodec13onIngressImplERKN5folly5IOBufE",
 "# 17 _ZN8proxygen11HTTP1xCodec9onIngressERKN5folly5IOBufE",
 "# 18 _ZN8proxygen26PassThroughHTTPCodecFilter9onIngressERKN5folly5IOBufE",
 "# 19 _ZN8proxygen11HTTPSession15processReadDataEv",
 "# 20 _ZN8proxygen11HTTPSession17readDataAvailableEm",
 "# 21 _ZN5folly11AsyncSocket17processNormalReadEv",
 "# 22 _ZN5folly11AsyncSocket10handleReadEv",
 "# 23 _ZN5folly11AsyncSocket7ioReadyEt",
 "# 24 _ZN5folly11AsyncSocket9IoHandler12handlerReadyEt",
 "# 25 _ZN5folly12EventHandler16libeventCallbackEisPv",
 "# 26 0x0000000000000000",
 "# 27 event_base_loop",
 "# 28 _ZN12_GLOBAL__N_116EventBaseBackend18eb_event_base_loopEi",
 "# 29 _ZN5folly9EventBase8loopMainEib",
 "# 30 _ZN5folly9EventBase8loopBodyEib",
 "# 31 _ZN5folly9EventBase4loopEv",
 "# 32 _ZN5folly9EventBase11loopForeverEv",
 "# 33 _ZN5folly20IOThreadPoolExecutor9threadRunESt10shared_ptrINS_18ThreadPoolExecutor6ThreadEE",
 "# 34 _ZSt13__invoke_implIvRMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEERPS1_JRS4_EET_St21__invoke_memfun_derefOT0_OT1_DpOT2_",
 "# 35 _ZSt8__invokeIRMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEJRPS1_RS4_EENSt15__invoke_resultIT_JDpT0_EE4typeEOSC_DpOSD_",
 "# 36 _ZNSt5_BindIFMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEPS1_S4_EE6__callIvJEJLm0ELm1EEEET_OSt5tupleIJDpT0_EESt12_Index_tupleIJXspT1_EEE",
 "# 37 _ZNSt5_BindIFMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEPS1_S4_EEclIJEvEET0_DpOT_",
 "# 38 _ZN5folly6detail8function14FunctionTraitsIFvvEE9callSmallISt5_BindIFMNS_18ThreadPoolExecutorEFvSt10shared_ptrINS7_6ThreadEEEPS7_SA_EEEEvRNS1_4DataE",
 "# 39 0x0000000000000000",
 "# 40 start_thread",
 "# 41 clone" ${database}.${schema}.
ashokku2022 commented 1 year ago

@gopukrishnasIBM is looking into it as he run the ibm tpcds workload on higher SF with help of Linsong

aditi-pandit commented 1 year ago

This could be similar to previous issues we've seen with at TaskManager failure escalating into other failures in error-handling code paths.

@majetideepak : Any ideas ?

majetideepak commented 1 year ago

Operator::isBlocked failed for [operator: Exchange, plan node ID: 1465]: Failed to fetched data from 10.78.43.252:8585 /v1/task/20230629_203031_00426_dikw7.6.0.16.0/results/8/0 - Exhausted retries: AsyncSocketException: connect failed, type = Socket not open, errno = 111 (Connection refused)

These are error messages you see in Prestissimo when a worker tries to fetch data from another Prestissimo worker that likely crashed. We have to see the logs on the node(10.78.43.252) where Prestissimo worker crashed to determine the root cause.

More details would help as well: Do the queries immediately fail, or do they fail after executing for some time? Is it an OOM that killed the Prestissimo process? You can check the kernel logs on that node. Can we reproduce this failure consistently with some minimal query?

yingsu00 commented 1 year ago

@majetideepak It's reproducible on any cluster with 16 or 32 nodes and running TPCDS 10TB. Q55 was the simplest query I found to reproduce the error.

gopukrishnasIBM commented 1 year ago

@czentgr @majetideepak @yzhang1991 @MuralidharVVK1 @ashokku2022 Linsong shared 3 coordinator logs and 65 worker node logs with me, which can be found at (https://s3-oss-jenkins.ahana.dev/oss-prestodb/). After examining these, I have some questions and observations. Two of the three coordinators were idle, while one was actively working.

Among the 65 worker logs, 25 of them only have a few lines of log indicating a connection failure. I'm curious as to why there are no subsequent retry attempts.

2023-08-14T20:31:05.739Z I0814 20:31:05.739145     9 PrestoServer.cpp:341] Starting with node memory 230GB
2023-08-14T20:31:05.756Z I0814 20:31:05.756304     9 PrestoServer.cpp:456] STARTUP: Registering catalog oss_glue using connector hive-hadoop2, cache enabled: 1, cache size: 0
2023-08-14T20:31:05.756Z I0814 20:31:05.756423     9 PrestoServer.cpp:169] STARTUP: Starting server at :::8585 (10.78.139.79)
2023-08-14T20:31:05.758Z I0814 20:31:05.758229    20 Announcer.cpp:125] Discovery service changed to 172.20.196.247:8585
2023-08-14T20:31:05.760Z I0814 20:31:05.760804    20 Announcer.cpp:148] Announcement succeeded: 202
2023-08-14T20:31:05.761Z I0814 20:31:05.761909     9 PrestoServer.cpp:275] STARTUP: Starting all periodic tasks...
2023-08-14T20:31:05.762Z I0814 20:31:05.762070     9 HttpServer.cpp:125] STARTUP: proxygen::HTTPServer::start()
2023-08-14T20:31:35.766Z I0814 20:31:35.766618    20 Announcer.cpp:148] Announcement succeeded: 202
2023-08-14T20:32:05.773Z W0814 20:32:05.773288    20 Announcer.cpp:154] Announcement failed: AsyncSocketException: connect failed, type = Socket not open, errno = 111 (Connection refused)

The first error that I saw on the worker side is an out of memory error

2023-08-14T21:40:11.965Z W0814 21:40:11.965792   643 MmapAllocator.cpp:429] [MEM] Exceeded memory allocator limit when adding 512 new pages for total allocation of 106496 pages, the memory allocator capacity is 60293120
2023-08-14T21:40:11.965Z W0814 21:40:11.965801   745 MmapAllocator.cpp:79] [MEM] Exceeding memory allocator limit when allocate 64 pages with capacity of 60293120
2023-08-14T21:40:12.972Z W0814 21:40:12.971920   672 MmapAllocator.cpp:79] [MEM] Exceeding memory allocator limit when allocate 128 pages with capacity of 60293120
2023-08-14T21:40:13.021Z W0814 21:40:13.021198   684 MmapAllocator.cpp:429] [MEM] Exceeded memory allocator limit when adding 512 new pages for total allocation of 125952 pages, the memory allocator capacity is 60293120
2023-08-14T21:40:13.977Z W0814 21:40:13.977497   708 MmapAllocator.cpp:79] [MEM] Exceeding memory allocator limit when allocate 128 pages with capacity of 60293120
2023-08-14T21:40:14.022Z W0814 21:40:14.022696   749 MmapAllocator.cpp:429] [MEM] Exceeded memory allocator limit when adding 512 new pages for total allocation of 129024 pages, the memory allocator capacity is 60293120
2023-08-14T21:40:14.092Z W0814 21:40:14.092751   816 MmapAllocator.cpp:88] [MEM] Exceeded memory allocator limit when allocate 64 pages with capacity of 60293120

and on the coordinator side is

2023-08-14T21:47:29.507Z 2023-08-14T21:47:29.505Z   ERROR   remote-task-callback-1642   com.facebook.presto.execution.StageExecutionStateMachine    Stage execution 20230814_214520_00015_geneu.26.0 failed
2023-08-14T21:47:29.507Z VeloxRuntimeError:  Exceeded memory pool cap of 163.84GB with max 163.84GB when requesting 1.00MB, memory manager cap is 230.00GB, requestor 'op.4742.4.1.LocalPartition' with current usage 1.88MB
2023-08-14T21:47:29.507Z 20230814_214520_00015_geneu usage 163.84GB peak 163.84GB
2023-08-14T21:47:29.507Z     task.20230814_214520_00015_geneu.16.0.10.0 usage 198.00MB peak 708.00MB
2023-08-14T21:47:29.507Z         node.root usage 167.00MB peak 676.00MB
2023-08-14T21:47:29.507Z             op.root.0.13.PartitionedOutput usage 14.41MB peak 39.95MB

Because of this, I suspect that the issue might be related to memory.

Another issue that I found in the logs are "dequeueLocked" worker side:

2023-08-14T22:02:41.957Z I0814 22:02:41.957504  1451 Task.cpp:1647] Terminating task 20230814_220048_00037_geneu.16.0.37.0 with state Failed after running for 112513 ms.
2023-08-14T22:02:41.957Z E0814 22:02:41.957602  1436 Exceptions.h:69] Line: ../../velox/velox/exec/ExchangeQueue.cpp:100, Function:dequeueLocked, Expression:  Failed to fetch data from 10.78.174.48:8585 /v1/task/20230814_220048_00037_geneu.17.0.17.0/results/37/0 - Exhausted retries: ingress timeout, streamID=1, Source: RUNTIME, ErrorCode: INVALID_STATE
2023-08-14T22:02:41.957Z E0814 22:02:41.957715  1446 Exceptions.h:69] Line: ../../velox/velox/exec/ExchangeQueue.cpp:100, Function:dequeueLocked, Expression:  Failed to fetch data from 10.78.174.48:8585 /v1/task/20230814_220048_00037_geneu.17.0.17.0/results/37/0 - Exhausted retries: ingress timeout, streamID=1, Source: RUNTIME, ErrorCode: INVALID_STATE
2023-08-14T22:02:41.957Z E0814 22:02:41.957813  1457 Exceptions.h:69] Line: ../../velox/velox/exec/ExchangeQueue.cpp:100, Function:dequeueLocked, Expression:  Failed to fetch data from 10.78.174.48:8585 /v1/task/20230814_220048_00037_geneu.17.0.17.0/results/37/0 - Exhausted retries: ingress timeout, streamID=1, Source: RUNTIME, ErrorCode: INVALID_STATE

coordinator side:

2023-08-14T22:11:24.515Z VeloxRuntimeError:  Failed to fetch data from 10.78.183.226:8585 /v1/task/20230814_220920_00041_geneu.15.0.23.0/results/29/0 - Exhausted retries: ingress timeout, streamID=1
2023-08-14T22:11:24.515Z    at Unknown.# 0  _ZN8facebook5velox7process10StackTraceC1Ei(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 1  _ZN8facebook5velox14VeloxExceptionC2EPKcmS3_St17basic_string_viewIcSt11char_traitsIcEES7_S7_S7_bNS1_4TypeES7_(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 2  _ZN8facebook5velox6detail14veloxCheckFailINS0_17VeloxRuntimeErrorERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEEEvRKNS1_18VeloxCheckFailArgsET0_(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 3  _ZN8facebook5velox4exec13ExchangeQueue13dequeueLockedEPbPN5folly10SemiFutureINS4_4UnitEEE(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 4  _ZN8facebook5velox4exec14ExchangeClient4nextEPbPN5folly10SemiFutureINS4_4UnitEEE(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 5  _ZN8facebook5velox4exec8Exchange9isBlockedEPN5folly10SemiFutureINS3_4UnitEEE(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 6  _ZN8facebook5velox4exec6Driver11runInternalERSt10shared_ptrIS2_ERS3_INS1_13BlockingStateEERS3_INS0_9RowVectorEE(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 7  _ZN8facebook5velox4exec6Driver3runESt10shared_ptrIS2_E(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 8  _ZN5folly6detail8function14FunctionTraitsIFvvEE9callSmallIZN8facebook5velox4exec6Driver7enqueueESt10shared_ptrIS9_EEUlvE_EEvRNS1_4DataE(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 9  _ZN5folly6detail8function14FunctionTraitsIFvvEEclEv(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 10 _ZN5folly18ThreadPoolExecutor7runTaskERKSt10shared_ptrINS0_6ThreadEEONS0_4TaskE(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 11 _ZN5folly21CPUThreadPoolExecutor9threadRunESt10shared_ptrINS_18ThreadPoolExecutor6ThreadEE(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 12 _ZSt13__invoke_implIvRMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEERPS1_JRS4_EET_St21__invoke_memfun_derefOT0_OT1_DpOT2_(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 13 _ZSt8__invokeIRMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEJRPS1_RS4_EENSt15__invoke_resultIT_JDpT0_EE4typeEOSC_DpOSD_(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 14 _ZNSt5_BindIFMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEPS1_S4_EE6__callIvJEJLm0ELm1EEEET_OSt5tupleIJDpT0_EESt12_Index_tupleIJXspT1_EEE(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 15 _ZNSt5_BindIFMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEPS1_S4_EEclIJEvEET0_DpOT_(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 16 _ZN5folly6detail8function14FunctionTraitsIFvvEE9callSmallISt5_BindIFMNS_18ThreadPoolExecutorEFvSt10shared_ptrINS7_6ThreadEEEPS7_SA_EEEEvRNS1_4DataE(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 17 0x0000000000000000(Unknown Source)
2023-08-14T22:11:24.515Z    at Unknown.# 18 start_thread(Unknown Source)

I also came across SIGSEGV error on different worker logs. One example is a worker with node.ip=10.78.161.169

2023-08-14T22:09:21.375Z I0814 22:09:21.375032    29 TaskManager.cpp:457] Starting task 20230814_220920_00041_geneu.27.0.3.0 with 16 max drivers.
2023-08-14T22:09:21.388Z I0814 22:09:21.388250    26 TaskManager.cpp:503] No more splits for 20230814_220920_00041_geneu.26.0.0.0 for node 3234
2023-08-14T22:09:21.395Z I0814 22:09:21.395074    29 TaskManager.cpp:503] No more splits for 20230814_220920_00041_geneu.27.0.3.0 for node 50
2023-08-14T22:09:21.395Z I0814 22:09:21.395102    29 TaskManager.cpp:503] No more splits for 20230814_220920_00041_geneu.27.0.3.0 for node 3233
2023-08-14T22:10:56.663Z *** Aborted at 1692051056 (Unix time, try 'date -d @1692051056') ***
2023-08-14T22:10:56.663Z *** Signal 11 (SIGSEGV) (0x7d284f35ac00) received by PID 9 (pthread TID 0x7d47d17fb700) (linux TID 235) (code: address not mapped to object), stack trace: ***
2023-08-14T22:10:56.663Z (error retrieving stack trace)
2023-08-14T22:19:28.355Z I0814 22:19:28.355716    20 Announcer.cpp:161] Announcement succeeded: HTTP 202

coordinator:

2023-08-14T22:14:54.253Z 2023-08-14T22:14:54.253Z   WARN    TaskInfoFetcher-20230814_220920_00041_geneu.27.0.3.0-4757   com.facebook.presto.server.RequestErrorTracker  Error getting info for task 20230814_220920_00041_geneu.27.0.3.0: Server refused connection: http://10.78.161.169:8585/v1/task/20230814_220920_00041_geneu.27.0.3.0?summarize: http://10.78.161.169:8585/v1/task/20230814_220920_00041_geneu.27.0.3.0

In the plan,

 Fragment 2 [SOURCE]                                                                                                                                                                                                                                                                                                                                                       >
     Output layout: [i_brand, i_brand_id, sum_20]                                                                                                                                                                                                                                                                                                                          >
     Output partitioning: HASH [i_brand, i_brand_id]                                                                                                                                                                                                                                                                                                                       >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                                                                                                         >
     - Aggregate(PARTIAL)[i_brand, i_brand_id] => [i_brand:varchar(50), i_brand_id:integer, sum_20:varbinary]                                                                                                                                                                                                                                                              >
             sum_20 := "presto.default.sum"((ss_ext_sales_price)) (1:81)                                                                                                                                                                                                                                                                                                   >
         - InnerJoin[("ss_item_sk" = "i_item_sk")] => [ss_ext_sales_price:decimal(7,2), i_brand_id:integer, i_brand:varchar(50)]                                                                                                                                                                                                                                           >
                 Estimates: {rows: 46351051 (3.50GB), cpu: 15300513743569.34, memory: 5458621.00, network: 5458621.00}                                                                                                                                                                                                                                                     >
                 Distribution: REPLICATED                                                                                                                                                                                                                                                                                                                                  >
             - InnerJoin[("ss_sold_date_sk" = "d_date_sk")] => [ss_item_sk:bigint, ss_ext_sales_price:decimal(7,2)]                                                                                                                                                                                                                                                        >
                     Estimates: {rows: 4576801864 (490.19GB), cpu: 15218156819337.52, memory: 8449.70, network: 8449.70}                                                                                                                                                                                                                                                   >
                     Distribution: REPLICATED                                                                                                                                                                                                                                                                                                                              >
                 - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf100000_parquet_varchar, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf100000_parquet_varchar.store_sales{}]'}, grouped = false] => [ss_sold_date_sk:bigint, ss_item_sk:bigint, ss_ext_sales_price:decimal(7,2)>
                         Estimates: {rows: 288002140051 (30.12TB), cpu: 7568710790041.00, memory: 0.00, network: 0.00}                                                                                                                                                                                                                                                     >
                         LAYOUT: tpcds_sf100000_parquet_varchar.store_sales{}                                                                                                                                                                                                                                                                                              >
                         ss_ext_sales_price := ss_ext_sales_price:decimal(7,2):15:REGULAR (1:137)                                                                                                                                                                                                                                                                          >
                         ss_sold_date_sk := ss_sold_date_sk:bigint:0:REGULAR (1:137)                                                                                                                                                                                                                                                                                       >
                         ss_item_sk := ss_item_sk:bigint:2:REGULAR (1:137)                                                                                                                                                                                                                                                                                                 >
                 - LocalExchange[HASH] (d_date_sk) => [d_date_sk:bigint]                                                                                                                                                                                                                                                                                                   >
                         Estimates: {rows: 30 (3.40kB), cpu: 545.14, memory: 0.00, network: 8449.70}                                                                                                                                                                                                                                                                       >
                     - RemoteSource[3] => [d_date_sk:bigint]                                                                                                                                                                                                                                                                                                               >
             - LocalExchange[HASH] (i_item_sk) => [i_item_sk:bigint, i_brand_id:integer, i_brand:varchar(50)]                                                                                                                                                                                                                                                              >
                     Estimates: {rows: 5008 (396.65kB), cpu: 351623.96, memory: 0.00, network: 5450171.30}                                                                                                                                                                                                                                                                 >
                 - RemoteSource[4] => [i_item_sk:bigint, i_brand_id:integer, i_brand:varchar(50)]

How can we distinguish between the probe table and the build table? Could using the larger table as the build table potentially lead to a memory overflow error?

karteekmurthys commented 1 year ago

The build sides for the innerjoins in stage2 of the query 55 are pretty small. The scans from stage3 and stage4 are fed as HashBuild side for the inner joins. The stage2 slowdown is mostly coming from the TableScan of storesales and the HashProbe operator of the innerjoin "ss_item_sk" = "i_item_sk".

I ran the query twice and I am not able to repro the issue:

presto:tpcds_sf100000_parquet_varchar> SELECT "i_brand_id" "brand_id" , "i_brand" "brand" , "sum"("ss_ext_sales_price") "ext_price" FROM date_dim , store_sales , item WHERE ("d_date_sk" = "ss_sold_date_sk") AND ("ss_item_sk" = "i_item_sk") AND ("i_manager_id" = 28) AND ("d_moy" = 11) AND ("d_year" = 1999) GROUP BY "i_brand", "i_brand_id" ORDER BY "ext_price" DESC, "i_brand_id" ASC LIMIT 100;
 brand_id |        brand         |   ext_price   
----------+----------------------+---------------
  2003001 | exportiimporto #1    | 5964127572.62 
  5004001 | edu packscholar #1   | 5576632210.47 
  5002001 | importoscholar #1    | 5567732557.60 
  2002001 | importoimporto #1    | 5449225119.18 
  4004001 | edu packedu pack #1  | 5312633690.87 
  4001001 | amalgedu pack #1     | 5279164166.79 
  3004001 | edu packexporti #1   | 5261083386.89 
  3002001 | importoexporti #1    | 5209729864.53 
  2004001 | edu packimporto #1   | 5180197512.20 
  1001001 | amalgamalg #1        | 5143336367.63 
  3001001 | amalgexporti #1      | 5084697742.14 
  4002001 | importoedu pack #1   | 5044387871.94 
  3003001 | exportiexporti #1    | 4814667457.27 
  5001001 | amalgscholar #1      | 4719865362.20 
  4003001 | exportiedu pack #1   | 4707612519.32 
  1003001 | exportiamalg #1      | 4627904623.74 
  5003001 | exportischolar #1    | 4575805261.31 
  2001001 | amalgimporto #1      | 4486235392.55 
  1002001 | importoamalg #1      | 4180718016.42 
  1004001 | edu packamalg #1     | 4101025071.39 
  3003002 | exportiexporti #2    | 3192322918.02 
  4001002 | amalgedu pack #2     | 3006853728.39 
  5001002 | amalgscholar #2      | 2852154932.45 
  5002002 | importoscholar #2    | 2833324097.64 
  5004002 | edu packscholar #2   | 2796275436.75 
  2001002 | amalgimporto #2      | 2762719006.14 
yingsu00 commented 1 year ago

This cluster has 64 workers? It's different than the Benchto cluster which had 16 nodes and it's possible it doesn't reproduce. Can you run other queries?

karteekmurthys commented 1 year ago

Scale Factor: sf100000

Screenshot 2023-08-24 at 4 01 06 PM
karteekmurthys commented 1 year ago

I enabled glog signal handler in Prestissimp: https://github.com/prestodb/presto/commit/2978dc802ab9e53d674fce0cb942f53eb7a5e14a

Cluster: https://large-0-284-d7f5be4-b40n.oss-prestodb.cp.ahana.cloud/ui/ This helped generate a stack trace:

2023-08-30T14:11:53.899-07:00 | I0830 21:11:53.899216 20 TaskManager.cpp:662] cleanOldTasks: Cleaned 1 old task(s) in 0 ms
-- | --
  | 2023-08-30T14:11:57.540-07:00 | *** Aborted at 1693429917 (unix time) try "date -d @1693429917" if you are using GNU date ***
  | 2023-08-30T14:11:57.542-07:00 | PC: @ 0x0 (unknown)
  | 2023-08-30T14:11:57.542-07:00 | *** SIGSEGV (@0x7d4758940a80) received by PID 9 (TID 0x7d4f975fe700) from PID 1486097024; stack trace: ***
  | 2023-08-30T14:11:57.543-07:00 | @ 0x7f55adbc448e google::(anonymous namespace)::FailureSignalHandler()
  | 2023-08-30T14:11:57.544-07:00 | @ 0x7f55aa267cf0 (unknown)
  | 2023-08-30T14:11:57.549-07:00 | @ 0x3cd708d facebook::velox::exec::HashTable<>::insertForGroupBy()
  | 2023-08-30T14:11:57.552-07:00 | @ 0x3ce20b2 facebook::velox::exec::HashTable<>::insertBatch()
  | 2023-08-30T14:11:57.556-07:00 | @ 0x3ceb080 facebook::velox::exec::HashTable<>::rehash()
  | 2023-08-30T14:11:57.559-07:00 | @ 0x3ceb312 facebook::velox::exec::HashTable<>::checkSize()
  | 2023-08-30T14:11:57.561-07:00 | @ 0x3ceb3dc facebook::velox::exec::HashTable<>::groupProbe()
  | 2023-08-30T14:11:57.564-07:00 | @ 0x3ca8586 facebook::velox::exec::GroupingSet::addInputForActiveRows()
  | 2023-08-30T14:11:57.567-07:00 | @ 0x3ca8d05 facebook::velox::exec::GroupingSet::addInput()
  | 2023-08-30T14:11:57.572-07:00 | @ 0x3caffee facebook::velox::exec::HashAggregation::addInput()
  | 2023-08-30T14:11:57.578-07:00 | @ 0x3b7c8eb facebook::velox::exec::Driver::runInternal()
  | 2023-08-30T14:11:57.581-07:00 | @ 0x3b7d697 facebook::velox::exec::Driver::run()
  | 2023-08-30T14:11:57.583-07:00 | @ 0x3b7d83f _ZN5folly6detail8function14FunctionTraitsIFvvEE9callSmallIZN8facebook5velox4exec6Driver7enqueueESt10shared_ptrIS9_EEUlvE_EEvRNS1_4DataE
  | 2023-08-30T14:11:57.584-07:00 | @ 0x7f55abff6bd9 folly::detail::function::FunctionTraits<>::operator()()
  | 2023-08-30T14:11:57.588-07:00 | @ 0x7f55a757a954 folly::ThreadPoolExecutor::runTask()
  | 2023-08-30T14:11:57.590-07:00 | @ 0x7f55a74fb330 folly::CPUThreadPoolExecutor::threadRun()
  | 2023-08-30T14:11:57.592-07:00 | @ 0x7f55a7583055 std::__invoke_impl<>()
  | 2023-08-30T14:11:57.594-07:00 | @ 0x7f55a7582583 std::__invoke<>()
  | 2023-08-30T14:11:57.596-07:00 | @ 0x7f55a75818b3 _ZNSt5_BindIFMN5folly18ThreadPoolExecutorEFvSt10shared_ptrINS1_6ThreadEEEPS1_S4_EE6__callIvJEJLm0ELm1EEEET_OSt5tupleIJDpT0_EESt12_Index_tupleIJXspT1_EEE
  | 2023-08-30T14:11:57.599-07:00 | @ 0x7f55a7580956 std::_Bind<>::operator()<>()
  | 2023-08-30T14:11:57.602-07:00 | @ 0x7f55a757f91a folly::detail::function::FunctionTraits<>::callSmall<>()
  | 2023-08-30T14:11:57.603-07:00 | @ 0x7f55a55e1b23 (unknown)
  | 2023-08-30T14:11:57.604-07:00 | @ 0x7f55aa25d1ca start_thread

The above stack trace is from one of the workers. All workers seem to receive SEGV:

1
2023-08-30T14:11:57.542-07:00
*** SIGSEGV (@0x7d4758940a80) received by PID 9 (TID 0x7d4f975fe700) from PID 1486097024; stack trace: ***
[worker.10.78.173.169.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.173.169.presto-server$3Fstart$3D2023-08-30T21$253A11$253A57.542Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
2
2023-08-30T14:11:56.530-07:00
*** SIGSEGV (@0x7d180c04d300) received by PID 9 (TID 0x7d1ffe1fc700) from PID 201642752; stack trace: ***
[worker.10.78.174.173.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.174.173.presto-server$3Fstart$3D2023-08-30T21$253A11$253A56.530Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
3
2023-08-30T14:11:56.431-07:00
*** SIGSEGV (@0x7d1ddeb1fb00) received by PID 9 (TID 0x7d2548bfd700) from PID 18446744073150790400; stack trace: ***
[worker.10.78.149.220.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.149.220.presto-server$3Fstart$3D2023-08-30T21$253A11$253A56.431Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
4
2023-08-30T14:11:56.234-07:00
*** SIGSEGV (@0x7cef66c83480) received by PID 9 (TID 0x7cf73e1fc700) from PID 1724396672; stack trace: ***
[worker.10.78.181.82.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.181.82.presto-server$3Fstart$3D2023-08-30T21$253A11$253A56.234Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
5
2023-08-30T14:11:56.049-07:00
*** SIGSEGV (@0x7d163edc5280) received by PID 9 (TID 0x7d1dc21fc700) from PID 1054626432; stack trace: ***
[worker.10.78.135.166.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.135.166.presto-server$3Fstart$3D2023-08-30T21$253A11$253A56.049Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
6
2023-08-30T14:11:55.701-07:00
*** SIGSEGV (@0x7ddba492a800) received by PID 9 (TID 0x7de35cdfa700) from PID 18446744072175659008; stack trace: ***
[worker.10.78.170.201.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.170.201.presto-server$3Fstart$3D2023-08-30T21$253A11$253A55.701Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
7
2023-08-30T14:11:55.640-07:00
*** SIGSEGV (@0x7d625b3e7780) received by PID 9 (TID 0x7d6a60dfa700) from PID 1530820480; stack trace: ***
[worker.10.78.148.215.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.148.215.presto-server$3Fstart$3D2023-08-30T21$253A11$253A55.640Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
8
2023-08-30T14:11:55.532-07:00
*** SIGSEGV (@0x7d7544b5d800) received by PID 9 (TID 0x7d7d2d5fe700) from PID 1152768000; stack trace: ***
[worker.10.78.163.56.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.163.56.presto-server$3Fstart$3D2023-08-30T21$253A11$253A55.532Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
9
2023-08-30T14:11:55.443-07:00
*** SIGSEGV (@0x7d929a0e6d80) received by PID 9 (TID 0x7d993e1fc700) from PID 18446744071999221120; stack trace: ***
[worker.10.78.158.21.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.158.21.presto-server$3Fstart$3D2023-08-30T21$253A11$253A55.443Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
10
2023-08-30T14:11:55.434-07:00
*** SIGSEGV (@0x7d8e8855d480) received by PID 9 (TID 0x7d96affff700) from PID 18446744071701910656; stack trace: ***
[worker.10.78.176.59.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.176.59.presto-server$3Fstart$3D2023-08-30T21$253A11$253A55.434Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
11
2023-08-30T14:11:55.359-07:00
*** SIGSEGV (@0x7d69834a5200) received by PID 9 (TID 0x7d714e1fc700) from PID 18446744071617270272; stack trace: ***
[worker.10.78.165.25.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.165.25.presto-server$3Fstart$3D2023-08-30T21$253A11$253A55.359Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
12
2023-08-30T14:11:55.168-07:00
*** SIGSEGV (@0x7d1a2ab1fb00) received by PID 9 (TID 0x7d20babfd700) from PID 716307200; stack trace: ***
[worker.10.78.168.1.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.168.1.presto-server$3Fstart$3D2023-08-30T21$253A11$253A55.168Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
13
2023-08-30T14:11:55.071-07:00
*** SIGSEGV (@0x7dd882df7e00) received by PID 9 (TID 0x7ddf8cdfa700) from PID 18446744071610269184; stack trace: ***
[worker.10.78.147.183.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.147.183.presto-server$3Fstart$3D2023-08-30T21$253A11$253A55.071Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
14
2023-08-30T14:11:54.919-07:00
*** SIGSEGV (@0x7cf339951900) received by PID 9 (TID 0x7cfa84dfa700) from PID 966072576; stack trace: ***
[worker.10.78.156.44.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.156.44.presto-server$3Fstart$3D2023-08-30T21$253A11$253A54.919Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
15
2023-08-30T14:11:54.872-07:00
*** SIGSEGV (@0x7d6500fbec00) received by PID 9 (TID 0x7d6cb57fb700) from PID 16509952; stack trace: ***
[worker.10.78.149.36.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.149.36.presto-server$3Fstart$3D2023-08-30T21$253A11$253A54.872Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n
16
2023-08-30T14:11:54.779-07:00
*** SIGSEGV (@0x7d27cae04780) received by PID 9 (TID 0x7d3039fff700) from PID 18446744072818280320; stack trace: ***
[worker.10.78.128.255.presto-server](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Fahana$252Foss-prestodb$252Flarge-0-284-d7f5be4-b40n/log-events/worker.10.78.128.255.presto-server$3Fstart$3D2023-08-30T21$253A11$253A54.779Z)
932483864676:/ahana/oss-prestodb/large-0-284-d7f5be4-b40n

I spot checked few workers and they all have the same stacktrace.

aditi-pandit commented 1 year ago

@mbasmanova , @xiaoxmeng : Any ideas about this crash ?

mbasmanova commented 1 year ago

To debug something like this, try to simplify the query and still reproduce the crash. Once you have a simple query, reduce the amount of data it processed using a filter on "$path". Once you have a simple repro processing one file, you can implement the query in a unit test and use standard debugging tools.

aditi-pandit commented 1 year ago

@mbasmanova : We don't see these errors in lower scale factors of the TPC-DS including sf1K. It shows up only when query is 10K scale factor (so 10 TB) of data. We can try to determine the number of rows triggering HashTable rewrite here. But just thought to check if you'll have seen such a failure before and can share some clues.

karteekmurthys commented 1 year ago

This was the query that failed:

SELECT
    "count"(DISTINCT "cs_order_number") "order count"
     , "sum"("cs_ext_ship_cost") "total shipping cost"
     , "sum"("cs_net_profit") "total net profit"
FROM
    tpcds_sf100000_parquet_varchar.catalog_sales cs1
   , tpcds_sf100000_parquet_varchar.date_dim
   , tpcds_sf100000_parquet_varchar.customer_address
   , tpcds_sf100000_parquet_varchar.call_center
WHERE ("d_date" BETWEEN CAST('2002-2-01' AS DATE) AND (CAST('2002-2-01' AS DATE) + INTERVAL  '60' DAY))
  AND ("cs1"."cs_ship_date_sk" = "d_date_sk")
  AND ("cs1"."cs_ship_addr_sk" = "ca_address_sk")
  AND ("ca_state" = 'GA')
  AND ("cs1"."cs_call_center_sk" = "cc_call_center_sk")
  AND ("cc_county" IN ('Williamson County', 'Williamson County', 'Williamson County', 'Williamson County', 'Williamson County'))
  AND (EXISTS (
    SELECT *
    FROM
        tpcds_sf100000_parquet_varchar.catalog_sales cs2
    WHERE ("cs1"."cs_order_number" = "cs2"."cs_order_number")
      AND ("cs1"."cs_warehouse_sk" <> "cs2"."cs_warehouse_sk")
))
  AND (NOT (EXISTS (
    SELECT *
    FROM
        tpcds_sf100000_parquet_varchar.catalog_returns cr1
    WHERE ("cs1"."cs_order_number" = "cr1"."cr_order_number")
)))
ORDER BY "count"(DISTINCT "cs_order_number") ASC
    LIMIT 100

Stages 1 and 2 failed. Stage 1 has the Aggregations and stage 2 is just a table scan.

 Fragment 1 [HASH]                                                                                                                                                                        >
     Output layout: [count_317, sum_316, sum_318]                                                                                                                                         >
     Output partitioning: SINGLE []                                                                                                                                                       >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                        >
     - Aggregate(PARTIAL) => [count_317:bigint, sum_316:varbinary, sum_318:varbinary]                                                                                                     >
             count_317 := "presto.default.count"((cs_order_number)) (mask = cs_order_number_313) (29:10)                                                                                  >
             sum_316 := "presto.default.sum"((cs_net_profit)) (4:8)                                                                                                                       >
             sum_318 := "presto.default.sum"((cs_ext_ship_cost)) (3:8)                                                                                                                    >
         - MarkDistinct[distinct=cs_order_number:bigint marker=cs_order_number_313] => [cs_order_number:bigint, cs_ext_ship_cost:decimal(7,2), cs_net_profit:decimal(7,2), cs_order_number>
             - LocalExchange[HASH] (cs_order_number) => [cs_order_number:bigint, cs_ext_ship_cost:decimal(7,2), cs_net_profit:decimal(7,2)]                                               >
                 - FilterProject[filterPredicate = not(COALESCE(subquerytrue, BOOLEAN'false')), projectLocality = LOCAL] => [cs_order_number:bigint, cs_ext_ship_cost:decimal(7,2), cs_net>
                     - RightJoin[("cr_order_number" = "cs_order_number")] => [subquerytrue:boolean, cs_order_number:bigint, cs_ext_ship_cost:decimal(7,2), cs_net_profit:decimal(7,2)]    >
                             Distribution: PARTITIONED                                                                                                                                    >
                         - Project[projectLocality = LOCAL] => [subquerytrue:boolean, cr_order_number:bigint]                                                                             >
                                 Estimates: {source: CostBasedSourceInfo, rows: 9217045187 (1021.50GB), cpu: 620010869893.00, memory: 82953406683.00, network: 129655843209.00}           >
                                 subquerytrue := BOOLEAN'true'                                                                                                                            >
                             - Aggregate[cr_order_number] => [cr_order_number:bigint]                                                                                                     >
                                     Estimates: {source: CostBasedSourceInfo, rows: 9217045187 (1021.50GB), cpu: 518623372836.00, memory: 82953406683.00, network: 129655843209.00}       >
                                 - LocalExchange[HASH] (cr_order_number) => [cr_order_number:bigint]                                                                                      >
                                         Estimates: {source: CostBasedSourceInfo, rows: 14406204801 (1.56TB), cpu: 388967529627.00, memory: 0.00, network: 129655843209.00}               >
                                     - RemoteSource[2] => [cr_order_number:bigint]                                                                                                        >
                         - LocalExchange[HASH] (cs_order_number) => [cs_order_number:bigint, cs_ext_ship_cost:decimal(7,2), cs_net_profit:decimal(7,2)]                                   >
                             - FilterProject[filterPredicate = (count_315) > (BIGINT'0'), projectLocality = LOCAL] => [cs_order_number:bigint, cs_ext_ship_cost:decimal(7,2), cs_net_profi>
                                 - Aggregate(FINAL)[cs_ship_date_sk, cs_ship_addr_sk, cs_call_center_sk, cs_warehouse_sk, cs_order_number, cs_ext_ship_cost, cs_net_profit, d_date, ca_sta>
                                         count_315 := "presto.default.count"((count_319))                                                                                                 >
                                     - LocalExchange[HASH] (cs_ship_date_sk, cs_ship_addr_sk, cs_call_center_sk, cs_warehouse_sk, cs_order_number, cs_ext_ship_cost, cs_net_profit, d_date>
                                         - Aggregate(PARTIAL)[cs_ship_date_sk, cs_ship_addr_sk, cs_call_center_sk, cs_warehouse_sk, cs_order_number, cs_ext_ship_cost, cs_net_profit, d_da>
                                                 count_319 := "presto.default.count"((non_null))                                                                                          >
                                             - RightJoin[("cs_order_number_26" = "cs_order_number") AND (cs_warehouse_sk) <> (cs_warehouse_sk_23)] => [non_null:boolean, cs_ship_date_sk:b>
                                                     Estimates: {source: CostBasedSourceInfo, rows: 28966521 (3.21GB), cpu: 29698269979850.87, memory: 589553296.57, network: 287509835213>
                                                     Distribution: PARTITIONED                                                                                                            >
                                                 - RemoteSource[3] => [cs_order_number_26:bigint, cs_warehouse_sk_23:bigint, non_null:boolean]                                            >
                                                 - LocalExchange[HASH] (cs_order_number) => [cs_ship_date_sk:bigint, cs_ship_addr_sk:bigint, cs_call_center_sk:bigint, cs_warehouse_sk:big>
                                                         Estimates: {source: CostBasedSourceInfo, rows: 3358884 (381.19MB), cpu: 18484753684194.89, memory: 243464742.30, network: 5593233>
                                                     - AssignUniqueId => [cs_ship_date_sk:bigint, cs_ship_addr_sk:bigint, cs_call_center_sk:bigint, cs_warehouse_sk:bigint, cs_order_numbe>
                                                             Estimates: {source: CostBasedSourceInfo, rows: 3358884 (381.19MB), cpu: 18484407595640.62, memory: 243464742.30, network: 559>
                                                         - RemoteSource[4] => [cs_ship_date_sk:bigint, cs_ship_addr_sk:bigint, cs_call_center_sk:bigint, cs_warehouse_sk:bigint, cs_order_>
                                                                                                                                                                                          >
 Fragment 2 [SOURCE]                                                                                                                                                                      >
     Output layout: [cr_order_number]                                                                                                                                                     >
     Output partitioning: HASH [cr_order_number]                                                                                                                                          >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                        >
     - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf100000_parquet_varchar, tableName=catalog_returns, analyzePartitionValues=Optional.e>
             Estimates: {source: CostBasedSourceInfo, rows: 14406204801 (120.75GB), cpu: 129655843209.00, memory: 0.00, network: 0.00}                                                    >
             LAYOUT: tpcds_sf100000_parquet_varchar.catalog_returns{}                                                                                                                     >
             cr_order_number := cr_order_number:bigint:16:REGULAR (26:9)
ethanyzhang commented 1 year ago

Karteek is trying to track down the line of code that caused the issue by a debug build. @majetideepak to sync offline.

karteekmurthys commented 1 year ago

I enabled debug build:

2023-09-06T13:46:17.750-07:00 | PC: @ 0x0 (unknown)
-- | --
  | 2023-09-06T13:46:17.750-07:00 | *** SIGSEGV (@0x7d23f4cde500) received by PID 9 (TID 0x7d2b5dbfd700) from PID 18446744073521718528; stack trace: ***
  | 2023-09-06T13:46:17.752-07:00 | @ 0x7f3183a7048e google::(anonymous namespace)::FailureSignalHandler()
  | 2023-09-06T13:46:17.753-07:00 | @ 0x7f3180113cf0 (unknown)
  | 2023-09-06T13:46:17.775-07:00 | @ 0x70736af facebook::velox::exec::BaseHashTable::loadTags()
  | 2023-09-06T13:46:17.809-07:00 | @ 0x7082620 facebook::velox::exec::HashTable<>::insertForGroupBy()

We could exactly point to which line of code we hit the segfault:

[root@presto-worker-799d46f88-mb99l app]# addr2line -spiCe /usr/local/bin/presto_server 0x70736af
emmintrin.h:703
 (inlined by) HashTable.h:289
  loadTags(uint8_t* tags, int32_t tagIndex) {
    // Cannot use xsimd::batch::unaligned here because we need to skip TSAN.
    auto src = tags + tagIndex;
#if XSIMD_WITH_SSE2
    return TagVector(_mm_loadu_si128(reinterpret_cast<__m128i const*>(src))); <---- Causing segfault.
#elif XSIMD_WITH_NEON
    return TagVector(vld1q_u8(src));
#endif
  }
karteekmurthys commented 1 year ago

I simplified the query as follows to reproduce the issues:

SELECT
    "count"(DISTINCT "cs_order_number") "order count"
 FROM
    tpcds_sf100000_parquet_varchar.catalog_sales cs1
 WHERE
NOT (EXISTS (
    SELECT *
    FROM
        tpcds_sf100000_parquet_varchar.catalog_returns cr1
    WHERE ("cs1"."cs_order_number" = "cr1"."cr_order_number")
))
ORDER BY "count"(DISTINCT "cs_order_number") ASC
    LIMIT 100;
2023-09-06T16:13:31.977-07:00 | PC: @ 0x0 (unknown)
-- | --
  | 2023-09-06T16:13:31.977-07:00 | *** SIGSEGV (@0x7dc751ff6700) received by PID 9 (TID 0x7dcdbebfd700) from PID 1375692544; stack trace: ***
  | 2023-09-06T16:13:31.978-07:00 | @ 0x7fd432da448e google::(anonymous namespace)::FailureSignalHandler()
  | 2023-09-06T16:13:31.979-07:00 | @ 0x7fd42f447cf0 (unknown)
  | 2023-09-06T16:13:32.002-07:00 | @ 0x70736af facebook::velox::exec::BaseHashTable::loadTags()
  | 2023-09-06T16:13:32.036-07:00 | @ 0x7082620 facebook::velox::exec::HashTable<>::insertForGroupBy()
  | 2023-09-06T16:13:32.067-07:00 | @ 0x708201c facebook::velox::exec::HashTable<>::insertBatch()
  | 2023-09-06T16:13:32.080-07:00 | @ 0x7081738 facebook::velox::exec::HashTable<>::rehash()
  | 2023-09-06T16:13:32.097-07:00 | @ 0x7081f18 facebook::velox::exec::HashTable<>::checkSize()
  | 2023-09-06T16:13:32.127-07:00 | @ 0x707d2b8 facebook::velox::exec::HashTable<>::groupProbe()
  | 2023-09-06T16:13:32.138-07:00 | @ 0x7036b18 facebook::velox::exec::GroupingSet::addInputForAct
karteekmurthys commented 1 year ago

Thanks to @czentgr for pointing out the issue with loadTags().

The failure in HashTable::loadTags() was due to downcast of address offset from int64_t to int32_t. For large dataset queries with billions of rows as unique keys, the offset could easily cross int32 max and cause seg fault. PR:https://github.com/facebookincubator/velox/pull/6485 addresses the issue. Tested a query that used fail on a 16 node cluster created from this branch https://github.com/prestodb/presto/pull/20762/. Query passed:

presto> SELECT
     ->     "count"(DISTINCT "cs_order_number") "order count"
     ->      , "sum"("cs_ext_ship_cost") "total shipping cost"
     ->      , "sum"("cs_net_profit") "total net profit"
     -> FROM
     ->     tpcds_sf100000_parquet_varchar.catalog_sales cs1
     ->    , tpcds_sf100000_parquet_varchar.date_dim
     ->    , tpcds_sf100000_parquet_varchar.customer_address
     ->    , tpcds_sf100000_parquet_varchar.call_center
     -> WHERE ("d_date" BETWEEN CAST('2002-2-01' AS DATE) AND (CAST('2002-2-01' AS DATE) + INTERVAL  '60' DAY))
     ->   AND ("cs1"."cs_ship_date_sk" = "d_date_sk")
     ->   AND ("cs1"."cs_ship_addr_sk" = "ca_address_sk")
     ->   AND ("ca_state" = 'GA')
     ->   AND ("cs1"."cs_call_center_sk" = "cc_call_center_sk")
     ->   AND ("cc_county" IN ('Williamson County', 'Williamson County', 'Williamson County', 'Williamson County', 'Williamson County'))
     ->   AND (EXISTS (
     ->     SELECT *
     ->     FROM
     ->         tpcds_sf100000_parquet_varchar.catalog_sales cs2
     ->     WHERE ("cs1"."cs_order_number" = "cs2"."cs_order_number")
     ->       AND ("cs1"."cs_warehouse_sk" <> "cs2"."cs_warehouse_sk")
     -> ))
     ->   AND (NOT (EXISTS (
     ->     SELECT *
     ->     FROM
     ->         tpcds_sf100000_parquet_varchar.catalog_returns cr1
     ->     WHERE ("cs1"."cs_order_number" = "cr1"."cr_order_number")
     -> )))
     -> ORDER BY "count"(DISTINCT "cs_order_number") ASC
     ->     LIMIT 100;
 order count | total shipping cost | total net profit 
-------------+---------------------+------------------
     1636473 | 7349832475.20       | -1526555543.44   
(1 row)