munhouiani / Deep-Packet

Pytorch implementation of deep packet: a novel approach for encrypted traffic classification using deep learning
MIT License
183 stars 56 forks source link

在代码中的一些讨论 #1

Closed cmh14 closed 4 years ago

cmh14 commented 4 years ago

作者您好,我也在准备复现deep packet这篇论文,您这个项目对于我的帮助很大。在参考您的代码实际运行过程中有一些疑惑想要和您交流一下。 preprocessing.py中:arr = np.pad(arr,pad_width=(0,pad_width),mode='constant',constant_values=0),我这边运行提示mode=是不可省略参数,所以我就补上了这个不知道对不对;Parallel(n_jobs=njob),如果采用了默认的n_jobs=-1的话我运行过程中一直出现CPU和内存占用一直100%,过一段时间报memorryerror的错误,限定4或者6反而没报错。 ml/model.py中,training_step()函数中有一句y_hat = self(x),我个人理解是y_hat = self.forward(x),不知道是不是一个作用,想要得到您的解答

munhouiani commented 4 years ago

你好,

preprocessing.py 中:arr = np.pad (arr,pad_width=(0,pad_width),mode='constant',constant_values=0),我这边运行提示 mode = 是不可省略参数,所以我就补上了这个不知道对不对

我不太曉得你的 numpy 版本是多少,不過根據官方文檔,mode 的預設值是 constant,所以你這樣寫沒問題

numpy.pad(array, pad_width, mode='constant', **kwargs)

Parallel (n_jobs=njob),如果采用了默认的 n_jobs=-1 的话我运行过程中一直出现 CPU 和内存占用一直 100%,过一段时间报 memorryerror 的错误,限定 4 或者 6 反而没报错。

n_jobs 參數如果帶 -1 的話指的是使用所有的核心,所以可能會因為記憶體不足而報錯。根本的原因是因為我在 utils.py/read_pcap function 裡面用的是 rdpcap,會把 packets 全部讀到記憶體裡,如果你把 rdpcap 換成 PcapReader 的話應該就不會報錯了

ml/model.py 中,training_step () 函数中有一句 y_hat = self (x),我个人理解是 y_hat = self.forward (x),不知道是不是一个作用,想要得到您的解答

是的,這個模型是繼承 Pytorch Lightning 的 LightningModuleself(x) 其實就是呼叫 self.forward(x)

cmh14 commented 4 years ago

您好,很感谢您百忙之中提供的帮助,前两天有点事没有开发,今天复现到了create_train_test_set.py了,在main函数里面我看到了下面这些代码: os.environ['PYSPARK_PYTHON'] = sys.executable os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable memory_gb = psutil.virtual_memory().available // 1024 // 1024 // 1024 spark = ( SparkSession .builder .master('local[*]') .config('spark.driver.memory', f'{memory_gb}g') .config('spark.driver.host', '127.0.0.1') .getOrCreate() )

# prepare final schema
schema = Unischema(
    'data_schema', [
        UnischemaField('feature', np.float32, (1, 1500), CompressedNdarrayCodec(), False),
        UnischemaField('label', np.int32, (), ScalarCodec(LongType()), False),
    ]
)

这些好像是多线程处理的准备工作是吗,因为之前从未接触过相关领域的开发,想请教一下大佬是否有官方案例教程,或者大佬如果方便的话能否简单介绍一下这些语句的每一个作用大致是什么,因为还没有都写完所以这个py我还没有实际运行测试调试。

如果能得到您的帮助我将非常感谢

cmh14 commented 4 years ago

WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/C:/Users/DELL/Anaconda3/envs/pytorch/Lib/site-packages/p yspark/jars/spark-unsafe_2.11-2.4.5.jar) to method java.nio.Bits.unaligned() WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 20/04/24 16:56:24 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:379) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:394) at org.apache.hadoop.util.Shell.(Shell.java:387) at org.apache.hadoop.util.StringUtils.(StringUtils.java:80) at org.apache.hadoop.security.SecurityUtil.getAuthenticationMethod(SecurityUtil.java:611) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:273) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:261) at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:791) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:761) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:634) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2422) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2422) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2422) at org.apache.spark.SecurityManager.(SecurityManager.scala:79) at org.apache.spark.deploy.SparkSubmit.secMgr$lzycompute$1(SparkSubmit.scala:348) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$secMgr$1(SparkSubmit.scala:348) at org.apache.spark.deploy.SparkSubmit$$anonfun$prepareSubmitEnvironment$7.apply(SparkSubmit.scala:356) at org.apache.spark.deploy.SparkSubmit$$anonfun$prepareSubmitEnvironment$7.apply(SparkSubmit.scala:356) at scala.Option.map(Option.scala:146) at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:355) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:774) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 20/04/24 16:56:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where ap plicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Traceback (most recent call last): File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\pyspark\sql\utils.py", line 63, in deco return f(*a, **kw) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o39.parquet. : java.lang.IllegalArgumentException: Unsupported class file major version 56 at org.apache.xbean.asm6.ClassReader.(ClassReader.java:166) at org.apache.xbean.asm6.ClassReader.(ClassReader.java:148) at org.apache.xbean.asm6.ClassReader.(ClassReader.java:136) at org.apache.xbean.asm6.ClassReader.(ClassReader.java:237) at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49) at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517) at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134) at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500) at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175) at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238) at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631) at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355) at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307 ) at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306 ) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.collect(RDD.scala:989) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:241) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:127) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.(InMemoryFileIndex.scala:67) at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$createInMemory FileIndex(DataSource.scala:533) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:371) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:645) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:567) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:835)

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "create_train_test_set.py", line 177, in main() File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\click\core.py", line 829, in call return self.main(args, kwargs) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\click\core.py", line 782, in main rv = self.invoke(ctx) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\click\core.py", line 1066, in invoke return ctx.invoke(self.callback, ctx.params) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\click\core.py", line 610, in invoke return callback(args, *kwargs) File "create_train_test_set.py", line 158, in main df = spark.read.parquet(f'{source_data_dir_path.absolute().as_uri()}/.parquet') File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\pyspark\sql\readwriter.py", line 316, in parquet return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths))) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\py4j\java_gateway.py", line 1257, in call File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\pyspark\sql\utils.py", line 79, in deco raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.IllegalArgumentException: 'Unsupported class file major version 56'

运行create_train_test_set.py时候保运行环境的错误,估计是我环境没配好吧

munhouiani commented 4 years ago

這邊用了 pyspark 和 petastorm 處理資料 我在實作這篇論文的時候一併考慮了假設在巨量資料的情況下如何把資料處理和 pytorch 訓練串起來

主要的想法就是 Pyspark (切割 train 和 test) -> 產生 petastorm 可使用的 parquet 檔案 -> petastorm 作 pytorch 的 data reader -> pytorch 進行訓練

你可以看看 pyspark 和 petastorm 的教程

pyspark.sql.utils.IllegalArgumentException: 'Unsupported class file major version 56'

你或許要指定 java 的版本,這裡有個相關的解法,你可以根據你的環境設定看看

https://stackoverflow.com/questions/53583199/pyspark-error-unsupported-class-file-major-version-55

cmh14 commented 4 years ago

您好,你提供的那个google网盘的划分好的训练数据和测试数据的下载链接失效了,不知道大佬您是否方便重新提供一个新的

munhouiani commented 4 years ago

新連結 https://drive.google.com/file/d/1_O2LPs3RixaErigJ_WL1Ecq83VXCXptq/view?usp=sharing

cmh14 commented 4 years ago

作者您好,我重新将我的系统的java环境修改成java8以后create_train_and_test已经可以运行了,但是我的代码在运行到stage11时候开始报错停止运行了,我看了下报错信息,里面比较有用的提示信息包括如下:

cmh14 commented 4 years ago

