import datetime as dt
import os
import pandas as pd
import ujson as json
from pyspark.sql.types import *

from moztelemetry import get_pings, get_pings_properties

%pylab inline

Take the set of pings, make sure we have actual clientIds and remove duplicate pings. We collect each unique ping.

def dedupe_pings(rdd):
    return rdd.filter(lambda p: p["meta/clientId"] is not None)\
              .map(lambda p: (p["meta/documentId"], p))\
              .reduceByKey(lambda x, y: x)\
              .map(lambda x: x[1])

Transform and sanitize the pings into arrays.

# bug 1362659 - int values exceeded signed 32 bit range
MAX_INT = (2**31)-1

def transform(ping):
    # Should not be None since we filter those out.
    clientId = ping["meta/clientId"]

    profileDate = None
    profileDaynum = ping["environment/profile/creationDate"]
    if profileDaynum is not None:
            # Bad data could push profileDaynum > 32767 (size of a C int) and throw exception
            profileDate = dt.datetime(1970, 1, 1) + dt.timedelta(int(profileDaynum))
            profileDate = None

    # Create date should already be in ISO format
    creationDate = ping["creationDate"]
    if creationDate is not None:
        # This is only accurate because we know the creation date is always in 'Z' (zulu) time.
        creationDate = dt.datetime.strptime(ping["creationDate"], "%Y-%m-%dT%H:%M:%S.%fZ")

    # Added via the ingestion process so should not be None.
    submissionDate = dt.datetime.strptime(ping["meta/submissionDate"], "%Y%m%d")

    appVersion = ping["application/version"]
    osVersion = ping["environment/system/os/version"]
    if osVersion is not None:
        osVersion = int(osVersion) if int(osVersion) <= MAX_INT else None

    locale = ping["environment/settings/locale"]

    # Truncate to 32 characters
    defaultSearch = ping["environment/settings/defaultSearchEngine"]
    if defaultSearch is not None:
        defaultSearch = defaultSearch[0:32]

    # Build up the device string, truncating like we do in 'core' ping.
    device = ping["environment/system/device/manufacturer"]
    model = ping["environment/system/device/model"]
    if device is not None and model is not None:
        device = device[0:12] + "-" + model[0:19]

    xpcomABI = ping["application/xpcomAbi"]
    arch = "arm"
    if xpcomABI is not None and "x86" in xpcomABI:
        arch = "x86"

    # Bug 1337896
    as_topsites_loader_time = ping["payload/histograms/FENNEC_ACTIVITY_STREAM_TOPSITES_LOADER_TIME_MS"]
    topsites_loader_time = ping["payload/histograms/FENNEC_TOPSITES_LOADER_TIME_MS"]

    if as_topsites_loader_time is not None:
        as_topsites_loader_time = map(int, as_topsites_loader_time.tolist())
        if any([v > MAX_INT for v in as_topsites_loader_time]):
            as_topsites_loader_time = None

    if topsites_loader_time is not None:
        topsites_loader_time = map(int, topsites_loader_time.tolist())
        if any([v > MAX_INT for v in topsites_loader_time]):
            topsites_loader_time = None

    return [clientId,

Create a set of pings from “saved-session” to build a set of core client data. Output the data to CSV or Parquet.

This script is designed to loop over a range of days and output a single day for the given channels. Use explicit date ranges for backfilling, or now() - ‘1day’ for automated runs.

channels = ["nightly", "aurora", "beta", "release"]

batch_date = os.environ.get('date')
if batch_date:
    start = end = dt.datetime.strptime(batch_date, '%Y%m%d')
    start = end = - dt.timedelta(1)

day = start
while day <= end:
    for channel in channels:
        print "\nchannel: " + channel + ", date: " + day.strftime("%Y%m%d")

        pings = get_pings(sc, app="Fennec", channel=channel,
                          submission_date=(day.strftime("%Y%m%d"), day.strftime("%Y%m%d")),
                          build_id=("20100101000000", "99999999999999"),

        subset = get_pings_properties(pings, ["meta/clientId",

        subset = dedupe_pings(subset)
        transformed =

        s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/mobile/android_clients"
        s3_output += "/v2/channel=" + channel + "/submission=" + day.strftime("%Y%m%d") 
        schema = StructType([
            StructField("clientid", StringType(), False),
            StructField("profiledate", TimestampType(), True),
            StructField("submissiondate", TimestampType(), False),
            StructField("creationdate", TimestampType(), True),
            StructField("appversion", StringType(), True),
            StructField("osversion", IntegerType(), True),
            StructField("locale", StringType(), True),
            StructField("defaultsearch", StringType(), True),
            StructField("device", StringType(), True),
            StructField("arch", StringType(), True),
        grouped = sqlContext.createDataFrame(transformed, schema)
        grouped.coalesce(1).write.parquet(s3_output, mode="overwrite")

    day += dt.timedelta(1)