Ray Source-code Reading
Summary
This article introduces the source code of ray(1.8.x+).
To be translated
Oh Sorry!
This blog has’t been translated to English, please wait for a little while…
重点问题
函数信息如何序列化? 答:使用cloudpickle序列化:详见下文cloudpickle
如何维护函数池并实现全局可用 答:
- 将函数信息序列化后放到redis处:详见下文
run_function_on_all_workers
- 在需要的时候取回来
- 将函数信息序列化后放到redis处:详见下文
如何处理用户参数(args和kwargs)? 答:
[key1, value1, key2, value2]
以此类推;如果参数里有objectRef
,则会先获取对应value然后取回。如何处理嵌套函数(一个函数里调用另一个函数) 答:通过序列化闭包直接完成
如何处理函数依赖(一个函数的参数为另一个函数的结果) 答:即上文处理函数参数中
objectRef
的部分如何打包运行时环境 答:并没有打包,但可以通过
runtime_env
参数处理运行时环境,也可以传pip config等帮助初始化环境。如何处理函数中用到的全局变量 答:
- 在序列化函数的时候,直接将全局变量作为
global_ref
并序列化到函数中,反序列化后直接拿到这个数值。详见下文类及全局变量 - 同spark一样,ray的全局变量可读,但写不安全。
- 在序列化函数的时候,直接将全局变量作为
ray.init
init(address: Optional[str] = None, *, ……)
注意到第二个参数就是*,在此之后的参数将只能通过key-value形式传递
init支持三种模式
ray.init()
——本地运行,启动所有关联的进程(包括Redis, a raylet(local scheduler), a plasma store, a plasma manager, and some workers)ray.init(address="localhost:6379")
——连接到一个本地的clusterray.init(address="ray://123.45.67.89:10001")
——连接到一个remote ray集群
local_mode
参数可以支持让代码串行,便于debuginclude_dashboard
参数为True可以让ray起一个本地的dashboard显示状态_enable_object_reconstruction
可以让ray在分布式环境下,丢失object后重新运行获得它的task来再次获得object重点X:在第三种模式中,会启动一个
ray.client
,解析目前参数并传给它初始化时还会尝试增加
file descriptor limit
,获取更大空间如果传进了
runtime_env
,启动ray.job_config.JobConfig()
并将runtime_env传给它创建或连接初始结点
- 在第一种模式中,会创建一个新集群,全局变量
_global_node
为头结点(传入head=True) - 在第二或者第三种模式中,全局变量
_global_node
为子结点(传入head=False),并且connect_only = True
- 在第一种模式中,会创建一个新集群,全局变量
之后,如果有
job_config
的话,会调用working_dir_pkg.rewrite_runtime_env_uris
来重写URI重点√:调用
connect()
function正式连接,详见下文。最后返回
dict(_global_node.address_info, node_id=global_worker.core_worker.get_current_node_id().hex())
def connect(node, ……)
- 连接当前node到
raylet
,Plasma
, 和Redis
- 调用
node.create_redis_client()
创建 redis client作为worker的属性 - 调用
_initialize_global_state
lazy初始化global state(lazy即只是存储了redis的address和密码),并没有真的初始化 - 如果是
driver
mode的话,初始化一个jobid - 检查当前结点ray、python版本和集群ray、python版本是否一致(需要后者在redis中有version数据),不一致会报错
- 填充field到
job_config
中 - 重点X:使用当前参数,初始化一个coreworker——
worker.core_worker = ray._raylet.CoreWorker()
,并拿到gcs_client
- 重点X:启动一个import thread——
import_thread.ImportThread
- 根据mode,启动
listener thread
和log thread
- 重点X:如果非交互模式(正常脚本模式),调用
worker.run_function_on_all_workers
封装一个匿名函数sys.path.insert(1, script_directory)
传进去,从这里我们可以看出ray其实是直接把脚本copy到working_dir的,然后用函数meta信息和上下文直接import对应函数执行。该函数具体信息见ray.worker。 - 如果client模式,
job_config
没带有working_dir
的,同上处理 - 之后将所有
worker.cached_functions_to_run
调用run_function_on_all_workers
- 最后加一个
tracing
来跟踪
ray.remote
- 感想:ray的api设计还是很优雅的
- ray在调用函数如
func.remote
后,返回ObjectRef
对象。 - 当ray.get()的时候,拿着
ObjectRef
去ObjectStore
里面找,如果有就返回值,如果没有就阻塞直到返回值。
def remote(*args, **kwargs)
- 可以支持function/actor,可以手动分配资源
@ray.remote(num_gpus=1, max_calls=1, num_returns=2)
- 当actor对象被 delete后,会完成当前任务再杀死,如果很急着关闭,可以调用
ray.kill()
- 首先拿到global worker(每个进程一个),然后进行判断,如果没有传参直接
make_decorator(worker=worker)(args[0])
,有传参则解析传参然后make_decorator
并将参数传入。 - 目前支持的参数有
|
|
def make_decorator(参数略)
- 用inspect判断是函数还是类,如果是函数检查参数,并返回
ray.remote_function.RemoteFunction
- 如果是类,检查参数,并返回
ray.actor.make_actor()
class RemoteFunction
__init__()
:- 首先检查是否协程,目前不支持async def function
- 对于传入的function,加上
_inject_tracing_into_function
包裹以获取函数trace - 重点√:使用
ParsedRuntimeEnv
来parse本地pip/conda config文件,详细见下文 - 重点√:使用
ray._private.signature.extract_signature
来extract(类)函数的signature,实际为inspect.signature
获取function的参数和默认值。 - 供用户调用的
.remote
由_remote_proxy
调用_remote
生成
__call__()
:不允许直接调用,报错返回options()
:给用户的调整参数的方式,会重新wrap一个RemoteFunction
给用户_remote
:- 首先判断是不是
client_mode
,如果是,调用client_mode_convert_function
返回ClientRemoteFunction
重点X - 如果不是,拿到process的global_worker并调用
worker.check_connected()
检查连接 - 之后检查这个函数是否被exported到这个process(使用
_last_export_session_and_job
与worker.current_session_and_job
作比较)。如果没有,重点√:使用ray自定义的cloudpickle序列化该函数,标记_last_export_session_and_job
和_function_descriptor
属性,并用worker.function_actor_manager.export(self)
export。cloudpickle详见下文。 - 整理参数,
check_placement_group_index
检查放置的group的index - 调用
resources_from_resource_arguments
整理所有参数为字典 - 重点X,拿到当前worker的
parent_env
,连同当前task的parsed_env
调用override_task_or_actor_runtime_env
获取merged_env
- 之后,根据是否cross_language 处理参数(如
flatten_args
,将signature和新传入的参数处理为单个list,如flatten_args([1, 2, 3], {"a": 4})
处理为[None, 1, None, 2, None, 3, "a", 4]
,None
为dummy,指没有keyword的positional param) - 最后,调用
worker.core_worker.submit_task
提交任务,拿到object_refs
,流程结束。
- 首先判断是不是
ParsedRuntimeEnv
类主要是运行环境相关的处理和依赖,没有包括全局变量等,使用
json
进行序列化和反序列化目前支持的字段有:
1 2 3 4 5 6 7 8
known_fields: Set[str] = { "working_dir", # 指定worker的工作目录,可以是一个目录也可以是一个archive压缩包,会将archive压缩包解压到指定工作目录 "conda", "pip", # pip requirements.txt或conda yaml "uris", # 所需uri路径(uri是一个zip包) "containers", # 支持docker镜像 "env_vars", # 环境变量 "excludes", "_ray_release", "_ray_commit", "_inject_current_ray", "plugins" }
ray.worker
- 重要属性
node (ray.node.Node)
:worker所在结点mode
:SCRIPT_MODE, LOCAL_MODE, WORKER_MODE
中的一个cached_functions_to_run (List)
:worker要运行的函数列表memory_monitor
:memory_monitor.MemoryMonitor()
——在低内存环境处理报错信息function_actor_manager
:重点X 管理functions/actors的export/load
- 感想:类设计也很优雅,@property处理对外对内的变量,便于调用和维护
worker.run_function_on_all_workers
- 注意这个函数并非我们直接的函数,而是封装过只接受一个worker info的参数。
- 如果还没
ray.init()
,就加入到cache里 - 如果已经init了,那就序列化(ray自带的
cloudpickle
)这个函数 - 重点X:直接在driver尝试运行这个函数,
function({"worker": self})
,以校验这个函数是否正常。(注释写明:如果一个有问题的函数,最好在被exported之前被发现) self.redis_client.setnx(b"Lock:" + key, 1)
尝试set(set if not exist),如果返回1,则加key成功,如果返回0,则没有加key成功(已经被用过了)来判断函数是否已经exported。
例子:
|
|
- 重点√:
check_oversized_function()
校验序列化函数是否太大,如果太大发送一个warning。 - 重点√:使用redis的hset(设置哈希表键值),保存相关信息,随后rpush(推送到队列尾)。
|
|
cloudpickle——重写原生pickle
- 目的:序列化lambda和嵌套函数;正确处理
main
module;处理其他不能被序列化的objects - 没有反序列化部分,这一部分用原生pickle做
- 非常非常厉害的序列化工具:可以直接序列化类、类变量和类方法,全局变量也直接被序列化,甚至能够直接嵌套序列化,将用到的东西一起序列化,形成完整的package。
CloudPickler
- 首先定义一个
dispatch_table
确定各个类型的处理方式如classmethod
使用_classmethod_reduce
处理,注意此处没有包括function
- 根据
pickle.HIGHEST_PROTOCOL
版本不同,有两种行为模式pickle.HIGHEST_PROTOCOL
>=5在初始化父类
Pickler
的时候,会多传入一个buffer_callback
参数(buffer_callback
可以让我们取回原始对象,请参考下面例子)1 2 3 4 5 6 7 8 9 10
b = ZeroCopyByteArray(b"abc") buffers = [] data = pickle.dumps(b, protocol=5) new_b = pickle.loads(data) print(b == new_b) # True print(b is new_b) # False: a copy was made data = pickle.dumps(b, protocol=5, buffer_callback=buffers.append) new_b = pickle.loads(data, buffers=buffers) print(b == new_b) # True print(b is new_b) # True: no copy was made
重点X:
reducer_override
——优先级比dispatch table高,可以突破type-specific的限制(如exception),同时基于C _pickle.Pickler
效率更高。
其他情况(
pickle.HIGHEST_PROTOCOL
<=4)- 初始化父类的时候不会传入
buffer_callback
- dispatch_table中加入
save_global
:可处理types
- dispatch[types.FunctionType]加入
save_function
:可处理函数
- 初始化父类的时候不会传入
- 等待编辑
实战——函数
我们以这个函数作为序列化的对象:
|
|
- 用原生pickle序列化结果如下:只有module和函数名,反序列化后需要能import进这个函数才行。
|
|
- 用ray序列化结果如下,中间包含了参数信息和函数体,可以直接被其他地方反序列化并调用
|
|
实战——类及全局变量
以这个类作为序列化的对象
|
|
- 使用ray的cloudpickle序列化后,可以直接读取并执行函数
|
|
实战——嵌套函数/嵌套类
ray可以将依赖的函数/依赖的类一起序列化,形成一个package,打包了整个环境。
|
|
删除源代码(不然会优先import源码)反序列化后再执行
|
|