Traceback (most recent call last): File "create_train_test_set.py", line 178, in main() File "C:\Users\hek09\Anaconda3\envs\pytorch\lib\site-packages\click\core.py", line 829, in call return self.main(args, kwargs) File "C:\Users\hek09\Anaconda3\envs\pytorch\lib\site-packages\click\core.py", line 782, in main rv = self.invoke(ctx) File "C:\Users\hek09\Anaconda3\envs\pytorch\lib\site-packages\click\core.py", line 1066, in invoke return ctx.invoke(self.callback, ctx.params) File "C:\Users\hek09\Anaconda3\envs\pytorch\lib\site-packages\click\core.py", line 610, in invoke return callback(args, *kwargs) File "create_train_test_set.py", line 164, in main under_sampling=under_sampling, data_dir_path=application_data_dir_path) File "create_train_test_set.py", line 88, in create_train_test_for_task save_train(spark, train_df, data_dir_path, schema) File "create_train_test_set.py", line 106, in save_train save_parquet(spark, df, path, schema) File "create_train_test_set.py", line 120, in save_parquet .parquet(output_path) File "C:\Users\hek09\Anaconda3\envs\pytorch\lib\site-packages\pyspark\sql\readwriter.py", line 844, in parquet self._jwrite.parquet(path) File "C:\Users\hek09\Anaconda3\envs\pytorch\lib\site-packages\py4j\java_gateway.py", line 1257, in call answer, self.gateway_client, self.target_id, self.name) File "C:\Users\hek09\Anaconda3\envs\pytorch\lib\site-packages\pyspark\sql\utils.py", line 63, in deco return f(a, **kw) File "C:\Users\hek09\Anaconda3\envs\pytorch\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o153.parquet.

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:53960) Traceback (most recent call last): File "C:\Users\hek09\Anaconda3\envs\pytorch\lib\site-packages\py4j\java_gateway.py", line 1145, in send_command self.socket.sendall(command.encode("utf-8")) ConnectionResetError: [WinError 10054] 远程主机强迫关闭了一个现有的连接。

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "C:\Users\hek09\Anaconda3\envs\pytorch\lib\site-packages\py4j\java_gateway.py", line 985, in send_command response = connection.send_command(command) File "C:\Users\hek09\Anaconda3\envs\pytorch\lib\site-packages\py4j\java_gateway.py", line 1149, in send_command "Error while sending", e, proto.ERROR_ON_SEND) py4j.protocol.Py4JNetworkError: Error while sending

ConnectionRefusedError: [WinError 10061] 由于目标计算机积极拒绝,无法连接。 ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:53960) Traceback (most recent call last): File "C:\Users\hek09\Anaconda3\envs\pytorch\lib\site-packages\py4j\java_gateway.py", line 929, in _get_connection connection = self.deque.pop() IndexError: pop from an empty deque

cmh14 commented 4 years ago

目前我环境配置运行划分时候已经没有问题,但是运行到writing saving阶段报错,而且阶段10会一下子吃掉我C盘大概80G空间,这是正常现象吗,在这方面的经验特别少,向您求教,不知道您在之前有没有遇到类似的情况

cmh14 commented 4 years ago

我刚刚做了个测试,从processed_data中提取了一个parquet文件作为create_train_test_set.py的-s输入后,报错信息如下: [Stage 9:> (0 + 12) / 200]20/06/07 19:20:42 ERROR Executor: Exception in task 1.0 in stage 9.0 (TID 408) org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:490) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:479) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:597) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(File FormatWriter.scala:232) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:582) ... 18 more [Stage 9:> (0 + 13) / 200]20/06/07 19:20:42 WARN TaskSetManager: Lost task 1.0 in stage 9.0 (TI D 408, localhost, executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:490) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:479) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:597) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(File FormatWriter.scala:232) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:582) ... 18 more

20/06/07 19:20:42 ERROR TaskSetManager: Task 1 in stage 9.0 failed 1 times; aborting job 20/06/07 19:20:42 ERROR FileFormatWriter: Aborting job b0aa29b9-042f-4362-9d54-de936645d883. org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 9.0 failed 1 times, most recent failure: Lost task 1.0 in stage 9. 0 (TID 408, localhost, executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:490) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:479) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:597) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(File FormatWriter.scala:232) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:582) ... 18 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:490) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:479) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:597) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(File FormatWriter.scala:232) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:582) ... 18 more Traceback (most recent call last): File "create_train_test_set.py", line 178, in main() File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\click\core.py", line 829, in call return self.main(args, kwargs) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\click\core.py", line 782, in main rv = self.invoke(ctx) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\click\core.py", line 1066, in invoke return ctx.invoke(self.callback, ctx.params) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\click\core.py", line 610, in invoke return callback(args, *kwargs) File "create_train_test_set.py", line 164, in main under_sampling=under_sampling, data_dir_path=application_data_dir_path) File "create_train_test_set.py", line 88, in create_train_test_for_task save_train(spark, train_df, data_dir_path, schema) File "create_train_test_set.py", line 106, in save_train save_parquet(spark, df, path, schema) File "create_train_test_set.py", line 120, in save_parquet .parquet(output_path) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\pyspark\sql\readwriter.py", line 844, in parquet self._jwrite.parquet(path) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\py4j\java_gateway.py", line 1257, in call answer, self.gateway_client, self.target_id, self.name) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\pyspark\sql\utils.py", line 63, in deco return f(a, **kw) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o153.parquet. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 9.0 failed 1 times, most recent failure: Lost task 1.0 in stage 9.0 (TID 408, localhost, executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:490) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:479) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:597) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(File FormatWriter.scala:232) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:582) ... 18 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167) ... 33 more Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:490) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:479) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:597) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(File FormatWriter.scala:232) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:582) ... 18 more

