Ray Source-code Reading

This article introduces the source code of ray(1.8.x+).

Oh Sorry!

This blog has’t been translated to English, please wait for a little while…

  • 函数信息如何序列化? 答:使用cloudpickle序列化:详见下文cloudpickle

  • 如何维护函数池并实现全局可用 答:

    • 将函数信息序列化后放到redis处:详见下文run_function_on_all_workers
    • 在需要的时候取回来
  • 如何处理用户参数(args和kwargs)? 答:[key1, value1, key2, value2]以此类推;如果参数里有objectRef,则会先获取对应value然后取回。

  • 如何处理嵌套函数(一个函数里调用另一个函数) 答:通过序列化闭包直接完成

  • 如何处理函数依赖(一个函数的参数为另一个函数的结果) 答:即上文处理函数参数中objectRef的部分

  • 如何打包运行时环境 答:并没有打包,但可以通过runtime_env参数处理运行时环境,也可以传pip config等帮助初始化环境。

  • 如何处理函数中用到的全局变量 答:

    • 在序列化函数的时候,直接将全局变量作为global_ref并序列化到函数中,反序列化后直接拿到这个数值。详见下文类及全局变量
    • 同spark一样,ray的全局变量可读,但写不安全
  • 注意到第二个参数就是*,在此之后的参数将只能通过key-value形式传递

  • init支持三种模式

    • ray.init()——本地运行,启动所有关联的进程(包括Redis, a raylet(local scheduler), a plasma store, a plasma manager, and some workers)
    • ray.init(address="localhost:6379")——连接到一个本地的cluster
    • ray.init(address="ray://123.45.67.89:10001")——连接到一个remote ray集群
  • local_mode参数可以支持让代码串行,便于debug

  • include_dashboard参数为True可以让ray起一个本地的dashboard显示状态

  • _enable_object_reconstruction可以让ray在分布式环境下,丢失object后重新运行获得它的task来再次获得object

  • 重点X:在第三种模式中,会启动一个ray.client,解析目前参数并传给它

  • 初始化时还会尝试增加file descriptor limit,获取更大空间

  • 如果传进了runtime_env,启动ray.job_config.JobConfig()并将runtime_env传给它

  • 创建或连接初始结点

    • 在第一种模式中,会创建一个新集群,全局变量_global_node为头结点(传入head=True)
    • 在第二或者第三种模式中,全局变量_global_node为子结点(传入head=False),并且connect_only = True
  • 之后,如果有job_config的话,会调用working_dir_pkg.rewrite_runtime_env_uris来重写URI

  • 重点√:调用connect()function正式连接,详见下文。

  • 最后返回dict(_global_node.address_info, node_id=global_worker.core_worker.get_current_node_id().hex())

  • 连接当前node到raylet, Plasma, 和Redis
  • 调用node.create_redis_client()创建 redis client作为worker的属性
  • 调用_initialize_global_state lazy初始化global state(lazy即只是存储了redis的address和密码),并没有真的初始化
  • 如果是driver mode的话,初始化一个jobid
  • 检查当前结点ray、python版本和集群ray、python版本是否一致(需要后者在redis中有version数据),不一致会报错
  • 填充field到job_config
  • 重点X:使用当前参数,初始化一个coreworker——worker.core_worker = ray._raylet.CoreWorker(),并拿到gcs_client
  • 重点X:启动一个import thread——import_thread.ImportThread
  • 根据mode,启动listener threadlog thread
  • 重点X:如果非交互模式(正常脚本模式),调用worker.run_function_on_all_workers封装一个匿名函数sys.path.insert(1, script_directory)传进去,从这里我们可以看出ray其实是直接把脚本copy到working_dir的,然后用函数meta信息和上下文直接import对应函数执行。该函数具体信息见ray.worker。
  • 如果client模式,job_config没带有working_dir的,同上处理
  • 之后将所有worker.cached_functions_to_run调用run_function_on_all_workers
  • 最后加一个tracing来跟踪
  • 感想:ray的api设计还是很优雅的
  • ray在调用函数如func.remote后,返回ObjectRef对象。
  • 当ray.get()的时候,拿着ObjectRefObjectStore里面找,如果有就返回值,如果没有就阻塞直到返回值。
  • 可以支持function/actor,可以手动分配资源@ray.remote(num_gpus=1, max_calls=1, num_returns=2)
  • 当actor对象被 delete后,会完成当前任务再杀死,如果很急着关闭,可以调用ray.kill()
  • 首先拿到global worker(每个进程一个),然后进行判断,如果没有传参直接make_decorator(worker=worker)(args[0]),有传参则解析传参然后make_decorator并将参数传入。
  • 目前支持的参数有
