前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊PowerJob的RemoteEngine

聊聊PowerJob的RemoteEngine

原创
作者头像
code4it
发布于 2023-12-26 01:17:31
发布于 2023-12-26 01:17:31
24300
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下PowerJob的RemoteEngine

RemoteEngine

tech/powerjob/remote/framework/engine/RemoteEngine.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface RemoteEngine {

    EngineOutput start(EngineConfig engineConfig);

    void close() throws IOException;
}

RemoteEngine定义了start及close方法,其中start接收EngineConfig参数返回EngineOutput

EngineConfig

tech/powerjob/remote/framework/engine/EngineConfig.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Data
@Accessors(chain = true)
public class EngineConfig implements Serializable {

    /**
     * 服务类型
     */
    private ServerType serverType;
    /**
     * 需要启动的引擎类型
     */
    private String type;
    /**
     * 绑定的本地地址
     */
    private Address bindAddress;
    /**
     * actor实例,交由使用侧自己实例化以便自行注入各种 bean
     */
    private List<Object> actorList;
}

EngineConfig定义了serverType(SERVER、WORKER),type、bindAddress、actorList属性

EngineOutput

tech/powerjob/remote/framework/engine/EngineOutput.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Getter
@Setter
public class EngineOutput {
    private Transporter transporter;
}

EngineOutput定义了transporter

Transporter

tech/powerjob/remote/framework/transporter/Transporter.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface Transporter {

    /**
     * Protocol
     * @return return protocol
     */
    Protocol getProtocol();

    /**
     *send message
     * @param url url
     * @param request request
     */
    void tell(URL url, PowerSerializable request);

    /**
     * ask by request
     * @param url url
     * @param request request
     * @param clz response type
     * @return CompletionStage
     * @throws RemotingException remote exception
     */
    <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException;
}

Transporter接口定义了getProtocol(AkkaProtocol、HttpProtocol)、tell、ask三个方法

PowerJobRemoteEngine

tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Slf4j
public class PowerJobRemoteEngine implements RemoteEngine {

    private CSInitializer csInitializer;

    @Override
    public EngineOutput start(EngineConfig engineConfig) {

        final String engineType = engineConfig.getType();
        EngineOutput engineOutput = new EngineOutput();
        log.info("[PowerJobRemoteEngine] [{}] start remote engine with config: {}", engineType, engineConfig);

        List<ActorInfo> actorInfos = ActorFactory.load(engineConfig.getActorList());
        csInitializer = CSInitializerFactory.build(engineType);

        String type = csInitializer.type();

        Stopwatch sw = Stopwatch.createStarted();
        log.info("[PowerJobRemoteEngine] [{}] try to startup CSInitializer[type={}]", engineType, type);

        csInitializer.init(new CSInitializerConfig()
                .setBindAddress(engineConfig.getBindAddress())
                .setServerType(engineConfig.getServerType())
        );

        // 构建通讯器
        Transporter transporter = csInitializer.buildTransporter();
        engineOutput.setTransporter(transporter);

        log.info("[PowerJobRemoteEngine] [{}] start to bind Handler", engineType);
        actorInfos.forEach(actor -> actor.getHandlerInfos().forEach(handlerInfo -> log.info("[PowerJobRemoteEngine] [{}] PATH={}, handler={}", engineType, handlerInfo.getLocation().toPath(), handlerInfo.getMethod())));

        // 绑定 handler
        csInitializer.bindHandlers(actorInfos);

        log.info("[PowerJobRemoteEngine] [{}] startup successfully, cost: {}", engineType, sw);

        return engineOutput;
    }

    @Override
    public void close() throws IOException {
        csInitializer.close();
    }
}

PowerJobRemoteEngine定义了csInitializer,其start方法先通过ActorFactory.load(engineConfig.getActorList())加载actorInfos,再通过CSInitializerFactory.build(engineType)构建csInitializer,然后执行其init方法;接着通过csInitializer.buildTransporter()构建transporter;最后将actorInfos绑定到csInitializer;其close方法关闭csInitializer

ActorFactory.load

tech/powerjob/remote/framework/engine/impl/ActorFactory.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Slf4j
class ActorFactory {

    static List<ActorInfo> load(List<Object> actorList) {

        List<ActorInfo> actorInfos = Lists.newArrayList();

        actorList.forEach(actor -> {
            final Class<?> clz = actor.getClass();
            try {
                final Actor anno = clz.getAnnotation(Actor.class);

                ActorInfo actorInfo = new ActorInfo().setActor(actor).setAnno(anno);
                actorInfo.setHandlerInfos(loadHandlerInfos4Actor(actorInfo));

                actorInfos.add(actorInfo);
            } catch (Throwable t) {
                log.error("[ActorFactory] process Actor[{}] failed!", clz);
                ExceptionUtils.rethrow(t);
            }
        });

        return actorInfos;
    }

    //......
}    

ActorFactory.load方法遍历actorList,获取其类上的@Actor注解,再收集其方法上的@Handler注解信息设置到actorInfo

CSInitializerFactory

tech/powerjob/remote/framework/engine/impl/CSInitializerFactory.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Slf4j
class CSInitializerFactory {