20/06/07 19:20:42 WARN TaskSetManager: Lost task 12.0 in stage 9.0 (TID 418, localhost, executor driver): TaskKilled (Stage cancelled)

(pytorch) D:\pythonproject\deep_traffic_cmh14>成功: 已终止 PID 6896 (属于 PID 17816 子进程)的进程。 成功: 已终止 PID 11856 (属于 PID 18328 子进程)的进程。 成功: 已终止 PID 17816 (属于 PID 8512 子进程)的进程。 成功: 已终止 PID 18328 (属于 PID 8512 子进程)的进程。 成功: 已终止 PID 8512 (属于 PID 13868 子进程)的进程。 成功: 已终止 PID 13868 (属于 PID 18196 子进程)的进程。 成功: 已终止 PID 18196 (属于 PID 13820 子进程)的进程。

我猜想只有大约1wpacket数据包的输入情况下应该不是性能的问题了,应该是我的环境配置还有一些问题,在运行这些代码时候还需要开启什么其他服务吗

cmh14 commented 4 years ago

大佬您好,我按照您代码执行的流程一步一步运行调试了一下以后,感觉错误可能产生在运行了change_df_schema()函数之后,对于其中的该函数下的两步操作,我试着进行每一步测试然后输出单步运行输出调试, rows_rdd = ( spark_df1 .rdd .map(row_generator) .map(lambda x: dict_to_spark_row(schema, x)) ) print(rows_rdd.first()) df = spark.createDataFrame( rows_rdd, schema.as_spark_schema() ) print(df.head(5)) 上面的两句print代码在运行时候都报错了,前一句报错的主要信息是: Traceback (most recent call last): File "D:/pythonproject/deep_traffic_cmh14/create_train_test_set.py", line 256, in print(rows_rdd.first()) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\pyspark\rdd.py", line 1378, in first rs = self.take(1) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\pyspark\rdd.py", line 1360, in take res = self.context.runJob(self, takeUpToNumLeft, p) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\pyspark\context.py", line 1069, in runJob sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\py4j\java_gateway.py", line 1257, in call answer, self.gateway_client, self.target_id, self.name) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\pyspark\sql\utils.py", line 63, in deco return f(*a, **kw) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.

后一个报错的主要信息是: Traceback (most recent call last): File "D:/pythonproject/deep_traffic_cmh14/create_train_test_set.py", line 261, in print(df.head(5)) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\pyspark\sql\dataframe.py", line 1256, in head return self.take(n) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\pyspark\sql\dataframe.py", line 573, in take return self.limit(num).collect() File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\pyspark\sql\dataframe.py", line 534, in collect sock_info = self._jdf.collectToPython() File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\py4j\java_gateway.py", line 1257, in call answer, self.gateway_client, self.target_id, self.name) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\pyspark\sql\utils.py", line 63, in deco return f(*a, **kw) File "C:\Users\DELL\Anaconda3\envs\pytorch\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o165.collectToPython.

请求大佬帮忙看一下有没有什么可能的错误原因呢

munhouiani commented 4 years ago

回應一些你之前的問題:

... 但是我的代码在运行到 stage11 时候开始报错停止运行了 ...

基本上 Spark 會自動規劃要做什麼事,所以實際上每一個 stage 具體在做什麼事前我是不知道的,所以我也不知道怎麼幫你看 stage11

... 而且阶段 10 会一下子吃掉我 C 盘大概 80G 空间,这是正常现象吗 ...

是正常的,因為這次的 dataset 比較大,在 spark 處理資料的過程中往往會產生很多 intermediate files,會一下子用掉大部分的容量,所以要確保你的硬碟容量夠大

... 上面的两句 print 代码在运行时候都报错了,前一句报错的主要信息是: ...

實在看不出來有什麼程式錯誤,可能跟環境有關 (我不是很確定 petastorm 是否有支援 windows,dict_to_spark_row 是 petastorm 的程式)

讓我先確定你的 pyspark 沒問題,spark_df1.head(5) 是會回傳 dataframe 的內容對吧?

