Data Feeds 过滤器Filters

此功能是对反向交易者的一个相对较晚的添加,必须适合已经存在的内部结构。这使得它不像希望的那样灵活和 100% 的功能完整,但在许多情况下它仍然可以达到目的。

尽管实现尝试允许即插即用过滤器链接,但预先存在的内部结构很难确保始终可以实现。因此,一些过滤器可能会被链接,而另一些可能不会。

目的

  • 转换数据馈送提供的值提供不同的数据馈送

开始实施是为了简化可以通过cerebro API 直接使用的两个明显过滤器的实现。这些是:

  • 重采样 ( cerebro.resampledata)

在这里,过滤器转换传入数据馈送timeframe的和。例如:compression

(Seconds, 1) -> (Days, 1)

这意味着原始数据馈送是分辨率为1 Second的交付条。重采样过滤器截取数据并对其进行缓冲,直到它可以提供1 天柱。当看到第二天的1 秒柱时,就会发生这种情况。

对于与上述相同的时间范围,过滤器将使用1 秒 分辨率柱来重建1 天柱

这意味着1 Day bar 的交付次数与看到的1 Second bar一样多次 ,并更新以包含最新信息。

例如,这模拟了实际交易日的发展情况。

len(data)只要天数不变,数据的长度以及策略的长度就保持不变。

 

工作中的过滤器

给定现有的数据馈送/源,您可以使用addfilter数据馈送的方法:

data = MyDataFeed(dataname=myname)
data.addfilter(filter, *args, **kwargs)
cerebro.addata(data)

即使它恰好与重采样/重放过滤器兼容,也可以执行以下操作:

data = MyDataFeed(dataname=myname)
data.addfilter(filter, *args, **kwargs)
cerebro.replaydata(data)

过滤器接口

Afilter必须符合给定的接口,即:

  • 接受此签名的可调用对象:
callable(data, *args, **kwargs)

或者

  • 可以实例化调用的类
    • 在实例化期间,该__init__方法必须支持签名:
def __init__(self, data, *args, **kwargs)
  • __call__方法带有以下签名:
def __call__(self, data, *args, **kwargs)

将为来自数据馈送的每个新传入值调用该实例。\*args\*kwargs是相同的传递给__init__

返回值

* `True`:数据提要的内部数据获取循环必须重新尝试,从提要中获取数据,因为流的长度被操纵

*`False` 即使数据可能已被编辑(例如:更改了`close`价格) ,流的长度保持不变

在基于类的过滤器的情况下,可以实现 2 种附加方法

  • last带有以下签名:
def last(self, data, *args, **kwargs)
  • 这将在数​​据馈送结束时调用,允许过滤器传递它可能已经缓冲的数据。一个典型的例子是 重采样,因为一个条被缓冲,直到看到下一个时间段的数据。当数据馈送结束时,没有新数据可以将缓冲的数据推出。last提供了将缓冲数据推出的机会。
def __init__(self, data, *args, **kwargs) -> def __init__(self, data)

例子:过滤器

一个非常快速的过滤器实现:

class SessionFilter(object):
    def __init__(self, data):
        pass

    def __call__(self, data):
        if data.p.sessionstart <= data.datetime.time() <= data.p.sessionend:
            # bar is in the session
            return False  # tell outer data loop the bar can be processed

        # bar outside of the regular session times
        data.backwards()  # remove bar from data stack
        return True  # tell outer data loop to fetch a new bar

这个过滤器

  • 使用data.p.sessionstartdata.p.sessionend(标准数据馈送参数)来决定一个栏是否在会话中。
  • 如果在会话中,返回值False表示什么都没有做,当前柱的处理可以继续
  • 如果not-in-the-session,则从流中移除柱并True返回以指示必须获取新柱。

    使用 接口data.backwards()LineBuffer这深入挖掘了backtrader的内部结构。

过滤器的使用:

  • 一些数据馈送包含非正常交易时间的数据,交易者可能不感兴趣。使用此过滤器,只会考虑会话中的酒吧。

过滤器的数据伪 API

在上面的示例中,已经显示了过滤器如何调用 data.backwards()以从流中删除当前柱。来自数据馈送对象的有用调用是过滤器的伪 API

  • data.backwards(size=1, force=False): 通过向后移动逻辑指针从数据流中删除大小条(默认为1)。如果 force=True,则物理存储也被删除。删除物理存储是一项微妙的操作,仅用于内部操作。
  • data.forward(value=float('NaN'), size=1): 将大小条向前移动存储,如果需要,增加物理存储并填充 value
  • data._addtostack(bar, stash=False): 添加bar到堆栈以供以后处理。 是一个包含与数据馈送bar一样多的值的可迭代对象。lines如果stash=False添加到堆栈的条将在下一次迭代开始时立即被系统消耗。如果stash=Truebar 将经历整个循环处理,包括可能被过滤器重新解析
  • data._save2stack(erase=False, force=False):将当前数据栏保存到堆栈中以供以后处理。如果erase=Truethen data.backwards将被调用并接收参数force
  • data._updatebar(bar, forward=False, ago=0):使用可迭代对象bar中的值覆盖数据流ago 位置中的值。默认情况下ago=0,当前栏将更新。与 -1,前一个。

