CangjieMagic框架:使用华为仓颉编程语言编写,专门用于开发AI Agent,支持鸿蒙、Windows、macOS、Linux等系统。

这篇文章剖析一下 CangjieMagic 框架中的 ReactExecutor。

这个执行器名字中的"React"代表"Reasoning and Acting"(推理和行动),它让Agent能够像人类一样思考问题、采取行动、观察结果,然后再思考下一步。

如果NaiveExecutor像是直接回答问题的学生,PlanReactExecutor像是制定详细计划的项目经理,那么ReactExecutor就像是一位善于边思考边行动的侦探,通过不断的观察和推理来解决问题!

1 ReactExecutor的工作原理

用户提问
Agent
ReactExecutor
循环执行
思考Thought
行动Action
观察Observation
有答案了吗?
返回答案

ReactExecutor采用了一种叫做"思考-行动-观察"的循环执行模式,就像我们在生活中解决问题的方式:

  1. 思考(Thought):分析问题,考虑可能的解决方案
  2. 行动(Action):采取一个具体行动,比如查询信息、使用工具
  3. 观察(Observation):查看行动的结果
  4. 继续思考:基于观察到的结果,进一步思考下一步行动

这就像我们烹饪一道从未做过的菜:先思考食谱,然后按步骤操作,观察结果(尝尝味道),再决定是否需要调整。

2 代码结构与初始化

让我们先看看ReactExecutor的构造函数:

public class ReactExecutor <: AgentExecutor {
    public ReactExecutor(
        private let loop!: Int64 = Config.maxReactNumber
    ) { }
    
    // 其他成员...
}

这个构造函数看起来很简单,只有一个参数loop,用于设置最大循环次数,默认值来自配置Config.maxReactNumber。这个参数非常重要,它决定了Agent最多可以进行多少轮"思考-行动-观察"循环。

默认值
自定义值
ReactExecutor构造
设置最大循环次数
Config.maxReactNumber
自定义循环次数

这就像给侦探设定一个破案时间限制,防止陷入无限的思考和调查中。在实际应用中,通常不需要手动设置这个参数,使用默认配置就足够了。

3 同步执行流程:像侦探破案一样

现在,让我们深入研究同步执行函数:

override public func run(agent: Agent, request: AgentRequest): AgentResponse {
    LogUtils.info("React executor runs ${agent.name}")
    let task = ReactTask(agent, request)
    for (_ in 0..this.loop) {
        if (let Some(resp) <- task.runOnce()) {
            return resp
        }
    }
    LogUtils.info("Exceed the max react loop")
    return task.summarize()
}

这段代码实现了ReactExecutor的核心功能。让我们用一个侦探破案的例子来理解这个过程:

用户 Agent ReactExecutor ReactTask 请帮我找出丢失的宝石 开始调查 创建案件档案 执行一轮调查 是否有结论? 返回调查结果 宝石在管家的房间里 继续下一轮 alt [有结论] [需继续调查] loop [调查循环] 达到最大调查轮数 生成调查总结 返回调查总结 虽然没有确切证据,但根据线索推测... 用户 Agent ReactExecutor ReactTask

代码解析:

  1. 记录日志:记录执行开始

    LogUtils.info("React executor runs ${agent.name}")
    

    这就像福尔摩斯在笔记本上写下"案件调查开始"。

  2. 创建任务:创建一个ReactTask对象

    let task = ReactTask(agent, request)
    

    这相当于福尔摩斯整理案件资料,准备开始调查。

  3. 循环执行:在最大循环次数内,不断尝试解决问题

    for (_ in 0..this.loop) {
        // 尝试执行一次
    }
    

    这就像福尔摩斯进行一系列的调查活动,直到找到答案或达到时间限制。

  4. 单次执行:每次循环调用task.runOnce()

    if (let Some(resp) <- task.runOnce()) {
        return resp
    }
    

    这相当于福尔摩斯进行一轮"思考-行动-观察",如果找到了答案,就立即返回;如果没有,继续下一轮。

  5. 达到限制:如果达到最大循环次数仍未解决

    LogUtils.info("Exceed the max react loop")
    return task.summarize()
    

    这就像福尔摩斯在规定时间内未能完全破案,但仍然需要提供一个基于已知线索的最佳推理。

这个过程既优雅又实用,通过不断的思考和行动,逐步接近问题的答案,就像侦探通过收集线索、推理和验证来解开谜团一样。

4 异步执行流程:实时追踪侦探的思路

有时候,我们不仅想知道侦探的最终结论,还想实时跟踪他的思考过程。这就是异步执行的作用:

override public func asyncRun(agent: Agent, request: AgentRequest): AsyncAgentResponse {
    LogUtils.info("React executor async runs ${agent.name}")
    let task = ReactTask(agent, request)
    // Create a thread to execute the react task
    let fut: Future<Iterator<String>> = spawn {
        for (_ in 0..this.loop) {
            if (let Some(asyncAnswer) <- task.asyncRunOnce()) {
                return asyncAnswer
            }
        }
        LogUtils.info("Exceed the max react loop")
        return task.asyncSummarize()
    }
    return AsyncAgentResponse(IteratorWrapper(task, fut), execInfo: task.execInfo)
}

这段代码实现了异步执行的功能。想象一下,这就像福尔摩斯边调查边通过对讲机实时向你汇报进展:

用户请求
创建ReactTask
创建新线程
异步执行循环
找到答案?
返回异步答案
达到最大循环?
生成摘要
返回AsyncAgentResponse
用户界面

代码解析:

  1. 创建任务:与同步函数类似,创建ReactTask

    let task = ReactTask(agent, request)
    
  2. 创建工作线程:使用spawn创建新线程

    let fut: Future<Iterator<String>> = spawn {
        // 线程内的执行代码
    }
    

    这就像福尔摩斯委派一名助手进行调查,同时自己可以处理其他事情。

  3. 线程内执行循环:在新线程中执行与同步函数类似的循环

    for (_ in 0..this.loop) {
        if (let Some(asyncAnswer) <- task.asyncRunOnce()) {
            return asyncAnswer
        }
    }
    

    这个循环与同步函数类似,但使用asyncRunOnce()函数,返回的是一个可以实时获取内容的迭代器。

  4. 封装返回:将Future包装成AsyncAgentResponse

    return AsyncAgentResponse(IteratorWrapper(task, fut), execInfo: task.execInfo)
    

    这就像给用户一个特殊的收音机,可以实时听到福尔摩斯的调查进展。

这种异步模式非常适合需要实时反馈的场景,用户可以看到Agent的思考过程,而不必等待最终结果。

5 深入IteratorWrapper:信息流的管理者

react_executor.cj中还定义了一个类IteratorWrapper,它负责管理异步返回的信息流:

protected class IteratorWrapper <: Iterator<String> {
    protected IteratorWrapper(
        private let task: AgentTask,
        private let workerFut: Future<Iterator<String>>
    ) { }

    /**
     * Concatenate all response content and log it finally
     */
    private let buffer = StringBuilder()

    override public func next(): Option<String> {
        let asyncAnswer = workerFut.get()
        let data = asyncAnswer.next()
        match (data) {
            case Some(v) => buffer.append(v)
            case None =>
                LogUtils.info(this.task.agent.name, buffer.toString().withTag(ReactTag.ANSWER))
        }
        return data
    }
}

这个类看起来有点复杂,但它的作用很简单:收集和传递异步执行中产生的数据片段。它就像一位记者,实时记录并转播福尔摩斯的调查过程:

用户 IteratorWrapper 工作线程 日志系统 请求下一块数据 获取迭代器 返回迭代器 请求下一块数据 返回数据片段 添加到缓冲区 返回数据片段 返回None 记录完整回答 返回None alt [有数据] [没有更多数据] 用户 IteratorWrapper 工作线程 日志系统

代码解析:

  1. 构造函数:接收任务和Future对象

    protected IteratorWrapper(
        private let task: AgentTask,
        private let workerFut: Future<Iterator<String>>
    ) { }
    

    这就像记者接收到一个特殊的通讯设备和采访对象。

  2. 缓冲区:使用StringBuilder收集所有数据

    private let buffer = StringBuilder()
    

    这就像记者随身携带的笔记本,记录所有听到的信息。

  3. next函数:获取并传递下一块数据

    override public func next(): Option<String> {
        let asyncAnswer = workerFut.get()
        let data = asyncAnswer.next()
        // ...处理数据...
        return data
    }
    

    这就像记者不断询问:“然后呢?发生了什么?”,并将听到的内容转播给观众。

  4. 数据处理:根据有无数据采取不同行动

    match (data) {
        case Some(v) => buffer.append(v)
        case None =>
            LogUtils.info(this.task.agent.name, buffer.toString().withTag(ReactTag.ANSWER))
    }
    

    当有新数据时,记录到笔记本;当没有更多数据时,整理笔记并提交完整报道。

这个类使得异步执行的结果能够以流式的方式传递给用户,同时还能保存完整的响应用于日志记录。这就像一场实况转播,既能让观众实时了解情况,又能在赛后提供完整回放。

6 总结

ReactExecutor就像一位善于动手实践的探索者,通过不断尝试和观察,逐步接近问题的答案。它特别适合那些需要工具使用、信息查询和多步推理的场景,能够让你的Agent表现得更像一个真实的人类助手。它平衡了简单直接的NaiveExecutor和复杂全面的PlanReactExecutor,为大多数实际应用场景提供了理想的解决方案。

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