cmh14 commented 4 years ago

大佬您方便我把我目前写的create_train_test_set.py发给您帮忙看一下吗(虽然基本是在您的基础上注释加单步的)

munhouiani commented 4 years ago

你可以發到我的信箱: mhwong2007 [at] gmail.com

cmh14 commented 4 years ago

大佬您好,已经发送给您了,麻烦您百忙之中帮忙看一下了

cmh14 commented 4 years ago

rows_rdd = spark_df1.rdd print(rows_rdd.first()) rows_rdd = rows_rdd.map(row_generator) print(rows_rdd.first()) rows_rdd = rows_rdd.map(lambda x:dict_to_spark_row(schema,x)) print(rows_rdd.first()) 我将出问题的过程分成了三步走,然后前两个都没有问题,打印正常,但是最后一个print报错了,所以是因为dict_to_spark_row(schema, x)这个函数的问题吗

saduoduo commented 4 years ago

您好,我最近在复现这篇论文,我想请问下您的环境配置,比如pytorch、petastorm、python等重要库的版本。因为我运行程序出现了问题,但问题来自库,而不是代码本身。期待得到您的回答,谢谢。 Traceback (most recent call last): File "/home/ma/tina/Deep-Packet-master/evaluation.py", line 112, in num_class=len(ID_TO_TRAFFIC) File "/home/ma/tina/Deep-Packet-master/ml/metrics.py", line 19, in confusion_matrix for batch in dataloader: File "/home/ma/anaconda3/envs/torch13/lib/python3.6/site-packages/petastorm/pytorch.py", line 121, in iter for row in self.reader: File "/home/ma/anaconda3/envs/torch13/lib/python3.6/site-packages/petastorm/reader.py", line 588, in next return self._results_queue_reader.read_next(self._workers_pool, self.schema, self.ngram) File "/home/ma/anaconda3/envs/torch13/lib/python3.6/site-packages/petastorm/py_dict_reader_worker.py", line 60, in read_next rows_as_dict = list(reversed(workers_pool.get_results())) File "/home/ma/anaconda3/envs/torch13/lib/python3.6/site-packages/petastorm/workers_pool/process_pool.py", line 257, in get_results raise pickle_serialized TypeError: read() got an unexpected keyword argument 'open_file_func'

munhouiani commented 4 years ago

@saduoduo Hi 你可以參考這份 environment.yml

name: base
channels:
  - plotly
  - pytorch
  - conda-forge
  - defaults
