stanford-mast / pocket

Elastic ephemeral storage
118 stars 28 forks source link

Problem for registering jobs on NVMe servers #12

Closed charles-typ closed 4 years ago

charles-typ commented 4 years ago

Hi,

I tried to run jobs using DRAM storage servers which succeeded to read and write, but when I switched to NVMe servers, kubernete successfully launched a new NVMe container, but the NVMe container fails to connect to the controller's listening port.

Some logs below:

-------------------------- REGISTER JOB -------------------------------- 1579750512.3086464
received hints  test1-122643 0 1 0 0
connected to 10.1.0.10:9070
connected to 10.1.48.29:50030
generate weightmask for  test1-122643 1 72.07207207207207 0
jobid test1-122643 is throughput-bound
KUBERNETES: launch 1 extra nodes, wait for them to come up and assing proper weights [0.009009009009009009]
KUBERNETES: launch flash datanode........
controller.py:249: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
  job = yaml.load(f)
Job created. status='{'active': None,
 'completion_time': None,
 'conditions': None,
 'failed': None,
 'start_time': None,
 'succeeded': None}'
Wait for flash datanode to start...

At some point, the NVMe datanode is launched, and the metadata server returns it's capacity:

cpu  net  DRAM_GB  Flash_GB  blacklisted  reserved
datanodeip_port                                                     
10.1.48.29:50030  0.0  0.0      0.0      -1.0          0.0       0.0
Capacity usage for Tier 0 : 753662 free blocks out of 753664 ( 0.0002653702445652174 % )
Capacity usage for Tier 1 : 1703936 free blocks out of 1703936 ( 0.0 % )
Datanode usage: 
                   cpu  net  blacklisted
datanodeip_port                        
10.1.48.29:50030  0.0  0.0          0.0
**********
                  cpu  net  DRAM_GB  Flash_GB  blacklisted  reserved
datanodeip_port                                                     
10.1.48.29:50030  0.0  0.0      0.0      -1.0          0.0       0.0
Capacity usage for Tier 0 : 753662 free blocks out of 753664 ( 0.0002653702445652174 % )
Capacity usage for Tier 1 : 1703936 free blocks out of 1703936 ( 0.0 % )
**********
                  cpu  net  DRAM_GB  Flash_GB  blacklisted  reserved
datanodeip_port                                                     
10.1.48.29:50030  0.0  0.0      0.0      -1.0          0.0       0.0
Capacity usage for Tier 0 : 753662 free blocks out of 753664 ( 0.0002653702445652174 % )
Capacity usage for Tier 1 : 1703936 free blocks out of 1703936 ( 0.0 % )
Datanode usage: 
                   cpu  net  blacklisted
datanodeip_port                        
10.1.48.29:50030  2.0  0.0          0.0

And if you check the pods, you could see that the container is indeed running,

(base) ubuntu@ip-10-1-47-178:~/pocket/deploy$ kubectl get pod -o wide
NAME                                          READY   STATUS    RESTARTS   AGE     IP           NODE                                        NOMINATED NODE   READINESS GATES
pocket-datanode-dram-job0-h4drj               1/1     Running   0          9m16s   10.1.48.29   ip-10-1-48-29.us-west-2.compute.internal    <none>           <none>
pocket-datanode-nvme-job0-d9cml               1/1     Running   0          7m13s   10.1.2.3     ip-10-1-74-206.us-west-2.compute.internal   <none>           <none>
pocket-namenode-deployment-845648cb89-sg82q   1/1     Running   0          12m     10.1.0.10    ip-10-1-43-131.us-west-2.compute.internal   <none>           <none>

The code I'm using for registering a job is:

>>> import pocket
>>> jobid = "test1"
>>> namenode_ip = "10.1.0.10"
>>> jobid = pocket.register_job(jobid, capacityGB=1, latency_sensitive=0)

Also, I'm wondering if the controller lacks support for ssd nodes

Thanks for you time!

anakli commented 4 years ago

This commit added support to the controller for autoscaling the NVMe tier.

Can you first check if the cluster works for you with and NVMe datanode when you spin it up manually, without the controller? Use the following steps to do this:

  1. Edit pocketcluster.k8s.local.yaml to set the i3.2xlarge instance maxSize and minSize to 1.
  2. Run ./setup_cluster.sh and wait for the cluster to come up.
  3. python patch_cluster.py
  4. ./add_ip_routes.sh
  5. python deploy_pocket_namenode.py
  6. python create_reflex_job.py nvme 1
  7. Run the latency microbenchmark and see if it succeeds.