    static CSInitializer build(String targetType) {

        Reflections reflections = new Reflections(OmsConstant.PACKAGE);
        Set<Class<? extends CSInitializer>> cSInitializerClzSet = reflections.getSubTypesOf(CSInitializer.class);

        log.info("[CSInitializerFactory] scan subTypeOf CSInitializer: {}", cSInitializerClzSet);

        for (Class<? extends CSInitializer> clz : cSInitializerClzSet) {
            try {
                CSInitializer csInitializer = clz.getDeclaredConstructor().newInstance();
                String type = csInitializer.type();
                log.info("[CSInitializerFactory] new instance for CSInitializer[{}] successfully, type={}, object: {}", clz, type, csInitializer);
                if (targetType.equalsIgnoreCase(type)) {
                    return csInitializer;
                }
            } catch (Exception e) {
                log.error("[CSInitializerFactory] new instance for CSInitializer[{}] failed, maybe you should provide a non-parameter constructor", clz);
                ExceptionUtils.rethrow(e);
            }
        }

        throw new PowerJobException(String.format("can't load CSInitializer[%s], ensure your package name start with 'tech.powerjob' and import the dependencies!", targetType));
    }
}

CSInitializerFactory的build方法通过org.reflections.Reflections去扫描tech.powerjob包,获取CSInitializer的子类,之后通过反射进行实例化

CSInitializer

tech/powerjob/remote/framework/cs/CSInitializer.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface CSInitializer {

    /**
     * 类型名称,比如 akka, netty4,httpJson
     * @return 名称
     */
    String type();

    /**
     * initialize the framework
     * @param config config
     */
    void init(CSInitializerConfig config);

    /**
     * build a Transporter by based network framework
     * @return Transporter
     */
    Transporter buildTransporter();

    /**
     * bind Actor, publish handler's service
     * @param actorInfos actor infos
     */
    void bindHandlers(List<ActorInfo> actorInfos);

    void close() throws IOException;
}

CSInitializer接口定义了type、init、buildTransporter、close方法,它有两个实现类,分别是AkkaCSInitializer、HttpVertxCSInitializer

CSInitializerConfig

tech/powerjob/remote/framework/cs/CSInitializerConfig.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Getter
@Setter
@Accessors(chain = true)
public class CSInitializerConfig implements Serializable {

    private Address bindAddress;

    private ServerType serverType;
}

CSInitializerConfig定义了bindAddress、serverType两个属性

AkkaCSInitializer

tech/powerjob/remote/akka/AkkaCSInitializer.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Slf4j
public class AkkaCSInitializer implements CSInitializer {

    private ActorSystem actorSystem;
    private CSInitializerConfig config;

    @Override
    public String type() {
        return tech.powerjob.common.enums.Protocol.AKKA.name();
    }

    @Override
    public void init(CSInitializerConfig config) {

        this.config = config;

        Address bindAddress = config.getBindAddress();
        log.info("[PowerJob-AKKA] bindAddress: {}", bindAddress);

        // 初始化 ActorSystem(macOS上 new ServerSocket 检测端口占用的方法并不生效,可能是AKKA是Scala写的缘故?没办法...只能靠异常重试了)
        Map<String, Object> overrideConfig = Maps.newHashMap();
        overrideConfig.put("akka.remote.artery.canonical.hostname", bindAddress.getHost());
        overrideConfig.put("akka.remote.artery.canonical.port", bindAddress.getPort());

        Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG);
        Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);

        log.info("[PowerJob-AKKA] try to start AKKA System.");

        // 启动时绑定当前的 actorSystemName
        String actorSystemName = AkkaConstant.fetchActorSystemName(config.getServerType());
        this.actorSystem = ActorSystem.create(actorSystemName, akkaFinalConfig);

        // 处理系统中产生的异常情况
        ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(AkkaTroubleshootingActor.class), "troubleshooting");
        actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);

        log.info("[PowerJob-AKKA] initialize actorSystem[{}] successfully!", actorSystem.name());
    }

    @Override
    public Transporter buildTransporter() {
        return new AkkaTransporter(actorSystem);
    }

    @Override
    public void bindHandlers(List<ActorInfo> actorInfos) {
        int cores = Runtime.getRuntime().availableProcessors();
        actorInfos.forEach(actorInfo -> {
            String rootPath = actorInfo.getAnno().path();
            AkkaMappingService.ActorConfig actorConfig = AkkaMappingService.parseActorName(rootPath);

            log.info("[PowerJob-AKKA] start to process actor[path={},config={}]", rootPath, JsonUtils.toJSONString(actorConfig));

            actorSystem.actorOf(AkkaProxyActor.props(actorInfo)
                    .withDispatcher("akka.".concat(actorConfig.getDispatcherName()))
                    .withRouter(new RoundRobinPool(cores)), actorConfig.getActorName());

        });
    }

    @Override
    public void close() throws IOException {
        actorSystem.terminate();
    }
}

AkkaCSInitializer其type方法返回的是AKKA类型,init方法先通过ConfigFactory.load(AkkaConstant.AKKA_CONFIG)加载akka基本配置,再覆盖hostname和port信息,最后通过ActorSystem.create(actorSystemName, akkaFinalConfig)创建actorSystem,并创建AkkaTroubleshootingActor,订阅DeadLetter消息;buildTransporter返回的是AkkaTransporter;其bindHandlers方法主要是根据ActorInfo信息来创建actor;其close方法执行actorSystem.terminate()

AkkaTransporter

tech/powerjob/remote/akka/AkkaTransporter.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class AkkaTransporter implements Transporter {

    private final ActorSystem actorSystem;


    /**
     * akka://<actor system>@<hostname>:<port>/<actor path>
     */
    private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s";

    public AkkaTransporter(ActorSystem actorSystem) {
        this.actorSystem = actorSystem;
    }

    @Override
    public Protocol getProtocol() {
        return new AkkaProtocol();
    }

    @Override
    public void tell(URL url, PowerSerializable request) {
        ActorSelection actorSelection = fetchActorSelection(url);
        actorSelection.tell(request, null);
    }

    @Override
    @SuppressWarnings("unchecked")
    public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
        ActorSelection actorSelection = fetchActorSelection(url);
        return (CompletionStage<T>) Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
    }

    private ActorSelection fetchActorSelection(URL url) {

        HandlerLocation location = url.getLocation();
        String targetActorSystemName = AkkaConstant.fetchActorSystemName(url.getServerType());

        String targetActorName = AkkaMappingService.parseActorName(location.getRootPath()).getActorName();

        CommonUtils.requireNonNull(targetActorName, "can't find actor by URL: " + location);

        String address = url.getAddress().toFullAddress();

        return actorSystem.actorSelection(String.format(AKKA_NODE_PATH, targetActorSystemName, address, targetActorName));
    }
}

AkkaTransporter其protocol为AkkaProtocol;其tell方法根据url找到actorSelection,通过actorSelection的tell发送请求;ask方法使用的是Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS))

HttpVertxCSInitializer

tech/powerjob/remote/http/HttpVertxCSInitializer.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Slf4j
public class HttpVertxCSInitializer implements CSInitializer {

    private Vertx vertx;
    private HttpServer httpServer;
    private HttpClient httpClient;

    private CSInitializerConfig config;

    @Override
    public String type() {
        return tech.powerjob.common.enums.Protocol.HTTP.name();
    }

    @Override
    public void init(CSInitializerConfig config) {
        this.config = config;
        vertx = VertxInitializer.buildVertx();
        httpServer = VertxInitializer.buildHttpServer(vertx);
        httpClient = VertxInitializer.buildHttpClient(vertx);
    }

    @Override
    public Transporter buildTransporter() {
        return new VertxTransporter(httpClient);
    }

    @Override
    @SneakyThrows
    public void bindHandlers(List<ActorInfo> actorInfos) {
        Router router = Router.router(vertx);
        // 处理请求响应
        router.route().handler(BodyHandler.create());
        actorInfos.forEach(actorInfo -> {
            Optional.ofNullable(actorInfo.getHandlerInfos()).orElse(Collections.emptyList()).forEach(handlerInfo -> {
                String handlerHttpPath = handlerInfo.getLocation().toPath();
                ProcessType processType = handlerInfo.getAnno().processType();

                Handler<RoutingContext> routingContextHandler = buildRequestHandler(actorInfo, handlerInfo);
                Route route = router.post(handlerHttpPath);
                if (processType == ProcessType.BLOCKING) {
                    route.blockingHandler(routingContextHandler, false);
                } else {
                    route.handler(routingContextHandler);
                }
            });
        });

        // 启动 vertx http server
        final int port = config.getBindAddress().getPort();
        final String host = config.getBindAddress().getHost();

        httpServer.requestHandler(router)
                .exceptionHandler(e -> log.error("[PowerJob] unknown exception in Actor communication!", e))
                .listen(port, host)
                .toCompletionStage()
                .toCompletableFuture()
                .get(1, TimeUnit.MINUTES);

        log.info("[PowerJobRemoteEngine] startup vertx HttpServer successfully!");
    }

    private Handler<RoutingContext> buildRequestHandler(ActorInfo actorInfo, HandlerInfo handlerInfo) {
        Method method = handlerInfo.getMethod();
        Optional<Class<?>> powerSerializeClz = RemoteUtils.findPowerSerialize(method.getParameterTypes());

        // 内部框架,严格模式,绑定失败直接报错
        if (!powerSerializeClz.isPresent()) {
            throw new PowerJobException("can't find any 'PowerSerialize' object in handler args: " + handlerInfo.getLocation());
        }

        return ctx -> {
            final RequestBody body = ctx.body();
            final Object convertResult = body.asPojo(powerSerializeClz.get());
            try {
                Object response = method.invoke(actorInfo.getActor(), convertResult);
                if (response != null) {
                    if (response instanceof String) {
                        ctx.end((String) response);
                    } else {
                        ctx.json(JsonObject.mapFrom(response));
                    }
                    return;
                }

                ctx.end();
            } catch (Throwable t) {
                // 注意这里是框架实际运行时,日志输出用标准 PowerJob 格式
                log.error("[PowerJob] invoke Handler[{}] failed!", handlerInfo.getLocation(), t);
                ctx.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), t);
            }
        };
    }


    @Override
    public void close() throws IOException {
        httpClient.close();
        httpServer.close();
        vertx.close();
    }
}