dependencies:
  - _libgcc_mutex=0.1=main
  - appdirs=1.4.3=py_1
  - arrow-cpp=0.15.1=py37h7cd5009_5
  - asn1crypto=1.3.0=py37_0
  - attrs=19.3.0=py_0
  - backcall=0.1.0=py_0
  - black=19.10b0=py37_0
  - blas=1.0=mkl
  - bleach=3.1.4=pyh9f0ad1d_0
  - bokeh=2.0.1=py37_0
  - boost-cpp=1.71.0=h7b6447c_0
  - brotli=1.0.7=he6710b0_0
  - bzip2=1.0.8=h7b6447c_0
  - c-ares=1.15.0=h7b6447c_1001
  - ca-certificates=2020.1.1=0
  - certifi=2019.11.28=py37_1
  - cffi=1.14.0=py37h2e261b9_0
  - chardet=3.0.4=py37_1003
  - click=7.1.1=py_0
  - cloudpickle=1.3.0=py_0
  - conda=4.8.3=py37_0
  - conda-package-handling=1.6.0=py37h7b6447c_0
  - cpuonly=1.0=0
  - cryptography=2.8=py37h1ba5d50_0
  - cycler=0.10.0=py37_0
  - cytoolz=0.10.1=py37h7b6447c_0
  - dask=2.13.0=py_0
  - dask-core=2.13.0=py_0
  - dbus=1.13.12=h746ee38_0
  - decorator=4.4.2=py_0
  - defusedxml=0.6.0=py_0
  - distributed=2.13.0=py37_0
  - double-conversion=3.1.5=he6710b0_1
  - entrypoints=0.3=py37hc8dfbb8_1001
  - expat=2.2.6=he6710b0_0
  - fontconfig=2.13.0=h9420a91_0
  - freetype=2.9.1=h8a8886c_1
  - fsspec=0.7.1=py_0
  - gflags=2.2.2=he6710b0_0
  - glib=2.63.1=h5a9c865_0
  - glog=0.4.0=he6710b0_0
  - grpc-cpp=1.26.0=hf8bcb03_0
  - gst-plugins-base=1.14.0=hbbd80ab_1
  - gstreamer=1.14.0=hb453b48_1
  - heapdict=1.0.1=py_0
  - icu=58.2=h9c2bf20_1
  - idna=2.8=py37_0
  - importlib-metadata=1.6.0=py37hc8dfbb8_0
  - importlib_metadata=1.6.0=0
  - intel-openmp=2020.0=166
  - ipykernel=5.2.0=py37h43977f1_1
  - ipython=7.13.0=py37hc8dfbb8_2
  - ipython_genutils=0.2.0=py_1
  - ipywidgets=7.5.1=py_0
  - jedi=0.16.0=py37hc8dfbb8_1
  - jinja2=2.11.1=py_0
  - joblib=0.14.1=py_0
  - jpeg=9b=h024ee3a_2
  - json5=0.9.0=py_0
  - jsonschema=3.2.0=py37hc8dfbb8_1
  - jupyter=1.0.0=py37_7
  - jupyter_client=6.1.2=py_0
  - jupyter_console=6.1.0=py_0
  - jupyter_core=4.6.3=py37hc8dfbb8_1
  - jupyterlab=1.2.6=pyhf63ae98_0
  - jupyterlab_code_formatter=1.2.2=py_0
  - jupyterlab_server=1.1.0=py_0
  - kiwisolver=1.1.0=py37he6710b0_0
  - ld_impl_linux-64=2.33.1=h53a641e_7
  - libboost=1.71.0=h97c9712_0
  - libedit=3.1.20181209=hc058e9b_0
  - libevent=2.1.8=h1ba5d50_0
  - libffi=3.2.1=hd88cf55_4
  - libgcc-ng=9.1.0=hdf63c60_0
  - libgfortran-ng=7.3.0=hdf63c60_0
  - libpng=1.6.37=hbc83047_0
  - libprotobuf=3.11.2=hd408876_0
  - libsodium=1.0.17=h516909a_0
  - libstdcxx-ng=9.1.0=hdf63c60_0
  - libtiff=4.1.0=h2733197_0
  - libuuid=1.0.3=h1bed415_2
  - libuv=1.34.0=h516909a_0
  - libxcb=1.13=h1bed415_1
  - libxml2=2.9.9=hea5a465_1
  - locket=0.2.0=py37_1
  - lz4-c=1.8.1.2=h14c3975_0
  - markupsafe=1.1.1=py37h8f50634_1
  - matplotlib=3.1.3=py37_0
  - matplotlib-base=3.1.3=py37hef1b27d_0
  - mistune=0.8.4=py37h516909a_1000
  - mkl=2020.0=166
  - mkl-service=2.3.0=py37he904b0f_0
  - mkl_fft=1.0.15=py37ha843d7b_0
  - mkl_random=1.1.0=py37hd6b4f25_0
  - msgpack-python=1.0.0=py37hfd86e86_1
  - mypy_extensions=0.4.3=py37hc8dfbb8_1
  - nbconvert=5.6.1=py37_0
  - nbformat=5.0.4=py_0
  - ncurses=6.2=he6710b0_0
  - ninja=1.9.0=py37hfd86e86_0
  - nodejs=10.13.0=he6710b0_0
  - notebook=6.0.3=py37_0
  - numpy=1.18.1=py37h4f9e942_0
  - numpy-base=1.18.1=py37hde5b4d6_1
  - olefile=0.46=py37_0
  - openssl=1.1.1f=h7b6447c_0
  - packaging=20.1=py_0
  - pandas=1.0.3=py37h0573a6f_0
  - pandoc=2.9.2=0
  - pandocfilters=1.4.2=py_1
  - parso=0.6.2=py_0
  - partd=1.1.0=py_0
  - pathspec=0.7.0=py_0
  - pcre=8.43=he6710b0_0
  - pexpect=4.8.0=py37hc8dfbb8_1
  - pickleshare=0.7.5=py37hc8dfbb8_1001
  - pillow=7.0.0=py37hb39fc2d_0
  - pip=20.0.2=py37_1
  - plotly=4.5.4=py_0
  - prometheus_client=0.7.1=py_0
  - prompt-toolkit=3.0.5=py_0
  - prompt_toolkit=2.0.9=py37_0
  - psutil=5.7.0=py37h7b6447c_0
  - ptyprocess=0.6.0=py_1001
  - pyarrow=0.15.1=py37h0573a6f_0
  - pycosat=0.6.3=py37h7b6447c_0
  - pycparser=2.19=py37_0
  - pygments=2.6.1=py_0
  - pyopenssl=19.1.0=py37_0
  - pyparsing=2.4.6=py_0
  - pyqt=5.9.2=py37h05f1152_2
  - pyrsistent=0.16.0=py37h8f50634_0
  - pysocks=1.7.1=py37_0
  - python=3.7.6=h0371630_2
  - python-dateutil=2.8.1=py_0
  - python_abi=3.7=1_cp37m
  - pytorch=1.4.0=py3.7_cpu_0
  - pytz=2019.3=py_0
  - pyyaml=5.3.1=py37h7b6447c_0
  - pyzmq=19.0.0=py37hac76be4_1
  - qt=5.9.7=h5867ecd_1
  - qtconsole=4.7.2=py_0
  - qtpy=1.9.0=py_0
  - re2=2019.08.01=he6710b0_0
  - readline=7.0=h7b6447c_5
  - regex=2020.4.4=py37h8f50634_0
  - requests=2.22.0=py37_1
  - retrying=1.3.3=py37_2
  - ruamel_yaml=0.15.87=py37h7b6447c_0
  - scikit-learn=0.22.1=py37hd81dba3_0
  - scipy=1.4.1=py37h0b6359f_0
  - seaborn=0.10.0=py_0
  - send2trash=1.5.0=py_0
  - setuptools=45.2.0=py37_0
  - sip=4.19.8=py37hf484d3e_0
  - six=1.14.0=py37_0
  - snappy=1.1.7=hbae5bb6_3
  - sortedcontainers=2.1.0=py37_0
  - sqlite=3.31.1=h7b6447c_0
  - tblib=1.6.0=py_0
  - terminado=0.8.3=py37hc8dfbb8_1
  - testpath=0.4.4=py_0
  - thrift-cpp=0.11.0=h02b749d_3
  - tk=8.6.8=hbc83047_0
  - toml=0.10.0=py_0
  - toolz=0.10.0=py_0
  - torchvision=0.5.0=py37_cpu
  - tornado=6.0.4=py37h8f50634_1
  - tqdm=4.42.1=py_0
  - traitlets=4.3.3=py37hc8dfbb8_1
  - typed-ast=1.4.1=py37h516909a_0
  - typing_extensions=3.7.4.1=py37hc8dfbb8_3
  - uriparser=0.9.3=he6710b0_1
  - urllib3=1.25.8=py37_0
  - wcwidth=0.1.9=pyh9f0ad1d_0
  - webencodings=0.5.1=py_1
  - wheel=0.34.2=py37_0
  - widgetsnbextension=3.5.1=py37_0
  - xz=5.2.4=h14c3975_4
  - yaml=0.1.7=had09818_2
  - zeromq=4.3.2=he1b5a44_2
  - zict=2.0.0=py_0
  - zipp=3.1.0=py_0
  - zlib=1.2.11=h7b6447c_3
  - zstd=1.3.7=h0b5b093_0
  - pip:
    - absl-py==0.9.0
    - aiohttp==3.6.2
    - async-timeout==3.0.1
    - autobahn==20.4.1
    - cachetools==4.0.0
    - dill==0.3.1.1
    - diskcache==4.1.0
    - future==0.18.2
    - google-auth==1.13.1
    - google-auth-oauthlib==0.4.1
    - grpcio==1.28.1
    - jupyter-server-proxy==1.3.2
    - markdown==3.2.1
    - multidict==4.7.5
    - oauthlib==3.1.0
    - petastorm==0.8.2
    - protobuf==3.11.3
    - py4j==0.10.7
    - pyasn1==0.4.8
    - pyasn1-modules==0.2.8
    - pyspark==2.4.5
    - python-graphviz==0.13.2
    - pytorch-lightning==0.7.1
    - pyx==0.15
    - requests-oauthlib==1.3.0
    - rsa==4.0
    - scapy==2.4.3
    - simpervisor==0.3
    - tensorboard==2.2.0
    - tensorboard-plugin-wit==1.6.0.post2
    - txaio==20.4.1
    - vpython==7.6.1
    - werkzeug==1.0.1
    - yarl==1.4.2
prefix: /opt/conda