charles-typ commented 4 years ago

Hi!

Thank you so much for the response!

I did the steps exactly as you mentioned and the latency microbenchmark has a timeout error according to the cloudwatch logs.

Therefore, I tried to test it manually instead of using lambda.

I did:

  1. Edit pocketcluster.k8s.local.yaml to set the i3.2xlarge instance maxSize and minSize to 1.
  2. Run ./setup_cluster.sh and wait for the cluster to come up.
  3. python patch_cluster.py
  4. ./add_ip_routes.sh
  5. python deploy_pocket_namenode.py
  6. ssh to the i3.2xlarge instance and perform the exact steps one by one in the dockerfile from that instance, I could see that the reflex server is running properly and also the crail datanode runs properly

After setting all these up, I tried to run the following script, which is copied exactly from the microbenchmark:


>>> import pocket
>>> jobid = "test"
>>> namenode_ip = "10.1.0.10"
>>> p = pocket.connect(namenode_ip, 9070)
connected to 10.1.0.10:9070
>>> dir = jobid+"microbenchmark"
>>>  pocket.create_dir(p, dir, "")

This create_dir will fail due to timeout error, and from the i3.2xlarge instance, I could see that the reflex system caught an exception and is down, the exact line for the exception is here The header->magic has the value 0, while the sizeof statement has a value of 24.

After digging throw the call stack, I think the problem is because when I connect to the metadata server, and tries to create the directory, the metadata server somehow return the ip address and port of a reflex container to me but the pocket client seems to interpret it as a dram container and uses the narpc-client to send requests, so there will be this error in the reflex server since it has the wrong header.

In more detail: This line will return a narpc instead of a reflex client, which is called during the creation of the directory.

One quick question I want to ask:

How does the metadata server distinguish a dram node and a reflex node, or does it distinguish it at all?

Ultimately, our goal is to run pocket with some dram servers and some NVMe servers, and try to reproduce the Rightsizing Resource Allocations mentioned in the paper -- kind of like after running out of the dram servers, it will continue to use NVMe servers. But currently we still had problems setting up one NVMe server and read / write from it. Do you have any suggestions about this?

Thanks really much for your time and consideration!

anakli commented 4 years ago

The metadata server distinguishes a dram and nvme node based on the port number. The port number for the DRAM server is defined here as 50020 and the port number for Reflex servers is defined here as 1234.

Can you double check that when you run the datanode, you specify the storage tier explicitly? ReFlex is not the default storage tier. You can specify the tier as follows: ./bin/crail datanode -t com.ibm.crail.storage.reflex.ReFlexStorageTier

charles-typ commented 4 years ago

Hi @anakli

Thanks for the kind support regarding setting up the nvme nodes, I've tried to launch the reflex server as you suggested but the ix process will still abort, but I made some fix in the client code to get it working. I'm trying to reproduce some results from the paper and got some questions.

I want to run an experiment using pocket and a couple of dram, nvme servers. I was expecting to see that when all dram nodes are filled up with data, pocket will spill the new arriving data to the nvme nodes instead.

This assumption is according to the paper's section 4.1 Rightsizing application allocation Paragraph 3 in the middle:

Hence, Pocket fills the job’s DRAM resources before spilling to other storage tiers, in order of increasing storage latency.

When I register a job in pocket controller, I had problem to allocate both dram and nvme nodes at the same time. I found some relevant code commented out here. After uncommenting it, I still could not use dram and nvme nodes at the same time. Is there any way to make this code work?

Also a simple and concrete example of what I want to achieve:

  1. Launch one dram node(60GB) and one nvme node (110GB).
  2. Register a job with 100GB capacity hint(more than dram capacity but less than total capacity).
  3. Keep writing data via pocket API and see that it fills up the dram node first, and then automatically switch to write to the nvme node.

Does pocket supports this kind of scenario? If so, could you please give some advice of how to make the controller work?

Thank you sincerely for your time and support!

anakli commented 4 years ago

The current default policy maps latency-sensitive jobs to use the DRAM tier and other jobs to use the NVMe tier. Therefore, the default policy does not split data belonging to a single job across storage tiers, though this can be done with alternative policies in Pocket. The general mechanism that Pocket implements for data steering is weight maps. The controller assigns each job a weight map, which is an array containing the fraction of that job's data (from 0 to 1) to send to each storage node in the system. These nodes can belong to different storage tiers, which is how you can split a job's data across tiers. The current controller implementation sets static weight maps for jobs when they register (since we target short-lived jobs), but you can also experiment with adjusting weight maps dynamically to do things like write to NVMe only after the DRAM nodes fill up.