另一个例子:Pinkfish 过滤器

这是一个过滤器的示例,它可以链接到另一个过滤器,也就是重放过滤器Pinkfish的名称来自该库,该库在其主页中描述了这个想法:使用每日数据执行只有当日数据才能执行的操作。

实现效果:

  • 每日柱将分为 2 个部分:OHL然后C.
  • 这 2 部分与回放链接在一起,以在流中发生以下情况:
With Len X     -> OHL
With Len X     -> OHLC
With Len X + 1 -> OHL
With Len X + 1 -> OHLC
With Len X + 2 -> OHL
With Len X + 2 -> OHLC
...

逻辑:

  • OHLC收到一个 bar 时,它会被复制到一个 interable 中并分解为:
    • 一个OHL bar。因为这个概念实际上并不存在,所以收盘 价被开盘价取代真正形成一根OHLO 柱线。
    • 一个C也不存在的bar。现实情况是它会像滴答声一样交付CCCC
    • 成交量(如果分布在 2 个部分之间)
    • 当前栏从流中移除
    • OHLO放入堆栈以立即处理
    • CCCC被放入 stash 进行下一轮处理
    • 因为堆栈有一些东西可以立即处理,所以过滤器可以返回False来指示它。

过滤器与:

  • 重放过滤器将和部分放在一起最终OHLO传递CCCC一个OHLC酒吧。

用例:

  • 看到今天的最大值是否是过去 20 个会话中的最大值,则发出一个订单,该订单在第 2次交易Close时执行。

编码:

class DaySplitter_Close(bt.with_metaclass(bt.MetaParams, object)):
    '''
    Splits a daily bar in two parts simulating 2 ticks which will be used to
    replay the data:

      - First tick: ``OHLX``

        The ``Close`` will be replaced by the *average* of ``Open``, ``High``
        and ``Low``

        The session opening time is used for this tick

      and

      - Second tick: ``CCCC``

        The ``Close`` price will be used for the four components of the price

        The session closing time is used for this tick

    The volume will be split amongst the 2 ticks using the parameters:

      - ``closevol`` (default: ``0.5``) The value indicate which percentage, in
        absolute terms from 0.0 to 1.0, has to be assigned to the *closing*
        tick. The rest will be assigned to the ``OHLX`` tick.

    **This filter is meant to be used together with** ``cerebro.replaydata``

    '''
    params = (
        ('closevol', 0.5),  # 0 -> 1 amount of volume to keep for close
    )

    # replaying = True

    def __init__(self, data):
        self.lastdt = None

    def __call__(self, data):
        # Make a copy of the new bar and remove it from stream
        datadt = data.datetime.date()  # keep the date

        if self.lastdt == datadt:
            return False  # skip bars that come again in the filter

        self.lastdt = datadt  # keep ref to last seen bar

        # Make a copy of current data for ohlbar
        ohlbar = [data.lines[i][0] for i in range(data.size())]
        closebar = ohlbar[:]  # Make a copy for the close

        # replace close price with o-h-l average
        ohlprice = ohlbar[data.Open] + ohlbar[data.High] + ohlbar[data.Low]
        ohlbar[data.Close] = ohlprice / 3.0

        vol = ohlbar[data.Volume]  # adjust volume
        ohlbar[data.Volume] = vohl = int(vol * (1.0 - self.p.closevol))

        oi = ohlbar[data.OpenInterest]  # adjust open interst
        ohlbar[data.OpenInterest] = 0

        # Adjust times
        dt = datetime.datetime.combine(datadt, data.p.sessionstart)
        ohlbar[data.DateTime] = data.date2num(dt)

        # Ajust closebar to generate a single tick -> close price
        closebar[data.Open] = cprice = closebar[data.Close]
        closebar[data.High] = cprice
        closebar[data.Low] = cprice
        closebar[data.Volume] = vol - vohl
        ohlbar[data.OpenInterest] = oi

        # Adjust times
        dt = datetime.datetime.combine(datadt, data.p.sessionend)
        closebar[data.DateTime] = data.date2num(dt)

        # Update stream
        data.backwards(force=True)  # remove the copied bar from stream
        data._add2stack(ohlbar)  # add ohlbar to stack
        # Add 2nd part to stash to delay processing to next round
        data._add2stack(closebar, stash=True)

        return False  # initial tick can be further processed from stack

 

评论被关闭。