HttpVertxCSInitializer的type类型为HTTP,其init方法主要是通过VertxInitializer.buildVertx()构建vertx,并通过VertxInitializer.buildHttpServer(vertx)构建httpServer,通过VertxInitializer.buildHttpClient(vertx)构建httpClient;其buildTransporter返回的是VertxTransporter;其bindHandlers主要是通过actorInfo去注册vertx的路由及handler;其close方法依次关闭httpClient、httpServer、vertx

VertxTransporter

tech/powerjob/remote/http/vertx/VertxTransporter.java

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class VertxTransporter implements Transporter {

    private final HttpClient httpClient;

    private static final Protocol PROTOCOL = new HttpProtocol();

    public VertxTransporter(HttpClient httpClient) {
        this.httpClient = httpClient;
    }

    @Override
    public Protocol getProtocol() {
        return PROTOCOL;
    }

    @Override
    public void tell(URL url, PowerSerializable request) {
        post(url, request, null);
    }

    @Override
    public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
        return post(url, request, clz);
    }

    @SuppressWarnings("unchecked")
    private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) {
        final String host = url.getAddress().getHost();
        final int port = url.getAddress().getPort();
        final String path = url.getLocation().toPath();
        RequestOptions requestOptions = new RequestOptions()
                .setMethod(HttpMethod.POST)
                .setHost(host)
                .setPort(port)
                .setURI(path);
        // 获取远程服务器的HTTP连接
        Future<HttpClientRequest> httpClientRequestFuture = httpClient.request(requestOptions);
        // 转换 -> 发送请求获取响应
        Future<HttpClientResponse> responseFuture = httpClientRequestFuture.compose(httpClientRequest ->
            httpClientRequest
                .putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
                .send(JsonObject.mapFrom(request).toBuffer())
        );
        return responseFuture.compose(httpClientResponse -> {
            // throw exception
            final int statusCode = httpClientResponse.statusCode();
            if (statusCode != HttpResponseStatus.OK.code()) {
                // CompletableFuture.get() 时会传递抛出该异常
                throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",
                       host, port, path, statusCode, httpClientResponse.statusMessage()
                        ));
            }

            return httpClientResponse.body().compose(x -> {

                if (clz == null) {
                    return Future.succeededFuture(null);
                }

                if (clz.equals(String.class)) {
                    return Future.succeededFuture((T) x.toString());
                }

                return Future.succeededFuture(x.toJsonObject().mapTo(clz));
            });
        }).toCompletionStage();
    }
}

VertxTransporter的protocol为HttpProtocol,其tell方法使用的是不需要返回值的post,其ask方法也是调用post方法,只不过其设定了返回值类型

小结

