• 注册
    • 今日签到
    • 连续签到

    暂没有数据

    暂没有数据

    • 总打赏排行
    • 今日收益排行

    暂没有数据

    暂没有数据

    BBS 实时线报 关注:1 内容:41

    用于CT实时重建程序的流水线架构设计

  • 查看作者
  • 打赏作者
  • 拉黑名单
  • 引言

    从事了CT的重建软件开发有大概三年了,前两年跟着某国际大厂的大神学习,一起搭建了一个用在大型医用CT的重建程序,最近半年在一家做牙科CT的公司,自己搭了一个牙科CT用的实时重建程序。之前做的大型医用CT重建程序用8核CPU+GTX1080Ti,对于128排CT能做到实时重建(边扫描边出图且重建速度高于采集速度),而近半年做的牙科CT重建程序在六核i5+GTX1060上大概能做到单例十秒以内(扫描时间大概15秒),由于牙科CT一般只扫描一圈,相当于大型医用CT的单圈轴扫,不可能做实时,但从扫描结束到重建完成也就几秒钟时间(主要看后处理的步骤,不做后处理的话扫描结束后1秒内完成重建)。

    本文将介绍我所搭建的CT重建程序架构(本文中称之为流水线架构),搭建框架时只用了C++17和STL,没有使用任何第三方库。具体的数据处理中有用Intel IPP、Intel MKL和CUDA等库,但在本文中没有体现。

    本人非软件科班出身,抛砖引玉,希望能得到各路大神的指点!

    什么是CT

    CT全称Computed Tomography,即计算机层析成像,主要是利用X射线对被扫描物体进行环绕扫描,然后通过计算机生成物体内部的横截面精确结构图。最常见的CT是大型医用CT,常用于观察肺部、脑部、骨骼等的结构和病变。此外,也有口腔、乳腺等专科医用CT、工业CT、安检CT等。

    CT采集到的数据是探测器和X光源在物体两侧环绕轴心周向旋转时探测器探测到的X射线强度值,而重建后的图像(不考虑三维渲染)是被扫描物体的横截面图,这中间需要很多个步骤的处理。首先,探测器的生数据需要经过去除本底响应、增益归一化、负对数变换等一系列处理转换为x射线穿过被扫描物体的衰减系数。然后,传统的线性代数重建方法(FBP或FDK)需要对数据进行斜坡滤波、反雷登变换才能形成横断面图。最后横断面图还需要经过各种后处理(伪影去除、图像增强等)。除了线性代数的重建方法外,如果想获得更好的图像质量,还可以使用迭代重建的方法,迭代的思路一般是先通过代数重建方法生成大致的重建图,然后模拟正投影的过程重新得到模拟的探测器数据,跟原始的探测器数据进行比较,然后不断修正重建图,直到模拟的探测器数据与实际的非常接近。迭代重建由于计算量大而且需要做正投影,所以无法实现边扫描边出图的实时重建。

    CT图像实时重建的特点

    • 数据流量大

    以牙科CT为例,假设探测器尺寸为800×800,探测器输出为两个字节的整形数据,一次扫描需要采集500帧,在10秒内完成,那么扫描数据的大小为2Bx800x800x500=640MB,约64MB每秒;而一个大型医用64排CT进行螺旋扫描,假设探测器大小为800×64,转速为0.5秒/圈,每圈采集1000帧数据,则数据流量约为100MB/s,而现在最新的CT已经有512排甚至640排(通过飞焦点技术实现),数据流量更是翻了好几倍。如此大的数据量即使只用一步处理完,也需要非常快的运算及较高的IO性能。

    • 实时性

    CT图像在医学上作为诊断的依据,而且大型医院通常CT资源非常紧张,每个小时可能要扫描十几乃至几十个病人,当然希望扫描完之后越快得到图像越好。CT实时性的更重要一方面是,有很多扫描需要一边扫描一边出图,而不是等扫描全部结束后才看图,例如在做穿刺的时候,医生需要通过CT图像知道探针的实时位置。另一方面,X射线对病人有一定的伤害,即使在一般的螺旋扫描中,医生也希望一边扫描一边能看到实时的预览图,从而判断扫描是否正常,发现异常时需要立即终止扫描以减少不必要的扫描。

    • 多处理步骤

    前面已经介绍,CT重建包括预处理、滤波反投影、后处理等几个步骤,而这些只是最基本的操作,实际上由于各种物理或机械的原因,还要增加很多处理,例如:由于不同物质对不同的能量的射线粒子的衰减系数不同,需要进行射束硬化校正;由于被扫描物体内部一些吸收特别强的物质会造成严重的硬化或光子饥饿,从而形成金属伪影,需要进行去金属伪影处理;由于X光的散射而造成一些低频的CT值偏移,需要进行散射校正;由于光源的散焦需要进行散焦校正……此外根据某些实际应用要求,也还要添加各种处理,例如想要提高空间分辨率,需要进行共轭偏置扫描,那么在重建时需要对共轭数据进行交织;为了更好地分辨不同的物质可能要进行两次或多次不同能量的扫描……各种处理加起来,CT图像的实时重建通常包含至少十几个步骤。这些步骤有些比较复杂,有些比较简单,有些可以并行,有些不可以并行,有些可以分为一个帧一个帧地处理,有些需要前后几个帧甚至半圈的数据一起处理,有些步骤适合在CPU上运行,有些比较适合在GPU上处理。

    流水线简介

    流水线(Pipeline, Assembly line)原本是工业生产的一个术语。现代化的生产线把生产的生产过程细分为多个步骤,把每个步骤分别分配给不同的生产单位,每个生产单位只专注处理一个步骤,生产单位之间用皮带等传送结构连接,每个生产单位都在自己的位置待命,当上一个单位的成品输送进来,就开始处理,处理完后立即放到传送带交给下一个生产单位。此链接是一个福特汽车最早的生产流水线的视频 https://v.m.autohome.com.cn/v-2162141.html 大家可以再次感受一下这一百多年前的生产技术。流水线的生产方式有很多优点,例如:可扩展性高、专业化程度高、可靠性高、效率高等。

    CT图像的实时重建非常适合使用流水线的架构。首先,实时性要求非常高,如前面所述,对于大型医用CT的,甚至要求一边扫描一边出图,扫描到某个地方,那个地方的图像就要在很短的延时后立即显示出来。如果采用每个步骤都先把所有数据都处理完再进行下一步的方式,那就没有办法做到实时显示,而流水线的方式只要第一个数据开始进入,整个流水线就开始流动,第一幅图像很快就会出来。其次,CT扫描的数据量非常大,如果每一步都要一次把所有数据处理完,那么,每一步的输入输出数据量都非常大,对于一些范围较大的扫描,数据量可能会超过计算机的物理内存,那么在处理数据的时候就要频繁进行磁盘读写;而使用流水线架构,每个处理单位每次处理一个或者几个帧,数据立即输出到管道然后马上被下一个处理单元利用,最后一个单元处理完才把数据写入磁盘或传给显示单元,内存占用会得到较大的优化,由于管道的数据一般是在内存上的,所以也就避免了频繁的磁盘读写。综上所述,CT图像实时重建程序非常适合使用类似工厂流水线的架构。

    流水线架构的软件设计

    用于CT实时重建程序的流水线架构设计
    流水线架构示意图

    上图是一个简单的生产流水线架构示意图。架构中有几个主要的部分。方框是四个不同的处理单元(Agent),对应于生产流水线的生产步骤,其中处理单元又分为序列处理单元(Sequential Agent)和并发处理单元(Concurrent Agent)。处理单元内有一个或多个处理核,处理核对应于生产流水线上没道工序需要用到的工具或工位,其中序列处理单元内只能有一个处理核,而并发处理单元内可以有一个或多个相同的处理核。连接在处理单元之间的箭头是管道(Pipe),对应于生产流水线上的传输履带。计算机的物理线程相当于流水线工厂中的工人,每个工人在同一时间能操作工厂中的任意一把工具,而每个计算机线程在同一时间内能运行任意一个处理核。

    管道Pipe及其读写标签ReadToken/WriteToken

    管道用于连接两个数据处理单元,生产者(Producer)把数据按顺序写入管道,消费者(Consumer)按写入的顺序从管道中读取数据。管道是流水线架构中最重要的环节,整个流水线处理过程是由管道来驱动的,管道中没有数据时,下一步的处理单元会被阻塞,而管道数据满时,上一步的处理单元会被阻塞,这一机制控制着每个数据处理单元的吞吐量均衡。如果某个处理单元比其他单元慢,那么它就会通过管道来阻塞其他的单元;相反,如果某个数据处理单元处理速度比其他单元快,它会经常被管道阻塞,等待过程中不消耗CPU资源,系统会把CPU资源分给其他处理慢的单元。在工厂里,如果某道工序特别慢,可以增加工位,让更多的工人参与这道工序。同理,对于处理速度较慢的单元,可以增加其线程数,让该单元可以容纳更多的CPU资源来并行处理。

    管道的设计需要满足以下需求:

    • 多线程读写
    • 高性能
    • 占用内存尽量少
    • 重叠读取(overlap reading)
    • 分段(shots)

    以下先放出主要的声明代码,然后再解析每一函数和变量是如何满足以上要求的。

    template<typename T>
    class Pipe: Noncopyable
    {
    public:
    
      void SetProducer(
        const std::string& name,
        const size_t numWriteThreads,
        const size_t numFramesPerWriter);
      void SetConsumer(
        const std::string& name,
        const size_t numReadThreads
        const size_t numFramesPerRead,
        const size_t overlapSize);
      void InitBuffer(T&& templateFrame);
      void InitBuffer(T&& templateFrame, const size_t specificBufferSize);
    
      PipeWriteToken<T> GetWriteToken();
      PipeWriteToken<T> GetWriteTokenForShotEnd(const size_t numFrames);
        
      PipeReadToken<T> GetReadToken();
    
      void Close();
      // ...
    }
    
    template<typename T>
    class PipeWriteToken
    {
    public:
      size_t GetStartIndex() const;
      size_t GetSize() const;
      bool IsShotStart() const;
      bool IsShotEnd() const;
      T& GetBuffer(size_t index) const;
      // ...
    }
    
    template<typename T>
    class PipeReadToken
    {
    public:
      size_t GetStartIndex() const;
      size_t GetSize() const;
      size_t GetOverlapSize() const;
      bool IsShotStart() const;
      bool IsShotEnd() const;
      const T& GetBuffer(size_t index) const;
      T& GetMutableBuffer(size_t index) const;
      // ...
    }
    

    管道初始化

    SetProducerSetConsumerInitBuffer函数都是用来初始化管道的。SetProducer设定了生产者的默认并发线程数、每次写入多少帧,而SetConsumer设定了消费者的默认并发线程数、每次读取多少帧以及有多少帧是重叠读取。InitBuffer有两个重载,其中一个指定了缓存大小而另一个不指定缓存大小,如果不指定缓存大小,则使用默认的缓存大小:

    defaultBufferSize = 
      numFramesPerWrite * numWriteThreads +
      numFramesPerRead * numReadThreads -
      overlapSize * (numReadThreads - 1);
    

    默认的大小是能保证多线程同时读写的最小值。templateFrame是每个缓存帧的模板,初始化时每个缓存帧都通过复制templateFrame来构造。这几个函数在初始化阶段调用,预先分配好内存,避免在正式开始处理数据时频繁动态管理内存,从而提高了性能。

    读写标签

    GetWriteToken,GetWriteTokenForShotEndGetReadToken三个函数用来获取读写标签。

    生产者在写入数据前调用GetWriteTokenGetWriteTokenForShotEnd获得一个写标签(WriteToken),写标签中包含了本次写入的每个缓存的地址,其中GetWriteToken获得的缓存地址数量是默认的numFramesPerWrite,而GetWriteTokenForShotEnd指定了本次写入的帧数并表示本次写入的是一个分段的结束。GetWriteTokenGetWriteTokenForShotEnd都是互斥的,确保多个写线程必须按顺序获得写标签,这两个函数也是阻塞的,当消费者还没读完之前写入的缓存,即可写缓存数不足时,GetWriteTokenGetWriteTokenForShotEnd的线程会被阻塞,等待的过程不消耗CPU资源,当可写缓存数量增多时,线程会得到通知并再次判断是否有足够缓存可供写入,不满足则继续等待,满足则函数会返回一个写标签。写标签的析构函数会通知管道,告诉管道这一段缓存已经写入完成,可以给消费者读取。

    同理,消费者在读取数据前调用GetReadToken来获得读标签(ReadToken),读标签中包含本次可读取的缓存的地址。GetReadToken也是互斥的,确保多个读线程按顺序获得标签。当缓存中没有足够的可读数据时,GetReadToken会被阻塞,等待过程也不消耗CPU资源。读标签的析构函数也会通知管道,缓存已读取完毕,不需要被重叠读取的帧所对应的内存可以被回收用作下一次写入。

    读写标签的机制使得管道的内存可以被重复利用,避免了动态申请和注销内存,同时也保证了多线程读写时的读写顺序正确性。

    处理单元AgentBase

    一个处理单元相当于流水线上的一道工序,一个处理单元可以连接多个管道。一个典型的处理单元连接两个管道,一个输入一个输出,当输入管道有数据可读,而输出管道有空位时,处理单元向输入管道获得读标签、向输出管道获得写标签,然后处理单元开始处理数据,处理完成后注销读写标签并等待下一组数据。一些特殊的处理单元也可以不连接管道,而是用其它的输入输出,例如用作读写的单元;而另一些特殊的处理单元也可以有多个输入或者多个输出。

    class AgentBase
    {
    public:
      AgentBase(const std::string& agentName);
      virtual void SetPipes(std::map<std::string, std::shared_ptr<PipeBase>>&){};
      virtual void GetReady(){};
      virtual void Start() = 0;
      virtual void Join() = 0;
    private:
      std::future<void> mainThread;
       //...
    }
    

    处理单元分两种:序列处理单元和并发处理单元。

    序列处理单元SequentialAgent

    序列处理单元用于处理一些不需要或不能进行CPU并发的任务,例如磁盘读写、GPU调用或其他一些时变系统(每一次的处理依赖于之前的处理结果)。序列处理单元只有一个线程,该线程循环执行获取数据、处理数据、输出数据等任务。主要代码如下:

    class SequentialAgentBase: AgentBase
    {
    public:
      SequentialAgentBase(const std::string& agentName);
      void Start() override final;
      void Join() voerde final;
    protected:
      virtual void MainThread() = 0;
      //...
    }
    
    void SequentialAgentBase::Start()
    {
      mainThread = std::async(
        std::launch::async,
        &SequentialAgentBase::MainThreadImpl,
        this);
    }
    
    void SequentialAgentBase::Join()
    {
      mainThread.get();
    }
    

    SequentialAgentBase的继承类需要实现自己的MainThreadMainThread内循环等待和执行任务。

    并发处理单元ConcurrentAgent

    并发处理单元可以并发工作,有一个主线程和多个分线程,主线程负责等待数据和打包任务,而多个分线程分别循环等待主线程打包的任务并执行。主要代码如下:

    class ConcurrentAgentBase: AgentBase
    {
    public:
      ConcurrentAgentBase(
        const std::string& agentName,
        const size_t numThreads);
      void Start() override final;
      void Join() override final;
    protected:
      virtual void MainThread() = 0;
      virtual void ConcurrentThread(const size_t threadIndex) = 0;
    private:
      std::vector<std::future<void>> concurrentThreads;
      //...
    }
    
    void ConcurrentAgentBase::Start()
    {
      mainThread = std::async(std::launch::async,
        &ConcurrentAgentBase::MainThreadImpl, this);
      for(size_t i  = 0; i<numThreads; ++i)
      {
        concurrentThreads.emplace_back(
          std::async( std::launch::async,
            &ConcurrentAgentBase::ConcurrentThreadImpl,
            this, i));
      }
    }
    
    void ConcurrentAgentBase::Join()
    {
      mainThread.get();
      for(auto& thread: concurrentThreads)
      {
        thread.get();
      }
    }
    
    

    其中MainThread内循环等待数据并打包任务分发给分线程,而每个ConcurrentThread则循环等待MainThread打包的任务并执行,MainThreadConcurrentThread的配合可以用信号量(Semaphore)、同步队列(SyncQueue)或互斥锁和条件变量(condition variable)的组合等方法来实现。主线程的角色是一个单线程执行的任务生成器,所以也是通过协程(coroutine)来实现,C++标准将在20版本中正式加入协程。

    SequentialAgentBaseConcurrentAgentBase都是基类,每个处理单元需要继承SequentialAgentBaseConcurrentAgentBase。继承的类除了必须实现自己的构造函数、MainThreadConcurrentThread外,有必要时要实现AgentBase中的GetReady,如果Agent有与管道连接,还需要实现SetPipes(std::map<std::string, std::shared_ptr<PipeBase>>& pipePtrs)其中,pipePtrs在调用前是输入的管道,调用后是输出的管道。

    下面用一个例子来说明如何实现SequentialAgentBaseConcurrentAgentBase的继承

    class SequentialAgentSample: SequentialAgentBase
    {
    private:
      std::shared_ptr<Pipe<DataType>>& pipeIn;
      std::shared_ptr<Pipe<DataType>>& pipeOut;
    
      void SetPipes(std::map<std::string, std::shared_ptr<PipeBase>>& pipes)
      {
        pipeIn = pipes.at("to SequentialAgentSample");
        pipes.erase("to SequentialAgentSample");
        pipeIn.InitBuffer(...);
        
        pipeOut = std::make_shared<Pipe<DataType>>(new Pipe<DataType>());
        pipes["from SequentialAgentSample"] = pipeOut;
      }
    
      void MainThreadImpl() override
      {
        while(true)
        {
          PipeReadToken<DataType> readToken = pipeIn->GetReadToken();
          PipeWriteToken<DataType> writeToken = readToken->IsShotEnd()?
            pipeOut.GetWriteTokenForShotEnd(readToken.GetSize()) :
            pipeOut.GetWriteToken();
    
          // ... process data from readToken and write output data to writeToken
    
          // readToken and writeToken will be destructed here.
        }
      } 
    }
    
    class ConcurrentAgentSample: ConcurrentAgentBase
    {
    private:
      std::shared_ptr<Pipe<DataType>>& pipeIn;
      std::shared_ptr<Pipe<DataType>>& pipeOut;
      struct Task{
        ReadToken<DataType> readToken;
        WriteToken<DataType> writeToken;
      };
    
      size_t WaitForStanbyThreadAndGetIndex();
      void NotifyThreadStanby();
      void SubmitTask(Task&&);
      Task AcceptTask();
    
      void SetPipes(std::map<std::string, std::shared_ptr<PipeBase>>& pipes)
      {
        pipeIn = pipes.at("to ConcurrentAgentSample");
        pipes.erase("to ConcurrentAgentSample");
        pipeIn.InitBuffer(...);
        
        pipeOut = std::make_shared<Pipe<DataType>>(new Pipe<DataType>());
        pipes["from ConcurrentAgentSample"] = pipeOut;
      }
    
      void MainThreadImpl() override
      {
        while(true)
        {
          WaitForStanbyThreadAndGetIndex();
    
          PipeReadToken<DataType> readToken = pipeIn->GetReadToken();
          PipeWriteToken<DataType> writeToken = readToken->IsShotEnd()?
            pipeOut.GetWriteTokenForShotEnd(readToken.GetSize()) :
            pipeOut.GetWriteToken();
    
          SubmitTask({readToken, writeToken});
        }
      } 
    
      void ConcurrentThreadImpl(const size_t threadIndex) override
      {
        while(true)
        {
          NotifyThreadStandby();
          Task task = AcceptTask();
          // process task;
        }
      }
    }
    

    其中WaitForStanbyThreadAndGetIndexNotifyThreadStandby以及SubmitTaskAcceptTask是两对对控制同步的函数,可以通过Semaphore或SyncQueue来实现。实际上,这两对函数我是在ConcurrentAgentBase里实现的,基类中定义一个TaskBase,继承ConcurrentAgentBase的类可以根据自身需要扩展TaskBase类。

    以下是一个main函数的实例,示范在处理单元(Agent)都设计好后如何把它们通过Pipe连接起来以及如何启动流水线“工厂”。

    int main()
    {
      std::vector<std::shared_ptr<AgentBase>> agents;
      {
        // ... some agents before A
        SequentialAgentSample agentA;
        agents.push_back(std::make_shared(agentA));
        // ... some other agents between A and B
        SequentialAgentSample agentB;
        agents.push_back(std::make_shared(agentB));
        // ... some agents after B
      }
    
      // Connect pipes and agents
      {
        std::map<std::string, std::shared_ptr<PipeBase>> pipes;
        for(auto& agent: agents)
        {
          agent->SetPipes(pipes);
        }
        ASSERT(pipes.empty());
      }
      
      // Get ready
      for(auto& agent: agents)
      {
        agent->GetReady();
      }
      
      // Start
      for(auto& agent: agents)
      {
        agent->Start();
      }
      
      // now all agents and pipes are working
    
      // Join
      for(auto& agent: agents)
      {
        agent->Join();
      }
      
      return 0;
    }
    

    流水线架构优点总结

    • 实时性强

    实时性强体现在两个方面。一个是本身的速度快,如果是一步步地处理,那么在做IO的时候,CPU和GPU可能会闲置,GPU工作的时候,CPU可能也会闲置,大大浪费了计算机资源,而流水线上由于是不同工序并行,所以磁盘IO、CPU、GPU能同时工作,调整得好的话,可以达到同时基本满载。另一个实时性体现在可以边采集边出图,而不用等到全部数据采集完才开始工作。

    • 高效利用内存

    管道在设计时使用的是循环内存,内存的大小之需要满足几个读写线程同时工作,所以每个管道通常之需要缓存几个帧的数据,内存占用极少。

    由于用了读写标签的机制,所有内存空间都在初始化阶段申请好,而不用在开始处理后才动态管理,极大的提升了内存效率。

    • 高效线程

    跟内存一样,线程也是在初始化阶段开辟好的,不需要在开始处理后去开辟。线程之间的切换用阻塞等待的方法,所以每个线程都是在接到任务时短暂工作一个时间片,然后进入等待,等待时基本不占用CPU资源。

    • 易用

    框架搭好后,后续开发每个处理步骤的时候只需要做一个单线程的处理核心,不需要考虑并发、不需要考虑顺序、不需要考虑输入输出的内存管理、不需要关心线程,这些都交给架构去处理。

    流水线架构性能优化技巧

    • 性能调优的辅助代码

    要想优化流水线的性能,就必须要先知道流水线的运行情况,了解哪些地方花费了多少时间、哪些时候CPU被限制了。为此,可以先在代码中添加一些辅助来测量性能,例如:记录下每个管道的平均数据滞留时间、GetReadTokenGetWriteToken的被阻塞时间,记录Agent处理每个任务的时间等。

    • 保证每个管道的通畅

    在理想的情况下,数据被写入管道后,应立即被下一个单元读取,这样才能使数据一直被处理,而不是在等待,而数据在管道中滞留会降低重建的实时性。

    • 控制好每个任务的处理时长

    可以通过改变每个任务包处理的数据帧数来控制每个任务的时长,每个任务的时长不是越大越好、也不是越小越好。任务的时间太小的话,那么在处理线程同步的时间占比就越大,如果任务过于简单,除了减少线程之外,还可以考虑增加每次处理的帧数。任务的时间过长的话,由于线程切换是操作系统控制的,处理过程中线程会频繁被打断,而线程切换也是会耗费资源的。根据经验,每个任务包的时长最好控制在比操作系统的线程时间片长度稍短最为适合,操作系统的线程切换周期(量子时间)与操作系统有关,在Windows中,有两种模式可以选择,后台模式下,线程切换时间大约200ms,而程序模式下大概是10-15ms。

    写在最后

    • 上面的示例代码并不完整,只是一些基础的部分,用来解释流水线架构。像循环内存的实现、同步锁、结束和退出机制这些都没有展示出来,有需要的话以后会慢慢补充或另写文章,欢迎大家指点。

    请登录之后再进行评论

    登录
  • 做任务
  • 帖子间隔 侧栏位置: