import datetime as dt
import os
import pandas as pd
import ujson as json
from pyspark.sql.types import *

from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client

%pylab inline

Take the set of pings, make sure we have actual clientIds and remove duplicate pings.

def dedupe_pings(rdd):
    return rdd.filter(lambda p: p["meta/clientId"] is not None)\
              .map(lambda p: (p["meta/documentId"], p))\
              .reduceByKey(lambda x, y: x)\
              .map(lambda x: x[1])

We’re going to dump each event from the pings. Do a little empty data sanitization so we don’t get NoneType errors during the dump. We create a JSON array of active experiments as part of the dump.

def safe_str(obj):
    """ return the byte string representation of obj """
    if obj is None:
        return unicode("")
    return unicode(obj)

def transform(ping):    
    output = []

    # These should not be None since we filter those out & ingestion process adds the data
    clientId = ping["meta/clientId"]
    submissionDate = dt.datetime.strptime(ping["meta/submissionDate"], "%Y%m%d")

    events = ping["payload/UIMeasurements"]
    if events and isinstance(events, list):
        for event in events:
            if isinstance(event, dict) and "type" in event and event["type"] == "event":
                if "timestamp" not in event or "action" not in event or "method" not in event or "sessions" not in event:

                # Verify timestamp is a long, otherwise ignore the event
                timestamp = None
                    timestamp = long(event["timestamp"])

                # Force all fields to strings
                action = safe_str(event["action"])
                method = safe_str(event["method"])

                # The extras is an optional field
                extras = unicode("")
                if "extras" in event and safe_str(event["extras"]) is not None:
                    extras = safe_str(event["extras"])

                sessions = set()
                experiments = []

                    for session in event["sessions"]:
                        if "experiment.1:" in session:
                except TypeError:

                output.append([clientId, submissionDate, timestamp, action, method, extras, json.dumps(list(sessions)), json.dumps(experiments)])

    return output

The data can have duplicate events, due to a bug in the data collection that was fixed (bug 1246973). We still need to de-dupe the events. Because pings can be archived on device and submitted on later days, we can’t assume dupes only happen on the same submission day. We don’t use submission date when de-duping.

def dedupe_events(rdd):
    return p: (p[0] + safe_str(p[2]) + p[3] + p[4], p))\
              .reduceByKey(lambda x, y: x)\
              .map(lambda x: x[1])

Create a set of events from “saved-session” UI telemetry. Output the data to CSV or Parquet.

This script is designed to loop over a range of days and output a single day for the given channels. Use explicit date ranges for backfilling, or now() - ‘1day’ for automated runs.

channels = ["nightly", "aurora", "beta", "release"]

batch_date = os.environ.get('date')
if batch_date:
    start = end = dt.datetime.strptime(batch_date, '%Y%m%d')
    start = start = - dt.timedelta(1)

day = start
while day <= end:
    for channel in channels:
        print "\nchannel: " + channel + ", date: " + day.strftime("%Y%m%d")

        pings = get_pings(sc, app="Fennec", channel=channel,
                          submission_date=(day.strftime("%Y%m%d"), day.strftime("%Y%m%d")),
                          build_id=("20100101000000", "99999999999999"),

        subset = get_pings_properties(pings, ["meta/clientId",

        subset = dedupe_pings(subset)
        print subset.first()

        rawEvents = subset.flatMap(transform)
        print "\nRaw count: " + str(rawEvents.count())
        print rawEvents.first()

        uniqueEvents = dedupe_events(rawEvents)
        print "\nUnique count: " + str(uniqueEvents.count())
        print uniqueEvents.first()

        s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/mobile/android_events"
        s3_output += "/v1/channel=" + channel + "/submission=" + day.strftime("%Y%m%d") 
        schema = StructType([
            StructField("clientid", StringType(), False),
            StructField("submissiondate", TimestampType(), False),
            StructField("ts", LongType(), True),
            StructField("action", StringType(), True),
            StructField("method", StringType(), True),
            StructField("extras", StringType(), True),
            StructField("sessions", StringType(), True),
            StructField("experiments", StringType(), True)
        grouped = sqlContext.createDataFrame(uniqueEvents, schema)
        grouped.coalesce(1).write.parquet(s3_output, mode="overwrite")

    day += dt.timedelta(1)