hail-is / hail

Cloud-native genomic dataframes and batch computing
https://hail.is
MIT License
977 stars 245 forks source link

[query] matrix multiply fails #13688

Closed danking closed 9 months ago

danking commented 1 year ago

What happened?

Cal, Lindo, Leo, report that Hail’s matrix multiplication fails on certain pipelines for no obvious reason. I think this is only in QoB/lowered code.

Version

0.2.122

Relevant log output

No response

danking commented 1 year ago

Lindo reports an error with a side-length of 177860. Cal reports a side length of 544768. The square of both is larger than 2^32.

danking commented 1 year ago

Cal is LD prune. Lindo is in PCA.

danking commented 1 year ago
FatalError                                Traceback (most recent call last)
/tmp/ipykernel_10092/1120523425.py in <cell line: 8>()
      6 # Connection refused: qc-sw-nvpv.c.covid-19-wgs-analysis.internal/10.128.0.182:7337
      7 # https://hail.zulipchat.com/#narrow/stream/123010-Hail-Query-0.2E2-support/topic/pc_related.2E.20how.20slow.3F
----> 8 relatedness_ht = hl.pc_relate(hq_mt_subset.GT, 0.01, k=10, statistics='kin',
      9                               include_self_kinship=include_kinself, block_size=2048)
     10 

<decorator-gen-1794> in pc_relate(call_expr, min_individual_maf, k, scores_expr, min_kinship, statistics, block_size, include_self_kinship)

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    585     def wrapper(__original_func: Callable[..., T], *args, **kwargs) -> T:
    586         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 587         return __original_func(*args_, **kwargs_)
    588 
    589     return wrapper

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/methods/relatedness/pc_relate.py in pc_relate(call_expr, min_individual_maf, k, scores_expr, min_kinship, statistics, block_size, 
include_self_kinship)
    314 
    315     if k and scores_expr is None:
--> 316         _, scores, _ = hwe_normalized_pca(call_expr, k, compute_loadings=False)
    317         scores_expr = scores[mt.col_key].scores
    318     elif not k and scores_expr is not None:

<decorator-gen-1778> in hwe_normalized_pca(call_expr, k, compute_loadings)

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    585     def wrapper(__original_func: Callable[..., T], *args, **kwargs) -> T:
    586         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 587         return __original_func(*args_, **kwargs_)
    588 
    589     return wrapper

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/methods/pca.py in hwe_normalized_pca(call_expr, k, compute_loadings)
     99         return _hwe_normalized_blanczos(call_expr, k, compute_loadings)
    100 
--> 101     return pca(hwe_normalize(call_expr),
    102                k,
    103                compute_loadings)

<decorator-gen-1780> in pca(entry_expr, k, compute_loadings)

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    585     def wrapper(__original_func: Callable[..., T], *args, **kwargs) -> T:
    586         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 587         return __original_func(*args_, **kwargs_)
    588 
    589     return wrapper

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/methods/pca.py in pca(entry_expr, k, compute_loadings)
    209         'k': k,
    210         'computeLoadings': compute_loadings
--> 211     })).persist())
    212 
    213     g = t.index_globals()