PowerJob的RemoteEngine定义了start及close方法,其中start接收EngineConfig参数返回EngineOutput;PowerJobRemoteEngine定义了csInitializer,其start方法先通过ActorFactory.load(engineConfig.getActorList())加载actorInfos,再通过CSInitializerFactory.build(engineType)构建csInitializer,然后执行其init方法;接着通过csInitializer.buildTransporter()构建transporter;最后将actorInfos绑定到csInitializer;其close方法关闭csInitializer。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
Brain:阿尔茨海默病无创脑刺激的新兴领域
摘要:治疗认知障碍是现代临床神经科学的一个重要目标。在过去的几年里,无创脑刺激作为一种治疗方法,越来越多地用于改善认知障碍患者的表现,并作为一种辅助方法用于提高认知表现正常的个体的能力。在阿尔茨海默病(AD)患者中,对大脑连接性和功能的更深入理解促进了不同无创脑刺激方案的发展。最近的研究表明,多种模式的经颅刺激方法可以增强大脑可塑性,对认知功能产生有益影响。在转基因小鼠阿尔茨海默病病理行为的临床前研究和认知障碍严重程度不同的临床研究中都显示出了改善。虽然该领域仍在努力解决目标人群标准化、频率、强度、治疗持续时间和刺激区域等问题,但在认知功能和大脑病理学标志物方面已经取得了积极的成果。在这里,我们回顾了最有前景的方案,包括重复经颅磁刺激、经颅直流电刺激、经颅交流电刺激、视觉 - 听觉刺激、光生物调节和经颅聚焦超声,这些方案已证明能够增强阿尔茨海默病患者的认知功能或减缓认知衰退。无创脑刺激对认知功能的有益效果与特定大脑网络的调节有关。最有前景的结果是针对高级认知网络的关键中枢,如额顶网络和默认模式网络进行刺激。根据个体大脑特征个性化刺激参数,为未来应用优化无创脑刺激方案提供了新的思路。
悦影科技
2025/03/06
890
神经调控技术的2035年展望
神经调控装置在美国被批准用于治疗运动障碍、癫痫、疼痛和抑郁症,并在标签外用于其他神经系统适应症。到2035年,我们对神经解剖网络和刺激作用机制的理解的进步,加上材料科学、小型化、能量储存和输送的发展,将扩大神经调控装置的使用。神经调控方法是灵活和可修改的。刺激可以针对功能失调的大脑焦点、区域或网络,可以作为单次治疗、连续治疗、根据占空比或响应生理变化进行治疗。编程可以根据临床反应或生理生物标志物进行滴定和修改。除了跟上临床和技术发展的步伐外,2035年的神经科医生还需要考虑复杂的伦理和经济因素,以确保快速增长的患者群体能够获得神经调控技术。本文概述了目前使用的系统和预期的系统,并强调了未来的机遇和挑战,其中一些是技术性的,但大多数将通过学习大脑网络和快速增长的神经调控设备经验来解决。
悦影科技
2023/07/16
3970
​PNAS:alpha频率经颅电刺激调控大脑默认网络
默认模式网络(DMN)是最重要的内在连接网络,是大脑功能组织的关键架构。相反,失调的DMN是主要神经精神疾病的特征。然而,该领域仍然缺乏对DMN调控的机制和对DMN调控失调的有效干预。目前的研究通过操纵神经同步来解决这个问题,特别是α(8-12Hz)振荡,这是一种主要的内在振荡活动,在功能和生理上与DMN越来越相关。使用高分辨率α-频率经颅交流电流刺激(α-tACS)刺激皮层α振荡源,结合同时脑电图和功能MRI(EEG- fMRI),我们证明了α-tACS(与伪对照相比)不仅增强了脑电图α振荡,还增强了DMN核心内的fMRI和(源水平)α连接。重要的是,α振荡的增加介导了DMN连接的增强。因此,这些发现确定了α振荡和DMN功能之间的机制联系。经颅α调制可以上调DMN,进一步强调了一种有效的非侵入性干预,以使各种疾病中的DMN功能正常化。
悦影科技
2022/08/04
3830
综述:无创脑刺激技术改善阅读障碍患者的阅读困难
无创脑刺激(Noninvasive brain stimulation,NIBS)可以主动且非侵入性地调节脑活动。除了抑制特定的过程外,NIBS还可以增强认知功能,用于预防和干预阅读障碍等学习障碍。本文综述了NIBS在阅读障碍相关研究,以证明NIBS是否可以作为阅读障碍的可选治疗。综合15篇研究发现,重复的阅读训练结合不同的NIBS方案可长期改善儿童和成人患者的阅读障碍,特别是通过NIBS似乎最成功地调节了“经典”阅读区域,而简易的操作流程还可以改善各种与阅读相关的子过程。此外,本文还强调了调节听觉皮层功能作为一种干预方法的潜力,例如,可以阻止由阅读障碍而发展成听觉和语音困难。最后,本文呼吁未来的研究可以更关注NIBS改善阅读障碍的神经生物学基础。
悦影科技
2022/10/11
4170
通过脑电图/脑磁图观察到的大脑活动来指导经颅脑刺激
非侵入性经颅脑刺激(NTBS)技术的应用范围广泛,但也存在诸多局限性,主要问题是干预的特异性、效应大小不一。这些局限性促使最近的研究将NTBS与正在进行的大脑活动的结合。正在进行的神经元活动的时间模式,特别是大脑振荡及其波动,可以用脑电或脑磁图(EEG/MEG)跟踪,以指导NTBS的时间和刺激设置。在线脑电图/脑磁图已用于指导NTBS的时机(即刺激时间):通过考虑大脑振荡活动的瞬时相位或功率,NTBS可以与兴奋性状态的波动对齐。此外,干预前的离线脑电图/脑磁图记录可以告诉研究人员和临床医生如何刺激:通过调频NTBS到感兴趣的振荡区域,内在的大脑振荡可以被上调或下调。本文综述了脑电/脑磁图引导干预的现有方法和思路,以及它们的前景和注意事项。本文发表在Clinical Neurophysiology杂志。
用户1279583
2021/12/14
8890
通过脑电图/脑磁图观察到的大脑活动来指导经颅脑刺激
经颅磁刺激治疗老年抑郁症
背景:难治性老年抑郁症(geriatric depression, GD)的流行表明,需要在保持老年人认知功能和识别多药治疗的同时有效减轻症状负担。经颅磁刺激(Transcranial magnetic stimulation, TMS)是一种已被证实的治疗年轻人难治性抑郁症的干预手段,但TMS对老年人抑郁症的疗效仍不清楚。这篇综述提供了TMS治疗GD疗效的最新观点,讨论了TMS试验之间的方法学差异,并探索了在大脑老化背景下优化TMS治疗的途径。
用户1279583
2022/02/28
1.2K0
经颅磁刺激治疗老年抑郁症
BP综述:闭环脑刺激
就像美存在于观察者的眼中一样,刺激对大脑的影响不仅仅是由刺激的性质决定的,而是由在那一刻接受刺激的大脑的性质决定的。在过去的几十年里,治疗性脑刺激通常应用开环固定方案,而在很大程度上忽略了这一原则。只有最近的神经技术进步使我们能够使用应用于脑电图时间序列数据的前馈算法,在毫秒范围内以足够的时间精度预测大脑的性质(即下一个实例的脑电生理状态)。只要目标脑区处于预先设定的兴奋性或连接状态,就可以进行专门的刺激。临床前研究表明,在特定的大脑状态(例如高兴奋状态)期间而不是在其他状态期间进行的重复刺激会导致受刺激环路的持久修饰(例如长时程增强)。在本研究中,我们调查了使用脑电图通知的经颅磁刺激,在人类皮层的系统水平上这也是可能的证据。我们批判性地讨论了开发脑状态依赖性刺激,从而比传统固定方案更有效地长期修饰病理性脑网络(例如重度抑郁症)的机会和困难。同样基于实时脑电图的经颅磁刺激技术将允许通过记录刺激的效果来闭合环路。这一信息可能使刺激方案适应,使治疗反应最大化。通过这种方式,大脑状态控制大脑刺激,从而引入了从开环刺激到闭环刺激的范式转变。
悦影科技
2024/06/03
2520
PNAS:大脑区域间耦合的增加和减少会相应增加和减少人类大脑中的振荡活动
大脑中振荡活动的起源目前仍有争议,但许多假说都认为它们反映了大脑区域之间的相互作用。本文中,我们通过控制两个人类大脑区域之间的耦合强度来检验这种可能性,这两个区域是腹侧前运动皮层(ventral premotor cortex, PMv)和初级运动皮质(primary motor cortex, M1),并且检测其对运动系统中可测得(基于脑电图)的振荡活动的影响。我们增加或降低耦合强度,同时保持对通路中每个组分区域的影响不变。这是通过使用两种不同模式的经颅磁刺激PMv和M1成对脉冲刺激来实现的,其中只有一种方式增加了PMv对M1的影响。虽然刺激方案的时间模式不同,但它们由相同数量的M1和PMv脉冲组成。在一项运动任务中,我们测量了参与者做一个事先准备好的动作(Go)或不做(No-Go)时,对α、β和θ波段活动的影响。通过激发PMv - M1通路的同步突触前和突触后活动,增强了PMv和M1之间的皮层连接,分别增强了Go和No-Go试验中的β和θ振荡节律。α节律几乎没有变化。相比之下,在Go和No-Go试验中,PMv对M1影响的减弱分别降低了β和θ振荡节律。这表明PMv-M1通路中的皮质-皮质交流频率可以按照Hebbian棘波时间依赖可塑性被调节。本文发表在PNAS杂志。
用户1279583
2022/02/28
9720
PNAS:大脑区域间耦合的增加和减少会相应增加和减少人类大脑中的振荡活动
Nature子刊:卒中的可塑性调控:一种新的神经功能恢复模型
非侵入性脑刺激(NIBS)技术可以用来监测和调节皮层神经回路的兴奋性。长期的皮层刺激可对大脑功能产生持久影响,这为NIBS在慢性神经疾病中的治疗奠定了基础。
用户1279583
2019/11/01
5K0
Nature子刊:卒中的可塑性调控:一种新的神经功能恢复模型
Cerebral Cortex:调节γ振荡可以促进大脑连接性而改善认知障碍
老年痴呆症造成了巨大的全球经济负担,但目前还缺乏有效的治疗方法。最近的研究表明,脑电活动的伽马波段波,特别是40赫兹振荡,与高阶认知功能密切相关,可以激活小胶质细胞清除淀粉样蛋白-β沉积。本研究发现与假刺激相比,在可能性阿尔茨海默病(AD;n=37)患者的双侧角回上应用40赫兹高频重复经颅磁刺激(rTMS)可导致长达8周的认知功能显著改善。静息状态脑电图(EEG)的功率谱密度分析表明,40Hz的rTMS调制了左侧后颞顶叶区的伽马波段振荡。进一步测试磁共振成像和TMS-EEG显示:40hzrTMS可以1)防止灰质体积损失,2)增强在双侧角回局部功能整合,以及在双侧角回和左中额叶回全局功能整合,3)加强信息流从左后颞顶叶区至额叶区和加强前和后脑区之间的动态连接。这些发现表明,调节伽马波段振荡通过促进大脑内的局部和长期的动态连接,有效地改善了可能性AD患者的认知功能。
悦影科技
2022/11/28
9620
Brain Stimulation|利用双侧顶叶经颅交流电刺激(tACS)研究δ波相位在时间测中的作用
时间预测就是我们的大脑在处理信息时,能够根据已有的经验和模式,提前判断某件事情发生的时间。这种能力是我们进行各种活动、做出反应和做决策的重要基础。大脑通过神经振荡(尤其是delta波段,1-4 Hz)的相位对齐(phase alignment)预测未来事件的时间,顶叶、感觉区和前额叶构成分布式网络。先前研究(如Daume et al.)发现顶叶delta波段相位一致性(ITPC)与时间预测性能相关,但缺乏因果证据。在单模态(即视觉)和跨模态(即视觉 -触觉)任务设置中均观察到了这种相关性,这表明该过程并非特定于某种模态。因此本研究是就采用,经颅交流电刺激(tACS)特异性调节脑振荡的相位,以验证delta相位的功能。
悦影科技
2025/03/14
920
经颅直流电刺激对大脑网络的调制
经颅直流电刺激(tDCS)可以影响行为任务的表现,改善大脑状况的症状。然而,tDCS是如何影响大脑功能和连通性的还不清楚。在这里,我们测量了磁共振兼容tDCS全脑分析中获得的依赖于血氧水平(BOLD)的fMRI数据的功能连通性(FC)指标的变化,修正了错误发现率。志愿者(n = 64)接受了真tDCS、伪tDCS和休息(无刺激),使用三个预先建立的电极tDCS目标之一,以左背外侧前额叶皮层(DLPFC, n = 37)、侧颞顶区(LTA, n = 16)或颞上皮层(STC, n = 11)为目标。在每个电极中模拟E场最高的大脑网络中,真tDCS与远程节点的连通性降低。在DLPFC-tDCS活动期间,额顶叶网络和亚属ACC之间的连接减少,而在LTA-tDCS活动期间,听觉躯体运动网络和额盖之间的连接减少。活跃的DLPFC-tDCS也与重叠亚属ACC的眶额网络内的连接性增加有关。在不考虑目标的情况下,在真和假tDCS中,感觉运动和注意区域的FC指标均增加,这可能反映了tDCS的认知-知觉需求。综上所述,这些结果表明tDCS可能对正在进行的大脑活动具有预期的和非预期的影响,强调了在tDCS的基础科学和临床试验中包括假刺激、无刺激和真刺激的重要性。
悦影科技
2022/09/05
4050
NeuroImage|在视觉感知过程中对枕叶皮层的经颅电刺激改变了BOLD活动的幅度:一种tES-fMRI方法
本研究的目的是探讨视觉皮层中的血氧水平依赖(BOLD)信号变化,是否可以作为反映经颅电刺激(tES)在线和离线效应的生物标志物。研究采用阳极经颅直流电刺激(tDCS)和10 Hz经颅交流电刺激(tACS),刺激时间为10分钟,刺激部位为健康成年人枕叶皮层,在呈现不同视觉刺激时进行。研究设计为交叉、双盲实验。同时,也进行了控制实验,使用了假刺激以及另一种电极布置。
悦影科技
2025/01/10
1150
相位相关TMS对脑电皮层运动网络的影响
已有研究对经颅磁刺激(TMS)应用于大脑振荡,观察磁刺激对大脑状态的影响。然而,没有人研究相位相关的TMS是否可能调节属于同一网络的同源远端脑区连接。在网络靶向TMS的框架下,我们研究了对持续的大脑振荡的特定相位的刺激是否有利于刺激目标的远端网络节点出现更强的皮质-皮质(c-c)同步。在24名健康个体的实验中,TMS脉冲刺激刺激初级运动皮层(M1),间隔1个月,重复两次。考虑到TMS脉冲是在μ频率振荡的正(峰)或负(谷)相位时发出,刺激效应取决于在感觉运动网络的同源区域内c-c同步。扩散加权成像(DWI)用于研究感觉运动网络中的c-c连接,并识别与刺激点连接的对侧区域。根据应用TMS脉冲的时间(峰或谷),其对脑内神经网络同步性的影响有明显的变化。研究发现,谷刺激试次与峰值刺激试次相比,在μ频带进行TMS脉冲后(0-200ms)的M1-M1相位锁值同步更高。本文发表在The Journal of Physiology杂志。
用户1279583
2022/02/28
9500
相位相关TMS对脑电皮层运动网络的影响
BP综述|贯穿一生的TMS:发育和退行性过程的影响
经颅磁刺激(Transcranial magnetic stimulation, TMS)已成为一种重要的无创性技术,用于研究整个生命周期的皮质兴奋性和可塑性,为神经发育和神经变性过程提供了有价值的见解。在这篇综述中,我们探讨了TMS的应用对我们理解正常发育、健康老龄化、神经发育障碍和成人发病的神经退行性疾病的影响。通过介绍TMS测量中关键的发育里程碑和年龄相关的变化,我们为理解神经递质系统的成熟和一生中认知功能的轨迹提供了基础。在此基础上,本文深入探讨了神经发育障碍的病理生理机制,包括自闭症谱系障碍、注意缺陷多动障碍、Tourette综合征和青少年抑郁症。本文重点介绍了近年来TMS在神经递质环路改变和皮质可塑性障碍方面的研究结果,并强调了TMS作为一种有价值的工具来揭示潜在的机制和指导未来的治疗干预。我们也回顾了TMS在研究和治疗最常见的成人神经退行性疾病和迟发性抑郁症中的新作用。通过概述无创脑刺激技术在这些疾病中的治疗应用,我们讨论了越来越多的证据支持其作为症状管理和潜在减缓疾病进展的治疗工具。从TMS研究中获得的见解促进了我们对健康和疾病状态下潜在机制的理解,最终为开发更有针对性的诊断和治疗策略提供了信息。
悦影科技
2024/05/24
3060
从组水平到个体水平的精神分裂症谱系障碍无创脑刺激
精神分裂症谱系障碍(SSDs,schizophrenia spectrum disorders)患者经常经历持续的社会认知障碍,并伴有较差的功能结果。目前还没有已被批准的治疗方案来治疗这些症状,当前我们非常需要新的治疗策略。迄今为止,已经有许多工作阐明不同的社交过程中SSDs患者的认知障碍的潜在神经回路,此外,一些研究指出这些神经环路可以使用神经刺激手段予以调节。此外,功能连接映射和电场建模的进展可能被用来确定个体化的治疗目标,以最大限度地发挥脑刺激对社会认知网络的影响。
用户1279583
2022/02/28
5720
从组水平到个体水平的精神分裂症谱系障碍无创脑刺激
Nature子刊:无创电刺激逆转老年人衰退的工作记忆能力
随着年纪的增大,我们可能会有这样的体验:年轻的时候轻而易举记住的事情现在却常常忘记,不禁感叹,时间不仅“摧毁”了我们的容颜,而且也“摧毁”了我们的记忆力。如果你的记忆力被“摧毁”到一定的程度,可能你得了一种称之为阿尔兹海默症的病,俗称老年痴呆。老年痴呆最主要的临床症状之一是工作记忆能力的损伤。退一步说,即使正常的、没有患有老年痴呆的老年人,也会伴随着工作记忆能力的大幅衰退(和年轻人相比),因此,研究大脑的老化规律和机制,以及发展出能够提高老年人认知功能(包括工作记忆)的方法策略是神经科学研究的主要目的之一。近日,美国波士顿大学研究团队在Nature neuroscience 杂志发表研究成果,发现与年轻人相比,老年人颞叶皮层θ-γ频段的相-幅耦合(phase-amplitude coupling, PAC)以及前额叶和颞叶皮层之间的θ频段相同步出现异常;并且提出一种无创的大脑电刺激方案(经颅交流电刺激tACS),可逆转老年人衰退的工作记忆能力,这种逆转效应可以在电刺激停止后持续50分钟。
悦影科技
2020/11/26
4830
Nature子刊:无创电刺激逆转老年人衰退的工作记忆能力
BP综述|个性化和基于回路的经颅磁刺激:证据、争议和机遇
绘制脑连接图的神经影像学方法的发展改变了我们对精神疾病、脑刺激的分布效应以及如何最好地利用经颅磁刺激靶向和改善精神症状的理解。与此同时,神经影像学研究表明,像前额叶皮层这样的高阶脑区(代表了精神疾病最常见的脑刺激靶点)在大脑连接方面表现出了一些最高水平的个体差异。这些发现为基于个体特异性脑网络结构的个体化靶点选择提供了理论基础。最近的进展使确定可重复的个体化靶点成为可能,其精度为毫米,采集时间为临床可掌握。这些进展使空间个性化经颅磁刺激靶向的潜在优势得以评估并转化为基础和临床应用。在这篇综述中,我们概述了目标位点个性化的动机、初步支持(主要在抑郁症中)、来自其他脑刺激模式的聚合证据,以及抑郁症和前额叶皮层之外的普遍适用性。最后,我们将详细介绍方法学建议、争议和值得注意的替代方案。总体而言,虽然这一研究领域看起来很有前景,但个性化靶向的价值仍不清楚,使用经过验证的方法学进行专门的大型前瞻性随机临床试验是至关重要的。
悦影科技
2024/06/27
2580
Neuron:背侧流中θ振荡的选择性夹带可提高听觉工作记忆表现
已经证实背侧流(Dorsal Stream)在工作记忆中操作听觉信息的作用。然而,该网络中的振荡动力学及其与行为的因果关系仍未明确。通过同步使用MEG/EEG,我们发现在需要比较两种不同时间顺序模式差异的任务中,背侧流中θ振荡可以预测被试的操作能力。我们利用θ节律性TMS与EEG结合的方法,在两种刺激之间的静息态间隔内,对MEG识别目标(左侧顶内沟)进行脑振荡与行为之间的因果关系研究。节律性TMS引发了θ振荡并提高了被试的准确性。TMS诱发的振荡夹带随着行为增强而增加,而且这两种增强都随着被试的基线水平而产生变化。这些结果在旋律对比控制任务(melody-comparison control task)中没有观察到,在非节律性TMS中也没有观察到。这些数据表明,背侧流中的θ活动与记忆操作有因果关系。本文发表在Neuron杂志。
用户1279583
2022/02/28
6810
Neuron:背侧流中θ振荡的选择性夹带可提高听觉工作记忆表现
Science advances:非侵入性脑刺激技术的生理效应在整个皮层区域存在本质差异
经颅磁刺激(transcranial magnetic stimulation, TMS)是一种非侵入性调节大脑活动和行为的方法,但是其效果在不同的研究和个体之间仍然存在很大差异,从而限制其在研究或临床中的大规模应用。本文中作者发现低频TMS刺激对感知(sensory)区域和认知(cognitive)区域的功能连接有相反的影响,进而通过生物物理模型确定了这些区域特异性作用的神经机制:额叶皮层的TMS刺激减弱了局部抑制(local inhibition)并破坏了前馈(feedforward)和反馈(feedback)连接,而枕叶皮质的TMS刺激增强了局部抑制及正向信号传导。
用户1279583
2020/02/26
1.5K0
推荐阅读
相关推荐
Brain:阿尔茨海默病无创脑刺激的新兴领域
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档