Open chris-twiner opened 1 year ago
pr - [#5 ]
@garretwilson - hihi, pinging in case you haven't seen this
I had seen it, but lots of things got in front of it on the priority. Thanks for the ping. The more people want things, the higher the things get on my priority list. 😆
I'll try to address this before July 15, 2023, how's that? If I miss that feel free to ping me again.
OK, @chris-twiner . I set aside some time for this. Let's dig into this a little further. When you say "it doesn't support streaming", what are you wanting to do exactly? I assume you're referring to Spark Streaming. Bear with me a second—I haven't touched Hadoop and Spark in a year, and I wasn't working with streaming when I was working with Spark. So what exactly about Spark (or whatever you're working with) requires an AbstractFileSystem
?
If Spark is using some sort of dependency on AbstractFileSystem
to get something done, then that doesn't sound like a good thing to begin with. A better approach would be to fix the FileSystem
APIs to handle whatever Spark needs. (Then again, if Apache were committed to keeping the FileSystem
APIs up-to-date, then we wouldn't need this project to begin with, huh? 😅) Anyway, please just give me a bit more context so that I can decide the best path to take to address your needs while not creating a monstrous hack.
Secondly, if I understand the proposal correctly, I'm against creating a new class named BareStreamingLocalFileSystem
that simply wraps BareLocalFileSystem
. In fact this new class BareStreamingLocalFileSystem
would have nothing at all related to streaming. Its only reason for existence would be to get AbstractFileSystem
into the hierarchy. So a better name would be BareLocalFileSystemDecoratorSoSparkSeesAnAbstractFileSystemSystem
. That reflects its purpose. (It also illustrates the hackiness.)
So explain a little more about exactly what other component is needing AbstractFileSystem
, and what this other component is attempting to do. Thanks.
Hi, correct it's for spark structured streaming which uses AbstractFileSystem under the hood not FileSystem and as you note Hadoop/Spark aren't likely to change org.apache.hadoop.fs.FileContext.
The use case is to be able to stream from and to files on windows using spark streaming, in particular it's the checkpointing code which requires it, This is the kind of stack you get, there may be others but if you use checkpointing it's definitely hit:
at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:584)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:686)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:682)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.create(FileContext.java:688)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:311)
at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133)
at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:318)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataLog.scala:123)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
This was discovered in migrating frameless to use bare-naked. The code added was needed by these tests which use checkpoints.
Huh, it's just creating a temporary file. I wonder why it needs an AbstractFileSystem
. (The documentation of the FileContext
doesn't seem to provide any answers.)
Before just slapping on kludges, let's try to get to the heart of what's going on. @chris-twiner , where does the actual error show up for you? Do you tell it to use the BareLocalFileSystem
as the fs.file.impl
, and then it throws a ClassCastException
or something? Please let me know exactly what happens, and where it's happening, and in response to what. A stack trace of the actual error would be helpful.
The other question is: what file system is it using to begin with, which works with Winutils? It can't be the LocalFileSystem
, because that's not an AbstractFileSystem
. What file system implementation does it use if Winutils is installed? (This last question is important.)
I'm guessing (just from browsing through the code) that this method in AbstractFileSystem
may be related:
/**
* Create a file system instance for the specified uri using the conf. The
* conf is used to find the class name that implements the file system. The
* conf is also passed to the file system for its configuration.
*
* @param uri URI of the file system
* @param conf Configuration for the file system
*
* @return Returns the file system for the given URI
*
* @throws UnsupportedFileSystemException file system for <code>uri</code> is
* not found
*/
public static AbstractFileSystem createFileSystem(URI uri, Configuration conf)
throws UnsupportedFileSystemException {
final String fsImplConf = String.format("fs.AbstractFileSystem.%s.impl",
uri.getScheme());
Class<?> clazz = conf.getClass(fsImplConf, null);
if (clazz == null) {
throw new UnsupportedFileSystemException(String.format(
"%s=null: %s: %s",
fsImplConf, NO_ABSTRACT_FS_ERROR, uri.getScheme()));
}
return (AbstractFileSystem) newInstance(clazz, uri, conf);
}
It looks like there's a separate configuration for setting the fs.AbstractFileSystem.file.impl
or something. What a mess.
What I'm getting at, though, is that we may provide the other class as just an implementation of AbstractFileSystem
(not mentioning anything about streaming) and set this configuration parameter instead, as a less hacky way of doing things.
This is just after a few minutes of glancing through the code. I look forward to getting the other info.
correct, all paths lead there for streaming, even mkdir:
Caused by: java.io.IOException: Cannot run program "C:\Dev\bin\hadoop-3.2.0\bin\winutils.exe": CreateProcess error=193, %1 is not a valid Win32 application
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:934)
at org.apache.hadoop.util.Shell.run(Shell.java:900)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:677)
at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1356)
at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:185)
at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:219)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:809)
at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:805)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:812)
at org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.mkdirs(CheckpointFileManager.scala:324)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:67)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.<init>(CompactibleFileStreamLog.scala:48)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.<init>(FileStreamSinkLog.scala:91)
at org.apache.spark.sql.execution.streaming.FileStreamSink.<init>(FileStreamSink.scala:139)
at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:322)
at org.apache.spark.sql.streaming.DataStreamWriter.createV1Sink(DataStreamWriter.scala:439)
at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:404)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:231)
hence me adding that documentation change to the PR.
But @chris-twiner one of the important questions I'm asking is whether this works with Winutils installed. I think the stack trace you just showed me last is a different error, saying that Winutils isn't present. Does this actually work if Winutils is installed?
My concern is wondering why we would have to do this special with my library but not with the Winutils approach, because they both extend the same classes.
Other thing I'm seeing in your latest stack trace is this:
at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:185)
How did this DelegateToFileSystem
get into the mix? It wasn't in the stack trace you gave me earlier. Any idea where it's coming from? Did you perhaps add a workaround for Winutils like you are suggesting for my library?
I'm beginning to suspect this ticket is really something like this: "Your library works great, but we notice that neither your library nor Winutils works with Spark Streaming. To make it work with Winutils we added a DelegateToFileSystem
wrapper. If we do the same with your library it will make your library even better by working with Spark Streaming." Is that what happened?
Help me understand if this is something broken with my library so that it can be brought up to the same functionality as Winutils, or if you're proposing an improvement for a problem with Winutils as well. See what I'm asking?
Yes it works with winutils, it doesn't work without the code and configuration in the PR due to the createFileSystem code you note above delegating to an AbstractFileSystem.
Your library, as it currently stands, as noted in the first comment, does not support spark streaming. This PR adds that support.
Yes it works with winutils
Then maybe Spark Streaming is adding something into the mix that my library is overriding. I need to find out how the DelegateToFileSystem
is getting into the stack trace when you are using Spark Streaming with Winutils.
Please send me some simple instructions for what you are doing with Spark Streaming to get that stack trace. Do you just install Spark Streaming and then issue some command? Help me out a bit to know how to reproduce this, and I'll investigate further.
Spark's AbstractFileContextBasedCheckpointFileManager creates the FileContext by:
The returned FS is org.apache.hadoop.fs.local.LocalFs which extends ChecksumFs, which extends FilterFS which has the underlying "myFS", which is a RawLocalFS instance.
RawLocalFS extends DelegateToFileSystem which provides the mkdir function. That's why that's there, it's default hadoop for file:/// from FileContext.getFileContext.
I've attached a simple test project. One test runWithoutStreamingFS will fail if winutils is not present. The other will pass regardless of winutils.
Thanks, @chris-twiner . This is exactly the sort of information I was needing to dig into it deeper.
One intriguing thing is that the two file systems are LocalFs
and RawLocalFs
, the latter of which extends DelegateToFileSystem
— that's where it was coming from, as you mention. But the odd thing is that my library extends RawLocalFileSystem
and RawLocalFileSystem
instead. I don't recall the exact XXXFs
vs XXXFileSystem
distinction, but I seem to remember researching it at the time. Isn't the XXXFileSystem
form the newer one? But it looks like they have different hierarchies, which explains what Spark Streaming doesn't like here.
I assume that I used the default classes at the time. Or maybe I used the most recent hierarchy, not knowing that anything needed AbstractFileSystem
in the hierarchy. I don't remember. But now that we have these subtleties in hand, I'll look into it and see what I can dig up.
The FileSystem implementation doesn't support streaming (must be an AbstractFileSystem). Simply wrapping DelegateToFileSystem seems to be enough.