<decorator-gen-1340> in persist(self, storage_level)

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    585     def wrapper(__original_func: Callable[..., T], *args, **kwargs) -> T:
    586         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 587         return __original_func(*args_, **kwargs_)
    588 
    589     return wrapper

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/table.py in persist(self, storage_level)
   2110             Persisted table.
   2111         """
-> 2112         return Env.backend().persist(self)
   2113 
   2114     def unpersist(self) -> 'Table':

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/backend/backend.py in persist(self, dataset)
    167         from hail.context import TemporaryFilename
    168         tempfile = TemporaryFilename(prefix=f'persist_{type(dataset).__name__}')
--> 169         persisted = dataset.checkpoint(tempfile.__enter__())
    170         self._persisted_locations[persisted] = (tempfile, dataset)
    171         return persisted

<decorator-gen-1330> in checkpoint(self, output, overwrite, stage_locally, _codec_spec, _read_if_exists, _intervals, _filter_intervals)

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    585     def wrapper(__original_func: Callable[..., T], *args, **kwargs) -> T:
    586         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 587         return __original_func(*args_, **kwargs_)
    588 
    589     return wrapper

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/table.py in checkpoint(self, output, overwrite, stage_locally, _codec_spec, _read_if_exists, _intervals, _filter_intervals)
   1329 
   1330         if not _read_if_exists or not hl.hadoop_exists(f'{output}/_SUCCESS'):
-> 1331             self.write(output=output, overwrite=overwrite, stage_locally=stage_locally, _codec_spec=_codec_spec)
   1332             _assert_type = self._type
   1333             _load_refs = False

<decorator-gen-1332> in write(self, output, overwrite, stage_locally, _codec_spec)

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    585     def wrapper(__original_func: Callable[..., T], *args, **kwargs) -> T:
    586         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 587         return __original_func(*args_, **kwargs_)
    588 
    589     return wrapper

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/table.py in write(self, output, overwrite, stage_locally, _codec_spec)
   1375         hl.current_backend().validate_file_scheme(output)
   1376 
-> 1377         Env.backend().execute(ir.TableWrite(self._tir, ir.TableNativeWriter(output, overwrite, stage_locally, _codec_spec)))
   1378 
   1379     @typecheck_method(output=str,

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/backend/py4j_backend.py in execute(self, ir, timed)
     80             return (value, timings) if timed else value
     81         except FatalError as e:
---> 82             raise e.maybe_user_error(ir) from None
     83 
     84     async def _async_execute(self, ir, timed=False):

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/backend/py4j_backend.py in execute(self, ir, timed)
     74         # print(self._hail_package.expr.ir.Pretty.apply(jir, True, -1))
     75         try:
---> 76             result_tuple = self._jbackend.executeEncode(jir, stream_codec, timed)
     77             (result, timings) = (result_tuple._1(), result_tuple._2())
     78             value = ir.typ._from_encoding(result)

/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1319 
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   1323 

/opt/conda/miniconda3/lib/python3.10/site-packages/hail/backend/py4j_backend.py in deco(*args, **kwargs)
     33             tpl = Env.jutils().handleForPython(e.java_exception)
     34             deepest, full, error_id = tpl._1(), tpl._2(), tpl._3()
---> 35             raise fatal_error_from_java_error_triplet(deepest, full, error_id) from None
     36         except pyspark.sql.utils.CapturedException as e:
     37             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: ArrayIndexOutOfBoundsException: Index 177860 out of bounds for length 177860

Java stack trace:
java.lang.ArrayIndexOutOfBoundsException: Index 177860 out of bounds for length 177860
    at org.netlib.blas.Dcopy.dcopy(blas.f)
    at org.netlib.arpack.Dsaitr.dsaitr(arpack.f)
    at org.netlib.arpack.Dsaup2.dsaup2(arpack.f)
    at org.netlib.arpack.Dsaupd.dsaupd(arpack.f)
    at dev.ludovic.netlib.arpack.F2jARPACK.dsaupdK(F2jARPACK.java:189)
    at dev.ludovic.netlib.arpack.AbstractARPACK.dsaupd(AbstractARPACK.java:560)
    at dev.ludovic.netlib.arpack.F2jARPACK.dsaupd(F2jARPACK.java:30)
    at dev.ludovic.netlib.arpack.AbstractARPACK.dsaupd(AbstractARPACK.java:536)
    at dev.ludovic.netlib.arpack.F2jARPACK.dsaupd(F2jARPACK.java:30)
    at org.apache.spark.mllib.linalg.EigenValueDecomposition$.symmetricEigs(EigenValueDecomposition.scala:106)
    at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeSVD(RowMatrix.scala:385)
    at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeSVD(RowMatrix.scala:311)
    at org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix.computeSVD(IndexedRowMatrix.scala:231)
    at is.hail.methods.PCA.execute(PCA.scala:41)
    at is.hail.expr.ir.functions.WrappedMatrixToTableFunction.execute(RelationalFunctions.scala:52)
    at is.hail.expr.ir.TableToTableApply.execute(TableIR.scala:3379)
    at is.hail.expr.ir.TableIR.analyzeAndExecute(TableIR.scala:61)
    at is.hail.expr.ir.Interpret$.run(Interpret.scala:865)
    at is.hail.expr.ir.Interpret$.alreadyLowered(Interpret.scala:59)
    at is.hail.expr.ir.LowerOrInterpretNonCompilable$.evaluate$1(LowerOrInterpretNonCompilable.scala:20)
    at is.hail.expr.ir.LowerOrInterpretNonCompilable$.rewrite$1(LowerOrInterpretNonCompilable.scala:58)
    at is.hail.expr.ir.LowerOrInterpretNonCompilable$.apply(LowerOrInterpretNonCompilable.scala:63)
    at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.transform(LoweringPass.scala:67)
    at is.hail.expr.ir.lowering.LoweringPass.$anonfun$apply$3(LoweringPass.scala:16)
    at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
    at is.hail.expr.ir.lowering.LoweringPass.$anonfun$apply$1(LoweringPass.scala:16)
    at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
    at is.hail.expr.ir.lowering.LoweringPass.apply(LoweringPass.scala:14)
    at is.hail.expr.ir.lowering.LoweringPass.apply$(LoweringPass.scala:13)
    at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.apply(LoweringPass.scala:62)
    at is.hail.expr.ir.lowering.LoweringPipeline.$anonfun$apply$1(LoweringPipeline.scala:22)
    at is.hail.expr.ir.lowering.LoweringPipeline.$anonfun$apply$1$adapted(LoweringPipeline.scala:20)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:20)
    at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:50)
    at is.hail.backend.spark.SparkBackend._execute(SparkBackend.scala:462)
    at is.hail.backend.spark.SparkBackend.$anonfun$executeEncode$2(SparkBackend.scala:498)
    at is.hail.backend.ExecuteContext$.$anonfun$scoped$3(ExecuteContext.scala:75)
    at is.hail.utils.package$.using(package.scala:635)
    at is.hail.backend.ExecuteContext$.$anonfun$scoped$2(ExecuteContext.scala:75)
    at is.hail.utils.package$.using(package.scala:635)
    at is.hail.annotations.RegionPool$.scoped(RegionPool.scala:17)
    at is.hail.backend.ExecuteContext$.scoped(ExecuteContext.scala:63)
    at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:350)
    at is.hail.backend.spark.SparkBackend.$anonfun$executeEncode$1(SparkBackend.scala:495)
    at is.hail.utils.ExecutionTimer$.time(ExecutionTimer.scala:52)
    at is.hail.backend.spark.SparkBackend.executeEncode(SparkBackend.scala:494)
    at jdk.internal.reflect.GeneratedMethodAccessor109.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)

Hail version: 0.2.122-be9d88a80695
Error summary: ArrayIndexOutOfBoundsException: Index 177860 out of bounds for length 177860
danking commented 1 year ago

@patrick-schultz I assume no news on this front? I feel like you've got a lot on your plate right now; does it make sense to transfer this or something to someone else?

patrick-schultz commented 1 year ago

Another set of eyes on this would be great. My current thoughts on this:

I only looked at the failure in PCA. I was never able to reproduce. My next step to try to reproduce was to run PCA on Lindo's full dataset on dataproc (can't use batch because the error is in spark PCA).

I did look carefully through the stack trace, trying to understand what could possibly be happening. The number 177860 from the error isn't either matrix dimension, which is 210234 by 8893. Everything in org.apache.spark.mllib.linalg.EigenValueDecomposition$.symmetricEigs(EigenValueDecomposition.scala:106) is independent of the number of rows, so only the number 8893 of cols should be relevent.

I wrote a simple test to execute spark PCA with 8893 rows in scala, so I could step through with a debugger:

    var mt = rangeMatrix(10000, 8893)
    mt = MatrixMapEntries(mt, InsertFields(Ref("g", mt.typ.entryType), Seq("a" -> F64(1))))
    val t = MatrixToTableApply(mt, PCA("a", 10, false))
    val n = TableToValueApply(t, ForceCountTable())
    assertEvalsTo(n, 8893L)

The array v in symmetricEigs has length 177860 = 8893*20, and I didn't find anything else with that size. The only line I could find that could generate an exception that looks like this is line 555 of dev.ludovic.netlib.arpack.AbstractARPACK.dsaupd

  public void dsaupd(org.netlib.util.intW ido, String bmat, int n, String which, int nev, org.netlib.util.doubleW tol, double[] resid, int offsetresid, int ncv, double[] v, int offsetv, int ldv, int[] iparam, int offsetiparam, int[] ipntr, int offsetipntr, double[] workd, int offsetworkd, double[] workl, int offsetworkl, int lworkl, org.netlib.util.intW info) {
    if (debug) System.err.println("dsaupd");
    checkArgument("DSAUPD", 2, lsame("I", bmat) || lsame("G", bmat));
    checkArgument("DSAUPD", 3, n >= 0);
    checkArgument("DSAUPD", 4, lsame("LA", which) || lsame("SA", which) || lsame("LM", which) || lsame("SM", which) || lsame("BE", which));
    checkArgument("DSAUPD", 5, 0 < nev && nev < n);
    checkArgument("DSAUPD", 15, lworkl >= Math.pow(ncv, 2)+ 8 * ncv);
    requireNonNull(ido);
    requireNonNull(tol);
    requireNonNull(resid);
    requireNonNull(v);
    requireNonNull(iparam);
    requireNonNull(ipntr);
    requireNonNull(workd);
    requireNonNull(info);
    checkIndex(offsetresid + n - 1, resid.length);
    checkIndex(offsetv + n * ncv - 1, v.length); // HERE
    checkIndex(offsetiparam + 11 - 1, iparam.length);
    checkIndex(offsetipntr + 11 - 1, ipntr.length);
    checkIndex(offsetworkd + 3 * n - 1, workd.length);
    checkIndex(offsetworkl + lworkl - 1, workl.length);
    dsaupdK(ido, bmat, n, which, nev, tol, resid, offsetresid, ncv, v, offsetv, ldv, iparam, offsetiparam, ipntr, offsetipntr, workd, offsetworkd, workl, offsetworkl, lworkl, info);
  }

where v.length = 177860, n = 8893, and ncv = 20. checkIndex is

  private void checkIndex(int index, int length) {
    if (index < 0 || index >= length) {
      throw new IndexOutOfBoundsException(String.format("Index %s out of bounds for length %s", index, length));
    }
  }

which a) shouldn't throw in this case, b) throws an IndexOutOfBoundsException, not an ArrayIndexOutOfBoundsException, and c) isn't the root of the stacktrace

    at org.netlib.blas.Dcopy.dcopy(blas.f)
    at org.netlib.arpack.Dsaitr.dsaitr(arpack.f)
    at org.netlib.arpack.Dsaup2.dsaup2(arpack.f)
    at org.netlib.arpack.Dsaupd.dsaupd(arpack.f)
    at dev.ludovic.netlib.arpack.F2jARPACK.dsaupdK(F2jARPACK.java:189)
    at dev.ludovic.netlib.arpack.AbstractARPACK.dsaupd(AbstractARPACK.java:560)
    at dev.ludovic.netlib.arpack.F2jARPACK.dsaupd(F2jARPACK.java:30)
    at dev.ludovic.netlib.arpack.AbstractARPACK.dsaupd(AbstractARPACK.java:536)
    at dev.ludovic.netlib.arpack.F2jARPACK.dsaupd(F2jARPACK.java:30)
    at org.apache.spark.mllib.linalg.EigenValueDecomposition$.symmetricEigs(EigenValueDecomposition.scala:106)

So it looks like somehow bad indices are making it through all these checks and a bad array access is hapenning deep in blas?

So I'm at a complete loss. If anybody else could take a stab at it with a fresh perspective, I'd be happy to discuss, but I'm low on bandwidth to keep digging myself.

danking commented 11 months ago

Leo's is a matrix multiplication problem and he includes a tiny reproducer:

https://hail.zulipchat.com/#narrow/stream/123010-Hail-Query-0.2E2-support/topic/.60pc_relate.60.20crash.20with.200.2E2.2E110

danking commented 11 months ago

todo:

  1. Determine versions of LAPACK and BLAS in use in Dataproc, laptop, and QoB
  2. Try to reproduce in Dataproc
danking commented 11 months ago

Cal's bug was in 0.2.112 which is the version before Chris' fix landed (0.2.113) https://github.com/hail-is/hail/commit/4d5f1704a5cafc3b5d2cde35a307f260e8376be1 .

I think that leaves only Lindo's bug as likely still present.

danking commented 10 months ago

OK, so, this really feels like bad data.

We just merged https://github.com/hail-is/hail/commit/98adcce1d07001995b0819fd6afe161bf34ba840 which fixed https://github.com/hail-is/hail/issues/13979 . Google Cloud Storage's Java library was very rarely returning just flat-out bad data.

The frequency of occurrence on one particularly large pipeline appears to be 1/30000 tasks (0.003% or 3 in 100,000). The tasks were reading two files, the larger of which was 131MiB. The Java library reads in 8MiB chunks so that's at least 17 network requests per partition. That puts the frequency of this closer to 1 in 1,000,000 requests or 1 in 10TiB of data read.

Before we had Zstandard, it seems that this data corruption either (a) was unnoticed (b) caused a rare decoding error or (c) caused segfaults. After we added Zstandard (0.2.119), decompression often failed due to corrupt data. It seems to me that Zstandard more aggressively verifies integrity than LZ4 does.

OK, so, when was this bug introduced in Hail? As far as I can tell, this new code path was added in google-cloud-storage 2.17.0 almost one year ago: https://github.com/googleapis/java-storage/commit/94cd2887f22f6d1bb82f9929b388c27c63353d77 . We upgraded to 2.17.1 (😭 ) in Hail 0.2.109 https://github.com/hail-is/hail/commit/fec0cc2263c04c00e02cef5dda8ec46916717152 . All of the attempts above could have been plagued by this rare transient data corruption error.

OK, action items:

Once the first action item is successfully completed, I will close this issue. For the second action item, I have created a separate ticket.

danking commented 9 months ago

Resolved as of 0.2.127.

https://hail.zulipchat.com/#narrow/stream/123010-Hail-Query-0.2E2-support/topic/pc_relate.20crash.200.2E2.2E122-be9d88a80695/near/416091480