You can also experiment with writing to multiple tiers in a single job by associating different sub-directories that a job writes to with different weight maps (e.g., you can have DRAM directories and NVMe directories and directories mapped to different ratios of DRAM and NVMe). A Pocket weight map is always associated with a directory (see here). By default, the controller currently sets a weight map for each job's top-level directory, which is why in the default policy, all of a job's data is mapped to a single tier (see here).

The reason why uncommenting the code you pointed out in the controller did not enable data spilling for your job is because this just takes care of spinning up NVMe nodes when you run out of DRAM nodes, but the data steering happens via the weight masks. Hope this helps!

Ives66 commented 4 years ago

嗨@anakli

感谢您对设置nvme节点的友好支持,我已尝试按照您的建议启动反射服务器,但是ix进程仍会中止,但是我在客户端代码中进行了一些修复以使其正常运行。我正在尝试从论文中得出一些结果,并提出了一些问题。

我想使用Pocket和几个dram,nvme服务器进行实验。我期望看到所有dram节点都填满数据时,pocket会将新到达的数据溢出到nvme节点上。

该假设根据文件的第4.1节“ 合理化应用程序分配”中的第3段:

因此,Pocket会先增加作业的DRAM资源,然后再溢出到其他存储层,以增加存储延迟。

在Pocket Controller中注册作业时,我无法同时分配dram和nvme节点。我发现这里注释了一些相关的代码。取消注释后,我仍然无法同时使用dram和nvme节点。有什么办法可以使此代码起作用?

也是我要实现的一个简单而具体的示例:

  1. 启动一个dram节点(60GB)和一个nvme节点(110GB)。
  2. 注册一个具有100GB容量提示的作业(大于dram容量但小于总容量)。
  3. 继续通过Pocket API写入数据,并查看数据是否先填满了dram节点,然后自动切换为写入nvme节点。

口袋是否支持这种情况?如果是这样,请您提供一些有关如何使控制器工作的建议?

衷心感谢您的时间和支持!

Hi I meet the same problem, it confused me a lot. What should i change in the client code and fix this problem? thanks.

anakli commented 4 years ago

Hi Ives66, if you would like to split data for a single job between DRAM and NVMe nodes, you can modify the generate_weigthmask function in controller.py. The controller generates a weightmask for each job when the job registers. This weightmask controls how data is written across nodes for a particular job. The job's weightmask has a value of 0 to 1 for each datanode in the cluster, representing what fraction of the job's data should go to that datanode (this is used to select a datanode using weighted round-robin when writing data for that job). The sum of all weights in each job's weightmask should add up to 1.

Currently, the default policy uses DRAM exclusively for latency sensitive jobs and Flash for non-latency sensitive jobs. You can modify the generate_weightmask policy by adding a third category of jobs whose data can span both DRAM and Flash.

If you'd like to make the job use all DRAM first until it fills up and then switch to using Flash, you will need dynamic weightmask adjustment. The controller tracks how much capacity is used/remaining on each datanode (this information is received in heartbeats from each datanode), so the controller has the information to know when a tier fills up. You can use this as a signal to adjust the weightmasks dynamically while jobs are running when a tier fills up.

Ives66 commented 4 years ago

Hi Ives66, if you would like to split data for a single job between DRAM and NVMe nodes, you can modify the generate_weigthmask function in controller.py. The controller generates a weightmask for each job when the job registers. This weightmask controls how data is written across nodes for a particular job. The job's weightmask has a value of 0 to 1 for each datanode in the cluster, representing what fraction of the job's data should go to that datanode (this is used to select a datanode using weighted round-robin when writing data for that job). The sum of all weights in each job's weightmask should add up to 1.

Currently, the default policy uses DRAM exclusively for latency sensitive jobs and Flash for non-latency sensitive jobs. You can modify the generate_weightmask policy by adding a third category of jobs whose data can span both DRAM and Flash.

If you'd like to make the job use all DRAM first until it fills up and then switch to using Flash, you will need dynamic weightmask adjustment. The controller tracks how much capacity is used/remaining on each datanode (this information is received in heartbeats from each datanode), so the controller has the information to know when a tier fills up. You can use this as a signal to adjust the weightmasks dynamically while jobs are running when a tier fills up.

Thanks a lot!