1
2
3
4
5
6
valid_kwargs = [
    "num_returns", "num_cpus", "num_gpus", "memory", "object_store_memory",
    "resources", "accelerator_type", "max_calls", "max_restarts",
    "max_task_retries", "max_retries", "runtime_env", "retry_exceptions",
    "placement_group"
]
  • 用inspect判断是函数还是类,如果是函数检查参数,并返回ray.remote_function.RemoteFunction
  • 如果是类,检查参数,并返回ray.actor.make_actor()
  • __init__()

    • 首先检查是否协程,目前不支持async def function
    • 对于传入的function,加上_inject_tracing_into_function包裹以获取函数trace
    • 重点√:使用ParsedRuntimeEnv来parse本地pip/conda config文件,详细见下文
    • 重点√:使用ray._private.signature.extract_signature来extract(类)函数的signature,实际为inspect.signature获取function的参数和默认值。
    • 供用户调用的.remote_remote_proxy调用_remote生成
  • __call__():不允许直接调用,报错返回

  • options():给用户的调整参数的方式,会重新wrap一个RemoteFunction给用户

  • _remote

    • 首先判断是不是client_mode,如果是,调用client_mode_convert_function返回ClientRemoteFunction 重点X
    • 如果不是,拿到process的global_worker并调用worker.check_connected()检查连接
    • 之后检查这个函数是否被exported到这个process(使用_last_export_session_and_jobworker.current_session_and_job作比较)。如果没有,重点√:使用ray自定义的cloudpickle序列化该函数,标记_last_export_session_and_job_function_descriptor属性,并用worker.function_actor_manager.export(self)export。cloudpickle详见下文。
    • 整理参数,check_placement_group_index检查放置的group的index
    • 调用resources_from_resource_arguments整理所有参数为字典
    • 重点X,拿到当前worker的parent_env,连同当前task的parsed_env调用override_task_or_actor_runtime_env获取merged_env
    • 之后,根据是否cross_language 处理参数(如flatten_args,将signature和新传入的参数处理为单个list,如flatten_args([1, 2, 3], {"a": 4})处理为[None, 1, None, 2, None, 3, "a", 4]None为dummy,指没有keyword的positional param)
    • 最后,调用worker.core_worker.submit_task提交任务,拿到object_refs,流程结束。
  • ParsedRuntimeEnv

    • 主要是运行环境相关的处理和依赖,没有包括全局变量等,使用json进行序列化和反序列化

    • 目前支持的字段有:

      1
      2
      3
      4
      5
      6
      7
      8
      
      known_fields: Set[str] = {
          "working_dir", # 指定worker的工作目录,可以是一个目录也可以是一个archive压缩包,会将archive压缩包解压到指定工作目录
          "conda", "pip",   # pip requirements.txt或conda yaml
          "uris",        # 所需uri路径(uri是一个zip包)
          "containers",  # 支持docker镜像
          "env_vars",    # 环境变量
          "excludes", "_ray_release", "_ray_commit", "_inject_current_ray", "plugins"
      }
      
  • 重要属性
    • node (ray.node.Node):worker所在结点
    • mode: SCRIPT_MODE, LOCAL_MODE, WORKER_MODE中的一个
    • cached_functions_to_run (List):worker要运行的函数列表
    • memory_monitor: memory_monitor.MemoryMonitor()——在低内存环境处理报错信息
    • function_actor_manager重点X 管理functions/actors的export/load
  • 感想:类设计也很优雅,@property处理对外对内的变量,便于调用和维护
  • 注意这个函数并非我们直接的函数,而是封装过只接受一个worker info的参数。
  • 如果还没ray.init(),就加入到cache里
  • 如果已经init了,那就序列化(ray自带的cloudpickle)这个函数
  • 重点X:直接在driver尝试运行这个函数,function({"worker": self}),以校验这个函数是否正常。(注释写明:如果一个有问题的函数,最好在被exported之前被发现)
  • self.redis_client.setnx(b"Lock:" + key, 1)尝试set(set if not exist),如果返回1,则加key成功,如果返回0,则没有加key成功(已经被用过了)来判断函数是否已经exported。

例子:

1
2
3
4
5
6
redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"
  • 重点√check_oversized_function()校验序列化函数是否太大,如果太大发送一个warning。
  • 重点√:使用redis的hset(设置哈希表键值),保存相关信息,随后rpush(推送到队列尾)。
