import datetime as dt
import os
import pandas as pd
import operator
import ujson as json
from pyspark.sql.types import *

from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client

%pylab inline

Take the set of pings, make sure we have actual clientIds and remove duplicate pings.

def safe_str(obj):
    """ return the byte string representation of obj """
    if obj is None:
        return unicode("")
    return unicode(obj)

def dedupe_pings(rdd):
    return rdd.filter(lambda p: p["meta/clientId"] is not None)\
              .map(lambda p: (p["meta/documentId"], p))\
              .reduceByKey(lambda x, y: x)\
              .map(lambda x: x[1])

def dedupe_addons(rdd):
    return p: (p[0] + safe_str(p[2]) + safe_str(p[3]), p))\
              .reduceByKey(lambda x, y: x)\
              .map(lambda x: x[1])

We’re going to dump each event from the pings. Do a little empty data sanitization so we don’t get NoneType errors during the dump. We create a JSON array of active experiments as part of the dump.

def clean(s):
        s = s.decode("ascii").strip()
        return s if len(s) > 0 else None
        return None

def transform(ping):    
    output = []

    # These should not be None since we filter those out & ingestion process adds the data
    clientId = ping["meta/clientId"]
    submissionDate = dt.datetime.strptime(ping["meta/submissionDate"], "%Y%m%d")

    addonset = {}
    addons = ping["environment/addons/activeAddons"]
    if addons is not None:
        for addon, desc in addons.iteritems():
            name = clean(desc.get("name", None))
            if name is not None:
                addonset[name] = 1

    persona = ping["environment/addons/persona"]

    if len(addonset) > 0 or persona is not None:
        addonarray = None
        if len(addonset) > 0:
            addonarray = json.dumps(addonset.keys())
        output.append([clientId, submissionDate, addonarray, persona])

    return output

Create a set of events from “saved-session” UI telemetry. Output the data to CSV or Parquet.

This script is designed to loop over a range of days and output a single day for the given channels. Use explicit date ranges for backfilling, or now() - ‘1day’ for automated runs.

channels = ["nightly", "aurora", "beta", "release"]

batch_date = os.environ.get('date')
if batch_date:
    start = end = dt.datetime.strptime(batch_date, '%Y%m%d')
    start = start = - dt.timedelta(1)

day = start
while day <= end:
    for channel in channels:
        print "\nchannel: " + channel + ", date: " + day.strftime("%Y%m%d")

        pings = get_pings(sc, app="Fennec", channel=channel,
                          submission_date=(day.strftime("%Y%m%d"), day.strftime("%Y%m%d")),
                          build_id=("20100101000000", "99999999999999"),

        subset = get_pings_properties(pings, ["meta/clientId",

        subset = dedupe_pings(subset)
        print subset.first()

        rawAddons = subset.flatMap(transform)
        print "\nrawAddons count: " + str(rawAddons.count())
        print rawAddons.first()

        uniqueAddons = dedupe_addons(rawAddons)
        print "\nuniqueAddons count: " + str(uniqueAddons.count())
        print uniqueAddons.first()

        s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/mobile/android_addons"
        s3_output += "/v1/channel=" + channel + "/submission=" + day.strftime("%Y%m%d") 
        schema = StructType([
            StructField("clientid", StringType(), False),
            StructField("submissiondate", TimestampType(), False),
            StructField("addons", StringType(), True),
            StructField("lwt", StringType(), True)
        grouped = sqlContext.createDataFrame(uniqueAddons, schema)
        grouped.coalesce(1).write.parquet(s3_output, mode="overwrite")

    day += dt.timedelta(1)