import os
import datetime as dt
import pandas as pd
import ujson as json
from pyspark.sql.types import *

from moztelemetry import get_pings, get_pings_properties

%pylab inline

Take the set of pings, make sure we have actual clientIds and remove duplicate pings. We collect each unique ping.

def dedupe_pings(rdd):
    return rdd.filter(lambda p: p["meta/clientId"] is not None)\
              .map(lambda p: (p["meta/documentId"], p))\
              .reduceByKey(lambda x, y: x)\
              .map(lambda x: x[1])

Transform and sanitize the pings into arrays.

def transform(ping):
    # Should not be None since we filter those out.
    clientId = ping["meta/clientId"]

    # Added via the ingestion process so should not be None.
    submissionDate = dt.datetime.strptime(ping["meta/submissionDate"], "%Y%m%d")
    geoCountry = ping["meta/geoCountry"]

    profileDate = None
    profileDaynum = ping["profileDate"]
    if profileDaynum is not None:
            # Bad data could push profileDaynum > 32767 (size of a C int) and throw exception
            profileDate = dt.datetime(1970, 1, 1) + dt.timedelta(int(profileDaynum))
            profileDate = None

    # Create date can be an improper string (~.03% of the time, so ignore)
    # Year can be < 2000 (~.005% of the time, so ignore)
        # Create date should already be in ISO format
        creationDate = ping["created"]
        if creationDate is not None:
            # This is only accurate because we know the creation date is always in 'Z' (zulu) time.
            creationDate = dt.datetime.strptime(ping["created"], "%Y-%m-%d")
            if creationDate.year < 2000:
                creationDate = None
    except ValueError:
        creationDate = None

    appVersion = ping["meta/appVersion"]
    buildId = ping["meta/appBuildId"]
    locale = ping["locale"]
    os = ping["os"]
    osVersion = ping["osversion"]
    device = ping["device"]
    arch = ping["arch"]
    defaultSearch = ping["defaultSearch"]
    distributionId = ping["distributionId"]

    experiments = ping["experiments"]
    if experiments is None:
        experiments = []

    #bug 1315028
    defaultNewTabExperience = ping["defaultNewTabExperience"]
    defaultMailClient = ping["defaultMailClient"]

    #bug 1307419
    searches = ping["searches"]
    durations = ping["durations"]
    sessions = ping["sessions"]

    return [clientId, submissionDate, creationDate, profileDate, geoCountry, locale, os,
            osVersion, buildId, appVersion, device, arch, defaultSearch, distributionId,
            json.dumps(experiments), defaultNewTabExperience, defaultMailClient, searches,
            durations, sessions]

Create a set of pings from “core” to build a set of core client data. Output the data to CSV or Parquet.

This script is designed to loop over a range of days and output a single day for the given channels. Use explicit date ranges for backfilling, or now() - ‘1day’ for automated runs.

channels = ["nightly", "aurora", "beta", "release"]

batch_date = os.environ.get('date')
if batch_date:
    start = end = dt.datetime.strptime(batch_date, '%Y%m%d')
    start = - dt.timedelta(1)
    end = - dt.timedelta(1)

day = start
while day <= end:
    for channel in channels:
        print "\nchannel: " + channel + ", date: " + day.strftime("%Y%m%d")

        kwargs = dict(
            submission_date=(day.strftime("%Y%m%d"), day.strftime("%Y%m%d")),

        # Grab all available source_version pings
        pings = get_pings(sc, source_version="*", **kwargs)

        subset = get_pings_properties(pings, ["meta/clientId",

        subset = dedupe_pings(subset)
        print "\nDe-duped pings:" + str(subset.count())
        print subset.first()

        transformed =
        print "\nTransformed pings:" + str(transformed.count())
        print transformed.first()

        s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/mobile/mobile_clients"
        s3_output += "/v2/channel=" + channel + "/submission=" + day.strftime("%Y%m%d") 
        schema = StructType([
            StructField("clientid", StringType(), False),
            StructField("submissiondate", TimestampType(), False),
            StructField("creationdate", TimestampType(), True),
            StructField("profiledate", TimestampType(), True),
            StructField("geocountry", StringType(), True),
            StructField("locale", StringType(), True),
            StructField("os", StringType(), True),
            StructField("osversion", StringType(), True),
            StructField("buildid", StringType(), True),
            StructField("appversion", StringType(), True),
            StructField("device", StringType(), True),
            StructField("arch", StringType(), True),
            StructField("defaultsearch", StringType(), True),
            StructField("distributionid", StringType(), True),
            StructField("experiments", StringType(), True),
            StructField("default_new_tab_experience", StringType(), True),
            StructField("default_mail_client", StringType(), True),
            StructField("searches", StringType(), True),
            StructField("durations", StringType(), True),
            StructField("sessions", StringType(), True)
        # Make parquet parition file size large, but not too large for s3 to handle
        coalesce = 1
        if channel == "release":
            coalesce = 4
        grouped = sqlContext.createDataFrame(transformed, schema)

    day += dt.timedelta(1)