001/*
002 * Copyright 2019 Perihelios LLC
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *     http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package com.perihelios.aws.lambda.cloudwatch.dispatcher;
017
018import com.amazonaws.services.lambda.runtime.Context;
019import com.google.gson.Gson;
020import com.google.gson.GsonBuilder;
021import com.google.gson.JsonElement;
022import com.google.gson.JsonObject;
023import com.google.gson.JsonParser;
024import com.perihelios.aws.lambda.cloudwatch.dispatcher.event.CloudWatchEvent;
025import com.perihelios.aws.lambda.cloudwatch.dispatcher.event.DetailType;
026import com.perihelios.aws.lambda.cloudwatch.dispatcher.event.Header;
027
028import java.io.ByteArrayOutputStream;
029import java.io.IOException;
030import java.io.InputStream;
031import java.io.UncheckedIOException;
032import java.time.ZonedDateTime;
033import java.util.HashMap;
034import java.util.Map;
035import java.util.function.BiConsumer;
036
037import static java.nio.charset.StandardCharsets.UTF_8;
038
039/**
040 * Main entry point of the API—all users of this library will create and configure an instance of this class.
041 * <p>
042 * A typical usage of this class in a Lambda function is:
043 * </p>
044 * <pre>
045 *     ...
046 *     void myLambdaFunc(InputStream message, Context context) {
047 *         new CloudWatchEventDispatcher(message, context)
048 *             .withEventHandler(MyEvent1.class, new MyEvent1Handler())
049 *             .withEventHandler(MyEvent2.class, new MyEvent2Handler())
050 *             .logMessage()
051 *             .dispatch();
052 *     }
053 *     ...
054 * </pre>
055 */
056public class CloudWatchEventDispatcher {
057    private final String message;
058    private final Context context;
059    private final Map<String, BiConsumer<?, Context>> handlers;
060    private final Map<String, Class<? extends CloudWatchEvent>> eventTypes;
061
062    private boolean logRawMessage;
063
064    /**
065     * Creates a dispatcher for a CloudWatch event, ready for further configuration.
066     *
067     * @param message raw message stream, presumed to contain CloudWatch event JSON
068     * @param context AWS Lambda context, to be passed to handlers as they are invoked
069     */
070    public CloudWatchEventDispatcher(InputStream message, Context context) {
071        this.message = readQuickly(message).trim();
072        this.context = context;
073        this.handlers = new HashMap<>();
074        this.eventTypes = new HashMap<>();
075    }
076
077    /**
078     * Registers an event handler for a particular CloudWatch event type.
079     * <p>
080     * All classes passed in {@code eventType} must be annotated with {@link DetailType}. See the general description
081     * of that annotation, and of {@link CloudWatchEvent}, for details.
082     *
083     * @param eventType class to which events will be unmarshalled
084     * @param handler   consumer of events of {@code eventType} type
085     * @param <T>       type of event, with type bounds ensuring compatibility between {@code eventType} and
086     *                  {@code handler}
087     * @return a reference to this object
088     */
089    public <T extends CloudWatchEvent> CloudWatchEventDispatcher withEventHandler(
090            Class<T> eventType, BiConsumer<? super T, Context> handler) {
091
092        DetailType detailType = eventType.getAnnotation(DetailType.class);
093
094        if (detailType == null) {
095            throw new IllegalArgumentException(
096                    "Cannot register event handler for event type " + eventType.getName() +
097                            "; event type not annotated with " + DetailType.class.getName()
098            );
099        }
100
101        String typeDescription = detailType.value();
102
103        eventTypes.put(typeDescription, eventType);
104        handlers.put(typeDescription, handler);
105
106        return this;
107    }
108
109    /**
110     * Instructs the dispatcher to log the incoming message, before it is parsed as JSON.
111     * <p>
112     * Activating logging via this method is primarily useful for troubleshooting errors where an AWS Lambda may be
113     * receiving messages <em>other than</em> CloudWatch events. CloudWatch will only send valid JSON documents,
114     * but it is possible to (erroneously) connect a Lambda function expecting CloudWatch events to another event
115     * source.
116     * <p>
117     * Logging is done via the {@link com.amazonaws.services.lambda.runtime.LambdaLogger LambdaLogger} obtained from
118     * {@link Context#getLogger()}. CloudWatch Logs must be enabled for the Lambda, and the Lambda's role must have
119     * the necessary permissions to write to CloudWatch Logs.
120     *
121     * @return a reference to this object
122     */
123    public CloudWatchEventDispatcher logMessage() {
124        logRawMessage = true;
125
126        return this;
127    }
128
129    /**
130     * Dispatches the event to registered handlers.
131     * <p>
132     * This is the terminal operation of the dispatcher. All settings specified via the other methods of this class are
133     * applied at this time.
134     *
135     * @throws IllegalArgumentException if the message is not valid JSON, or if the message is missing
136     *                                  {@code detail-type} or {@code detail} properties, or if the message's
137     *                                  {@code detail-type} does not correspond to any event types registered via
138     *                                  {@link #withEventHandler(Class, BiConsumer) withEventHandler()}
139     */
140    public void dispatch() {
141        if (logRawMessage) {
142            context.getLogger().log("Raw message: " + message);
143        }
144
145        JsonObject jsonObject;
146        try {
147            jsonObject = new JsonParser().parse(message).getAsJsonObject();
148        } catch (Exception e) {
149            throw new IllegalArgumentException("Failed to parse message as JSON", e);
150        }
151
152        Gson gson = new GsonBuilder()
153                .registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeAdapter())
154                .create();
155
156        JsonElement detailType = jsonObject.get("detail-type");
157        if (detailType == null) {
158            throw new IllegalArgumentException(
159                    "Received message is not CloudWatch event (missing \"detail-type\" property)"
160            );
161        }
162
163        JsonElement detail = jsonObject.get("detail");
164        if (detail == null) {
165            throw new IllegalArgumentException(
166                    "Received message is not CloudWatch event (missing \"detail\" property)"
167            );
168        }
169
170        String typeDescription = detailType.getAsString();
171
172        Class<? extends CloudWatchEvent> eventType = eventTypes.get(typeDescription);
173        if (eventType == null) {
174            throw new IllegalArgumentException(
175                    "Received event of unknown type; detail-type field in message: " + typeDescription
176            );
177        }
178
179        Header header = gson.fromJson(jsonObject, Header.class);
180        CloudWatchEvent event = gson.fromJson(detail, eventType);
181        event.setHeader(header);
182
183        // The generic type bounds used on the method that stores key/value pairs in the map make this type-safe
184        @SuppressWarnings("unchecked")
185        BiConsumer<Object, Context> handler =
186                (BiConsumer<Object, Context>) handlers.get(typeDescription);
187
188        handler.accept(event, context);
189    }
190
191    private static String readQuickly(InputStream stream) {
192        try {
193            byte[] bytes = new byte[stream.available()];
194
195            //noinspection ResultOfMethodCallIgnored
196            stream.read(bytes);
197            int next = stream.read();
198
199            if (next < 0) {
200                return new String(bytes, UTF_8);
201            }
202
203            return readSlowly(bytes, next, stream);
204        } catch (IOException e) {
205            throw new UncheckedIOException(e);
206        } finally {
207            try {
208                stream.close();
209            } catch (IOException ignore) {
210            }
211        }
212    }
213
214    private static String readSlowly(byte[] initial, int next, InputStream stream) throws IOException {
215        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(initial.length + 1 + 65_536);
216        byte[] buffer = new byte[65_536];
217
218        outputStream.write(initial);
219        outputStream.write(next);
220
221        int read;
222        while ((read = stream.read(buffer)) >= 0) {
223            outputStream.write(buffer, 0, read);
224        }
225
226        return new String(outputStream.toByteArray(), UTF_8);
227    }
228}