1
2
3
4
5
6
7
8
self.redis_client.hset(
                key,
                mapping={
                    "job_id": self.current_job_id.binary(),
                    "function_id": function_to_run_id,
                    "function": pickled_function,
                    "run_on_other_drivers": str(run_on_other_drivers),
                })
  • 目的:序列化lambda和嵌套函数;正确处理mainmodule;处理其他不能被序列化的objects
  • 没有反序列化部分,这一部分用原生pickle做
  • 非常非常厉害的序列化工具:可以直接序列化类、类变量和类方法,全局变量也直接被序列化,甚至能够直接嵌套序列化,将用到的东西一起序列化,形成完整的package。
  • 首先定义一个dispatch_table确定各个类型的处理方式如classmethod使用_classmethod_reduce处理,注意此处没有包括function
  • 根据pickle.HIGHEST_PROTOCOL版本不同,有两种行为模式
    • pickle.HIGHEST_PROTOCOL>=5

      • 在初始化父类Pickler的时候,会多传入一个buffer_callback参数(buffer_callback可以让我们取回原始对象,请参考下面例子)

         1
         2
         3
         4
         5
         6
         7
         8
         9
        10
        
        b = ZeroCopyByteArray(b"abc")
        buffers = []
        data = pickle.dumps(b, protocol=5)
        new_b = pickle.loads(data)
        print(b == new_b)  # True
        print(b is new_b)  # False: a copy was made
        data = pickle.dumps(b, protocol=5, buffer_callback=buffers.append)
        new_b = pickle.loads(data, buffers=buffers)
        print(b == new_b)  # True
        print(b is new_b)  # True: no copy was made
        
      • 重点Xreducer_override——优先级比dispatch table高,可以突破type-specific的限制(如exception),同时基于C _pickle.Pickler效率更高。

    • 其他情况(pickle.HIGHEST_PROTOCOL<=4)

      • 初始化父类的时候不会传入buffer_callback
      • dispatch_table中加入save_global:可处理types
      • dispatch[types.FunctionType]加入save_function:可处理函数
  • 等待编辑

我们以这个函数作为序列化的对象:

1
2
def hello_world(name: str, age: int):
    print(f'hello world, {name} at {age}!')
  • 用原生pickle序列化结果如下:只有module和函数名,反序列化后需要能import进这个函数才行。
1
b'\x80\x03c__main__\nhello_world\nq\x00.'
  • 用ray序列化结果如下,中间包含了参数信息和函数体,可以直接被其他地方反序列化并调用
1
b'\x80\x05\x95@\x02\x00\x00\x00\x00\x00\x00\x8c\x1bray.cloudpickle.cloudpickle\x94\x8c\r_builtin_type\x94\x93\x94\x8c\nLambdaType\x94\x85\x94R\x94(h\x02\x8c\x08CodeType\x94\x85\x94R\x94(K\x02K\x00K\x02K\x06KCC\x1at\x00d\x01|\x00\x9b\x00d\x02|\x01\x9b\x00d\x03\x9d\x05\x83\x01\x01\x00d\x00S\x00\x94(N\x8c\rhello world, \x94\x8c\x04 at \x94\x8c\x01!\x94t\x94\x8c\x05print\x94\x85\x94\x8c\x04name\x94\x8c\x03age\x94\x86\x94\x8c%c:\\Users\\Peter\\Desktop\\test_pickle.py\x94\x8c\x0bhello_world\x94K\x05C\x02\x00\x01\x94))t\x94R\x94}\x94(\x8c\x0b__package__\x94\x8c\x00\x94\x8c\x08__name__\x94\x8c\x08__main__\x94\x8c\x08__file__\x94h\x13uNNNt\x94R\x94\x8c ray.cloudpickle.cloudpickle_fast\x94\x8c\x12_function_setstate\x94\x93\x94h\x1f}\x94}\x94(h\x1bh\x14\x8c\x0c__qualname__\x94h\x14\x8c\x0f__annotations__\x94}\x94(h\x10\x8c\x08builtins\x94\x8c\x03str\x94\x93\x94h\x11h(\x8c\x03int\x94\x93\x94u\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x1c\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94u\x86\x94\x86R0.'

以这个类作为序列化的对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from ray import cloudpickle as pickle

global_value = 10086

class TestPickle:

    def __init__(self) -> None:
        self.variable = 123

    def hello_world(self, name: str, age: int):
        print(f'hello world, {name} at {age}!')
        print(self.variable)
        print(global_value)
    

with open('temp.pickle', 'wb') as f:
    pickle.dump(TestPickle, f)
  • 使用ray的cloudpickle序列化后,可以直接读取并执行函数
1
2
3
4
5
6
7
from ray import cloudpickle as pickle

with open('temp.pickle', 'rb') as f:
    cls = pickle.load(f)

c = cls()
c.hello_world('peter', 246)

ray可以将依赖的函数/依赖的类一起序列化,形成一个package,打包了整个环境。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from ray import cloudpickle as pickle

def inner_function():
    print('inner function!')

class Inner_class:

    def __init__(self) -> None:
        print('inner class!')
    
    @staticmethod
    def class_function():
        print("inner class's function!")
        inner_function()


global_value = 10086

class TestPickle:

    def __init__(self) -> None:
        self.variable = 123

    def hello_world(self, name: str, age: int):
        print(f'hello world, {name} at {age}!')
        print(self.variable)
        print(global_value)
        inner_function()
        c = Inner_class()
        c.class_function()

def outer_function():
    c = TestPickle()
    c.hello_world('peter', 22)

with open('temp.pickle', 'wb') as f:
    pickle.dump(outer_function, f)

print(pickle.dumps(outer_function))

删除源代码(不然会优先import源码)反序列化后再执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
with open('temp.pickle', 'rb') as f:
    func = pickle.load(f)

func()
'''输出
hello world, peter at 22!
123
10086
inner function!
inner class!
inner class's function!
inner function!
'''