Create Ansible filters to work with Apache Flume configuration.

Initial notes

This example contains several Apache Flume filters to parse YAML configuration.

  • flume_agents – extract agents
  • flume_sources – extract sources, may be limited to a specific agent
  • flume_sinks – extract sinks, may be limited to a specific agent
  • flume_channels – extract channels, may be limited to a specific agent
  • flume_configuration – extract configuration rules, may be limited to a specific agent

Sample YAML configuration.

agent:
  sources:
    seqGenSrc:
      type: seq
      channels: memoryChannel
  channels:
    memoryChannel:
      type: memory
      capacity: 100
  sinks:
    loggerSink:
      type: logger
      channel: memoryChannel

The code

Define filters and store these inside filter_plugins directory.

#!/usr/bin/env python3

class FilterModule(object):
    def filters(self):
        return {
            ';flume_configuration';: self.flume_configuration,
            ';flume_sources';: self.flume_sources,
            ';flume_sinks';: self.flume_sinks,
            ';flume_channels';: self.flume_channels,
            ';flume_agents';: self.flume_agents,
        }

    @staticmethod
    def extract_keys(data):
        keys = []
        if isinstance(data, dict):
            for key in data:
                keys.append(key)
        return keys

    def flume_configuration(self, data, agent=""):
        if agent != "":
            data = data[agent]
        return self.flume_configuration_internal(data)

    def flume_configuration_internal(self, data, path=';';, configuration={}):
        if not isinstance(data, dict):
            configuration[path[:-1]] = data
        else:
            for key in data:
                self.flume_configuration_internal(data[key], path + key + ';.';, configuration)
        return configuration

    def flume_elements(self, data, element, elements=[]):
        if isinstance(data, dict):
            for key in data:
                if key == element:
                    elements = elements + self.extract_keys(data[key])
                elements = self.flume_elements(data[key], element, elements)
        return elements

    def flume_sources(self, data, agent=""):
        if agent != "":
            data = data[agent]
        return self.flume_elements(data, "sources")

    def flume_sinks(self, data, agent=""):
        if agent != "":
            data = data[agent]
        return self.flume_elements(data, "sinks")

    def flume_channels(self, data, agent=""):
        if agent != "":
            data = data[agent]
        return self.flume_elements(data, "channels")

    def flume_agents(self, data):
        return self.extract_keys(data)

Create an Ansible playbook that will be used to parse the Apache Flume configuration template.

- hosts: localhost
  vars:
    flume_config:
      agent:
        sources:
          seqGenSrc:
            type: seq
            channels: memoryChannel
        channels:
          memoryChannel:
            type: memory
            capacity: 100
        sinks:
          loggerSink:
            type: logger
            channel: memoryChannel

  tasks:
    - name: Parse and display configuration template
      debug:
        msg: "{{ rendered_flume_template.split(';\n';) }}"
      vars:
        rendered_flume_template: "{{ lookup(';template';, ';./template.j2';, trim_blocks=True) }}"

Create a template that uses defined filters.

#jinja2: lstrip_blocks: True
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'

{% set flume_agents = flume_config | flume_agents %}
{% for agent in flume_agents %}
{{ agent }}.sources = {{ flume_config | flume_sources(agent) | join(" ") }}
{{ agent }}.channels = {{ flume_config | flume_channels(agent) | join(" ") }}
{{ agent }}.sinks = {{ flume_config | flume_sinks(agent) | join(" ") }}
{% endfor %}

{% for agent in flume_agents %}
  {% set configuration = flume_config | flume_configuration(agent) %}
  {% for key in configuration %}
    {{- agent }}.{{ key }} = {{ configuration[key] }}
  {% endfor %}
{% endfor %}

Execute playbook to generate configuration template.

$ ansible-playbook playbook.yml 
[WARNING]: provided hosts list is empty, only localhost is available. Note that the implicit localhost does not match ';all';

PLAY [localhost] *********************************************************************************************************************************************

TASK [Gathering Facts] ***************************************************************************************************************************************
ok: [localhost]

TASK [Parse and display configuration template] **************************************************************************************************************
ok: [localhost] => {
    "msg": [
        "# Licensed to the Apache Software Foundation (ASF) under one",
        "# or more contributor license agreements.  See the NOTICE file",
        "# distributed with this work for additional information",
        "# regarding copyright ownership.  The ASF licenses this file",
        "# to you under the Apache License, Version 2.0 (the",
        "# \"License\"); you may not use this file except in compliance",
        "# with the License.  You may obtain a copy of the License at",
        "#",
        "#  http://www.apache.org/licenses/LICENSE-2.0",
        "#",
        "# Unless required by applicable law or agreed to in writing,",
        "# software distributed under the License is distributed on an",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY",
        "# KIND, either express or implied.  See the License for the",
        "# specific language governing permissions and limitations",
        "# under the License.",
        "",
        "",
        "# The configuration file needs to define the sources,",
        "# the channels and the sinks.",
        "# Sources, channels and sinks are defined per agent,",
        "# in this case called ';agent';",
        "",
        "agent.sources = seqGenSrc",
        "agent.channels = memoryChannel",
        "agent.sinks = loggerSink",
        "",
        "agent.sources.seqGenSrc.type = seq",
        "agent.sources.seqGenSrc.channels = memoryChannel",
        "agent.channels.memoryChannel.type = memory",
        "agent.channels.memoryChannel.capacity = 100",
        "agent.sinks.loggerSink.type = logger",
        "agent.sinks.loggerSink.channel = memoryChannel",
        ""
    ]
}

PLAY RECAP ***************************************************************************************************************************************************
localhost                  : ok=2    changed=0    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0   

Additional notes

Code used in this example is available at GitHub milosz/ansible-flume-filter

Play with it as this is simpler than you think.