Create Ansible filters to work with Apache Flume configuration.
Initial notes
This example contains several Apache Flume filters to parse YAML configuration.
flume_agents
– extract agentsflume_sources
– extract sources, may be limited to a specific agentflume_sinks
– extract sinks, may be limited to a specific agentflume_channels
– extract channels, may be limited to a specific agentflume_configuration
– extract configuration rules, may be limited to a specific agent
Sample YAML configuration.
agent: sources: seqGenSrc: type: seq channels: memoryChannel channels: memoryChannel: type: memory capacity: 100 sinks: loggerSink: type: logger channel: memoryChannel
The code
Define filters and store these inside filter_plugins
directory.
#!/usr/bin/env python3 class FilterModule(object): def filters(self): return { ';flume_configuration';: self.flume_configuration, ';flume_sources';: self.flume_sources, ';flume_sinks';: self.flume_sinks, ';flume_channels';: self.flume_channels, ';flume_agents';: self.flume_agents, } @staticmethod def extract_keys(data): keys = [] if isinstance(data, dict): for key in data: keys.append(key) return keys def flume_configuration(self, data, agent=""): if agent != "": data = data[agent] return self.flume_configuration_internal(data) def flume_configuration_internal(self, data, path=';';, configuration={}): if not isinstance(data, dict): configuration[path[:-1]] = data else: for key in data: self.flume_configuration_internal(data[key], path + key + ';.';, configuration) return configuration def flume_elements(self, data, element, elements=[]): if isinstance(data, dict): for key in data: if key == element: elements = elements + self.extract_keys(data[key]) elements = self.flume_elements(data[key], element, elements) return elements def flume_sources(self, data, agent=""): if agent != "": data = data[agent] return self.flume_elements(data, "sources") def flume_sinks(self, data, agent=""): if agent != "": data = data[agent] return self.flume_elements(data, "sinks") def flume_channels(self, data, agent=""): if agent != "": data = data[agent] return self.flume_elements(data, "channels") def flume_agents(self, data): return self.extract_keys(data)
Create an Ansible playbook that will be used to parse the Apache Flume configuration template.
- hosts: localhost vars: flume_config: agent: sources: seqGenSrc: type: seq channels: memoryChannel channels: memoryChannel: type: memory capacity: 100 sinks: loggerSink: type: logger channel: memoryChannel tasks: - name: Parse and display configuration template debug: msg: "{{ rendered_flume_template.split(';\n';) }}" vars: rendered_flume_template: "{{ lookup(';template';, ';./template.j2';, trim_blocks=True) }}"
Create a template that uses defined filters.
#jinja2: lstrip_blocks: True # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case called 'agent' {% set flume_agents = flume_config | flume_agents %} {% for agent in flume_agents %} {{ agent }}.sources = {{ flume_config | flume_sources(agent) | join(" ") }} {{ agent }}.channels = {{ flume_config | flume_channels(agent) | join(" ") }} {{ agent }}.sinks = {{ flume_config | flume_sinks(agent) | join(" ") }} {% endfor %} {% for agent in flume_agents %} {% set configuration = flume_config | flume_configuration(agent) %} {% for key in configuration %} {{- agent }}.{{ key }} = {{ configuration[key] }} {% endfor %} {% endfor %}
Execute playbook to generate configuration template.
$ ansible-playbook playbook.yml [WARNING]: provided hosts list is empty, only localhost is available. Note that the implicit localhost does not match ';all'; PLAY [localhost] ********************************************************************************************************************************************* TASK [Gathering Facts] *************************************************************************************************************************************** ok: [localhost] TASK [Parse and display configuration template] ************************************************************************************************************** ok: [localhost] => { "msg": [ "# Licensed to the Apache Software Foundation (ASF) under one", "# or more contributor license agreements. See the NOTICE file", "# distributed with this work for additional information", "# regarding copyright ownership. The ASF licenses this file", "# to you under the Apache License, Version 2.0 (the", "# \"License\"); you may not use this file except in compliance", "# with the License. You may obtain a copy of the License at", "#", "# http://www.apache.org/licenses/LICENSE-2.0", "#", "# Unless required by applicable law or agreed to in writing,", "# software distributed under the License is distributed on an", "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY", "# KIND, either express or implied. See the License for the", "# specific language governing permissions and limitations", "# under the License.", "", "", "# The configuration file needs to define the sources,", "# the channels and the sinks.", "# Sources, channels and sinks are defined per agent,", "# in this case called ';agent';", "", "agent.sources = seqGenSrc", "agent.channels = memoryChannel", "agent.sinks = loggerSink", "", "agent.sources.seqGenSrc.type = seq", "agent.sources.seqGenSrc.channels = memoryChannel", "agent.channels.memoryChannel.type = memory", "agent.channels.memoryChannel.capacity = 100", "agent.sinks.loggerSink.type = logger", "agent.sinks.loggerSink.channel = memoryChannel", "" ] } PLAY RECAP *************************************************************************************************************************************************** localhost : ok=2 changed=0 unreachable=0 failed=0 skipped=0 rescued=0 ignored=0
Additional notes
Code used in this example is available at GitHub milosz/ansible-flume-filter
Play with it as this is simpler than you think.